Asterisk - The Open Source Telephony Project GIT-master-009e3ef
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
Data Structures | Macros | Enumerations | Functions | Variables
chan_websocket.c File Reference

Websocket Media Channel. More...

#include "asterisk.h"
#include "asterisk/app.h"
#include "asterisk/causes.h"
#include "asterisk/channel.h"
#include "asterisk/codec.h"
#include "asterisk/http_websocket.h"
#include "asterisk/format_cache.h"
#include "asterisk/frame.h"
#include "asterisk/lock.h"
#include "asterisk/mod_format.h"
#include "asterisk/module.h"
#include "asterisk/pbx.h"
#include "asterisk/uuid.h"
#include "asterisk/timing.h"
#include "asterisk/translate.h"
#include "asterisk/websocket_client.h"
Include dependency graph for chan_websocket.c:

Go to the source code of this file.

Data Structures

struct  instance_proxy
 
struct  websocket_pvt
 

Macros

#define ANSWER_CHANNEL   "ANSWER"
 
#define CONTINUE_MEDIA   "CONTINUE_MEDIA"
 
#define DRIVER_STATUS   "STATUS"
 
#define FLUSH_MEDIA   "FLUSH_MEDIA"
 
#define GET_DRIVER_STATUS   "GET_STATUS"
 
#define HANGUP_CHANNEL   "HANGUP"
 
#define INCOMING_CONNECTION_ID   "INCOMING"
 
#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
 
#define MEDIA_BUFFERING_COMPLETED   "MEDIA_BUFFERING_COMPLETED"
 
#define MEDIA_START   "MEDIA_START"
 
#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"
 
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
 
#define MEDIA_XOFF   "MEDIA_XOFF"
 
#define MEDIA_XON   "MEDIA_XON"
 
#define PAUSE_MEDIA   "PAUSE_MEDIA"
 
#define QUEUE_DRAINED   "QUEUE_DRAINED"
 
#define QUEUE_LENGTH_MAX   1000
 
#define QUEUE_LENGTH_XOFF_LEVEL   900
 
#define QUEUE_LENGTH_XON_LEVEL   800
 
#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"
 
#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"
 
#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"
 

Enumerations

enum  { OPT_WS_CODEC = (1 << 0) , OPT_WS_NO_AUTO_ANSWER = (1 << 1) }
 
enum  { OPT_ARG_WS_CODEC , OPT_ARG_WS_NO_AUTO_ANSWER , OPT_ARG_ARRAY_SIZE }
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
static struct ast_framedequeue_frame (struct websocket_pvt *instance)
 
static void incoming_ws_established_cb (struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
 
static int incoming_ws_http_callback (struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
 
static void instance_proxy_cb (void *weakproxy, void *data)
 
static int load_module (void)
 Function called when our module is loaded. More...
 
static int process_binary_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int process_text_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int queue_frame_from_buffer (struct websocket_pvt *instance, char *buffer, size_t len)
 
static int queue_option_frame (struct websocket_pvt *instance, char *buffer)
 
static int read_from_ws_and_queue (struct websocket_pvt *instance)
 
static void * read_thread_handler (void *obj)
 
static void set_channel_format (struct websocket_pvt *instance, struct ast_format *fmt)
 
static int set_channel_timer (struct websocket_pvt *instance)
 
static int set_channel_variables (struct websocket_pvt *instance)
 
static int set_instance_silence_frame (struct websocket_pvt *instance)
 
static int set_instance_translator (struct websocket_pvt *instance)
 
static int unload_module (void)
 Function called when our module is unloaded. More...
 
static int webchan_call (struct ast_channel *ast, const char *dest, int timeout)
 
static int webchan_hangup (struct ast_channel *ast)
 
static struct ast_framewebchan_read (struct ast_channel *ast)
 
static struct ast_channelwebchan_request (const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
 
static int webchan_write (struct ast_channel *ast, struct ast_frame *f)
 Function called when we should write a frame to the channel. More...
 
static void websocket_destructor (void *data)
 
static struct websocket_pvtwebsocket_new (const char *chan_name, const char *connection_id, struct ast_format *fmt)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct ast_websocket_serverast_ws_server
 
static struct ast_http_uri http_uri
 
static struct ao2_containerinstances = NULL
 
static const struct ast_app_option websocket_options [128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, }
 
static struct ast_channel_tech websocket_tech
 

Detailed Description

Websocket Media Channel.

Author
George Joseph gjose.nosp@m.ph@s.nosp@m.angom.nosp@m.a.co.nosp@m.m

Definition in file chan_websocket.c.

Macro Definition Documentation

◆ ANSWER_CHANNEL

#define ANSWER_CHANNEL   "ANSWER"

Definition at line 87 of file chan_websocket.c.

◆ CONTINUE_MEDIA

#define CONTINUE_MEDIA   "CONTINUE_MEDIA"

Definition at line 95 of file chan_websocket.c.

◆ DRIVER_STATUS

#define DRIVER_STATUS   "STATUS"

Definition at line 101 of file chan_websocket.c.

◆ FLUSH_MEDIA

#define FLUSH_MEDIA   "FLUSH_MEDIA"

Definition at line 91 of file chan_websocket.c.

◆ GET_DRIVER_STATUS

#define GET_DRIVER_STATUS   "GET_STATUS"

Definition at line 92 of file chan_websocket.c.

◆ HANGUP_CHANNEL

#define HANGUP_CHANNEL   "HANGUP"

Definition at line 88 of file chan_websocket.c.

◆ INCOMING_CONNECTION_ID

#define INCOMING_CONNECTION_ID   "INCOMING"

Definition at line 85 of file chan_websocket.c.

◆ MAX_TEXT_MESSAGE_LEN

#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))

Definition at line 107 of file chan_websocket.c.

◆ MEDIA_BUFFERING_COMPLETED

#define MEDIA_BUFFERING_COMPLETED   "MEDIA_BUFFERING_COMPLETED"

Definition at line 102 of file chan_websocket.c.

◆ MEDIA_START

#define MEDIA_START   "MEDIA_START"

Definition at line 97 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_CONNECTION_ID

#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"

Definition at line 84 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE

#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"

Definition at line 83 of file chan_websocket.c.

◆ MEDIA_XOFF

#define MEDIA_XOFF   "MEDIA_XOFF"

Definition at line 99 of file chan_websocket.c.

◆ MEDIA_XON

#define MEDIA_XON   "MEDIA_XON"

Definition at line 98 of file chan_websocket.c.

◆ PAUSE_MEDIA

#define PAUSE_MEDIA   "PAUSE_MEDIA"

Definition at line 94 of file chan_websocket.c.

◆ QUEUE_DRAINED

#define QUEUE_DRAINED   "QUEUE_DRAINED"

Definition at line 100 of file chan_websocket.c.

◆ QUEUE_LENGTH_MAX

#define QUEUE_LENGTH_MAX   1000

Definition at line 104 of file chan_websocket.c.

◆ QUEUE_LENGTH_XOFF_LEVEL

#define QUEUE_LENGTH_XOFF_LEVEL   900

Definition at line 105 of file chan_websocket.c.

◆ QUEUE_LENGTH_XON_LEVEL

#define QUEUE_LENGTH_XON_LEVEL   800

Definition at line 106 of file chan_websocket.c.

◆ REPORT_QUEUE_DRAINED

#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"

Definition at line 93 of file chan_websocket.c.

◆ START_MEDIA_BUFFERING

#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"

Definition at line 89 of file chan_websocket.c.

◆ STOP_MEDIA_BUFFERING

#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"

Definition at line 90 of file chan_websocket.c.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
OPT_WS_CODEC 
OPT_WS_NO_AUTO_ANSWER 

Definition at line 1100 of file chan_websocket.c.

1100 {
1101 OPT_WS_CODEC = (1 << 0),
1102 OPT_WS_NO_AUTO_ANSWER = (1 << 1),
1103};
@ OPT_WS_CODEC
@ OPT_WS_NO_AUTO_ANSWER

◆ anonymous enum

anonymous enum
Enumerator
OPT_ARG_WS_CODEC 
OPT_ARG_WS_NO_AUTO_ANSWER 
OPT_ARG_ARRAY_SIZE 

Definition at line 1105 of file chan_websocket.c.

1105 {
1109};
@ OPT_ARG_WS_CODEC
@ OPT_ARG_WS_NO_AUTO_ANSWER
@ OPT_ARG_ARRAY_SIZE

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 1518 of file chan_websocket.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 1518 of file chan_websocket.c.

◆ AST_MODULE_SELF_SYM()

struct ast_module * AST_MODULE_SELF_SYM ( void  )

Definition at line 1518 of file chan_websocket.c.

◆ dequeue_frame()

static struct ast_frame * dequeue_frame ( struct websocket_pvt instance)
static

Definition at line 142 of file chan_websocket.c.

143{
144 struct ast_frame *queued_frame = NULL;
145 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
147
148 /*
149 * If the queue is paused, don't read a frame. Processing
150 * will continue down the function and a silence frame will
151 * be sent in its place.
152 */
153 if (instance->queue_paused) {
154 return NULL;
155 }
156
157 /*
158 * We need to check if we need to send an XON before anything
159 * else because there are multiple escape paths in this function
160 * and we don't want to accidentally keep the queue in a "full"
161 * state.
162 */
163 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
164 instance->queue_full = 0;
165 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
166 ast_channel_name(instance->channel));
168 }
169
170 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
171
172 /*
173 * If there are no frames in the queue, we need to
174 * return NULL so we can send a silence frame. We also need
175 * to send the QUEUE_DRAINED notification if we were requested
176 * to do so.
177 */
178 if (!queued_frame) {
179 if (instance->report_queue_drained) {
180 instance->report_queue_drained = 0;
181 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
182 ast_channel_name(instance->channel));
184 }
185 return NULL;
186 }
187
188 /*
189 * The only way a control frame could be present here is as
190 * a result of us calling queue_option_frame() in response
191 * to an incoming TEXT command from the websocket.
192 * We'll be safe and make sure it's a AST_CONTROL_OPTION
193 * frame anyway.
194 *
195 * It's quite possible that there are multiple control frames
196 * in a row in the queue so we need to process consecutive ones
197 * immediately.
198 *
199 * In any case, processing a control frame MUST not use up
200 * a media timeslot so after all control frames have been
201 * processed, we need to read an audio frame and process it.
202 */
203 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
204 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
205 /*
206 * We just need to send the data to the websocket.
207 * The data should already be NULL terminated.
208 */
210 queued_frame->data.ptr);
211 ast_debug(4, "%s: WebSocket sending %s\n",
212 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
213 }
214 /*
215 * We do NOT send these to the core so we need to free
216 * the frame and grab the next one. If it's also a
217 * control frame, we need to process it otherwise
218 * continue down in the function.
219 */
220 ast_frame_free(queued_frame, 0);
221 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
222 /*
223 * Jut FYI... We didn't bump the queue length when we added the control
224 * frames so we don't need to decrement it here.
225 */
226 }
227
228 /*
229 * If, after reading all control frames, there are no frames
230 * left in the queue, we need to return NULL so we can send
231 * a silence frame.
232 */
233 if (!queued_frame) {
234 return NULL;
235 }
236
237 instance->frame_queue_length--;
238
239 return queued_frame;
240}
#define QUEUE_LENGTH_XON_LEVEL
#define QUEUE_DRAINED
#define MEDIA_XON
const char * ast_channel_name(const struct ast_channel *chan)
int ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
Definition: main/frame.c:176
@ AST_FRAME_CONTROL
@ AST_CONTROL_OPTION
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_LOCK(head)
Locks a list.
Definition: linkedlists.h:40
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:833
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
Definition: linkedlists.h:140
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Definition: lock.h:590
#define NULL
Definition: resample.c:96
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@231 data
struct ast_channel * channel
struct websocket_pvt::@139 frame_queue
int report_queue_drained
struct ast_websocket * websocket

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, ast_frame_free(), AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_websocket_write_string(), websocket_pvt::channel, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, ast_frame_subclass::integer, MEDIA_XON, NULL, ast_frame::ptr, QUEUE_DRAINED, websocket_pvt::queue_full, QUEUE_LENGTH_XON_LEVEL, websocket_pvt::queue_paused, websocket_pvt::report_queue_drained, SCOPED_LOCK, ast_frame::subclass, and websocket_pvt::websocket.

