Asterisk - The Open Source Telephony Project GIT-master-754dea3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
Macros | Functions | Variables
ari_websockets.c File Reference

WebSocket support for RESTful API's. More...

#include "asterisk.h"
#include "resource_events.h"
#include "ari_websockets.h"
#include "internal.h"
#include "asterisk/app.h"
#include "asterisk/ari.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/module.h"
#include "asterisk/stasis_app.h"
Include dependency graph for ari_websockets.c:

Go to the source code of this file.

Macros

#define APPS_NUM_BUCKETS   7
 
#define ARI_WS_SESSION_NUM_BUCKETS   23
 
#define MAX_VALS   128
 
#define MESSAGES_INIT_SIZE   23
 
#define VALIDATION_FAILED
 

Functions

 AO2_STRING_FIELD_CMP_FN (ari_ws_session, session_id)
 
 AO2_STRING_FIELD_HASH_FN (ari_ws_session, session_id)
 
void ari_handle_websocket (struct ast_tcptls_session_instance *ser, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
 Wrapper for invoking the websocket code for an incoming connection. More...
 
int ari_websocket_load_module (void)
 
void ari_websocket_send_event (struct ari_ws_session *ari_ws_session, const char *app_name, struct ast_json *message, int debug_app)
 Callback handler for Stasis application messages. More...
 
int ari_websocket_unload_module (void)
 
static void ari_ws_session_cleanup (struct ari_ws_session *ari_ws_session)
 
static int ari_ws_session_create (int(*validator)(struct ast_json *), struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
 
static void ari_ws_session_dtor (void *obj)
 
static struct ast_jsonari_ws_session_read (struct ari_ws_session *ari_ws_session)
 
static void ari_ws_session_registry_dtor (void)
 
static void ari_ws_session_reset (struct ari_ws_session *ari_ws_session)
 
static int ari_ws_session_shutdown_cb (void *ari_ws_session, void *arg, int flags)
 
static int ari_ws_session_update (struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session)
 
static int ari_ws_session_write (struct ari_ws_session *ari_ws_session, struct ast_json *message)
 
static int null_validator (struct ast_json *json)
 Validator that always succeeds. More...
 
static int parse_app_args (struct ast_variable *get_params, struct ast_ari_response *response, struct ast_ari_events_event_websocket_args *args)
 
static void stasis_app_message_handler (void *data, const char *app_name, struct ast_json *message)
 
static int websocket_attempted_cb (struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
 
static void websocket_established_cb (struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
 

Variables

static struct ao2_containerari_ws_session_registry
 Local registry for created event_session objects. More...
 
struct ast_websocket_serverast_ws_server
 

Detailed Description

WebSocket support for RESTful API's.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file ari_websockets.c.

Macro Definition Documentation

◆ APPS_NUM_BUCKETS

#define APPS_NUM_BUCKETS   7

Number of buckets for a websocket apps container. Remember to keep it a prime number!

Definition at line 45 of file ari_websockets.c.

◆ ARI_WS_SESSION_NUM_BUCKETS

#define ARI_WS_SESSION_NUM_BUCKETS   23

Number of buckets for the event session registry. Remember to keep it a prime number!

Definition at line 42 of file ari_websockets.c.

◆ MAX_VALS

#define MAX_VALS   128

Definition at line 56 of file ari_websockets.c.

◆ MESSAGES_INIT_SIZE

#define MESSAGES_INIT_SIZE   23

Initial size of a message queue.

Definition at line 48 of file ari_websockets.c.

◆ VALIDATION_FAILED

#define VALIDATION_FAILED
Value:
"{" \
" \"error\": \"InvalidMessage\"," \
" \"message\": \"Message validation failed\"" \
"}"

Definition at line 66 of file ari_websockets.c.

Function Documentation

◆ AO2_STRING_FIELD_CMP_FN()

AO2_STRING_FIELD_CMP_FN ( ari_ws_session  ,
session_id   
)

◆ AO2_STRING_FIELD_HASH_FN()

AO2_STRING_FIELD_HASH_FN ( ari_ws_session  ,
session_id   
)

◆ ari_handle_websocket()

void ari_handle_websocket ( struct ast_tcptls_session_instance ser,
const char *  uri,
enum ast_http_method  method,
struct ast_variable get_params,
struct ast_variable headers 
)

Wrapper for invoking the websocket code for an incoming connection.

Parameters
ws_serverWebSocket server to invoke.
serHTTP session.
uriRequested URI.
methodRequested HTTP method.
get_paramsParsed query parameters.
headersParsed HTTP headers.

Definition at line 221 of file ari_websockets.c.

225{
226 struct ast_http_uri fake_urih = {
228 };
229
230 ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
231 headers);
232}
struct ast_websocket_server * ast_ws_server
int ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
const char * method
Definition: res_pjsip.c:1279
Definition of a URI handler.
Definition: http.h:102
const char * uri
Definition: http.h:105
void * data
Definition: http.h:116

References ast_websocket_uri_cb(), ast_ws_server, ast_http_uri::data, method, and ast_http_uri::uri.

Referenced by ast_ari_invoke().

◆ ari_websocket_load_module()

int ari_websocket_load_module ( void  )

Definition at line 722 of file ari_websockets.c.

723{
724 int res = 0;
725 struct ast_websocket_protocol *protocol;
726
728 ARI_WS_SESSION_NUM_BUCKETS, ari_ws_session_hash_fn,
729 NULL, ari_ws_session_cmp_fn);
732 "Failed to allocate the local registry for websocket applications\n");
734 }
735
737 if (!ast_ws_server) {
740 }
741
742 protocol = ast_websocket_sub_protocol_alloc("ari");
743 if (!protocol) {
748 }
752
754}
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
static struct ao2_container * ari_ws_session_registry
Local registry for created event_session objects.
static void websocket_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
static void ari_ws_session_registry_dtor(void)
#define ARI_WS_SESSION_NUM_BUCKETS
#define ast_log
Definition: astobj2.c:42
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
struct ast_websocket_protocol * ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
struct ast_websocket_server * ast_websocket_server_create(void)
Creates a ast_websocket_server.
#define LOG_WARNING
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
#define NULL
Definition: resample.c:96
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
ast_websocket_pre_callback session_attempted
Callback called when a new session is attempted. Optional.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_ref, ARI_WS_SESSION_NUM_BUCKETS, ari_ws_session_registry, ari_ws_session_registry_dtor(), ast_log, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_websocket_server_add_protocol2(), ast_websocket_server_create(), ast_websocket_sub_protocol_alloc(), ast_ws_server, LOG_WARNING, NULL, ast_websocket_protocol::session_attempted, ast_websocket_protocol::session_established, websocket_attempted_cb(), and websocket_established_cb().

