Asterisk - The Open Source Telephony Project GIT-master-a358458
Data Structures | Macros | Enumerations | Functions | Variables
resource_events.c File Reference

/api-docs/events.{format} implementation- WebSocket resource More...

#include "asterisk.h"
#include "resource_events.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/stasis_app.h"
#include "asterisk/vector.h"
Include dependency graph for resource_events.c:

Go to the source code of this file.

Data Structures

struct  event_session
 A wrapper for the /ref ast_ari_websocket_session. More...
 

Macros

#define APPS_NUM_BUCKETS   7
 
#define EVENT_SESSION_NUM_BUCKETS   23
 
#define MESSAGES_INIT_SIZE   23
 

Enumerations

enum  event_session_error_type { ERROR_TYPE_STASIS_REGISTRATION = 1 , ERROR_TYPE_OOM = 2 , ERROR_TYPE_MISSING_APP_PARAM = 3 , ERROR_TYPE_INVALID_APP_PARAM = 4 }
 event_session error types. More...
 

Functions

void ast_ari_events_user_event (struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response)
 Generate a user event. More...
 
int ast_ari_websocket_events_event_websocket_attempted (struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args, const char *session_id)
 WebSocket connection for events. More...
 
void ast_ari_websocket_events_event_websocket_dtor (void)
 WebSocket connection for events. More...
 
void ast_ari_websocket_events_event_websocket_established (struct ast_ari_websocket_session *ws_session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args)
 WebSocket connection for events. More...
 
int ast_ari_websocket_events_event_websocket_init (void)
 WebSocket connection for events. More...
 