Referenced by webchan_read().

◆ incoming_ws_established_cb()

static void incoming_ws_established_cb ( struct ast_websocket ast_ws_session,
struct ast_variable get_params,
struct ast_variable upgrade_headers 
)
static

Definition at line 1293 of file chan_websocket.c.

1295{
1296 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1297 struct ast_variable *v;
1298 const char *connection_id = NULL;
1299 struct websocket_pvt *instance = NULL;
1300 int nodelay = 1;
1301
1302 ast_debug(3, "WebSocket established\n");
1303
1304 for (v = upgrade_headers; v; v = v->next) {
1305 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1306 }
1307 for (v = get_params; v; v = v->next) {
1308 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1309 }
1310
1311 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1312 if (!connection_id) {
1313 /*
1314 * This can't really happen because websocket_http_callback won't
1315 * let it get this far if it can't add the connection_id to the
1316 * get_params.
1317 * Just in case though...
1318 */
1319 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1321 ast_websocket_close(ast_ws_session, 1000);
1322 return;
1323 }
1324
1326 if (!instance) {
1327 /*
1328 * This also can't really happen because websocket_http_callback won't
1329 * let it get this far if it can't find the instance.
1330 * Just in case though...
1331 */
1332 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1334 ast_websocket_close(ast_ws_session, 1000);
1335 return;
1336 }
1337 instance->websocket = ao2_bump(ast_ws_session);
1338
1339 if (setsockopt(ast_websocket_fd(instance->websocket),
1340 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1341 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno));
1342 }
1343
1344 /* read_thread_handler cleans up the bump */
1345 read_thread_handler(ao2_bump(instance));
1346
1347 ao2_cleanup(instance);
1348 ast_debug(3, "WebSocket closed\n");
1349}
#define ast_log
Definition: astobj2.c:42
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
static void * read_thread_handler(void *obj)
static struct ao2_container * instances
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
Definition: channel.c:1269
int ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
Definition: main/config.c:1013
@ AST_CONTROL_HANGUP
#define LOG_WARNING
int errno
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
Structure definition for session.
char connection_id[0]
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:947

References ao2_bump, ao2_cleanup, ao2_weakproxy_find, AST_CONTROL_HANGUP, ast_debug, ast_log, ast_queue_control(), ast_variable_find_in_list(), ast_websocket_close(), ast_websocket_fd(), ast_websocket_unref(), websocket_pvt::channel, websocket_pvt::connection_id, errno, instances, LOG_WARNING, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, read_thread_handler(), ast_variable::value, and websocket_pvt::websocket.

Referenced by load_module().

◆ incoming_ws_http_callback()

static int incoming_ws_http_callback ( struct ast_tcptls_session_instance ser,
const struct ast_http_uri urih,
const char *  uri,
enum ast_http_method  method,
struct ast_variable get_params,
struct ast_variable headers 
)
static

Definition at line 1361 of file chan_websocket.c.

1365{
1366 struct ast_http_uri fake_urih = {
1368 };
1369 int res = 0;
1370 /*
1371 * Normally the http server will destroy the get_params
1372 * when the session ends but if there weren't any initially
1373 * and we create some and add them to the list, the http server
1374 * won't know about it so we have to destroy it ourselves.
1375 */
1376 int destroy_get_params = (get_params == NULL);
1377 struct ast_variable *v = NULL;
1378 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1379
1380 ast_debug(2, "URI: %s Starting\n", uri);
1381
1382 /*
1383 * The client will have issued the GET request with a URI of
1384 * /media/<connection_id>
1385 *
1386 * Since this callback is registered for the /media URI prefix the
1387 * http server will strip that off the front of the URI passing in
1388 * only the path components after that in the 'uri' parameter.
1389 * This should leave only the connection id without a leading '/'.
1390 */
1391 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1392 if (!instance) {
1393 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1394 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1395 return -1;
1396 }
1397
1398 /*
1399 * We don't allow additional connections using the same connection id.
1400 */
1401 if (instance->websocket) {
1402 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1403 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1404 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1405 return -1;
1406 }
1407
1408 v = ast_variable_new("CONNECTION_ID", uri, "");
1409 if (!v) {
1410 ast_http_error(ser, 500, "Server error", "");
1411 return -1;
1412 }
1413 ast_variable_list_append(&get_params, v);
1414
1415 for (v = get_params; v; v = v->next) {
1416 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1417 }
1418
1419 /*
1420 * This will ultimately call internal_ws_established_cb() so
1421 * this function will block until the websocket is closed and
1422 * internal_ws_established_cb() returns;
1423 */
1424 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1425 get_params, headers);
1426 if (destroy_get_params) {
1427 ast_variables_destroy(get_params);
1428 }
1429
1430 ast_debug(2, "URI: %s DONE\n", uri);
1431
1432 return res;
1433}
static struct ast_websocket_server * ast_ws_server
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition: http.c:664
int ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition: extconf.c:1262
const char * method
Definition: res_pjsip.c:1279
Definition of a URI handler.
Definition: http.h:102
void * data
Definition: http.h:116