Referenced by load_module().

◆ ari_websocket_send_event()

void ari_websocket_send_event ( struct ari_ws_session ari_ws_session,
const char *  app_name,
struct ast_json message,
int  debug_app 
)

Callback handler for Stasis application messages.

Definition at line 244 of file ari_websockets.c.

246{
247 char *remote_addr = ast_sockaddr_stringify(
249 const char *msg_type, *msg_application, *msg_timestamp, *msg_ast_id;
250 SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
251
253
255
256 msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
257 msg_application = S_OR(
258 ast_json_string_get(ast_json_object_get(message, "application")), "");
259
260 /* If we've been replaced, remove the application from our local
261 websocket_apps container */
262 if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
263 strcmp(msg_application, app_name) == 0) {
264 ao2_find(ari_ws_session->websocket_apps, msg_application,
266 }
267
274 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
275 remote_addr, msg_type, msg_application);
276 }
277 }
278
279 msg_ast_id = S_OR(
280 ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
281 if (ast_strlen_zero(msg_ast_id)) {
282 char eid[20];
283
284 if (ast_json_object_set(message, "asterisk_id",
288 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
289 remote_addr, msg_type, msg_application);
290 }
291 }
292
293 /* Now, we need to determine our state to see how we will handle the message */
297 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
298 remote_addr, msg_type, msg_application);
299 }
300
301 if (!ari_ws_session) {
302 /* If the websocket is NULL, the message goes to the queue */
305 }
307 "%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
308 remote_addr,
309 msg_type,
310 msg_application);
312
313 if (TRACE_ATLEAST(4) || debug_app) {
315
316 ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
317 remote_addr,
318 str);
320 }
321
323 }
324
326 SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
327 remote_addr, msg_type, app_name);
328}
const char * str
Definition: app_jack.c:150
static int ari_ws_session_write(struct ari_ws_session *ari_ws_session, struct ast_json *message)
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_UNLINK
Definition: astobj2.h:1039
void ast_verbose(const char *fmt,...)
Definition: extconf.c:2206
#define TRACE_ATLEAST(level)
#define SCOPE_ENTER(level,...)
#define SCOPE_EXIT(...)
#define SCOPE_EXIT_LOG_RTN(__log_level,...)
struct ast_sockaddr * ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_free(void *p)
Asterisk's custom JSON allocator. Exposed for use by unit tests.
Definition: json.c:52
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
@ AST_JSON_PRETTY
Definition: json.h:795
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition: json.c:484
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition: netsock2.h:256
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
static struct timeval msg_timestamp(void *msg, enum smdi_message_type type)
Definition: res_smdi.c:366
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
struct ari_ws_session::@424 message_queue
struct ao2_container * websocket_apps
struct ast_websocket * ast_ws_session
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define ast_assert(a)
Definition: utils.h:739
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: utils.c:2839
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References ao2_find, ao2_lock, ao2_unlock, app_name(), ari_ws_session_write(), ast_assert, ast_eid_default, ast_eid_to_str(), ast_json_dump_string_format(), ast_json_free(), ast_json_object_get(), ast_json_object_set(), AST_JSON_PRETTY, ast_json_ref(), ast_json_string_create(), ast_json_string_get(), ast_json_timeval(), ast_log, ast_sockaddr_stringify(), ast_strlen_zero(), ast_tvnow(), AST_VECTOR_APPEND, ast_verbose(), ast_websocket_remote_address(), ari_ws_session::ast_ws_session, LOG_WARNING, ari_ws_session::message_queue, msg_timestamp(), NULL, OBJ_NODATA, OBJ_UNLINK, S_OR, SCOPE_ENTER, SCOPE_EXIT, SCOPE_EXIT_LOG_RTN, stasis_app_event_allowed(), str, TRACE_ATLEAST, and ari_ws_session::websocket_apps.

