83#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
84#define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID"
85#define INCOMING_CONNECTION_ID "INCOMING"
87#define ANSWER_CHANNEL "ANSWER"
88#define HANGUP_CHANNEL "HANGUP"
89#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
90#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
91#define FLUSH_MEDIA "FLUSH_MEDIA"
92#define GET_DRIVER_STATUS "GET_STATUS"
93#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
94#define PAUSE_MEDIA "PAUSE_MEDIA"
95#define CONTINUE_MEDIA "CONTINUE_MEDIA"
97#define MEDIA_START "MEDIA_START"
98#define MEDIA_XON "MEDIA_XON"
99#define MEDIA_XOFF "MEDIA_XOFF"
100#define QUEUE_DRAINED "QUEUE_DRAINED"
101#define DRIVER_STATUS "STATUS"
102#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED"
104#define QUEUE_LENGTH_MAX 1000
105#define QUEUE_LENGTH_XOFF_LEVEL 900
106#define QUEUE_LENGTH_XON_LEVEL 800
107#define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
118 .description =
"Media over WebSocket Channel Driver",
165 ast_debug(4,
"%s: WebSocket sending MEDIA_XON\n",
181 ast_debug(4,
"%s: WebSocket sending QUEUE_DRAINED\n",
211 ast_debug(4,
"%s: WebSocket sending %s\n",
302 slin_frame = native_frame;
321 char *old_data = slin_frame->
data.
ptr;
322 int old_len = slin_frame->
datalen;
323 int old_offset = slin_frame->
offset;
324 ast_debug(4,
"%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
335 memcpy(slin_frame->
data.
ptr, old_data, old_len);
348 char *buffer,
size_t len)
372 ast_debug(4,
"%s: WebSocket sending %s\n",
405 ast_debug(4,
"%s: Queued '%s' option frame\n",
412 char *payload, uint64_t payload_len)
418 ast_log(
LOG_WARNING,
"%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
428 payload[payload_len] =
'\0';
431 ast_debug(4,
"%s: WebSocket %s command received\n",
453 ast_debug(4,
"%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
467 if (res <= 0 || !option) {
492 res =
ast_asprintf(&
status,
"%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
500 if (res <= 0 || !
status) {
504 ast_debug(4,
"%s: WebSocket status: %s\n",
529 char *payload, uint64_t payload_len)
531 char *next_frame_ptr =
NULL;
532 size_t bytes_read = 0;
534 size_t bytes_left = 0;
540 ast_debug(4,
"%s: WebSocket queue is full. Ignoring incoming binary message.\n",
546 next_frame_ptr = payload;
561 size_t bytes_avail_to_copy =
MIN(bytes_needed_for_frame, payload_len);
567 memcpy(append_ptr, payload, bytes_avail_to_copy);
573 if (bytes_avail_to_copy < bytes_needed_for_frame) {
574 ast_debug(4,
"%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
590 payload_len -= bytes_avail_to_copy;
591 next_frame_ptr = payload + bytes_avail_to_copy;
593 ast_debug(5,
"%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
597 (
int)(payload_len + bytes_avail_to_copy),
601 (
int)(next_frame_ptr - payload),
603 (
int)bytes_avail_to_copy
614 bytes_left = payload_len;
630 ast_debug(5,
"%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
638 (
int)(next_frame_ptr - payload),
650 uint64_t payload_len = 0;
651 char *payload =
NULL;
683 &opcode, &fragmented);
696 ast_debug(5,
"%s: WebSocket closed by remote\n",
702 ast_debug(5,
"%s: WebSocket frame type %d not supported. Ignoring.\n",
736 instance->optimal_frame_size);
737 if (res <= 0 || !command) {
751 if (!instance->no_auto_answer) {
781 ast_log(
LOG_WARNING,
"%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
825 ast_debug(3,
"%s: WebSocket call requested to %s. cid: %s\n",
837 IPPROTO_TCP, TCP_NODELAY, (
char *) &nodelay,
sizeof(nodelay)) < 0) {
841 ast_debug(3,
"%s: WebSocket connection to %s established\n",
867 if (instance->
timer) {
955 instance->type = ws_type;
958 if (!instance->client) {
960 chan_name, instance->connection_id);
974 instance->native_format = fmt;
980 instance->optimal_frame_size =
981 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
982 / instance->native_codec->minimum_ms;
984 instance->leftover_data =
ast_calloc(1, instance->optimal_frame_size);
985 if (!instance->leftover_data) {
1000 proxy->connection_id);
1003 ast_debug(3,
"%s: WebSocket instance created and linked\n", proxy->connection_id);
1022 ast_debug(3,
"%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1029 ast_log(
LOG_ERROR,
"%s: Unable to build translator path from '%s' to '%s'\n",
1066 if (!instance->
timer) {
1071 ast_debug(3,
"%s: WebSocket timer rate %d\n",
1085 char *pkt_size =
NULL;
1131 const char *requestor_name = requestor ?
ast_channel_name(requestor) :
"no channel";
1133 ast_debug(3,
"%s: WebSocket channel requested\n",
1137 ast_log(
LOG_ERROR,
"%s: A connection id is required for the 'WebSocket' channel\n",
1145 ast_log(
LOG_ERROR,
"%s: connection_id is required for the 'WebSocket' channel\n",
1154 requestor_name,
args.options);
1160 ast_debug(3,
"%s: Using specified format %s\n",
1168 ast_debug(3,
"%s: Using format %s from requesting channel\n",
1175 requestor_name,
args.connection_id);
1189 requestor, 0,
"WebSocket/%s/%p",
args.connection_id, instance);
1195 ast_debug(3,
"%s: WebSocket channel %s allocated for connection %s\n",
1197 instance->connection_id);
1199 instance->channel =
ao2_bump(chan);
1234 ast_debug(3,
"%s: WebSocket channel created to %s\n",
1260 ast_debug(3,
"%s: WebSocket call hangup. cid: %s\n",
1298 const char *connection_id =
NULL;
1302 ast_debug(3,
"WebSocket established\n");
1304 for (v = upgrade_headers; v; v = v->
next) {
1307 for (v = get_params; v; v = v->
next) {
1340 IPPROTO_TCP, TCP_NODELAY, (
char *) &nodelay,
sizeof(nodelay)) < 0) {
1376 int destroy_get_params = (get_params ==
NULL);
1380 ast_debug(2,
"URI: %s Starting\n", uri);
1394 ast_http_error(ser, 404,
"Not found",
"WebSocket instance not found");
1401 if (instance->websocket) {
1404 ast_http_error(ser, 409,
"Conflict",
"Another websocket connection exists for this connection id");
1415 for (v = get_params; v; v = v->
next) {
1425 get_params, headers);
1426 if (destroy_get_params) {
1437 .description =
"Media over Websocket",
1485 instance_proxy_sort_fn, instance_proxy_cmp_fn);
1488 "Failed to allocate the chan_websocket instance registry\n");
1517 .
requires =
"res_http_websocket,res_websocket_client",
Asterisk main include file. File version handling, generic pbx functions.
#define ast_strdupa(s)
duplicate a string in memory from the stack
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
#define ast_calloc(num, len)
A wrapper for calloc()
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
@ AO2_ALLOC_OPT_LOCK_RWLOCK
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
#define ao2_unlink(container, obj)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Internal Asterisk hangup causes.
#define AST_CAUSE_FAILURE
static int set_instance_translator(struct websocket_pvt *instance)
#define MEDIA_BUFFERING_COMPLETED
#define QUEUE_LENGTH_XON_LEVEL
static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
static void instance_proxy_cb(void *weakproxy, void *data)
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
Function called when we should write a frame to the channel.
static int webchan_hangup(struct ast_channel *ast)
static struct ast_channel_tech websocket_tech
static struct ast_frame * webchan_read(struct ast_channel *ast)
#define MEDIA_WEBSOCKET_CONNECTION_ID
static int read_from_ws_and_queue(struct websocket_pvt *instance)
static struct ast_http_uri http_uri
#define GET_DRIVER_STATUS
static int set_channel_variables(struct websocket_pvt *instance)
static void * read_thread_handler(void *obj)
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static void websocket_destructor(void *data)
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
#define START_MEDIA_BUFFERING
static int set_channel_timer(struct websocket_pvt *instance)
#define QUEUE_LENGTH_XOFF_LEVEL
static const struct ast_app_option websocket_options[128]
static struct ao2_container * instances
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
static void set_channel_format(struct websocket_pvt *instance, struct ast_format *fmt)
#define INCOMING_CONNECTION_ID
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
#define REPORT_QUEUE_DRAINED
#define STOP_MEDIA_BUFFERING
static int load_module(void)
Function called when our module is loaded.
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static int set_instance_silence_frame(struct websocket_pvt *instance)
static struct ast_websocket_server * ast_ws_server
@ OPT_ARG_WS_NO_AUTO_ANSWER
static int unload_module(void)
Function called when our module is unloaded.
#define MAX_TEXT_MESSAGE_LEN
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
static struct ast_channel * webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
General Asterisk PBX channel definitions.
const char * ast_channel_name(const struct ast_channel *chan)
void * ast_channel_tech_pvt(const struct ast_channel *chan)
struct ast_format * ast_channel_rawreadformat(struct ast_channel *chan)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
#define ast_channel_unref(c)
Decrease channel reference count.
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
ast_http_method
HTTP Request methods known by Asterisk.
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
struct ast_websocket_protocol * ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
const char * ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
ast_websocket_result
Result code for a websocket client.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
int ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
int ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_type
WebSocket connection/configuration types.
struct ast_websocket_server * ast_websocket_server_create(void)
Creates a ast_websocket_server.
int ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
Application convenience functions, designed to give consistent look and feel to Asterisk apps.
#define AST_APP_ARG(name)
Define an application argument.
#define AST_APP_OPTIONS(holder, options...)
Declares an array of options for an application.
#define AST_APP_OPTION_ARG(option, flagno, argno)
Declares an application option that accepts an argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define AST_APP_OPTION(option, flagno)
Declares an application option that does not accept an argument.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Asterisk internal frame definitions.
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
#define ast_frdup(fr)
Copies a frame.
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
#define AST_FRIENDLY_OFFSET
Offset into a frame's data buffer.
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
#define AST_LIST_LOCK(head)
Locks a list.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
#define AST_LIST_HEAD(name, type)
Defines a structure to be used to hold a list of specified type.
Asterisk locking-related definitions:
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Asterisk module definitions.
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
@ AST_MODPRI_CHANNEL_DRIVER
@ AST_MODULE_SUPPORT_CORE
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Core PBX routines and definitions.
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
static force_inline int attribute_pure ast_strlen_zero(const char *s)
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Structure to pass both assignedid values to channel drivers.
Structure to describe a channel "technology", ie a channel driver See for examples:
struct ast_format_cap * capabilities
Main Channel structure associated with a channel.
Represents a media codec within Asterisk.
unsigned int sample_rate
Sample rate (number of samples carried in a second)
unsigned int minimum_bytes
Length in bytes of the data payload of a minimum_ms frame.
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
unsigned int minimum_ms
Minimum length of media that can be carried (in milliseconds) in a frame.
Structure used to handle boolean flags.
struct ast_format * format
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@231 data
Definition of a URI handler.
ast_http_callback callback
describes a server instance
Default structure for translators, with the basic fields and buffers, all allocated as part of the sa...
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
Structure for a WebSocket server.
Structure definition for session.
char connection_id[0]
The name of the module owning this sorcery instance.
struct ast_format * native_format
struct ast_channel * channel
struct ast_codec * slin_codec
struct websocket_pvt::@139 frame_queue
int bulk_media_in_progress
enum ast_websocket_type type
struct ast_format * slin_format
pthread_t outbound_read_thread
struct ast_codec * native_codec
struct ast_websocket_client * client
struct ast_websocket * websocket
struct ast_trans_pvt * translator
Timing source management.
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
struct ast_timer * ast_timer_open(void)
Open a timer.
@ AST_TIMING_EVENT_EXPIRED
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Support for translation of data formats. translate.c.
struct ast_frame * ast_translate(struct ast_trans_pvt *tr, struct ast_frame *f, int consume)
translates one or more frames Apply an input frame into the translator and receive zero or one output...
void ast_translator_free_path(struct ast_trans_pvt *tr)
Frees a translator path Frees the given translator path structure.
struct ast_trans_pvt * ast_translator_build_path(struct ast_format *dest, struct ast_format *source)
Builds a translator path Build a path (possibly NULL) from source to dest.
#define ast_test_flag(p, flag)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
int ast_wait_for_input(int fd, int ms)
#define ast_pthread_create_detached_background(a, b, c, d)
Universally unique identifier support.
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.