static int event_session_alloc (struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
 Creates an event_session object and registers its apps with Stasis. More...
 
static int event_session_allocation_error_handler (struct event_session *session, enum event_session_error_type error, struct ast_tcptls_session_instance *ser)
 Handles event_session error processing. More...
 
static void event_session_cleanup (struct event_session *session)
 Processes cleanup actions for a event_session object. More...
 
static int event_session_compare (void *obj, void *arg, int flags)
 AO2 comparison function for event_session objects. More...
 
static void event_session_dtor (void *obj)
 Event session object destructor (event_session). More...
 
static int event_session_hash (const void *obj, const int flags)
 AO2 hash function for event_session objects. More...
 
static void event_session_shutdown (struct event_session *session)
 Explicitly shutdown a session. More...
 
static int event_session_shutdown_cb (void *session, void *arg, int flags)
 
static void event_session_update_websocket (struct event_session *session, struct ast_ari_websocket_session *ws_session)
 Updates the websocket session for an event_session. More...
 
static void stasis_app_message_handler (void *data, const char *app_name, struct ast_json *message)
 Callback handler for Stasis application messages. More...
 

Variables

static struct ao2_containerevent_session_registry
 Local registry for created event_session objects. More...
 

Detailed Description

/api-docs/events.{format} implementation- WebSocket resource

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file resource_events.c.

Macro Definition Documentation

◆ APPS_NUM_BUCKETS

#define APPS_NUM_BUCKETS   7

Number of buckets for a websocket apps container. Remember to keep it a prime number!

Definition at line 42 of file resource_events.c.

◆ EVENT_SESSION_NUM_BUCKETS

#define EVENT_SESSION_NUM_BUCKETS   23

Number of buckets for the event session registry. Remember to keep it a prime number!

Definition at line 39 of file resource_events.c.

◆ MESSAGES_INIT_SIZE

#define MESSAGES_INIT_SIZE   23

Initial size of a message queue.

Definition at line 45 of file resource_events.c.

Enumeration Type Documentation

◆ event_session_error_type

event_session error types.

Enumerator
ERROR_TYPE_STASIS_REGISTRATION 

Stasis failed to register the application.

ERROR_TYPE_OOM 

Insufficient memory to create the event session.

ERROR_TYPE_MISSING_APP_PARAM 

HTTP request was missing an [app] parameter.

ERROR_TYPE_INVALID_APP_PARAM 

HTTP request contained an invalid [app] parameter.

Definition at line 58 of file resource_events.c.

58 {
59 ERROR_TYPE_STASIS_REGISTRATION = 1, /*!< Stasis failed to register the application. */
60 ERROR_TYPE_OOM = 2, /*!< Insufficient memory to create the event
61 session. */
62 ERROR_TYPE_MISSING_APP_PARAM = 3, /*!< HTTP request was missing an [app] parameter. */
63 ERROR_TYPE_INVALID_APP_PARAM = 4, /*!< HTTP request contained an invalid [app]
64 parameter. */
65};
@ ERROR_TYPE_INVALID_APP_PARAM
@ ERROR_TYPE_MISSING_APP_PARAM
@ ERROR_TYPE_OOM
@ ERROR_TYPE_STASIS_REGISTRATION

Function Documentation

◆ ast_ari_events_user_event()

void ast_ari_events_user_event ( struct ast_variable headers,
struct ast_ari_events_user_event_args args,
struct ast_ari_response response 
)

Generate a user event.

Parameters
headersHTTP headers
argsSwagger parameters
[out]responseHTTP response

Definition at line 532 of file resource_events.c.

535{
537 struct ast_json *json_variables = NULL;
538
539 if (args->variables) {
541 json_variables = ast_json_object_get(args->variables, "variables");
542 }
543
544 if (ast_strlen_zero(args->application)) {
545 ast_ari_response_error(response, 400, "Bad Request",
546 "Missing parameter application");
547 return;
548 }
549
550 res = stasis_app_user_event(args->application,
551 args->event_name,
552 args->source, args->source_count,
553 json_variables);
554
555 switch (res) {
558 break;
559
561 ast_ari_response_error(response, 404, "Not Found",
562 "Application not found");
563 break;
564
566 ast_ari_response_error(response, 422, "Unprocessable Entity",
567 "Event source was not found");
568 break;
569
571 ast_ari_response_error(response, 400, "Bad Request",
572 "Invalid event source URI scheme");
573 break;
574
576 ast_ari_response_error(response, 400, "Bad Request",
577 "Invalid userevent data");
578 break;
579
581 default:
582 ast_ari_response_error(response, 500, "Internal Server Error",
583 "Error processing request");
584 }
585}
void ast_ari_response_error(struct ast_ari_response *response, int response_code, const char *response_text, const char *message_fmt,...)
Fill in an error ast_ari_response.
Definition: res_ari.c:259
void ast_ari_response_no_content(struct ast_ari_response *response)
Fill in a No Content (204) ast_ari_response.
Definition: res_ari.c:284
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
#define NULL
Definition: resample.c:96
int ast_ari_events_user_event_parse_body(struct ast_json *body, struct ast_ari_events_user_event_args *args)
Body parsing function for /events/user/{eventName}.
stasis_app_user_event_res
Return code for stasis_app_user_event.
Definition: stasis_app.h:255
@ STASIS_APP_USER_APP_NOT_FOUND
Definition: stasis_app.h:257
@ STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND
Definition: stasis_app.h:258
@ STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME
Definition: stasis_app.h:259
@ STASIS_APP_USER_USEREVENT_INVALID
Definition: stasis_app.h:260
@ STASIS_APP_USER_OK
Definition: stasis_app.h:256
@ STASIS_APP_USER_INTERNAL_ERROR
Definition: stasis_app.h:261
enum stasis_app_user_event_res stasis_app_user_event(const char *app_name, const char *event_name, const char **source_uris, int sources_count, struct ast_json *json_variables)
Generate a Userevent for stasis app (echo to AMI)
Definition: res_stasis.c:2102
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
Abstract JSON element (object, array, string, int, ...).
const char * args

References args, ast_ari_events_user_event_parse_body(), ast_ari_response_error(), ast_ari_response_no_content(), ast_json_object_get(), ast_strlen_zero(), NULL, STASIS_APP_USER_APP_NOT_FOUND, stasis_app_user_event(), STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME, STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND, STASIS_APP_USER_INTERNAL_ERROR, STASIS_APP_USER_OK, and STASIS_APP_USER_USEREVENT_INVALID.

Referenced by ast_ari_events_user_event_cb().

◆ ast_ari_websocket_events_event_websocket_attempted()

int ast_ari_websocket_events_event_websocket_attempted ( struct ast_tcptls_session_instance ser,
struct ast_variable headers,
struct ast_ari_events_event_websocket_args args,
const char *  session_id 
)

WebSocket connection for events.

Parameters
serHTTP TCP/TLS Server Session
headersHTTP headers
argsSwagger parameters
session_idThe id of the current session.
Return values
0success
non-zeroerror

Definition at line 488 of file resource_events.c.

491{
492 ast_debug(3, "/events WebSocket attempted\n");
493
494 /* Create the event session */
495 return event_session_alloc(ser, args, session_id);
496}
#define ast_debug(level,...)
Log a DEBUG message.
static int event_session_alloc(struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
Creates an event_session object and registers its apps with Stasis.

References args, ast_debug, event_session_alloc(), and event_session::session_id.

Referenced by ast_ari_events_event_websocket_ws_attempted_cb().

◆ ast_ari_websocket_events_event_websocket_dtor()

void ast_ari_websocket_events_event_websocket_dtor ( void  )

WebSocket connection for events.

Definition at line 465 of file resource_events.c.

466{
468
471}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
static int event_session_shutdown_cb(void *session, void *arg, int flags)
static struct ao2_container * event_session_registry
Local registry for created event_session objects.

References ao2_callback, ao2_cleanup, event_session_registry, event_session_shutdown_cb(), NULL, OBJ_MULTIPLE, and OBJ_NODATA.

Referenced by load_module(), and unload_module().

◆ ast_ari_websocket_events_event_websocket_established()

void ast_ari_websocket_events_event_websocket_established ( struct ast_ari_websocket_session session,
struct ast_variable headers,
struct ast_ari_events_event_websocket_args args 
)

WebSocket connection for events.

Parameters
sessionARI WebSocket.
headersHTTP headers.
argsSwagger parameters.

Definition at line 498 of file resource_events.c.

501{
502 struct event_session *session;
503
504 struct ast_json *msg;
505 const char *session_id;
506
507 ast_debug(3, "/events WebSocket established\n");
508
509 ast_assert(ws_session != NULL);
510
511 session_id = ast_ari_websocket_session_id(ws_session);
512
513 /* Find the event_session and update its websocket */
515 if (session) {
518 } else {
520 "Failed to locate an event session for the provided websocket session\n");
521 }
522
523 /* We don't process any input, but we'll consume it waiting for EOF */
524 while ((msg = ast_ari_websocket_session_read(ws_session))) {
525 ast_json_unref(msg);
526 }
527
529 ao2_ref(session, -1);
530}
struct ast_json * ast_ari_websocket_session_read(struct ast_ari_websocket_session *session)
Read a message from an ARI WebSocket.
const char * ast_ari_websocket_session_id(const struct ast_ari_websocket_session *session)
Get the Session ID for an ARI WebSocket.
static struct ast_mansession session
#define ast_log
Definition: astobj2.c:42
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define LOG_WARNING
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static void event_session_cleanup(struct event_session *session)
Processes cleanup actions for a event_session object.
static void event_session_update_websocket(struct event_session *session, struct ast_ari_websocket_session *ws_session)
Updates the websocket session for an event_session.
A wrapper for the /ref ast_ari_websocket_session.
#define ast_assert(a)
Definition: utils.h:739

References ao2_find, ao2_ref, ao2_unlink, ast_ari_websocket_session_id(), ast_ari_websocket_session_read(), ast_assert, ast_debug, ast_json_unref(), ast_log, event_session_cleanup(), event_session_registry, event_session_update_websocket(), LOG_WARNING, NULL, OBJ_SEARCH_KEY, and session.

Referenced by ast_ari_events_event_websocket_ws_established_cb().

◆ ast_ari_websocket_events_event_websocket_init()

int ast_ari_websocket_events_event_websocket_init ( void  )

WebSocket connection for events.

Return values
0success
-1error

Definition at line 473 of file resource_events.c.

474{
475 /* Try to instantiate the registry */
479 /* This is bad, bad. */
481 "Failed to allocate the local registry for websocket applications\n");
482 return -1;
483 }
484
485 return 0;
486}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
static int event_session_hash(const void *obj, const int flags)
AO2 hash function for event_session objects.
#define EVENT_SESSION_NUM_BUCKETS
static int event_session_compare(void *obj, void *arg, int flags)
AO2 comparison function for event_session objects.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ast_log, event_session_compare(), event_session_hash(), EVENT_SESSION_NUM_BUCKETS, event_session_registry, LOG_WARNING, and NULL.