Referenced by ari_ws_session_read(), send_rest_response(), and stasis_app_message_handler().

◆ ari_websocket_unload_module()

int ari_websocket_unload_module ( void  )

Definition at line 711 of file ari_websockets.c.

712{
716 return 0;
717}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934

References ao2_cleanup, ari_ws_session_registry_dtor(), ast_ws_server, and NULL.

Referenced by unload_module().

◆ ari_ws_session_cleanup()

static void ari_ws_session_cleanup ( struct ari_ws_session ari_ws_session)
static

Definition at line 447 of file ari_websockets.c.

448{
449 if (!ari_ws_session) {
450 return;
451 }
452
456 }
458}
static void ari_ws_session_reset(struct ari_ws_session *ari_ws_session)
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578

References ao2_ref, ao2_unlink, ari_ws_session_registry, and ari_ws_session_reset().

Referenced by ari_ws_session_shutdown_cb(), and websocket_established_cb().

◆ ari_ws_session_create()

static int ari_ws_session_create ( int(*)(struct ast_json *)  validator,
struct ast_tcptls_session_instance ser,
struct ast_ari_events_event_websocket_args args,
const char *  session_id 
)
static

Definition at line 477 of file ari_websockets.c.

482{
484 int (* register_handler)(const char *, stasis_app_cb handler, void *data);
485 size_t size, i;
486
487 if (validator == NULL) {
488 validator = null_validator;
489 }
490
491 size = sizeof(*ari_ws_session) + strlen(session_id) + 1;
492
494 if (!ari_ws_session) {
495 return -1;
496 }
497
498 ari_ws_session->app_name = ast_strdup(args->app_parse);
499 if (!ari_ws_session->app_name) {
500 ast_http_error(ser, 500, "Internal Server Error",
501 "Allocation failed");
502 return -1;
503 }
504
505 strcpy(ari_ws_session->session_id, session_id); /* Safe */
506
507 /* Instantiate the hash table for Stasis apps */
511 ast_http_error(ser, 500, "Internal Server Error",
512 "Allocation failed");
513 return -1;
514 }
515
516 /* Instantiate the message queue */
518 ast_http_error(ser, 500, "Internal Server Error",
519 "Allocation failed");
521 return -1;
522 }
523
524 /* Register the apps with Stasis */
525 if (args->subscribe_all) {
526 register_handler = &stasis_app_register_all;
527 } else {
528 register_handler = &stasis_app_register;
529 }
530
531 for (i = 0; i < args->app_count; ++i) {
532 const char *app = args->app[i];
533
534 if (ast_strlen_zero(app)) {
535 ast_http_error(ser, 400, "Bad Request",
536 "Invalid application provided in param [app].");
538 return -1;
539 }
540
542 ast_http_error(ser, 500, "Internal Server Error",
543 "Allocation failed");
545 return -1;
546 }
547
548 if (register_handler(app, stasis_app_message_handler, ari_ws_session)) {
549 ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
550 ast_http_error(ser, 500, "Internal Server Error",
551 "Stasis registration failed");
553 return -1;
554 }
555 }
556
557 ari_ws_session->validator = validator;
558
559 /*
560 * Add the event session to the session registry.
561 * When this functions returns, the registry will have
562 * the only reference to the session.
563 */
565 ast_http_error(ser, 500, "Internal Server Error",
566 "Allocation failed");
568 return -1;
569 }
570
571 return 0;
572}
static const char app[]
Definition: app_adsiprog.c:56
static int null_validator(struct ast_json *json)
Validator that always succeeds.
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
#define MESSAGES_INIT_SIZE
#define APPS_NUM_BUCKETS
static void ari_ws_session_dtor(void *obj)
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition: http.c:664
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
Definition: res_stasis.c:1838
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
Definition: res_stasis.c:1843
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
int(* validator)(struct ast_json *)
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
const char * args
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113