References ao2_cleanup, ao2_weakproxy_find, ast_channel_name(), ast_debug, ast_http_error(), ast_log, ast_variable_list_append, ast_variable_new, ast_variables_destroy(), ast_websocket_uri_cb(), ast_ws_server, ast_http_uri::data, instances, LOG_WARNING, method, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, and ast_variable::value.

◆ instance_proxy_cb()

static void instance_proxy_cb ( void *  weakproxy,
void *  data 
)
static

Definition at line 918 of file chan_websocket.c.

919{
920 struct instance_proxy *proxy = weakproxy;
921 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
922 ao2_unlink(instances, weakproxy);
923}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
char connection_id[0]
The name of the module owning this sorcery instance.

References ao2_unlink, ast_debug, instance_proxy::connection_id, and instances.

Referenced by websocket_new().

◆ load_module()

static int load_module ( void  )
static

Function called when our module is loaded.

Definition at line 1467 of file chan_websocket.c.

1468{
1469 int res = 0;
1470 struct ast_websocket_protocol *protocol;
1471
1474 }
1475
1478 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
1479 unload_module();
1481 }
1482
1484 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
1485 instance_proxy_sort_fn, instance_proxy_cmp_fn);
1486 if (!instances) {
1488 "Failed to allocate the chan_websocket instance registry\n");
1489 unload_module();
1491 }
1492
1494 if (!ast_ws_server) {
1495 unload_module();
1497 }
1498
1499 protocol = ast_websocket_sub_protocol_alloc("media");
1500 if (!protocol) {
1501 unload_module();
1503 }
1506
1508
1510}
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition: astobj2.h:365
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition: astobj2.h:1211
static struct ast_channel_tech websocket_tech
static struct ast_http_uri http_uri
static int unload_module(void)
Function called when our module is unloaded.
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
Definition: channel.c:538
@ AST_MEDIA_TYPE_UNKNOWN
Definition: codec.h:31
int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_type type)
Add all codecs Asterisk knows about for a specific type to the capabilities structure.
Definition: format_cap.c:216
@ AST_FORMAT_CAP_FLAG_DEFAULT
Definition: format_cap.h:38
#define ast_format_cap_alloc(flags)
Allocate a new ast_format_cap structure.
Definition: format_cap.h:49
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Definition: http.c:689
struct ast_websocket_protocol * ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
struct ast_websocket_server * ast_websocket_server_create(void)
Creates a ast_websocket_server.
#define LOG_ERROR
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
struct ast_format_cap * capabilities
Definition: channel.h:652
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.

References AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_container_alloc_hash, AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, ast_channel_register(), ast_format_cap_alloc, ast_format_cap_append_by_type(), AST_FORMAT_CAP_FLAG_DEFAULT, ast_http_uri_link(), ast_log, AST_MEDIA_TYPE_UNKNOWN, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_websocket_server_add_protocol2(), ast_websocket_server_create(), ast_websocket_sub_protocol_alloc(), ast_ws_server, ast_channel_tech::capabilities, http_uri, incoming_ws_established_cb(), instances, LOG_ERROR, LOG_WARNING, ast_websocket_protocol::session_established, unload_module(), and websocket_tech.

◆ process_binary_message()

static int process_binary_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 528 of file chan_websocket.c.

530{
531 char *next_frame_ptr = NULL;
532 size_t bytes_read = 0;
533 int res = 0;
534 size_t bytes_left = 0;
535
536 {
537 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
539 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
540 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
541 ast_channel_name(instance->channel));
542 return 0;
543 }
544 }
545
546 next_frame_ptr = payload;
547 instance->bytes_read += payload_len;
548
549 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
550 /*
551 * We have leftover data from a previous websocket message.
552 * Try to make a complete frame by appending data from
553 * the current message to the leftover data.
554 */
555 char *append_ptr = instance->leftover_data + instance->leftover_len;
556 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
557 /*
558 * It's possible that even the current message doesn't have enough
559 * data to make a complete frame.
560 */
561 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
562
563 /*
564 * Append whatever we can to the end of the leftover data
565 * even if it's not enough to make a complete frame.
566 */
567 memcpy(append_ptr, payload, bytes_avail_to_copy);
568
569 /*
570 * If leftover data is still short, just return and wait for the
571 * next websocket message.
572 */
573 if (bytes_avail_to_copy < bytes_needed_for_frame) {
574 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
575 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
576 instance->leftover_len += bytes_avail_to_copy;
577 return 0;
578 }
579
580 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
581 if (res < 0) {
582 return -1;
583 }
584
585 /*
586 * We stole data from the current payload so decrement payload_len
587 * and set the next frame pointer after the data in payload
588 * we just copied.
589 */
590 payload_len -= bytes_avail_to_copy;
591 next_frame_ptr = payload + bytes_avail_to_copy;
592
593 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
594 ast_channel_name(instance->channel),
595 instance->frame_queue_length,
596 (int)instance->bytes_read,
597 (int)(payload_len + bytes_avail_to_copy),
598 (int)instance->leftover_len,
599 payload,
600 next_frame_ptr,
601 (int)(next_frame_ptr - payload),
602 (int)payload_len,
603 (int)bytes_avail_to_copy
604 );
605
606
607 instance->leftover_len = 0;
608 }
609
610 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
611 instance->leftover_len = 0;
612 }
613
614 bytes_left = payload_len;
615 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
616 res = queue_frame_from_buffer(instance, next_frame_ptr,
617 instance->optimal_frame_size);
618 if (res < 0) {
619 break;
620 }
621 bytes_read += instance->optimal_frame_size;
622 next_frame_ptr += instance->optimal_frame_size;
623 bytes_left -= instance->optimal_frame_size;
624 }
625
626 if (instance->bulk_media_in_progress && bytes_left > 0) {
627 /*
628 * We have a partial frame. Save the leftover data.
629 */
630 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
631 ast_channel_name(instance->channel),
632 (int)instance->bytes_read,
633 instance->frame_queue_length,
634 (int)payload_len,
635 (int)instance->leftover_len,
636 payload,
637 next_frame_ptr,
638 (int)(next_frame_ptr - payload),
639 (int)bytes_left
640 );
641 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
642 instance->leftover_len = bytes_left;
643 }
644
645 return 0;
646}
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
#define QUEUE_LENGTH_MAX
char * leftover_data
int bulk_media_in_progress
size_t leftover_len
#define MIN(a, b)
Definition: utils.h:231

References ast_channel_name(), ast_debug, AST_LIST_LOCK, AST_LIST_UNLOCK, websocket_pvt::bulk_media_in_progress, websocket_pvt::bytes_read, websocket_pvt::channel, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, websocket_pvt::leftover_data, websocket_pvt::leftover_len, MIN, NULL, websocket_pvt::optimal_frame_size, queue_frame_from_buffer(), QUEUE_LENGTH_MAX, and SCOPED_LOCK.

Referenced by read_from_ws_and_queue().

◆ process_text_message()

static int process_text_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 411 of file chan_websocket.c.