Referenced by load_module().

◆ event_session_alloc()

static int event_session_alloc ( struct ast_tcptls_session_instance ser,
struct ast_ari_events_event_websocket_args args,
const char *  session_id 
)
static

Creates an event_session object and registers its apps with Stasis.

Definition at line 388 of file resource_events.c.

390{
392 int (* register_handler)(const char *, stasis_app_cb handler, void *data);
393 size_t size, i;
394
395 /* The request must have at least one [app] parameter */
396 if (args->app_count == 0) {
399 }
400
401 size = sizeof(*session) + strlen(session_id) + 1;
402
403 /* Instantiate the event session */
405 if (!session) {
407 }
408
409 strncpy(session->session_id, session_id, size - sizeof(*session));
410
411 /* Instantiate the hash table for Stasis apps */
412 session->websocket_apps =
414
415 if (!session->websocket_apps) {
417 }
418
419 /* Instantiate the message queue */
420 if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
422 }
423
424 /* Register the apps with Stasis */
425 if (args->subscribe_all) {
426 register_handler = &stasis_app_register_all;
427 } else {
428 register_handler = &stasis_app_register;
429 }
430
431 for (i = 0; i < args->app_count; ++i) {
432 const char *app = args->app[i];
433
434 if (ast_strlen_zero(app)) {
437 }
438
439 if (ast_str_container_add(session->websocket_apps, app)) {
441 }
442
443 if (register_handler(app, stasis_app_message_handler, session)) {
444 ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
447 }
448 }
449
450 /* Add the event session to the local registry */
453 }
454
455 return 0;
456}
static const char app[]
Definition: app_adsiprog.c:56
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static void event_session_dtor(void *obj)
Event session object destructor (event_session).
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
Callback handler for Stasis application messages.
#define MESSAGES_INIT_SIZE
#define APPS_NUM_BUCKETS
static int event_session_allocation_error_handler(struct event_session *session, enum event_session_error_type error, struct ast_tcptls_session_instance *ser)
Handles event_session error processing.
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
Definition: res_stasis.c:1784
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
Definition: res_stasis.c:1789
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113