References ao2_alloc, ao2_cleanup, ao2_link, app, ari_ws_session::app_name, APPS_NUM_BUCKETS, args, ari_ws_session_dtor(), ari_ws_session_registry, ari_ws_session_reset(), ast_http_error(), ast_log, ast_str_container_add(), ast_str_container_alloc, ast_strdup, ast_strlen_zero(), AST_VECTOR_INIT, handler(), LOG_WARNING, ari_ws_session::message_queue, MESSAGES_INIT_SIZE, NULL, null_validator(), RAII_VAR, ari_ws_session::session_id, stasis_app_message_handler(), stasis_app_register(), stasis_app_register_all(), ari_ws_session::validator, and ari_ws_session::websocket_apps.

Referenced by websocket_attempted_cb().

◆ ari_ws_session_dtor()

static void ari_ws_session_dtor ( void *  obj)
static

Definition at line 465 of file ari_websockets.c.

466{
467 struct ari_ws_session *ari_ws_session = obj;
468
471 return;
472 }
475}
#define ast_free(a)
Definition: astmm.h:180
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.

References ari_ws_session::app_name, ast_free, ast_websocket_unref(), ari_ws_session::ast_ws_session, and NULL.

Referenced by ari_ws_session_create().

◆ ari_ws_session_read()

static struct ast_json * ari_ws_session_read ( struct ari_ws_session ari_ws_session)
static

Definition at line 153 of file ari_websockets.c.