413{
414 int res = 0;
415 char *command;
416
417 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
418 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
419 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
420 return 0;
421 }
422
423 /*
424 * This is safe because the payload buffer is always >= 8K
425 * even with LOW_MEMORY defined and we've already made sure the
426 * command is less than 128 bytes.
427 */
428 payload[payload_len] = '\0';
429 command = ast_strip(ast_strdupa(payload));
430
431 ast_debug(4, "%s: WebSocket %s command received\n",
432 ast_channel_name(instance->channel), command);
433
434 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
436
437 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
439
440 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
441 AST_LIST_LOCK(&instance->frame_queue);
442 instance->bulk_media_in_progress = 1;
443 AST_LIST_UNLOCK(&instance->frame_queue);
444
445 } else if (ast_begins_with(command, STOP_MEDIA_BUFFERING)) {
446 char *id;
447 char *option;
448 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
450
451 id = ast_strip(command + strlen(STOP_MEDIA_BUFFERING));
452
453 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
455 (int)instance->leftover_len);
456
457 instance->bulk_media_in_progress = 0;
458 if (instance->leftover_len > 0) {
459 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
460 if (res != 0) {
461 return res;
462 }
463 }
464 instance->leftover_len = 0;
465 res = ast_asprintf(&option, "%s%s%s", MEDIA_BUFFERING_COMPLETED,
466 S_COR(!ast_strlen_zero(id), " ", ""), S_OR(id, ""));
467 if (res <= 0 || !option) {
468 return res;
469 }
470 res = queue_option_frame(instance, option);
471 ast_free(option);
472
473 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
474 struct ast_frame *frame = NULL;
475 AST_LIST_LOCK(&instance->frame_queue);
476 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
477 ast_frfree(frame);
478 }
479 instance->frame_queue_length = 0;
480 instance->bulk_media_in_progress = 0;
481 instance->leftover_len = 0;
482 AST_LIST_UNLOCK(&instance->frame_queue);
483
484 } else if (ast_strings_equal(payload, REPORT_QUEUE_DRAINED)) {
485 AST_LIST_LOCK(&instance->frame_queue);
486 instance->report_queue_drained = 1;
487 AST_LIST_UNLOCK(&instance->frame_queue);
488
489 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
490 char *status = NULL;
491
492 res = ast_asprintf(&status, "%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
496 S_COR(instance->queue_full, "true", "false"),
497 S_COR(instance->bulk_media_in_progress, "true", "false"),
498 S_COR(instance->queue_paused, "true", "false")
499 );
500 if (res <= 0 || !status) {
502 res = -1;
503 } else {
504 ast_debug(4, "%s: WebSocket status: %s\n",
505 ast_channel_name(instance->channel), status);
508 }
509
510 } else if (ast_strings_equal(payload, PAUSE_MEDIA)) {
511 AST_LIST_LOCK(&instance->frame_queue);
512 instance->queue_paused = 1;
513 AST_LIST_UNLOCK(&instance->frame_queue);
514
515 } else if (ast_strings_equal(payload, CONTINUE_MEDIA)) {
516 AST_LIST_LOCK(&instance->frame_queue);
517 instance->queue_paused = 0;
518 AST_LIST_UNLOCK(&instance->frame_queue);
519
520 } else {
521 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
522 ast_channel_name(instance->channel), command);
523 }
524
525 return res;
526}
jack_status_t status
Definition: app_jack.c:149
enum queue_result id
Definition: app_queue.c:1808
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition: astmm.h:298
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define FLUSH_MEDIA
#define MEDIA_BUFFERING_COMPLETED
#define ANSWER_CHANNEL
#define HANGUP_CHANNEL
#define DRIVER_STATUS
#define GET_DRIVER_STATUS
#define START_MEDIA_BUFFERING
#define QUEUE_LENGTH_XOFF_LEVEL
#define PAUSE_MEDIA
#define REPORT_QUEUE_DRAINED
#define CONTINUE_MEDIA
#define STOP_MEDIA_BUFFERING
#define MAX_TEXT_MESSAGE_LEN
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
#define ast_frfree(fr)
@ AST_CONTROL_ANSWER
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition: strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
Definition: strings.h:87
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Definition: strings.h:223

References ANSWER_CHANNEL, ast_asprintf, ast_begins_with(), ast_channel_name(), AST_CONTROL_ANSWER, AST_CONTROL_HANGUP, ast_debug, ast_free, ast_frfree, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log, ast_queue_control(), ast_strdupa, ast_strings_equal(), ast_strip(), ast_strlen_zero(), ast_websocket_write_string(), websocket_pvt::bulk_media_in_progress, websocket_pvt::channel, CONTINUE_MEDIA, DRIVER_STATUS, FLUSH_MEDIA, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, GET_DRIVER_STATUS, HANGUP_CHANNEL, id, websocket_pvt::leftover_data, websocket_pvt::leftover_len, LOG_WARNING, MAX_TEXT_MESSAGE_LEN, MEDIA_BUFFERING_COMPLETED, NULL, PAUSE_MEDIA, queue_frame_from_buffer(), websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, QUEUE_LENGTH_XON_LEVEL, queue_option_frame(), websocket_pvt::queue_paused, websocket_pvt::report_queue_drained, REPORT_QUEUE_DRAINED, S_COR, S_OR, SCOPED_LOCK, START_MEDIA_BUFFERING, status, STOP_MEDIA_BUFFERING, and websocket_pvt::websocket.

Referenced by read_from_ws_and_queue().

◆ queue_frame_from_buffer()

static int queue_frame_from_buffer ( struct websocket_pvt instance,
char *  buffer,
size_t  len 
)
static

Definition at line 347 of file chan_websocket.c.

349{
350 struct ast_frame fr = { 0, };
351 struct ast_frame *duped_frame = NULL;
352
353 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
355 fr.subclass.format = instance->native_format;
356 fr.samples = instance->native_codec->samples_count(&fr);
357
358 duped_frame = ast_frisolate(&fr);
359 if (!duped_frame) {
360 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
361 ast_channel_name(instance->channel));
362 return -1;
363 }
364
365 {
366 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
368 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
369 instance->frame_queue_length++;
370 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
371 instance->queue_full = 1;
372 ast_debug(4, "%s: WebSocket sending %s\n",
375 }
376 }
377
378 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
379 duped_frame->datalen);
380
381 return 0;
382}
#define MEDIA_XOFF
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
@ AST_FRAME_VOICE
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:731
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
Definition: codec.h:68
struct ast_format * format
struct ast_format * native_format
struct ast_codec * native_codec

References ast_channel_name(), ast_debug, AST_FRAME_SET_BUFFER, AST_FRAME_VOICE, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, ast_websocket_write_string(), websocket_pvt::channel, ast_frame::datalen, ast_frame_subclass::format, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, len(), LOG_WARNING, MEDIA_XOFF, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, ast_frame::samples, ast_codec::samples_count, SCOPED_LOCK, ast_frame::subclass, and websocket_pvt::websocket.

Referenced by process_binary_message(), and process_text_message().

◆ queue_option_frame()

static int queue_option_frame ( struct websocket_pvt instance,
char *  buffer 
)
static

Definition at line 384 of file chan_websocket.c.

386{
387 struct ast_frame fr = { 0, };
388 struct ast_frame *duped_frame = NULL;
389
390 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
393
394 duped_frame = ast_frisolate(&fr);
395 if (!duped_frame) {
396 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
397 ast_channel_name(instance->channel));
398 return -1;
399 }
400
401 AST_LIST_LOCK(&instance->frame_queue);
402 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
403 AST_LIST_UNLOCK(&instance->frame_queue);
404
405 ast_debug(4, "%s: Queued '%s' option frame\n",
406 ast_channel_name(instance->channel), buffer);
407
408 return 0;
409}

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, AST_FRAME_SET_BUFFER, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, websocket_pvt::channel, websocket_pvt::frame_queue, ast_frame::frametype, ast_frame_subclass::integer, LOG_WARNING, NULL, and ast_frame::subclass.

Referenced by process_text_message().

◆ read_from_ws_and_queue()

static int read_from_ws_and_queue ( struct websocket_pvt instance)
static

Definition at line 648 of file chan_websocket.c.