References ao2_alloc, ao2_cleanup, ao2_link, app, APPS_NUM_BUCKETS, args, ast_log, ast_str_container_add(), ast_str_container_alloc, ast_strlen_zero(), AST_VECTOR_INIT, ERROR_TYPE_INVALID_APP_PARAM, ERROR_TYPE_MISSING_APP_PARAM, ERROR_TYPE_OOM, ERROR_TYPE_STASIS_REGISTRATION, event_session_allocation_error_handler(), event_session_dtor(), event_session_registry, handler(), LOG_WARNING, MESSAGES_INIT_SIZE, NULL, RAII_VAR, session, event_session::session_id, stasis_app_message_handler(), stasis_app_register(), and stasis_app_register_all().

Referenced by ast_ari_websocket_events_event_websocket_attempted().

◆ event_session_allocation_error_handler()

static int event_session_allocation_error_handler ( struct event_session session,
enum event_session_error_type  error,
struct ast_tcptls_session_instance ser 
)
static

Handles event_session error processing.

Definition at line 339 of file resource_events.c.

342{
343 /* Notify the client */
344 switch (error) {
346 ast_http_error(ser, 500, "Internal Server Error",
347 "Stasis registration failed");
348 break;
349
350 case ERROR_TYPE_OOM:
351 ast_http_error(ser, 500, "Internal Server Error",
352 "Allocation failed");
353 break;
354
356 ast_http_error(ser, 400, "Bad Request",
357 "HTTP request is missing param: [app]");
358 break;
359
361 ast_http_error(ser, 400, "Bad Request",
362 "Invalid application provided in param [app].");
363 break;
364
365 default:
366 break;
367 }
368
369 /* Cleanup the session */
371 return -1;
372}
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition: http.c:651
int error(const char *format,...)
Definition: utils/frame.c:999