155{
157
159 return NULL;
160 }
161
162 while (!message) {
163 int res;
164 char *payload;
165 uint64_t payload_len;
166 enum ast_websocket_opcode opcode;
167 int fragmented;
168
169 res = ast_wait_for_input(
171
172 if (res <= 0) {
173 ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
174 strerror(errno));
175 return NULL;
176 }
177
179 &payload_len, &opcode, &fragmented);
180
181 if (res != 0) {
182 ast_log(LOG_WARNING, "WebSocket read error: %s\n",
183 strerror(errno));
184 return NULL;
185 }
186
187 switch (opcode) {
189 ast_debug(1, "WebSocket closed\n");
190 return NULL;
192 message = ast_json_load_buf(payload, payload_len, NULL);
193 if (message == NULL) {
194 struct ast_json *error = ast_json_pack(
195 "{s:s, s:s, s:s, s:i, s:s, s:s }",
196 "type", "RESTResponse",
197 "transaction_id", "",
198 "request_id", "",
199 "status_code", 400,
200 "reason_phrase", "Failed to parse request message JSON",
201 "uri", ""
202 );
204 error, 0);
207 "WebSocket input failed to parse\n");
208
209 }
210
211 break;
212 default:
213 /* Ignore all other message types */
214 break;
215 }
216 }
217
218 return ast_json_ref(message);
219}
void ari_websocket_send_event(struct ari_ws_session *ari_ws_session, const char *app_name, struct ast_json *message, int debug_app)
Callback handler for Stasis application messages.
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
#define ast_debug(level,...)
Log a DEBUG message.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition: json.c:585
int errno
Abstract JSON element (object, array, string, int, ...).
int error(const char *format,...)
Definition: utils/frame.c:999
int ast_wait_for_input(int fd, int ms)
Definition: utils.c:1698

References ari_ws_session::app_name, ari_websocket_send_event(), ast_debug, ast_json_load_buf(), ast_json_pack(), ast_json_ref(), ast_json_unref(), ast_log, ast_wait_for_input(), ast_websocket_fd(), AST_WEBSOCKET_OPCODE_CLOSE, AST_WEBSOCKET_OPCODE_TEXT, ast_websocket_read(), ari_ws_session::ast_ws_session, errno, error(), LOG_WARNING, NULL, and RAII_VAR.

Referenced by websocket_established_cb().

◆ ari_ws_session_registry_dtor()

static void ari_ws_session_registry_dtor ( void  )
static

Definition at line 698 of file ari_websockets.c.

699{
701 return;
702 }
703
706
709}
static int ari_ws_session_shutdown_cb(void *ari_ws_session, void *arg, int flags)
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
@ OBJ_MULTIPLE
Definition: astobj2.h:1049

References ao2_callback, ao2_cleanup, ari_ws_session_registry, ari_ws_session_shutdown_cb(), NULL, OBJ_MULTIPLE, and OBJ_NODATA.

Referenced by ari_websocket_load_module(), and ari_websocket_unload_module().

◆ ari_ws_session_reset()

static void ari_ws_session_reset ( struct ari_ws_session ari_ws_session)
static

Definition at line 414 of file ari_websockets.c.

415{
416 struct ao2_iterator i;
417 char *app;
418 int j;
420
421 /* Clean up the websocket_apps container */
424 while ((app = ao2_iterator_next(&i))) {
427 }
431 }
432
433 /* Clean up the message_queue container */
434 for (j = 0; j < AST_VECTOR_SIZE(&ari_ws_session->message_queue); j++) {
436 ast_json_unref(msg);
437 }
439}
ast_mutex_t lock
Definition: app_sla.c:337
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:608
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application and unsubscribe from all event sources.
Definition: res_stasis.c:1848
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ao2_cleanup, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, app, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, lock, ari_ws_session::message_queue, NULL, SCOPED_AO2LOCK, stasis_app_unregister(), and ari_ws_session::websocket_apps.

Referenced by ari_ws_session_cleanup(), and ari_ws_session_create().

◆ ari_ws_session_shutdown_cb()

static int ari_ws_session_shutdown_cb ( void *  ari_ws_session,
void *  arg,
int  flags 
)
static

Definition at line 691 of file ari_websockets.c.

692{
694
695 return 0;
696}
static void ari_ws_session_cleanup(struct ari_ws_session *ari_ws_session)

References ari_ws_session_cleanup().

Referenced by ari_ws_session_registry_dtor().

◆ ari_ws_session_update()