649{
650 uint64_t payload_len = 0;
651 char *payload = NULL;
652 enum ast_websocket_opcode opcode;
653 int fragmented = 0;
654 int res = 0;
655
656 if (!instance || !instance->websocket) {
657 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
658 ast_channel_name(instance->channel));
659 return -1;
660 }
661
662 ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel));
663 res = ast_wait_for_input(
664 ast_websocket_fd(instance->websocket), -1);
665 if (res <= 0) {
666 ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n",
667 ast_channel_name(instance->channel), strerror(errno));
668 return -1;
669 }
670
671 /*
672 * We need to lock here to prevent the websocket handle from
673 * being pulled out from under us if the core sends us a
674 * hangup request.
675 */
676 ao2_lock(instance);
677 if (!instance->websocket) {
678 ao2_unlock(instance);
679 return -1;
680 }
681
682 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
683 &opcode, &fragmented);
684 ao2_unlock(instance);
685 if (res) {
686 return -1;
687 }
688 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
689 (int)payload_len);
690
691 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
692 return process_text_message(instance, payload, payload_len);
693 }
694
695 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
696 ast_debug(5, "%s: WebSocket closed by remote\n",
697 ast_channel_name(instance->channel));
698 return -1;
699 }
700
701 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
702 ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n",
703 ast_channel_name(instance->channel), (int)opcode);
704 return 0;
705 }
706
707 return process_binary_message(instance, payload, payload_len);
708}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
int ast_wait_for_input(int fd, int ms)
Definition: utils.c:1698

References ao2_lock, ao2_unlock, ast_channel_name(), ast_debug, ast_log, ast_wait_for_input(), ast_websocket_fd(), AST_WEBSOCKET_OPCODE_BINARY, AST_WEBSOCKET_OPCODE_CLOSE, AST_WEBSOCKET_OPCODE_TEXT, ast_websocket_read(), websocket_pvt::channel, errno, LOG_WARNING, NULL, process_binary_message(), process_text_message(), and websocket_pvt::websocket.

Referenced by read_thread_handler().

◆ read_thread_handler()

static void * read_thread_handler ( void *  obj)
static

Definition at line 720 of file chan_websocket.c.

721{
722 RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup);
723 RAII_VAR(char *, command, NULL, ast_free);
724 int res = 0;
725
726 ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel));
727
728 /*
729 * We need to tell the remote app what channel this media is for.
730 * This is especially important for outbound connections otherwise
731 * the app won't know who the media is for.
732 */
733 res = ast_asprintf(&command, "%s connection_id:%s channel:%s format:%s optimal_frame_size:%d", MEDIA_START,
734 instance->connection_id, ast_channel_name(instance->channel),
735 ast_format_get_name(instance->native_format),
736 instance->optimal_frame_size);
737 if (res <= 0 || !command) {
738 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
739 ast_log(LOG_ERROR, "%s: Failed to create MEDIA_START\n", ast_channel_name(instance->channel));
740 return NULL;
741 }
742 res = ast_websocket_write_string(instance->websocket, command);
743 if (res != 0) {
744 ast_log(LOG_ERROR, "%s: Failed to send MEDIA_START\n", ast_channel_name(instance->channel));
745 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
746 return NULL;
747 }
748 ast_debug(3, "%s: Sent %s\n", ast_channel_name(instance->channel),
749 command);
750
751 if (!instance->no_auto_answer) {
752 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
753 ast_queue_control(instance->channel, AST_CONTROL_ANSWER);
754 }
755
756 while (read_from_ws_and_queue(instance) == 0)
757 {
758 }
759
760 /*
761 * websocket_hangup will take care of closing the websocket if needed.
762 */
763 ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel));
764 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
765
766 return NULL;
767}
#define MEDIA_START
static int read_from_ws_and_queue(struct websocket_pvt *instance)
const char * ast_format_get_name(const struct ast_format *format)
Get the name associated with a format.
Definition: format.c:334

References ao2_cleanup, ast_asprintf, ast_channel_name(), AST_CONTROL_ANSWER, AST_CONTROL_HANGUP, ast_debug, ast_format_get_name(), ast_free, ast_log, ast_queue_control(), ast_websocket_write_string(), LOG_ERROR, MEDIA_START, NULL, RAII_VAR, and read_from_ws_and_queue().

Referenced by incoming_ws_established_cb(), and webchan_call().

◆ set_channel_format()

static void set_channel_format ( struct websocket_pvt instance,
struct ast_format fmt 
)
static

Definition at line 126 of file chan_websocket.c.

128{
132 ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt));
133 }
134}
struct ast_format * ast_channel_rawreadformat(struct ast_channel *chan)
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
enum ast_format_cmp_res ast_format_cmp(const struct ast_format *format1, const struct ast_format *format2)
Compare two formats.
Definition: format.c:201
@ AST_FORMAT_CMP_NOT_EQUAL
Definition: format.h:38

References ast_channel_rawreadformat(), ast_channel_set_rawreadformat(), ast_debug, ast_format_cmp(), AST_FORMAT_CMP_NOT_EQUAL, ast_format_get_name(), and websocket_pvt::channel.

Referenced by webchan_read().

◆ set_channel_timer()

static int set_channel_timer ( struct websocket_pvt instance)
static

Definition at line 1062 of file chan_websocket.c.

1063{
1064 int rate = 0;
1065 instance->timer = ast_timer_open();
1066 if (!instance->timer) {
1067 return -1;
1068 }
1069 /* Rate is the number of ticks per second, not the interval. */
1070 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1071 ast_debug(3, "%s: WebSocket timer rate %d\n",
1072 ast_channel_name(instance->channel), rate);
1073 ast_timer_set_rate(instance->timer, rate);
1074 /*
1075 * Calling ast_channel_set_fd will cause the channel thread to call
1076 * webchan_read at 'rate' times per second.
1077 */
1078 ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer));
1079
1080 return 0;
1081}
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
Definition: channel.c:2395
unsigned int ast_format_get_default_ms(const struct ast_format *format)
Get the default framing size (in milliseconds) for a format.
Definition: format.c:359
struct ast_timer * timer
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
Definition: timing.c:166
struct ast_timer * ast_timer_open(void)
Open a timer.
Definition: timing.c:122
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Definition: timing.c:161

References ast_channel_name(), ast_channel_set_fd(), ast_debug, ast_format_get_default_ms(), ast_timer_fd(), ast_timer_open(), ast_timer_set_rate(), websocket_pvt::channel, websocket_pvt::native_format, and websocket_pvt::timer.

Referenced by webchan_request().

◆ set_channel_variables()

static int set_channel_variables ( struct websocket_pvt instance)
static

Definition at line 1083 of file chan_websocket.c.

1084{
1085 char *pkt_size = NULL;
1086 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1087 if (res <= 0) {
1088 return -1;
1089 }
1090
1092 pkt_size);
1093 ast_free(pkt_size);
1095 instance->connection_id);
1096
1097 return 0;
1098}
#define MEDIA_WEBSOCKET_CONNECTION_ID
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.

References ast_asprintf, ast_free, websocket_pvt::channel, websocket_pvt::connection_id, MEDIA_WEBSOCKET_CONNECTION_ID, MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE, NULL, websocket_pvt::optimal_frame_size, and pbx_builtin_setvar_helper().

Referenced by webchan_request().

◆ set_instance_silence_frame()

static int set_instance_silence_frame ( struct websocket_pvt instance)
static

Definition at line 1039 of file chan_websocket.c.

1040{
1041 instance->silence.frametype = AST_FRAME_VOICE;
1042 instance->silence.datalen =
1043 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1044 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1045 /*
1046 * Even though we'll calloc the data pointer, we don't mark it as
1047 * mallocd because this frame will be around for a while and we don't
1048 * want it accidentally freed before we're done with it.
1049 */
1050 instance->silence.mallocd = 0;
1051 instance->silence.offset = 0;
1052 instance->silence.src = __PRETTY_FUNCTION__;
1053 instance->silence.subclass.format = instance->slin_format;
1054 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1055 if (!instance->silence.data.ptr) {
1056 return -1;
1057 }
1058
1059 return 0;
1060}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
unsigned int minimum_bytes
Length in bytes of the data payload of a minimum_ms frame.
Definition: codec.h:60
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
Definition: codec.h:58
unsigned int minimum_ms
Minimum length of media that can be carried (in milliseconds) in a frame.
Definition: codec.h:54
const char * src
struct ast_codec * slin_codec
struct ast_format * slin_format
struct ast_frame silence