References ast_http_error(), error(), ERROR_TYPE_INVALID_APP_PARAM, ERROR_TYPE_MISSING_APP_PARAM, ERROR_TYPE_OOM, ERROR_TYPE_STASIS_REGISTRATION, event_session_cleanup(), and session.

Referenced by event_session_alloc().

◆ event_session_cleanup()

static void event_session_cleanup ( struct event_session session)
static

Processes cleanup actions for a event_session object.

Definition at line 297 of file resource_events.c.

298{
299 if (!session) {
300 return;
301 }
302
306 }
307}
static void event_session_shutdown(struct event_session *session)
Explicitly shutdown a session.

References ao2_unlink, event_session_registry, event_session_shutdown(), and session.

Referenced by ast_ari_websocket_events_event_websocket_established(), event_session_allocation_error_handler(), and event_session_shutdown_cb().

◆ event_session_compare()

static int event_session_compare ( void *  obj,
void *  arg,
int  flags 
)
static

AO2 comparison function for event_session objects.

Definition at line 153 of file resource_events.c.

154{
155 const struct event_session *object_left = obj;
156 const struct event_session *object_right = arg;
157 const char *right_key = arg;
158 int cmp = 0;
159
160 switch (flags & OBJ_SEARCH_MASK) {
162 right_key = object_right->session_id;
163 /* Fall through */
164 case OBJ_SEARCH_KEY:
165 cmp = strcmp(object_left->session_id, right_key);
166 break;
168 cmp = strncmp(object_left->session_id, right_key, strlen(right_key));
169 break;
170 default:
171 break;
172 }
173
174 return cmp ? 0 : CMP_MATCH;
175}
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072

References CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and event_session::session_id.

Referenced by ast_ari_websocket_events_event_websocket_init().

◆ event_session_dtor()

static void event_session_dtor ( void *  obj)
static

Event session object destructor (event_session).

Definition at line 316 of file resource_events.c.

317{
318#ifdef AST_DEVMODE /* Avoid unused variable warning */
319 struct event_session *session = obj;
320#endif
321
322 /* event_session_shutdown should have been called before now */
323 ast_assert(session->ws_session == NULL);
324 ast_assert(session->websocket_apps == NULL);
325 ast_assert(AST_VECTOR_SIZE(&session->message_queue) == 0);
326}
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609

References ast_assert, AST_VECTOR_SIZE, NULL, and session.

Referenced by event_session_alloc().

◆ event_session_hash()

static int event_session_hash ( const void *  obj,
const int  flags 
)
static

AO2 hash function for event_session objects.

Computes hash value for the given event_session, with respect to the provided search flags.

Definition at line 191 of file resource_events.c.

192{
193 const struct event_session *session;
194 const char *key;
195
196 switch (flags & OBJ_SEARCH_MASK) {
197 case OBJ_SEARCH_KEY:
198 key = obj;
199 break;
201 session = obj;
202 key = session->session_id;
203 break;
204 default:
205 /* Hash can only work on something with a full key. */
206 ast_assert(0);
207 return 0;
208 }
209 return ast_str_hash(key);
210}
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
Definition: strings.h:1259

References ast_assert, ast_str_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and session.

Referenced by ast_ari_websocket_events_event_websocket_init().

◆ event_session_shutdown()

static void event_session_shutdown ( struct event_session session)
static

Explicitly shutdown a session.

An explicit shutdown is necessary, since the stasis_app has a reference to this session. We also need to be sure to null out the ws_session field, since the websocket is about to go away.

Definition at line 223 of file resource_events.c.

224{
225 struct ao2_iterator i;
226 char *app;
227 int j;
229
230 /* Clean up the websocket_apps container */
231 if (session->websocket_apps) {
232 i = ao2_iterator_init(session->websocket_apps, 0);
233 while ((app = ao2_iterator_next(&i))) {
236 }
238 ao2_cleanup(session->websocket_apps);
239 session->websocket_apps = NULL;
240 }
241
242 /* Clean up the message_queue container */
243 for (j = 0; j < AST_VECTOR_SIZE(&session->message_queue); j++) {
244 struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, j);
245 ast_json_unref(msg);
246 }
247 AST_VECTOR_FREE(&session->message_queue);
248
249 /* Remove the handle to the underlying websocket session */
250 session->ws_session = NULL;
251}
ast_mutex_t lock
Definition: app_sla.c:331
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application.
Definition: res_stasis.c:1794
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ao2_cleanup, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, app, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, lock, NULL, SCOPED_AO2LOCK, session, and stasis_app_unregister().