static int ari_ws_session_update ( struct ari_ws_session ari_ws_session,
struct ast_websocket ast_ws_session 
)
static

Definition at line 111 of file ari_websockets.c.

114{
116 int i;
117
118 if (ast_ws_session == NULL) {
119 return -1;
120 }
121
122 if (config == NULL || config->general == NULL) {
123 return -1;
124 }
125
126 if (ast_websocket_set_nonblock(ast_ws_session) != 0) {
128 "ARI web socket failed to set nonblock; closing: %s\n",
129 strerror(errno));
130 return -1;
131 }
132
133 if (ast_websocket_set_timeout(ast_ws_session, config->general->write_timeout)) {
134 ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
135 config->general->write_timeout);
136 }
137
138 ao2_ref(ast_ws_session, +1);
139 ari_ws_session->ast_ws_session = ast_ws_session;
141 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
144 ast_json_unref(msg);
145 }
146
149
150 return 0;
151}
static const char config[]
Definition: chan_ooh323.c:111
int ast_websocket_set_timeout(struct ast_websocket *session, int timeout)
Set the timeout on a non-blocking WebSocket session.
int ast_websocket_set_nonblock(struct ast_websocket *session)
Set the socket of a WebSocket session to be non-blocking.
#define LOG_ERROR
struct ast_ari_conf * ast_ari_config_get(void)
Get the current ARI configuration.
All configuration options for ARI.
Definition: internal.h:54
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:625
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlock, ari_ws_session_write(), ast_ari_config_get(), ast_json_unref(), ast_log, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_RESET, AST_VECTOR_SIZE, ast_websocket_set_nonblock(), ast_websocket_set_timeout(), ari_ws_session::ast_ws_session, config, errno, LOG_ERROR, LOG_WARNING, ari_ws_session::message_queue, NULL, and RAII_VAR.

Referenced by websocket_established_cb().

◆ ari_ws_session_write()

static int ari_ws_session_write ( struct ari_ws_session ari_ws_session,
struct ast_json message 
)
static

Definition at line 72 of file ari_websockets.c.

75{
76 RAII_VAR(char *, str, NULL, ast_json_free);
77
78#ifdef AST_DEVMODE
80 ast_log(LOG_ERROR, "Outgoing message failed validation\n");
82 }
83#endif
84
86
87 if (str == NULL) {
88 ast_log(LOG_ERROR, "Failed to encode JSON object\n");
89 return -1;
90 }
91
93 ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
95 return -1;
96 }
97 return 0;
98}
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition: res_ari.c:1015
#define VALIDATION_FAILED
int ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
#define LOG_NOTICE

References ast_ari_json_format(), ast_json_dump_string_format(), ast_json_free(), ast_log, ast_sockaddr_stringify(), ast_websocket_remote_address(), ast_websocket_write_string(), ari_ws_session::ast_ws_session, LOG_ERROR, LOG_NOTICE, NULL, RAII_VAR, str, VALIDATION_FAILED, and ari_ws_session::validator.

Referenced by ari_websocket_send_event(), and ari_ws_session_update().

◆ null_validator()

static int null_validator ( struct ast_json json)
static

Validator that always succeeds.

Definition at line 61 of file ari_websockets.c.

62{
63 return 1;
64}

Referenced by ari_ws_session_create().

◆ parse_app_args()

static int parse_app_args ( struct ast_variable get_params,
struct ast_ari_response response,
struct ast_ari_events_event_websocket_args args 
)
static

Definition at line 339 of file ari_websockets.c.