References ast_calloc, AST_FRAME_VOICE, ast_frame::data, ast_frame::datalen, ast_codec::default_ms, ast_frame_subclass::format, ast_frame::frametype, ast_frame::mallocd, ast_codec::minimum_bytes, ast_codec::minimum_ms, ast_frame::offset, ast_frame::ptr, ast_frame::samples, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, ast_frame::src, and ast_frame::subclass.

Referenced by webchan_request().

◆ set_instance_translator()

static int set_instance_translator ( struct websocket_pvt instance)
static

Definition at line 1008 of file chan_websocket.c.

1009{
1011 instance->slin_format = ao2_bump(instance->native_format);
1012 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1013 return 0;
1014 }
1015
1017 if (!instance->slin_format) {
1018 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1019 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1020 return -1;
1021 }
1022 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1026
1027 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1028 if (!instance->translator) {
1029 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1031 ast_format_get_name(instance->slin_format));
1032 return -1;
1033 }
1034
1035 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1036 return 0;
1037}
struct ast_codec * ast_format_get_codec(const struct ast_format *format)
Get the codec associated with a format.
Definition: format.c:324
unsigned int ast_format_get_sample_rate(const struct ast_format *format)
Get the sample rate of a media format.
Definition: format.c:379
int ast_format_cache_is_slinear(struct ast_format *format)
Determines if a format is one of the cached slin formats.
Definition: format_cache.c:534
struct ast_format * ast_format_cache_get_slin_by_rate(unsigned int rate)
Retrieve the best signed linear format given a sample rate.
Definition: format_cache.c:512
unsigned int sample_rate
Sample rate (number of samples carried in a second)
Definition: codec.h:52
struct ast_trans_pvt * translator
struct ast_trans_pvt * ast_translator_build_path(struct ast_format *dest, struct ast_format *source)
Builds a translator path Build a path (possibly NULL) from source to dest.
Definition: translate.c:486

References ao2_bump, ast_channel_name(), ast_debug, ast_format_cache_get_slin_by_rate(), ast_format_cache_is_slinear(), ast_format_get_codec(), ast_format_get_default_ms(), ast_format_get_name(), ast_format_get_sample_rate(), ast_log, ast_translator_build_path(), websocket_pvt::channel, LOG_ERROR, websocket_pvt::native_codec, websocket_pvt::native_format, ast_codec::sample_rate, websocket_pvt::slin_codec, websocket_pvt::slin_format, and websocket_pvt::translator.

Referenced by webchan_request().

◆ unload_module()

static int unload_module ( void  )
static

Function called when our module is unloaded.

Definition at line 1446 of file chan_websocket.c.

1447{
1451
1455
1457 instances = NULL;
1458
1459 return 0;
1460}
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
Definition: channel.c:569
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
Definition: http.c:721

References ao2_cleanup, ast_channel_unregister(), ast_http_uri_unlink(), ast_ws_server, ast_channel_tech::capabilities, http_uri, instances, NULL, and websocket_tech.

Referenced by load_module().

◆ webchan_call()

static int webchan_call ( struct ast_channel ast,
const char *  dest,
int  timeout 
)
static

Definition at line 800 of file chan_websocket.c.

802{
803 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
804 int nodelay = 1;
806
807 if (!instance) {
808 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
809 ast_channel_name(ast));
810 return -1;
811 }
812
813 if (instance->type == AST_WS_TYPE_SERVER) {
814 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
815 return 0;
816 }
817 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
818
819 if (!instance->client) {
820 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
821 ast_channel_name(ast));
822 return -1;
823 }
824
825 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
826 ast_channel_name(ast), dest, instance->connection_id);
827
828 instance->websocket = ast_websocket_client_connect(instance->client,
829 instance, ast_channel_name(ast), &result);
830 if (!instance->websocket || result != WS_OK) {
831 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
833 return -1;
834 }
835
836 if (setsockopt(ast_websocket_fd(instance->websocket),
837 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
838 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
839 }
840
841 ast_debug(3, "%s: WebSocket connection to %s established\n",
842 ast_channel_name(ast), dest);
843
844 /* read_thread_handler() will clean up the bump */
846 read_thread_handler, ao2_bump(instance))) {
847 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast));
848 ao2_cleanup(instance);
849 return -1;
850 }
851
852 return 0;
853}
static PGresult * result
Definition: cel_pgsql.c:84
void * ast_channel_tech_pvt(const struct ast_channel *chan)
const char * ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
@ AST_WS_TYPE_SERVER
enum ast_websocket_type type
pthread_t outbound_read_thread
struct ast_websocket_client * client
#define ast_pthread_create_detached_background(a, b, c, d)
Definition: utils.h:603
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.

References ao2_bump, ao2_cleanup, ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_log, ast_pthread_create_detached_background, ast_websocket_client_connect(), ast_websocket_fd(), ast_websocket_result_to_str(), AST_WS_TYPE_SERVER, websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, errno, LOG_WARNING, NULL, websocket_pvt::outbound_read_thread, read_thread_handler(), result, websocket_pvt::type, websocket_pvt::websocket, and WS_OK.

◆ webchan_hangup()

static int webchan_hangup ( struct ast_channel ast)
static

Definition at line 1253 of file chan_websocket.c.

1254{
1255 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1256
1257 if (!instance) {
1258 return -1;
1259 }
1260 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1261 ast_channel_name(ast), instance->connection_id);
1262
1263 /*
1264 * We need to lock because read_from_ws_and_queue() is probably waiting
1265 * on the websocket file descriptor and will unblock and immediately try to
1266 * check the websocket and read from it. We don't want to pull the
1267 * websocket out from under it between the check and read.
1268 */
1269 ao2_lock(instance);
1270 if (instance->websocket) {
1271 ast_websocket_close(instance->websocket, 1000);
1272 ast_websocket_unref(instance->websocket);
1273 instance->websocket = NULL;
1274 }
1276 ao2_unlock(instance);
1277
1278 /* Clean up the reference from adding the instance to the channel */
1279 ao2_cleanup(instance);
1280
1281 return 0;
1282}
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)

References ao2_cleanup, ao2_lock, ao2_unlock, ast_channel_name(), ast_channel_tech_pvt(), ast_channel_tech_pvt_set(), ast_debug, ast_websocket_close(), ast_websocket_unref(), websocket_pvt::connection_id, NULL, and websocket_pvt::websocket.

◆ webchan_read()

static struct ast_frame * webchan_read ( struct ast_channel ast)
static

Definition at line 247 of file chan_websocket.c.

248{
249 struct websocket_pvt *instance = NULL;
250 struct ast_frame *native_frame = NULL;
251 struct ast_frame *slin_frame = NULL;
252
253 instance = ast_channel_tech_pvt(ast);
254 if (!instance) {
255 return NULL;
256 }
257
259 ast_timer_ack(instance->timer, 1);
260 }
261
262 native_frame = dequeue_frame(instance);
263
264 /*
265 * No frame when the timer fires means we have to create and
266 * return a silence frame in its place.
267 */
268 if (!native_frame) {
269 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
270 set_channel_format(instance, instance->slin_format);
271 slin_frame = ast_frdup(&instance->silence);
272 return slin_frame;
273 }
274
275 /*
276 * If the frame length is already optimal_frame_size, we can just
277 * return it.
278 */
279 if (native_frame->datalen == instance->optimal_frame_size) {
280 set_channel_format(instance, instance->native_format);
281 return native_frame;
282 }
283
284 /*
285 * If we're here, we have a short frame that we need to pad
286 * with silence.
287 */
288
289 if (instance->translator) {
290 slin_frame = ast_translate(instance->translator, native_frame, 0);
291 if (!slin_frame) {
292 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
293 ast_channel_name(ast), native_frame->datalen);
294 return NULL;
295 }
296 ast_frame_free(native_frame, 0);
297 } else {
298 /*
299 * If there was no translator then the native format
300 * was already slin.
301 */
302 slin_frame = native_frame;
303 }
304
305 set_channel_format(instance, instance->slin_format);
306
307 /*
308 * So now we have an slin frame but it's probably still short
309 * so we create a new data buffer with the correct length
310 * which is filled with zeros courtesy of ast_calloc.
311 * We then copy the short frame data into the new buffer
312 * and set the offset to AST_FRIENDLY_OFFSET so that
313 * the core can read the data without any issues.
314 * If the original frame data was mallocd, we need to free the old
315 * data buffer so we don't leak memory and we need to set
316 * mallocd to AST_MALLOCD_DATA so that the core knows
317 * it needs to free the new data buffer when it's done.
318 */
319
320 if (slin_frame->datalen != instance->silence.datalen) {
321 char *old_data = slin_frame->data.ptr;
322 int old_len = slin_frame->datalen;
323 int old_offset = slin_frame->offset;
324 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
325 ast_channel_name(ast), instance->silence.datalen,
326 slin_frame->datalen);
327
328 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
329 if (!slin_frame->data.ptr) {
330 ast_frame_free(slin_frame, 0);
331 return NULL;
332 }
333 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
334 slin_frame->offset = AST_FRIENDLY_OFFSET;
335 memcpy(slin_frame->data.ptr, old_data, old_len);
336 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
337 ast_free(old_data - old_offset);
338 }
339 slin_frame->mallocd |= AST_MALLOCD_DATA;
340 slin_frame->datalen = instance->silence.datalen;
341 slin_frame->samples = instance->silence.samples;
342 }
343
344 return slin_frame;
345}
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static void set_channel_format(struct websocket_pvt *instance, struct ast_format *fmt)
#define ast_frdup(fr)
Copies a frame.
#define AST_MALLOCD_DATA
#define AST_FRIENDLY_OFFSET
Offset into a frame's data buffer.
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
Definition: timing.c:171
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
Definition: timing.c:186
@ AST_TIMING_EVENT_EXPIRED
Definition: timing.h:58
struct ast_frame * ast_translate(struct ast_trans_pvt *tr, struct ast_frame *f, int consume)
translates one or more frames Apply an input frame into the translator and receive zero or one output...
Definition: translate.c:566