Referenced by event_session_cleanup().

◆ event_session_shutdown_cb()

static int event_session_shutdown_cb ( void *  session,
void *  arg,
int  flags 
)
static

Definition at line 458 of file resource_events.c.

459{
461
462 return 0;
463}

References event_session_cleanup(), and session.

Referenced by ast_ari_websocket_events_event_websocket_dtor().

◆ event_session_update_websocket()

static void event_session_update_websocket ( struct event_session session,
struct ast_ari_websocket_session ws_session 
)
static

Updates the websocket session for an event_session.

The websocket for the given event_session will be updated to the value of the ws_session argument.

If the value of the ws_session is not NULL and there are messages in the event session's message_queue, the messages are dispatched and removed from the queue.

Definition at line 269 of file resource_events.c.

271{
272 int i;
273
275
277
278 session->ws_session = ws_session;
279
280 for (i = 0; i < AST_VECTOR_SIZE(&session->message_queue); i++) {
281 struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, i);
283 ast_json_unref(msg);
284 }
285
288}
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message)
Send a message to an ARI WebSocket.
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:625
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571

References ao2_lock, ao2_unlock, ast_ari_websocket_session_write(), ast_assert, ast_json_unref(), AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_RESET, AST_VECTOR_SIZE, NULL, and session.

Referenced by ast_ari_websocket_events_event_websocket_established().

◆ stasis_app_message_handler()

static void stasis_app_message_handler ( void *  data,
const char *  app_name,
struct ast_json message 
)
static

Callback handler for Stasis application messages.

Definition at line 79 of file resource_events.c.

81{
82 struct event_session *session = data;
83 const char *msg_type, *msg_application;
84 int app_debug_enabled;
85
87
88 /*
89 * We need to get the debug flag before locking session
90 * to help prevent a deadlock with the apps_registry container.
91 */
92 app_debug_enabled = stasis_app_get_debug_by_name(app_name);
93
95
96 msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
97 msg_application = S_OR(
99
100 /* If we've been replaced, remove the application from our local
101 websocket_apps container */
102 if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
103 strcmp(msg_application, app_name) == 0) {
104 ao2_find(session->websocket_apps, msg_application,
106 }
107
108 /* Now, we need to determine our state to see how we will handle the message */
110 /* We failed to add an application element to our json message */
112 "Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
113 msg_type,
114 msg_application);
115 } else if (!session->ws_session) {
116 /* If the websocket is NULL, the message goes to the queue */
117 if (!AST_VECTOR_APPEND(&session->message_queue, message)) {
119 }
121 "Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
122 msg_type,
123 msg_application);
125 if (app_debug_enabled) {
127
128 ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
130 str);
132 }
133
134 /* We are ready to publish the message */
136 }
137
139}
const char * str
Definition: app_jack.c:147
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition: res_ari.c:806
struct ast_sockaddr * ast_ari_websocket_session_get_remote_addr(struct ast_ari_websocket_session *session)
Get the remote address from an ARI WebSocket.
@ OBJ_UNLINK
Definition: astobj2.h:1039
void ast_verbose(const char *fmt,...)
Definition: extconf.c:2206
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_free(void *p)
Asterisk's custom JSON allocator. Exposed for use by unit tests.
Definition: json.c:52
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition: json.c:484
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition: netsock2.h:256
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References ao2_find, ao2_lock, ao2_unlock, app_name(), ast_ari_json_format(), ast_ari_websocket_session_get_remote_addr(), ast_ari_websocket_session_write(), ast_assert, ast_json_dump_string_format(), ast_json_free(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_string_get(), ast_log, ast_sockaddr_stringify(), AST_VECTOR_APPEND, ast_verbose(), LOG_WARNING, NULL, OBJ_NODATA, OBJ_UNLINK, S_OR, session, stasis_app_event_allowed(), stasis_app_get_debug_by_name(), and str.

Referenced by event_session_alloc().

Variable Documentation

◆ event_session_registry

struct ao2_container* event_session_registry
static