342{
343 struct ast_variable *i;
344 RAII_VAR(char *, app_parse, NULL, ast_free);
345
346 for (i = get_params; i; i = i->next) {
347 if (strcmp(i->name, "app") == 0) {
348 /* Parse comma separated list */
349 char *vals[MAX_VALS];
350 size_t j;
351
352 app_parse = ast_strdup(i->value);
353 if (!app_parse) {
355 return -1;
356 }
357
358 if (strlen(app_parse) == 0) {
359 /* ast_app_separate_args can't handle "" */
360 args->app_count = 1;
361 vals[0] = app_parse;
362 } else {
363 args->app_count = ast_app_separate_args(
364 app_parse, ',', vals,
365 ARRAY_LEN(vals));
366 }
367
368 if (args->app_count == 0) {
370 return -1;
371 }
372
373 if (args->app_count >= MAX_VALS) {
374 ast_ari_response_error(response, 400,
375 "Bad Request",
376 "Too many values for app");
377 return -1;
378 }
379
380 args->app = ast_malloc(sizeof(*args->app) * args->app_count);
381 if (!args->app) {
383 return -1;
384 }
385
386 for (j = 0; j < args->app_count; ++j) {
387 args->app[j] = (vals[j]);
388 }
389 } else if (strcmp(i->name, "subscribeAll") == 0) {
390 args->subscribe_all = ast_true(i->value);
391 }
392 }
393
394 args->app_parse = app_parse;
395 app_parse = NULL;
396
397 return 0;
398}
void ast_ari_response_error(struct ast_ari_response *response, int response_code, const char *response_text, const char *message_fmt,...)
Fill in an error ast_ari_response.
Definition: res_ari.c:319
void ast_ari_response_alloc_failed(struct ast_ari_response *response)
Fill in response with a 500 message for allocation failures.
Definition: res_ari.c:358
#define MAX_VALS
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_app_separate_args(a, b, c, d)
int attribute_pure ast_true(const char *val)
Make sure something is true. Determine if a string containing a boolean value is "true"....
Definition: utils.c:2199
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
#define ARRAY_LEN(a)
Definition: utils.h:666

References args, ARRAY_LEN, ast_app_separate_args, ast_ari_response_alloc_failed(), ast_ari_response_error(), ast_free, ast_malloc, ast_strdup, ast_true(), MAX_VALS, ast_variable::name, ast_variable::next, NULL, RAII_VAR, and ast_variable::value.

Referenced by websocket_attempted_cb().

◆ stasis_app_message_handler()

static void stasis_app_message_handler ( void *  data,
const char *  app_name,
struct ast_json message 
)
static

Definition at line 330 of file ari_websockets.c.

332{
333 int debug_app = stasis_app_get_debug_by_name(app_name);
334 struct ari_ws_session *ari_ws_session = data;
337}
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.

References app_name(), ari_websocket_send_event(), ast_assert, NULL, and stasis_app_get_debug_by_name().

Referenced by ari_ws_session_create().

◆ websocket_attempted_cb()

static int websocket_attempted_cb ( struct ast_tcptls_session_instance ser,
struct ast_variable get_params,
struct ast_variable headers,
const char *  session_id 
)
static

Definition at line 579 of file ari_websockets.c.