References ast_calloc, ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_frame_free(), ast_frdup, ast_free, AST_FRIENDLY_OFFSET, ast_log, AST_MALLOCD_DATA, ast_timer_ack(), ast_timer_get_event(), AST_TIMING_EVENT_EXPIRED, ast_translate(), ast_frame::data, ast_frame::datalen, dequeue_frame(), LOG_WARNING, ast_frame::mallocd, websocket_pvt::native_format, NULL, ast_frame::offset, websocket_pvt::optimal_frame_size, ast_frame::ptr, ast_frame::samples, set_channel_format(), websocket_pvt::silence, websocket_pvt::slin_format, websocket_pvt::timer, and websocket_pvt::translator.

◆ webchan_request()

static struct ast_channel * webchan_request ( const char *  type,
struct ast_format_cap cap,
const struct ast_assigned_ids assignedids,
const struct ast_channel requestor,
const char *  data,
int *  cause 
)
static

Definition at line 1116 of file chan_websocket.c.

1119{
1120 char *parse;
1121 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1122 struct ast_channel *chan = NULL;
1123 struct ast_format *fmt = NULL;
1124 struct ast_format_cap *caps = NULL;
1126 AST_APP_ARG(connection_id);
1128 );
1129 struct ast_flags opts = { 0, };
1130 char *opt_args[OPT_ARG_ARRAY_SIZE];
1131 const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel";
1132
1133 ast_debug(3, "%s: WebSocket channel requested\n",
1134 requestor_name);
1135
1136 if (ast_strlen_zero(data)) {
1137 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1138 requestor_name);
1139 goto failure;
1140 }
1141 parse = ast_strdupa(data);
1142 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1143
1144 if (ast_strlen_zero(args.connection_id)) {
1145 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1146 requestor_name);
1147 goto failure;
1148 }
1149
1150 if (!ast_strlen_zero(args.options)
1151 && ast_app_parse_options(websocket_options, &opts, opt_args,
1152 ast_strdupa(args.options))) {
1153 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1154 requestor_name, args.options);
1155 goto failure;
1156 }
1157
1158 if (ast_test_flag(&opts, OPT_WS_CODEC)
1159 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1160 ast_debug(3, "%s: Using specified format %s\n",
1161 requestor_name, opt_args[OPT_ARG_WS_CODEC]);
1162 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1163 } else {
1164 /*
1165 * If codec wasn't specified in the dial string,
1166 * use the first format in the capabilities.
1167 */
1168 ast_debug(3, "%s: Using format %s from requesting channel\n",
1169 requestor_name, opt_args[OPT_ARG_WS_CODEC]);
1170 fmt = ast_format_cap_get_format(cap, 0);
1171 }
1172
1173 if (!fmt) {
1174 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1175 requestor_name, args.connection_id);
1176 goto failure;
1177 }
1178
1179 instance = websocket_new(requestor_name, args.connection_id, fmt);
1180 if (!instance) {
1181 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1182 requestor_name);
1183 goto failure;
1184 }
1185
1186 instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER);
1187
1188 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1189 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1190 if (!chan) {
1191 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1192 goto failure;
1193 }
1194
1195 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1196 ast_channel_name(chan), requestor_name,
1197 instance->connection_id);
1198
1199 instance->channel = ao2_bump(chan);
1200 ast_channel_tech_set(instance->channel, &websocket_tech);
1201
1202 if (set_instance_translator(instance) != 0) {
1203 goto failure;
1204 }
1205
1206 if (set_instance_silence_frame(instance) != 0) {
1207 goto failure;
1208 }
1209
1210 if (set_channel_timer(instance) != 0) {
1211 goto failure;
1212 }
1213
1214 if (set_channel_variables(instance) != 0) {
1215 goto failure;
1216 }
1217
1219 if (!caps) {
1220 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1221 goto failure;
1222 }
1223
1224 ast_format_cap_append(caps, instance->native_format, 0);
1225 ast_channel_nativeformats_set(instance->channel, caps);
1226 ast_channel_set_writeformat(instance->channel, instance->native_format);
1227 ast_channel_set_rawwriteformat(instance->channel, instance->native_format);
1228 ast_channel_set_readformat(instance->channel, instance->native_format);
1229 ast_channel_set_rawreadformat(instance->channel, instance->native_format);
1230 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1231 ast_channel_unlock(chan);
1232 ao2_cleanup(caps);
1233
1234 ast_debug(3, "%s: WebSocket channel created to %s\n",
1235 ast_channel_name(chan), args.connection_id);
1236
1237 return chan;
1238
1239failure:
1240 if (chan) {
1241 ast_channel_unlock(chan);
1242 }
1243 *cause = AST_CAUSE_FAILURE;
1244 return NULL;
1245}
#define AST_CAUSE_FAILURE
Definition: causes.h:150
static int set_instance_translator(struct websocket_pvt *instance)
static int set_channel_variables(struct websocket_pvt *instance)
static int set_channel_timer(struct websocket_pvt *instance)
static const struct ast_app_option websocket_options[128]
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
static int set_instance_silence_frame(struct websocket_pvt *instance)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition: channel.h:1299
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
Definition: channel.h:2973
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
@ AST_STATE_DOWN
Definition: channelstate.h:36
#define ast_format_cache_get(name)
Retrieve a named format from the cache.
Definition: format_cache.h:278
struct ast_format * ast_format_cap_get_format(const struct ast_format_cap *cap, int position)
Get the format at a specific index.
Definition: format_cap.c:400
#define ast_format_cap_append(cap, format, framing)
Add format capability to capabilities structure.
Definition: format_cap.h:99
#define AST_APP_ARG(name)
Define an application argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
Definition: main/app.c:3066
Main Channel structure associated with a channel.
Structure used to handle boolean flags.
Definition: utils.h:199
Format capabilities structure, holds formats + preference order + etc.
Definition: format_cap.c:54
Definition of a media format.
Definition: format.c:43
const char * args
static struct test_options options
#define ast_test_flag(p, flag)
Definition: utils.h:63