582{
584 int res = 0;
585 RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
586 char *remote_addr = ast_sockaddr_stringify(&ser->remote_address);
587
588 response = ast_calloc(1, sizeof(*response));
589 if (!response) {
590 ast_log(LOG_ERROR, "Failed to create response.\n");
591 ast_http_error(ser, 500, "Server Error", "Memory allocation error");
592 return -1;
593 }
594
595 res = parse_app_args(get_params, response, &args);
596 if (res != 0) {
597 /* Param parsing failure */
598 RAII_VAR(char *, msg, NULL, ast_json_free);
599 if (response->message) {
600 msg = ast_json_dump_string(response->message);
601 } else {
602 ast_log(LOG_ERROR, "Missing response message\n");
603 }
604
605 if (msg) {
606 ast_http_error(ser, response->response_code, response->response_text, msg);
607 return -1;
608 }
609 }
610
611 if (args.app_count == 0) {
612 ast_http_error(ser, 400, "Bad Request",
613 "HTTP request is missing param: [app]");
614 return -1;
615 }
616
617#if defined(AST_DEVMODE)
619 ser, &args, session_id);
620#else
621 res = ari_ws_session_create(NULL, ser, &args, session_id);
622#endif
623 if (res != 0) {
625 "%s: Failed to create ARI ari_session\n", remote_addr);
626 }
627
628 ast_free(args.app_parse);
629 ast_free(args.app);
630 return res;
631}
ari_validator ast_ari_validate_message_fn(void)
Function pointer to ast_ari_validate_message().
static int ari_ws_session_create(int(*validator)(struct ast_json *), struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
static int parse_app_args(struct ast_variable *get_params, struct ast_ari_response *response, struct ast_ari_events_event_websocket_args *args)
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_json_dump_string(root)
Encode a JSON value to a compact string.
Definition: json.h:810
struct ast_sockaddr remote_address
Definition: tcptls.h:153

References args, ari_ws_session_create(), ast_ari_validate_message_fn(), ast_calloc, ast_free, ast_http_error(), ast_json_dump_string, ast_json_free(), ast_log, ast_sockaddr_stringify(), LOG_ERROR, NULL, parse_app_args(), RAII_VAR, and ast_tcptls_session_instance::remote_address.

Referenced by ari_websocket_load_module().

◆ websocket_established_cb()

static void websocket_established_cb ( struct ast_websocket ast_ws_session,
struct ast_variable get_params,
struct ast_variable upgrade_headers 
)
static

Definition at line 638 of file ari_websockets.c.

640{
641 RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
642 /*
643 * ast_ws_session is passed in with it's refcount bumped so
644 * we need to unref it when we're done. The refcount will
645 * be bumped again when we add it to the ari_ws_session.
646 */
647 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
649 struct ast_json *msg;
650 struct ast_variable *v;
651 char *remote_addr = ast_sockaddr_stringify(
652 ast_websocket_remote_address(ast_ws_session));
653 const char *session_id = ast_websocket_session_id(ast_ws_session);
654
655 SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
656
657 if (TRACE_ATLEAST(2)) {
658 ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
659 for (v = upgrade_headers; v; v = v->next) {
660 ast_trace(3, "--> %s: %s\n", v->name, v->value);
661 }
662 }
663
664 response = ast_calloc(1, sizeof(*response));
665 if (!response) {
667 "%s: Failed to create response\n", remote_addr);
668 }
669
670 /* Find the event_session and update its websocket */
672 if (ari_ws_session) {
674 ari_ws_session_update(ari_ws_session, ast_ws_session);
675 } else {
677 "%s: Failed to locate an event session for the websocket session\n",
678 remote_addr);
679 }
680
681 ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
682 while ((msg = ari_ws_session_read(ari_ws_session))) {
684 upgrade_headers, ari_ws_session->app_name, msg);
685 ast_json_unref(msg);
686 }
687
688 SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
689}
int ari_websocket_process_request(struct ari_ws_session *ari_ws_session, const char *remote_addr, struct ast_variable *upgrade_headers, const char *app_name, struct ast_json *request_msg)
static int ari_ws_session_update(struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session)
static struct ast_json * ari_ws_session_read(struct ari_ws_session *ari_ws_session)
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ast_trace(level,...)
const char * ast_websocket_session_id(struct ast_websocket *session)
Get the session ID for a WebSocket session.
Structure definition for session.

References ao2_find, ao2_unlink, ari_ws_session::app_name, ari_websocket_process_request(), ari_ws_session_cleanup(), ari_ws_session_read(), ari_ws_session_registry, ari_ws_session_update(), ast_calloc, ast_free, ast_json_unref(), ast_sockaddr_stringify(), ast_trace, ast_websocket_remote_address(), ast_websocket_session_id(), ast_websocket_unref(), LOG_ERROR, ast_variable::name, ast_variable::next, NULL, OBJ_SEARCH_KEY, RAII_VAR, SCOPE_ENTER, SCOPE_EXIT, SCOPE_EXIT_LOG_RTN, TRACE_ATLEAST, and ast_variable::value.

Referenced by ari_websocket_load_module().

Variable Documentation

◆ ari_ws_session_registry

struct ao2_container* ari_ws_session_registry
static

Local registry for created event_session objects.

Definition at line 52 of file ari_websockets.c.

Referenced by ari_websocket_load_module(), ari_ws_session_cleanup(), ari_ws_session_create(), ari_ws_session_registry_dtor(), and websocket_established_cb().

◆ ast_ws_server

struct ast_websocket_server* ast_ws_server