References ao2_bump, ao2_cleanup, args, AST_APP_ARG, ast_app_parse_options(), AST_CAUSE_FAILURE, ast_channel_alloc, ast_channel_name(), ast_channel_nativeformats_set(), ast_channel_set_rawreadformat(), ast_channel_set_rawwriteformat(), ast_channel_set_readformat(), ast_channel_set_writeformat(), ast_channel_tech_pvt_set(), ast_channel_tech_set(), ast_channel_unlock, ast_debug, AST_DECLARE_APP_ARGS, ast_format_cache_get, ast_format_cap_alloc, ast_format_cap_append, AST_FORMAT_CAP_FLAG_DEFAULT, ast_format_cap_get_format(), ast_log, AST_NONSTANDARD_APP_ARGS, AST_STATE_DOWN, ast_strdupa, ast_strlen_zero(), ast_test_flag, LOG_ERROR, LOG_WARNING, NULL, OPT_ARG_ARRAY_SIZE, OPT_ARG_WS_CODEC, OPT_WS_CODEC, OPT_WS_NO_AUTO_ANSWER, options, RAII_VAR, set_channel_timer(), set_channel_variables(), set_instance_silence_frame(), set_instance_translator(), websocket_new(), websocket_options, and websocket_tech.

◆ webchan_write()

static int webchan_write ( struct ast_channel ast,
struct ast_frame f 
)
static

Function called when we should write a frame to the channel.

Definition at line 770 of file chan_websocket.c.

771{
772 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
773
774 if (!instance || !instance->websocket) {
775 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
776 ast_channel_name(ast));
777 return -1;
778 }
779
780 if (f->frametype != AST_FRAME_VOICE) {
781 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
782 ast_channel_name(ast));
783 return -1;
784 }
785 if (f->subclass.format != instance->native_format) {
786 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format\n",
788 return -1;
789 }
790
792 (char *)f->data.ptr, (uint64_t)f->datalen);
793}
int ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.

References ast_channel_name(), ast_channel_tech_pvt(), ast_format_get_name(), AST_FRAME_VOICE, ast_log, AST_WEBSOCKET_OPCODE_BINARY, ast_websocket_write(), ast_frame::data, ast_frame::datalen, ast_frame_subclass::format, ast_frame::frametype, LOG_WARNING, websocket_pvt::native_format, ast_frame::ptr, ast_frame::subclass, and websocket_pvt::websocket.

◆ websocket_destructor()

static void websocket_destructor ( void *  data)
static

Definition at line 855 of file chan_websocket.c.

856{
857 struct websocket_pvt *instance = data;
858 struct ast_frame *frame = NULL;
859 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
860
861 AST_LIST_LOCK(&instance->frame_queue);
862 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
863 ast_frfree(frame);
864 }
865 AST_LIST_UNLOCK(&instance->frame_queue);
866
867 if (instance->timer) {
868 ast_timer_close(instance->timer);
869 instance->timer = NULL;
870 }
871
872 if (instance->channel) {
873 ast_channel_unref(instance->channel);
874 instance->channel = NULL;
875 }
876 if (instance->websocket) {
878 instance->websocket = NULL;
879 }
880
881 ao2_cleanup(instance->client);
882 instance->client = NULL;
883
884 ao2_cleanup(instance->native_codec);
885 instance->native_codec = NULL;
886
887 ao2_cleanup(instance->native_format);
888 instance->native_format = NULL;
889
890 ao2_cleanup(instance->slin_codec);
891 instance->slin_codec = NULL;
892
893 ao2_cleanup(instance->slin_format);
894 instance->slin_format = NULL;
895
896 if (instance->silence.data.ptr) {
897 ast_free(instance->silence.data.ptr);
898 instance->silence.data.ptr = NULL;
899 }
900
901 if (instance->translator) {
903 instance->translator = NULL;
904 }
905
906 if (instance->leftover_data) {
907 ast_free(instance->leftover_data);
908 instance->leftover_data = NULL;
909 }
910}
#define ast_channel_unref(c)
Decrease channel reference count.
Definition: channel.h:3008
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
Definition: timing.c:154
void ast_translator_free_path(struct ast_trans_pvt *tr)
Frees a translator path Frees the given translator path structure.
Definition: translate.c:476

References ao2_cleanup, ast_channel_unref, ast_debug, ast_free, ast_frfree, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_timer_close(), ast_translator_free_path(), ast_websocket_unref(), websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::leftover_data, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, ast_frame::ptr, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, websocket_pvt::timer, websocket_pvt::translator, and websocket_pvt::websocket.

Referenced by websocket_new().

◆ websocket_new()

static struct websocket_pvt * websocket_new ( const char *  chan_name,
const char *  connection_id,
struct ast_format fmt 
)
static

Definition at line 925 of file chan_websocket.c.

927{
928 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
929 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
931 enum ast_websocket_type ws_type;
932
934
935 if (ast_strings_equal(connection_id, INCOMING_CONNECTION_ID)) {
936 connection_id = ast_uuid_generate_str(uuid, sizeof(uuid));
937 ws_type = AST_WS_TYPE_SERVER;
938 } else {
939 ws_type = AST_WS_TYPE_CLIENT;
940 }
941
942 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
943 if (!proxy) {
944 return NULL;
945 }
946 strcpy(proxy->connection_id, connection_id); /* Safe */
947
948 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
950 if (!instance) {
951 return NULL;
952 }
953 strcpy(instance->connection_id, connection_id); /* Safe */
954
955 instance->type = ws_type;
956 if (ws_type == AST_WS_TYPE_CLIENT) {
957 instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id);
958 if (!instance->client) {
959 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
960 chan_name, instance->connection_id);
961 return NULL;
962 }
963 }
964
965 AST_LIST_HEAD_INIT(&instance->frame_queue);
966
967 /*
968 * We need the codec to calculate the number of samples in a frame
969 * so we'll get it once and store it in the instance.
970 *
971 * References for native_format and native_codec are now held by the
972 * instance and will be released when the instance is destroyed.
973 */
974 instance->native_format = fmt;
975 instance->native_codec = ast_format_get_codec(instance->native_format);
976 /*
977 * References for native_format and native_codec are now held by the
978 * instance and will be released when the instance is destroyed.
979 */
980 instance->optimal_frame_size =
981 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
982 / instance->native_codec->minimum_ms;
983
984 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
985 if (!instance->leftover_data) {
986 return NULL;
987 }
988
989 /* We have exclusive access to proxy and sorcery, no need for locking here. */
990 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
991 return NULL;
992 }
993
995 return NULL;
996 }
997
998 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
999 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1000 proxy->connection_id);
1001 return NULL;
1002 }
1003 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1004
1005 return ao2_bump(instance);
1006}
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
Definition: astobj2.h:579
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
Definition: astobj2.h:550
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static void instance_proxy_cb(void *weakproxy, void *data)
static void websocket_destructor(void *data)
#define INCOMING_CONNECTION_ID
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
Definition: func_uuid.c:52
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_CLIENT
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
Definition: linkedlists.h:626
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
Definition: lock.h:621
#define AST_UUID_STR_LEN
Definition: uuid.h:27
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
Definition: uuid.c:141
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.

References ao2_alloc, ao2_bump, ao2_cleanup, ao2_link_flags, ao2_weakproxy_alloc, ao2_weakproxy_set_object, ao2_weakproxy_subscribe(), ast_calloc, ast_debug, ast_format_get_codec(), AST_LIST_HEAD_INIT, ast_log, ast_strings_equal(), ast_uuid_generate_str(), AST_UUID_STR_LEN, ast_websocket_client_retrieve_by_id(), AST_WS_TYPE_CLIENT, AST_WS_TYPE_SERVER, websocket_pvt::connection_id, INCOMING_CONNECTION_ID, instance_proxy_cb(), instances, LOG_ERROR, NULL, OBJ_NOLOCK, RAII_VAR, SCOPED_AO2WRLOCK, uuid(), and websocket_destructor().

Referenced by webchan_request().

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
static

Definition at line 1518 of file chan_websocket.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 1518 of file chan_websocket.c.

◆ ast_ws_server

struct ast_websocket_server* ast_ws_server
static

Definition at line 52 of file chan_websocket.c.

Referenced by incoming_ws_http_callback(), load_module(), and unload_module().

◆ http_uri

struct ast_http_uri http_uri
static

Definition at line 1435 of file chan_websocket.c.

Referenced by load_module(), and unload_module().

◆ instances

struct ao2_container* instances = NULL
static

◆ websocket_options

const struct ast_app_option websocket_options[128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, }
static

Definition at line 1114 of file chan_websocket.c.

Referenced by webchan_request().

◆ websocket_tech

struct ast_channel_tech websocket_tech
static

Definition at line 116 of file chan_websocket.c.

Referenced by load_module(), unload_module(), and webchan_request().