Asterisk - The Open Source Telephony Project GIT-master-7988d11
Loading...
Searching...
No Matches
Macros | Enumerations | Functions | Variables
ari_websockets.c File Reference

WebSocket support for RESTful API's. More...

#include "asterisk.h"
#include "resource_events.h"
#include "ari_websockets.h"
#include "internal.h"
#include "ari_model_validators.h"
#include "asterisk/app.h"
#include "asterisk/ari.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/module.h"
#include "asterisk/pbx.h"
#include "asterisk/stasis_app.h"
#include "asterisk/time.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/websocket_client.h"
Include dependency graph for ari_websockets.c:

Go to the source code of this file.

Macros

#define APPS_INIT_SIZE   7
 
#define ARI_CONTEXT_REGISTRAR   "res_ari"
 
#define handle_create_error(ser, code, msg, reason)
 
#define MESSAGES_INIT_SIZE   23
 
#define PER_CALL_FAIL_SAFE_TIMEOUT(owc)
 
#define SESSION_REGISTRY_NUM_BUCKETS   23
 
#define STASIS_END_MAX_WAIT_MS   5000
 
#define STASIS_END_POST_WAIT_US   (3000 * 1000)
 
#define VALIDATION_FAILED
 

Enumerations

enum  session_apply_result { SESSION_APPLY_NO_CHANGE , SESSION_APPLY_OK , SESSION_APPLY_RECONNECT_REQUIRED , SESSION_APPLY_FAILED }
 

Functions

void ari_handle_websocket (struct ast_tcptls_session_instance *ser, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
 Wrapper for invoking the websocket code for an incoming connection.
 
int ari_outbound_websocket_start (struct ari_conf_outbound_websocket *owc)
 
struct ari_ws_sessionari_websocket_get_session (const char *session_id)
 
struct ao2_containerari_websocket_get_sessions (void)
 
int ari_websocket_load_module (int is_enabled)
 
void ari_websocket_send_event (struct ari_ws_session *session, const char *app_name, struct ast_json *message, int debug_app)
 Callback handler for Stasis application messages.
 
void ari_websocket_shutdown (struct ari_ws_session *session)
 
void ari_websocket_shutdown_all (void)
 
int ari_websocket_unload_module (void)
 
void ast_ari_close_per_call_websocket (char *app_name)
 This function is called by the app_stasis dialplan app to close a per-call websocket after stasis_app_exec() returns.
 
char * ast_ari_create_per_call_websocket (const char *app_name, struct ast_channel *chan)
 This function gets called by app_stasis when a call arrives but a Stasis application isn't already registered. We check to see if a per-call config exists for the application and if so, we create a per-call websocket connection and return a unique app id which app_stasis can use to call stasis_app_exec() with.
 
static int null_validator (struct ast_json *json)
 Validator that always succeeds.
 
static enum session_apply_result outbound_session_apply_config (struct ari_ws_session *session, struct ari_conf_outbound_websocket *new_owc)
 
static int outbound_session_create (void *obj, void *args, int flags)
 
static void * outbound_session_handler_thread (void *obj)
 
static void * outbound_session_pc_close_thread (void *data)
 
static void outbound_sessions_load (const char *name)
 
static void session_cleanup (void *obj)
 
static struct ari_ws_sessionsession_create (struct ast_tcptls_session_instance *ser, const char *apps, int subscribe_all, const char *session_id, struct ari_conf_outbound_websocket *ows, enum ast_websocket_type ws_type)
 
static void session_dtor (void *obj)
 
static struct ari_ws_sessionsession_find_by_app (const char *app_name, unsigned int ws_type)
 
static struct ast_jsonsession_read (struct ari_ws_session *session)
 
static int session_register_apps (struct ari_ws_session *session, const char *_apps, int subscribe_all)
 
static void session_registry_dtor (void)
 
static void session_reset (struct ari_ws_session *session)
 
static void session_send_app_event (struct ari_ws_session *session, const char *event_type, const char *app_name)
 
static void session_send_or_queue (struct ari_ws_session *session, struct ast_json *message, const char *msg_type, const char *app_name, int debug_app)
 
static int session_shutdown_cb (void *obj, void *arg, int flags)
 
static void session_unref (struct ari_ws_session *session)
 
static void session_unregister_app_cb (char *app_name, struct ari_ws_session *session)
 
static void session_unregister_apps (struct ari_ws_session *session)
 
static int session_update (struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session, int send_registered_events)
 
static int session_write (struct ari_ws_session *session, struct ast_json *message)
 
static void stasis_app_message_handler (void *data, const char *app_name, struct ast_json *message)
 
static int websocket_attempted_cb (struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
 
static void websocket_established_cb (struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
 

Variables

ari_validator ari_validate_message_fn = null_validator
 
struct ast_websocket_serverast_ws_server
 
static struct ast_sorcery_observer observer_callbacks
 
static struct ao2_containersession_registry
 Local registry for created ari_ws_session objects.
 

Detailed Description

WebSocket support for RESTful API's.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file ari_websockets.c.

Macro Definition Documentation

◆ APPS_INIT_SIZE

#define APPS_INIT_SIZE   7

Initial size of websocket session apps vector

Definition at line 48 of file ari_websockets.c.

◆ ARI_CONTEXT_REGISTRAR

#define ARI_CONTEXT_REGISTRAR   "res_ari"

Definition at line 53 of file ari_websockets.c.

◆ handle_create_error

#define handle_create_error (   ser,
  code,
  msg,
  reason 
)

Definition at line 619 of file ari_websockets.c.

620 { \
621 if (ser) { \
622 ast_http_error(ser, code, msg, reason); \
623 } \
624 ast_log(LOG_WARNING, "Failed to create ARI websocket session: %d %s %s\n", \
625 code, msg, reason); \
626})
#define LOG_WARNING

◆ MESSAGES_INIT_SIZE

#define MESSAGES_INIT_SIZE   23

Initial size of the websocket session message queue.

Definition at line 51 of file ari_websockets.c.

◆ PER_CALL_FAIL_SAFE_TIMEOUT

#define PER_CALL_FAIL_SAFE_TIMEOUT (   owc)
Value:
(int64_t)((owc->websocket_client->connect_timeout + owc->websocket_client->reconnect_interval) \
* (owc->websocket_client->reconnect_attempts + 3))

Definition at line 1178 of file ari_websockets.c.

◆ SESSION_REGISTRY_NUM_BUCKETS

#define SESSION_REGISTRY_NUM_BUCKETS   23

Number of buckets for the ari_ws_session registry. Remember to keep it a prime number!

Definition at line 45 of file ari_websockets.c.

◆ STASIS_END_MAX_WAIT_MS

#define STASIS_END_MAX_WAIT_MS   5000

Definition at line 1289 of file ari_websockets.c.

◆ STASIS_END_POST_WAIT_US

#define STASIS_END_POST_WAIT_US   (3000 * 1000)

Definition at line 1290 of file ari_websockets.c.

◆ VALIDATION_FAILED

#define VALIDATION_FAILED
Value:
"{" \
" \"error\": \"InvalidMessage\"," \
" \"message\": \"Message validation failed\"" \
"}"

Definition at line 75 of file ari_websockets.c.

76 {" \
77 " \"error\": \"InvalidMessage\"," \
78 " \"message\": \"Message validation failed\"" \
79 "}"

Enumeration Type Documentation

◆ session_apply_result

Enumerator
SESSION_APPLY_NO_CHANGE 
SESSION_APPLY_OK 
SESSION_APPLY_RECONNECT_REQUIRED 
SESSION_APPLY_FAILED 

Definition at line 1066 of file ari_websockets.c.

1066 {
1071};
@ SESSION_APPLY_FAILED
@ SESSION_APPLY_RECONNECT_REQUIRED
@ SESSION_APPLY_OK
@ SESSION_APPLY_NO_CHANGE

Function Documentation

◆ ari_handle_websocket()

void ari_handle_websocket ( struct ast_tcptls_session_instance ser,
const char *  uri,
enum ast_http_method  method,
struct ast_variable get_params,
struct ast_variable headers 
)

Wrapper for invoking the websocket code for an incoming connection.

Parameters
ws_serverWebSocket server to invoke.
serHTTP session.
uriRequested URI.
methodRequested HTTP method.
get_paramsParsed query parameters.
headersParsed HTTP headers.

Definition at line 267 of file ari_websockets.c.

271{
272 struct ast_http_uri fake_urih = {
274 };
275
276 ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
277 headers);
278}
struct ast_websocket_server * ast_ws_server
int AST_OPTIONAL_API_NAME() ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
const char * method
Definition res_pjsip.c:1273
Definition of a URI handler.
Definition http.h:102
const char * uri
Definition http.h:105
void * data
Definition http.h:116

References ast_websocket_uri_cb(), ast_ws_server, ast_http_uri::data, method, and ast_http_uri::uri.

Referenced by ast_ari_invoke().

◆ ari_outbound_websocket_start()

int ari_outbound_websocket_start ( struct ari_conf_outbound_websocket owc)

Definition at line 1515 of file ari_websockets.c.

1516{
1517 if (owc) {
1518 return outbound_session_create(owc, NULL, 0);
1519 }
1520 return -1;
1521}
static int outbound_session_create(void *obj, void *args, int flags)
#define NULL
Definition resample.c:96

References NULL, and outbound_session_create().

Referenced by ari_start_owc().

◆ ari_websocket_get_session()

struct ari_ws_session * ari_websocket_get_session ( const char *  session_id)

Definition at line 902 of file ari_websockets.c.

903{
904 return ao2_find(session_registry, session_id, OBJ_SEARCH_KEY);
905}
static struct ao2_container * session_registry
Local registry for created ari_ws_session objects.
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101

References ao2_find, OBJ_SEARCH_KEY, ari_ws_session::session_id, and session_registry.

Referenced by ari_shut_session(), and outbound_session_create().

◆ ari_websocket_get_sessions()

struct ao2_container * ari_websocket_get_sessions ( void  )

Definition at line 1392 of file ari_websockets.c.

1393{
1394 return ao2_bump(session_registry);
1395}
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480

References ao2_bump, and session_registry.

Referenced by ari_show_sessions(), and ari_shut_session().

◆ ari_websocket_load_module()

int ari_websocket_load_module ( int  is_enabled)

Definition at line 1564 of file ari_websockets.c.

1565{
1566 int res = 0;
1567 struct ast_websocket_protocol *protocol;
1568
1569 ast_debug(2, "Initializing ARI websockets. Enabled: %s\n", is_enabled ? "yes" : "no");
1570
1573 ari_ws_session_sort_fn, ari_ws_session_cmp_fn);
1574 if (!session_registry) {
1576 "Failed to allocate the local registry for websocket applications\n");
1578 }
1579
1580 res = ari_sorcery_observer_add("outbound_websocket", &observer_callbacks);
1581 if (res < 0) {
1582 ast_log(LOG_WARNING, "Failed to register ARI websocket observer\n");
1585 }
1586
1587 /*
1588 * The global "enabled" flag only controls whether the REST and
1589 * inbound websockets are enabled. The outbound websocket
1590 * configs are always enabled.
1591 if (!is_enabled) {
1592 return AST_MODULE_LOAD_SUCCESS;
1593 }
1594 */
1595
1597 if (!ast_ws_server) {
1600 }
1601
1602 protocol = ast_websocket_sub_protocol_alloc("ari");
1603 if (!protocol) {
1606 }
1610
1612}
static struct ast_sorcery_observer observer_callbacks
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
static void websocket_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
int ari_websocket_unload_module(void)
#define ast_log
Definition astobj2.c:42
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition astobj2.h:1211
int AST_OPTIONAL_API_NAME() ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME() ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
struct ast_websocket_server *AST_OPTIONAL_API_NAME() ast_websocket_server_create(void)
Creates a ast_websocket_server.
#define ast_debug(level,...)
Log a DEBUG message.
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
int ari_sorcery_observer_add(const char *object_type, const struct ast_sorcery_observer *callbacks)
static int is_enabled(void)
Helper function to check if module is enabled.
Definition res_ari.c:96
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
ast_websocket_pre_callback session_attempted
Callback called when a new session is attempted. Optional.

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, ao2_container_alloc_rbtree, ari_sorcery_observer_add(), ari_websocket_unload_module(), ast_debug, ast_log, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_websocket_server_add_protocol2(), ast_websocket_server_create(), ast_websocket_sub_protocol_alloc(), ast_ws_server, is_enabled(), LOG_WARNING, observer_callbacks, ast_websocket_protocol::session_attempted, ast_websocket_protocol::session_established, session_registry, websocket_attempted_cb(), and websocket_established_cb().

Referenced by load_module().

◆ ari_websocket_send_event()

void ari_websocket_send_event ( struct ari_ws_session session,
const char *  app_name,
struct ast_json message,
int  debug_app 
)

Callback handler for Stasis application messages.

Definition at line 290 of file ari_websockets.c.

292{
293 char *remote_addr = session->ast_ws_session ? ast_sockaddr_stringify(
294 ast_websocket_remote_address(session->ast_ws_session)) : "";
295 const char *msg_type, *msg_application;
296 SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
297
299
301
302 msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
303 msg_application = S_OR(
305
306 /* If we've been replaced, remove the application from our local
307 websocket_apps container */
308 if (session->type == AST_WS_TYPE_INBOUND
309 && strcmp(msg_type, "ApplicationReplaced") == 0 &&
310 strcmp(msg_application, app_name) == 0) {
311 AST_VECTOR_REMOVE_CMP_ORDERED(&session->websocket_apps,
313 }
314
315 /* Now, we need to determine our state to see how we will handle the message */
319 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
320 remote_addr, msg_type, msg_application);
321 }
322
325 app_name, debug_app);
326 }
327
329 && !ast_strlen_zero(session->channel_id)
330 && ast_strings_equal(msg_type, "StasisEnd")) {
331 struct ast_json *chan = ast_json_object_get(message, "channel");
332 struct ast_json *id_obj = ast_json_object_get(chan, "id");
333 const char *id = ast_json_string_get(id_obj);
334 if (!ast_strlen_zero(id)
335 && ast_strings_equal(id, session->channel_id)) {
336 ast_debug(3, "%s: StasisEnd message sent for channel '%s'\n",
337 remote_addr, id);
338 session->stasis_end_sent = 1;
339 }
340 }
342 SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
343 remote_addr, msg_type, app_name);
344}
static void session_send_or_queue(struct ari_ws_session *session, struct ast_json *message, const char *msg_type, const char *app_name, int debug_app)
static struct ast_mansession session
void ast_free_ptr(void *ptr)
free() wrapper
Definition astmm.c:1739
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define SCOPE_ENTER(level,...)
#define SCOPE_EXIT(...)
#define SCOPE_EXIT_LOG_RTN(__log_level,...)
struct ast_sockaddr *AST_OPTIONAL_API_NAME() ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
@ AST_WS_TYPE_INBOUND
@ AST_WS_TYPE_CLIENT_PER_CALL
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition json.c:278
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition json.c:407
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition netsock2.h:256
const char * app_name(struct ast_app *app)
Definition pbx_app.c:475
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
Abstract JSON element (object, array, string, int, ...).
#define ast_assert(a)
Definition utils.h:779
#define AST_VECTOR_REMOVE_CMP_ORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison while maintaining order.
Definition vector.h:551

References ao2_lock, ao2_unlock, app_name(), ast_assert, ast_debug, ast_free_ptr(), ast_json_object_get(), ast_json_object_set(), ast_json_string_create(), ast_json_string_get(), ast_sockaddr_stringify(), ast_strings_equal(), ast_strlen_zero(), AST_VECTOR_REMOVE_CMP_ORDERED, ast_websocket_remote_address(), AST_WS_TYPE_CLIENT_PER_CALL, AST_WS_TYPE_INBOUND, LOG_WARNING, NULL, S_OR, SCOPE_ENTER, SCOPE_EXIT, SCOPE_EXIT_LOG_RTN, session, session_send_or_queue(), and stasis_app_event_allowed().

Referenced by send_rest_response(), session_read(), and stasis_app_message_handler().

◆ ari_websocket_shutdown()

void ari_websocket_shutdown ( struct ari_ws_session session)

Definition at line 1523 of file ari_websockets.c.

1524{
1525 if (session) {
1527 }
1528}
static int session_shutdown_cb(void *obj, void *arg, int flags)

References NULL, session, and session_shutdown_cb().

Referenced by ari_shut_session().

◆ ari_websocket_shutdown_all()

void ari_websocket_shutdown_all ( void  )

Definition at line 1530 of file ari_websockets.c.

1531{
1532 if (session_registry) {
1535 }
1536}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049

References ao2_callback, NULL, OBJ_MULTIPLE, OBJ_NODATA, session_registry, and session_shutdown_cb().

Referenced by ari_shut_sessions().

◆ ari_websocket_unload_module()

int ari_websocket_unload_module ( void  )

Definition at line 1552 of file ari_websockets.c.

1553{
1554 ari_sorcery_observer_remove("outbound_websocket", &observer_callbacks);
1558 return 0;
1559}
static void session_registry_dtor(void)
#define ao2_cleanup(obj)
Definition astobj2.h:1934
int ari_sorcery_observer_remove(const char *object_type, const struct ast_sorcery_observer *callbacks)

References ao2_cleanup, ari_sorcery_observer_remove(), ast_ws_server, NULL, observer_callbacks, and session_registry_dtor().

Referenced by ari_websocket_load_module(), and unload_module().

◆ ast_ari_close_per_call_websocket()

void ast_ari_close_per_call_websocket ( char *  app_name)

This function is called by the app_stasis dialplan app to close a per-call websocket after stasis_app_exec() returns.

Close a per-call outbound websocket connection.

Definition at line 1327 of file ari_websockets.c.

1328{
1329 struct ari_ws_session *session = NULL;
1330 pthread_t thread;
1331 struct timeval tv_start;
1332
1334 if (!session) {
1335 ast_debug(3, "%s: Per call websocket not found\n", app_name);
1337 return;
1338 }
1340
1341 /*
1342 * When stasis_app_exec() returns, the StasisEnd event for the
1343 * channel has been queued but since actually sending it is done
1344 * in a separate thread, it probably won't have been sent yet.
1345 * We need to wait for it to go out on the wire before we close the
1346 * websocket. ari_websocket_send_event will set a flag on the session
1347 * when a StasisEnd event is sent for the channel that originally
1348 * triggered the connection. We'll wait for that but we don't want
1349 * to wait forever so there's a fail-safe timeout in case a thread
1350 * got cancelled or we missed the StasisEnd event somehow.
1351 */
1352 ast_debug(3, "%s: Waiting for StasisEnd event to be sent RC: %d\n",
1353 session->session_id, (int)ao2_ref(session, 0));
1354
1355 tv_start = ast_tvnow();
1356 while (session->thread > 0 && !session->stasis_end_sent) {
1357 struct timeval tv_now = ast_tvnow();
1358 int64_t diff = ast_tvdiff_ms(tv_now, tv_start);
1359 ast_debug(3, "%s: Waiting for StasisEnd event %lu %d %ld\n",
1360 session->session_id, (unsigned long)session->thread,
1361 session->stasis_end_sent, diff);
1362 if (diff > STASIS_END_MAX_WAIT_MS) {
1363 break;
1364 }
1365 /* Sleep for 500ms before checking again. */
1366 usleep(500 * 1000);
1367 }
1368 ast_debug(3, "%s: StasisEnd event sent. Scheduling websocket close. RC: %d\n",
1369 session->session_id, (int)ao2_ref(session, 0));
1370
1371 /*
1372 * We can continue to send events like ChannelVarset and ChannelDestroyed
1373 * to the websocket after the StasisEnd event but those events won't be
1374 * generated until after the Stasis() dialplan app returns. We don't want
1375 * to hold up the dialplan while we wait so we'll create a thread that waits
1376 * a few seconds more before closing the websocket.
1377 *
1378 * We transferring ownership of the session to the thread.
1379 */
1382 ast_log(LOG_WARNING, "%s: Failed to create websocket close thread\n",
1383 session->session_id);
1385 }
1386 ast_debug(3, "%s: Scheduled websocket close RC: %d\n",
1387 session->session_id, (int)ao2_ref(session, 0));
1388
1389 return;
1390}
pthread_t thread
Definition app_sla.c:335
static struct ari_ws_session * session_find_by_app(const char *app_name, unsigned int ws_type)
static void session_unref(struct ari_ws_session *session)
#define STASIS_END_MAX_WAIT_MS
static void * outbound_session_pc_close_thread(void *data)
#define ast_free(a)
Definition astmm.h:180
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define ast_pthread_create_detached_background(a, b, c, d)
Definition utils.h:637

References ao2_ref, app_name(), ast_debug, ast_free, ast_log, ast_pthread_create_detached_background, ast_tvdiff_ms(), ast_tvnow(), AST_WS_TYPE_CLIENT_PER_CALL, LOG_WARNING, NULL, outbound_session_pc_close_thread(), session, session_find_by_app(), session_unref(), STASIS_END_MAX_WAIT_MS, and thread.

Referenced by app_exec().

◆ ast_ari_create_per_call_websocket()

char * ast_ari_create_per_call_websocket ( const char *  app_name,
struct ast_channel chan 
)

This function gets called by app_stasis when a call arrives but a Stasis application isn't already registered. We check to see if a per-call config exists for the application and if so, we create a per-call websocket connection and return a unique app id which app_stasis can use to call stasis_app_exec() with.

Create a per-call outbound websocket connection.

Definition at line 1189 of file ari_websockets.c.

1191{
1194 RAII_VAR(char *, session_id, NULL, ast_free);
1195 RAII_VAR(char *, app_id, NULL, ast_free);
1196 enum ari_conf_owc_fields invalid_fields;
1197 const char *owc_id = NULL;
1198 char *app_id_rtn = NULL;
1199 struct timeval tv_start;
1200 int res = 0;
1201
1203 if (!owc) {
1204 ast_log(LOG_WARNING, "%s: Failed to find outbound websocket per-call config for app '%s'\n",
1205 ast_channel_name(chan), app_name);
1206 return NULL;
1207 }
1208 owc_id = ast_sorcery_object_get_id(owc);
1209 invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1210
1211 if (invalid_fields) {
1212 ast_log(LOG_WARNING, "%s: Unable to create per-call websocket. Outbound websocket config is invalid\n",
1213 owc_id);
1214 return NULL;
1215 }
1216
1217 res = ast_asprintf(&session_id, "%s:%s", owc_id, ast_channel_name(chan));
1218 if (res < 0) {
1219 return NULL;
1220 }
1221 res = ast_asprintf(&app_id, "%s:%s", app_name, ast_channel_name(chan));
1222 if (res < 0) {
1223 ast_free(app_id);
1224 return NULL;
1225 }
1226
1227 session = session_create(NULL, app_id, owc->subscribe_all,
1228 session_id, owc, AST_WS_TYPE_CLIENT_PER_CALL);
1229 if (!session) {
1230 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", session_id);
1231 return NULL;
1232 }
1233
1234 session->channel_id = ast_strdup(ast_channel_uniqueid(chan));
1235 session->channel_name = ast_strdup(ast_channel_name(chan));
1236
1237 /*
1238 * We have to bump the session reference count here because
1239 * we need to check that the session is connected before we return.
1240 * If it didn't connect, then the thread will have cleaned up the
1241 * session while we're in the loop checking for the connection
1242 * which will result in a SEGV or FRACK.
1243 * RAII will clean up this bump.
1244 */
1246 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1247 (int)ao2_ref(session, 0));
1248
1252 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1253 return NULL;
1254 }
1255
1256 /*
1257 * We need to make sure the session connected and is processing
1258 * requests before we return but we don't want to block forever
1259 * in case the thread never starts or gets cancelled so we have
1260 * a fail-safe timeout.
1261 */
1262 tv_start = ast_tvnow();
1263 while (session->thread > 0 && !session->connected) {
1264 struct timeval tv_now = ast_tvnow();
1265 if (ast_tvdiff_ms(tv_now, tv_start) > PER_CALL_FAIL_SAFE_TIMEOUT(owc)) {
1266 break;
1267 }
1268 /* Sleep for 500ms before checking again. */
1269 usleep(500 * 1000);
1270 }
1271
1272 if (session->thread <= 0 || !session->connected) {
1273 ast_log(LOG_WARNING, "%s: Failed to create per call websocket thread\n",
1274 session_id);
1275 return NULL;
1276 }
1277
1278 ast_debug(3, "%s: Created per call websocket for app '%s'\n",
1279 session_id, app_id);
1280
1281 /*
1282 * We now need to prevent RAII from freeing the app_id.
1283 */
1284 app_id_rtn = app_id;
1285 app_id = NULL;
1286 return app_id_rtn;
1287}
static struct ari_ws_session * session_create(struct ast_tcptls_session_instance *ser, const char *apps, int subscribe_all, const char *session_id, struct ari_conf_outbound_websocket *ows, enum ast_websocket_type ws_type)
static void * outbound_session_handler_thread(void *obj)
static void session_cleanup(void *obj)
#define PER_CALL_FAIL_SAFE_TIMEOUT(owc)
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
const char * ast_channel_name(const struct ast_channel *chan)
const char * ast_channel_uniqueid(const struct ast_channel *chan)
@ AST_WS_TYPE_CLIENT_PER_CALL_CONFIG
ari_conf_owc_fields
Definition internal.h:99
struct ari_conf_outbound_websocket * ari_conf_get_owc_for_app(const char *app_name, unsigned int ws_type)
Get the outbound websocket configuration for a Stasis app.
enum ari_conf_owc_fields ari_conf_owc_get_invalid_fields(const char *id)
const char * ast_sorcery_object_get_id(const void *object)
Get the unique identifier of a sorcery object.
Definition sorcery.c:2381
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ao2_bump, ao2_cleanup, ao2_ref, app_name(), ari_conf_get_owc_for_app(), ari_conf_owc_get_invalid_fields(), ast_asprintf, ast_channel_name(), ast_channel_uniqueid(), ast_debug, ast_free, ast_log, ast_pthread_create_detached_background, ast_sorcery_object_get_id(), ast_strdup, ast_tvdiff_ms(), ast_tvnow(), AST_WS_TYPE_CLIENT_PER_CALL, AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, LOG_WARNING, NULL, outbound_session_handler_thread(), PER_CALL_FAIL_SAFE_TIMEOUT, RAII_VAR, session, session_cleanup(), session_create(), and session_unref().

Referenced by app_exec().

◆ null_validator()

static int null_validator ( struct ast_json json)
static

Validator that always succeeds.

Definition at line 66 of file ari_websockets.c.

67 {
68 return 1;
69 }

◆ outbound_session_apply_config()

static enum session_apply_result outbound_session_apply_config ( struct ari_ws_session session,
struct ari_conf_outbound_websocket new_owc 
)
static

Definition at line 1073 of file ari_websockets.c.

1076{
1077 enum session_apply_result apply_result;
1078 enum ari_conf_owc_fields what_changed;
1079 const char *new_owc_id = ast_sorcery_object_get_id(new_owc);
1080
1081 what_changed = ari_conf_owc_detect_changes(session->owc, new_owc);
1082
1083 if (what_changed == ARI_OWC_FIELD_NONE) {
1084 ast_debug(2, "%s: No changes detected\n", new_owc_id);
1086 }
1087 ast_debug(2, "%s: Config change detected. Checking details\n", new_owc_id);
1088
1089 if (what_changed & ARI_OWC_NEEDS_REREGISTER) {
1090 ast_debug(2, "%s: Re-registering apps\n", new_owc_id);
1091
1092 if (!(what_changed & ARI_OWC_FIELD_SUBSCRIBE_ALL)) {
1093 /*
1094 * If subscribe_all didn't change, we don't have to
1095 * unregister apps that are already registered and
1096 * also in the new config. We'll remove them from
1097 * the session->websocket_apps container so that
1098 * session_unregister_apps will only clean up
1099 * the ones that are going away. session_register_apps
1100 * will add them back in again and cause ApplicationReplaced
1101 * messages to be sent.
1102 *
1103 * If subscribe_all did change, we have no choice but to
1104 * unregister all apps and register all the ones in
1105 * the new config even if they already existed.
1106 */
1107 int i = 0;
1108 char *app;
1109
1110 while(i < (int) AST_VECTOR_SIZE(&session->websocket_apps)) {
1111 app = AST_VECTOR_GET(&session->websocket_apps, i);
1112 if (ast_in_delimited_string(app, new_owc->apps, ',')) {
1113 AST_VECTOR_REMOVE_ORDERED(&session->websocket_apps, i);
1114 ast_debug(3, "%s: Unlinked app '%s' to keep it from being unregistered\n",
1115 new_owc_id, app);
1116 ast_free(app);
1117 } else {
1118 i++;
1119 }
1120 }
1121 }
1122
1124
1125 /*
1126 * Register the new apps. This will also replace any
1127 * existing apps that are in the new config sending
1128 * ApplicationRegistered or ApplicationReplaced events
1129 * as necessary.
1130 */
1131 if (session_register_apps(session, new_owc->apps,
1132 new_owc->subscribe_all) < 0) {
1133 ast_log(LOG_WARNING, "%s: Failed to register apps '%s'\n",
1134 new_owc_id, new_owc->apps);
1135 /* Roll back. */
1137 /* Re-register the original apps. */
1138 if (session_register_apps(session, session->owc->apps,
1139 session->owc->subscribe_all) < 0) {
1140 ast_log(LOG_WARNING, "%s: Failed to re-register apps '%s'\n",
1141 new_owc_id, session->owc->apps);
1142 }
1143 return SESSION_APPLY_FAILED;
1144 }
1145 }
1146 /*
1147 * We need to update the session with the new config
1148 * but it has to be done after re-registering apps and
1149 * before we reconnect.
1150 */
1151 ao2_replace(session->owc, new_owc);
1152 session->type = new_owc->websocket_client->connection_type;
1153 session->subscribe_all = new_owc->subscribe_all;
1154
1155 apply_result = SESSION_APPLY_OK;
1156
1157 if (what_changed & ARI_OWC_NEEDS_RECONNECT) {
1158 ast_debug(2, "%s: Reconnect required\n", new_owc_id);
1159 apply_result = SESSION_APPLY_RECONNECT_REQUIRED;
1160 if (session->ast_ws_session) {
1161 ast_debug(2, "%s: Closing websocket\n", new_owc_id);
1162 ast_websocket_close(session->ast_ws_session, 1000);
1163 }
1164 }
1165
1166 return apply_result;
1167}
static const char app[]
session_apply_result
static void session_unregister_apps(struct ari_ws_session *session)
static int session_register_apps(struct ari_ws_session *session, const char *_apps, int subscribe_all)
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition astobj2.h:501
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
@ ARI_OWC_NEEDS_REREGISTER
Definition internal.h:109
@ ARI_OWC_FIELD_NONE
Definition internal.h:100
@ ARI_OWC_FIELD_SUBSCRIBE_ALL
Definition internal.h:105
@ ARI_OWC_NEEDS_RECONNECT
Definition internal.h:106
enum ari_conf_owc_fields ari_conf_owc_detect_changes(struct ari_conf_outbound_websocket *old_owc, struct ari_conf_outbound_websocket *new_owc)
Detect changes between two outbound websocket configurations.
int ast_in_delimited_string(const char *needle, const char *haystack, char delim)
Check if there is an exact match for 'needle' between delimiters in 'haystack'.
Definition strings.c:466
const ast_string_field apps
Definition internal.h:119
struct ast_websocket_client * websocket_client
Definition internal.h:122
enum ast_websocket_type connection_type
#define AST_VECTOR_REMOVE_ORDERED(vec, idx)
Remove an element from a vector by index while maintaining order.
Definition vector.h:459
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References ao2_replace, app, ari_conf_outbound_websocket::apps, ari_conf_owc_detect_changes(), ARI_OWC_FIELD_NONE, ARI_OWC_FIELD_SUBSCRIBE_ALL, ARI_OWC_NEEDS_RECONNECT, ARI_OWC_NEEDS_REREGISTER, ast_debug, ast_free, ast_in_delimited_string(), ast_log, ast_sorcery_object_get_id(), AST_VECTOR_GET, AST_VECTOR_REMOVE_ORDERED, AST_VECTOR_SIZE, ast_websocket_close(), ast_websocket_client::connection_type, LOG_WARNING, session, SESSION_APPLY_FAILED, SESSION_APPLY_NO_CHANGE, SESSION_APPLY_OK, SESSION_APPLY_RECONNECT_REQUIRED, session_register_apps(), session_unregister_apps(), ari_conf_outbound_websocket::subscribe_all, and ari_conf_outbound_websocket::websocket_client.

Referenced by outbound_session_create().

◆ outbound_session_create()

static int outbound_session_create ( void *  obj,
void *  args,
int  flags 
)
static

Definition at line 1397 of file ari_websockets.c.

1398{
1399 struct ari_conf_outbound_websocket *owc = obj;
1400 const char *owc_id = ast_sorcery_object_get_id(owc);
1401 struct ari_ws_session *session = NULL;
1402 enum session_apply_result apply_result;
1403 enum ari_conf_owc_fields invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1404
1406 if (session) {
1407 ast_debug(2, "%s: Found existing connection\n", owc_id);
1408 if (invalid_fields) {
1411 "%s: Unable to update websocket session. Outbound websocket config is invalid\n",
1412 owc_id);
1413 return 0;
1414 }
1415
1417 apply_result = outbound_session_apply_config(session, owc);
1420 if (apply_result == SESSION_APPLY_FAILED) {
1422 "%s: Failed to apply new configuration. Existing connection preserved.\n",
1423 owc_id);
1424 }
1425 return 0;
1426 }
1427
1428 if (invalid_fields) {
1430 "%s: Unable to create websocket session. Outbound websocket config is invalid\n",
1431 owc_id);
1432 return 0;
1433 }
1434
1437 if (!session) {
1438 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", owc_id);
1439 return 0;
1440 }
1441
1443 /* There's no thread to transfer the reference to */
1445 return 0;
1446 }
1447
1448 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1449 (int)ao2_ref(session, 0));
1450 /* We're transferring the session reference to the thread. */
1454 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1455 return 0;
1456 }
1457 ast_debug(2, "%s: launched thread\n", session->session_id);
1458
1459 return 0;
1460}
struct ari_ws_session * ari_websocket_get_session(const char *session_id)
static enum session_apply_result outbound_session_apply_config(struct ari_ws_session *session, struct ari_conf_outbound_websocket *new_owc)
struct ari_conf_outbound_websocket * owc

References ao2_lock, ao2_ref, ao2_unlock, ari_conf_outbound_websocket::apps, ari_conf_owc_get_invalid_fields(), ari_websocket_get_session(), ast_debug, ast_log, ast_pthread_create_detached_background, ast_sorcery_object_get_id(), AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, ast_websocket_client::connection_type, LOG_WARNING, NULL, outbound_session_apply_config(), outbound_session_handler_thread(), ari_ws_session::owc, session, SESSION_APPLY_FAILED, session_cleanup(), session_create(), session_unref(), ari_conf_outbound_websocket::subscribe_all, and ari_conf_outbound_websocket::websocket_client.

Referenced by ari_outbound_websocket_start(), and outbound_sessions_load().

◆ outbound_session_handler_thread()

static void * outbound_session_handler_thread ( void *  obj)
static

Definition at line 944 of file ari_websockets.c.

945{
946 struct ari_ws_session *session = obj;
947 int already_sent_registers = 1;
948
949 /*
950 * We use pthread_cleanup_push because RAII destructors don't run
951 * if we cancel the thread.
952 */
953 pthread_cleanup_push(session_cleanup, obj);
954
955 ast_debug(3, "%s: Starting outbound websocket thread RC: %d\n",
956 session->session_id, (int)ao2_ref(session, 0));
957 session->thread = pthread_self();
958 session->connected = 0;
959
960 while(1) {
962 RAII_VAR(struct ast_variable *, upgrade_headers, NULL, ast_variables_destroy);
964 struct ast_json *msg;
965
966 ast_debug(3, "%s: Attempting to connect to %s\n", session->session_id,
967 session->owc->websocket_client->uri);
968
969 astws = ast_websocket_client_connect(session->owc->websocket_client,
970 NULL, session->session_id, &result);
971 if (!astws || result != WS_OK) {
973 struct stasis_app_control *control =
975 if (control) {
976 ast_debug(3, "%s: Connection failed. Returning to dialplan.\n",
977 session->session_id);
980 ao2_cleanup(control);
981 } else {
982 ast_debug(3, "%s: Connection failed. No control object found.\n",
983 session->session_id);
984 }
985
986 break;
987 }
988
989 if (session->closing) {
990 ast_debug(3, "%s: Websocket closing RC: %d\n",
991 session->session_id, (int)ao2_ref(session, 0));
992 break;
993 }
994
995 usleep(session->owc->websocket_client->reconnect_interval * 1000);
996 continue;
997 }
998 ast_log(LOG_NOTICE, "%s: Outbound websocket connected to %s\n",
999 session->type == AST_WS_TYPE_CLIENT_PERSISTENT ? session->session_id : session->channel_name,
1000 session->owc->websocket_client->uri);
1001
1002 /*
1003 * We only want to send "ApplicationRegistered" events in the
1004 * case of a reconnect. The initial connection will have already sent
1005 * the events when outbound_register_apps() was called.
1006 */
1007 session_update(session, astws, !already_sent_registers);
1008 already_sent_registers = 0;
1009
1010 /*
1011 * This is the Authorization header that would normally be taken
1012 * from the incoming HTTP request that is being upgraded to a websocket.
1013 * Since this is an outbound websocket, we have to create it ourselves.
1014 *
1015 * This is NOT the same as the Authorization header that is used for
1016 * authentication with the remote websocket server.
1017 */
1018 upgrade_headers = ast_http_create_basic_auth_header(
1019 session->owc->local_ari_user, session->owc->local_ari_password);
1020 if (!upgrade_headers) {
1021 ast_log(LOG_WARNING, "%s: Failed to create upgrade header\n", session->session_id);
1022 session->thread = 0;
1023 session->connected = 0;
1024 ast_websocket_close(astws, 1000);
1025 session->ast_ws_session = NULL;
1026 break;
1027 }
1028
1029 session->connected = 1;
1030 ast_debug(3, "%s: Websocket connected\n", session->session_id);
1031 ast_debug(3, "%s: Waiting for messages RC: %d\n",
1032 session->session_id, (int)ao2_ref(session, 0));
1033
1034 /*
1035 * The websocket is connected. Now we need to wait for messages
1036 * from the server.
1037 */
1038 while ((msg = session_read(session))) {
1040 upgrade_headers, session->app_name, msg);
1041 ast_json_unref(msg);
1042 }
1043
1044 session->connected = 0;
1045 ast_websocket_unref(session->ast_ws_session);
1046 session->ast_ws_session = NULL;
1047 if (session->closing) {
1048 ast_debug(3, "%s: Websocket closing RC: %d\n",
1049 session->session_id, (int)ao2_ref(session, 0));
1050 break;
1051 }
1052
1053 ast_log(LOG_WARNING, "%s: Websocket disconnected. Reconnecting\n",
1054 session->session_id);
1055 }
1056
1057 ast_debug(3, "%s: Stopping outbound websocket thread RC: %d\n",
1058 session->session_id, (int)ao2_ref(session, 0));
1059 session->thread = 0;
1060
1061 pthread_cleanup_pop(1);
1062
1063 return NULL;
1064}
int ari_websocket_process_request(struct ari_ws_session *ari_ws_session, const char *remote_addr, struct ast_variable *upgrade_headers, const char *app_name, struct ast_json *request_msg)
static struct ast_json * session_read(struct ari_ws_session *session)
static int session_update(struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session, int send_registered_events)
static PGresult * result
Definition cel_pgsql.c:84
struct ast_variable * ast_http_create_basic_auth_header(const char *userid, const char *password)
Create an HTTP authorization header.
Definition http.c:1726
ast_websocket_result
Result code for a websocket client.
@ WS_OK
@ AST_WS_TYPE_CLIENT_PERSISTENT
void AST_OPTIONAL_API_NAME() ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition extconf.c:1260
#define LOG_NOTICE
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
void stasis_app_control_mark_failed(struct stasis_app_control *control)
Set the failed flag on a control structure.
Definition control.c:377
int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
Exit res_stasis and continue execution in the dialplan.
Definition control.c:415
struct stasis_app_control * stasis_app_control_find_by_channel_id(const char *channel_id)
Returns the handler for the channel with the given id.
Definition res_stasis.c:349
Structure for variables, used for configurations and for channel variables.
Structure definition for session.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.

References ao2_cleanup, ao2_ref, ari_websocket_process_request(), ast_debug, ast_http_create_basic_auth_header(), ast_json_unref(), ast_log, ast_variables_destroy(), ast_websocket_client_connect(), ast_websocket_close(), ast_websocket_unref(), AST_WS_TYPE_CLIENT_PER_CALL, AST_WS_TYPE_CLIENT_PERSISTENT, LOG_NOTICE, LOG_WARNING, NULL, RAII_VAR, result, session, session_cleanup(), session_read(), session_update(), stasis_app_control_continue(), stasis_app_control_find_by_channel_id(), stasis_app_control_mark_failed(), and WS_OK.

Referenced by ast_ari_create_per_call_websocket(), and outbound_session_create().

◆ outbound_session_pc_close_thread()

static void * outbound_session_pc_close_thread ( void *  data)
static

Definition at line 1298 of file ari_websockets.c.

1299{
1300 /*
1301 * We're using RAII because we want to show a debug message
1302 * after we run ast_websocket_close().
1303 */
1304 RAII_VAR(struct ari_ws_session *, session, data, session_unref);
1305
1306 /*
1307 * We're going to wait 3 seconds to allow stasis to send additional
1308 * events like ChannelVarset and ChannelDestroyed after the StasisEnd.
1309 */
1310 ast_debug(3, "%s: Waiting for %dms before closing websocket RC: %d\n",
1311 session->session_id, (int)(STASIS_END_POST_WAIT_US / 1000),
1312 (int)ao2_ref(session, 0));
1314 session->closing = 1;
1315 if (session->ast_ws_session) {
1316 ast_websocket_close(session->ast_ws_session, 1000);
1317 }
1318 ast_debug(3, "%s: Websocket closed RC: %d\n", session->session_id,
1319 (int)ao2_ref(session, 0));
1320 return NULL;
1321}
#define STASIS_END_POST_WAIT_US

References ao2_ref, ast_debug, ast_websocket_close(), NULL, RAII_VAR, session, session_unref(), and STASIS_END_POST_WAIT_US.

Referenced by ast_ari_close_per_call_websocket().

◆ outbound_sessions_load()

static void outbound_sessions_load ( const char *  name)
static

Definition at line 1462 of file ari_websockets.c.

1463{
1465 struct ao2_iterator i;
1466 struct ari_ws_session *session;
1467
1468 ast_debug(2, "Reloading ARI websockets\n");
1469
1471
1473 while ((session = ao2_iterator_next(&i))) {
1474 int cleanup = 1;
1475 if (session->owc
1476 && (session->type &
1478 struct ari_conf_outbound_websocket *ows =
1479 ari_conf_get_owc(session->session_id);
1480 if (!ows) {
1481 ast_debug(3, "Cleaning up outbound websocket %s\n",
1482 session->session_id);
1483 session->closing = 1;
1485 if (session->ast_ws_session) {
1486 ast_debug(3, "%s: Closing websocket\n", session->session_id);
1487 ast_websocket_close(session->ast_ws_session, 1000);
1488 } else if (session->thread) {
1489 ast_debug(3, "%s: Cancelling handler thread\n", session->session_id);
1490 pthread_cancel(session->thread);
1491 }
1492
1494 /*
1495 * If persistent, session_cleanup will cleanup
1496 * this reference so we don't want to double clean it up.
1497 * session_cleanup doesn't cleanup the reference
1498 * for per-call configs so we need to do that ourselves.
1499 */
1500 cleanup = 0;
1501 }
1502 }
1503 ao2_cleanup(ows);
1504 }
1505 /* We don't want to double cleanup if its been closed. */
1506 if (cleanup) {
1508 }
1509 }
1511
1512 return;
1513}
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct ao2_container * ari_conf_get_owcs(void)
struct ari_conf_outbound_websocket * ari_conf_get_owc(const char *id)
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition res_stasis.c:327
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821

References ao2_callback, ao2_cleanup, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ari_conf_get_owc(), ari_conf_get_owcs(), ast_debug, ast_websocket_close(), AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, AST_WS_TYPE_CLIENT_PERSISTENT, cleanup(), NULL, OBJ_NODATA, outbound_session_create(), RAII_VAR, session, session_cleanup(), and session_registry.

◆ session_cleanup()

static void session_cleanup ( void *  obj)
static

Definition at line 556 of file ari_websockets.c.

557{
558 struct ari_ws_session *session = obj;
559 enum ast_websocket_type wstype;
560
561 if (!session) {
562 return;
563 }
564
565 /*
566 * We need to save the websocket type because client-per-call websockets
567 * will get destroyed when they're unlinked from the session_registry
568 * so we won't be able to test it afterwards.
569 */
570 wstype = session->type;
571
572 ast_debug(3, "%s: Cleaning up ARI %s websocket session. Refcount: %d\n",
573 session->session_id, ast_websocket_type_to_str(wstype), (int)ao2_ref(session, 0));
574
576
577 if (session_registry) {
578 ast_debug(3, "%s: Unlinking %s websocket session from registry Refcount: %d\n",
579 session->session_id, ast_websocket_type_to_str(wstype), (int)ao2_ref(session, 0));
581 }
582
583 /*
584 * If this is a per-call config then its only reference
585 * was held by the registry container so we don't need
586 * to unref it here.
587 */
590 }
591}
static void session_reset(struct ari_ws_session *session)
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
const char * ast_websocket_type_to_str(enum ast_websocket_type type)
ast_websocket_type
WebSocket connection/configuration types.

References ao2_ref, ao2_unlink, ast_debug, ast_websocket_type_to_str(), AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, session, session_registry, session_reset(), and session_unref().

Referenced by ast_ari_create_per_call_websocket(), outbound_session_create(), outbound_session_handler_thread(), outbound_sessions_load(), session_shutdown_cb(), and websocket_established_cb().

◆ session_create()

static struct ari_ws_session * session_create ( struct ast_tcptls_session_instance ser,
const char *  apps,
int  subscribe_all,
const char *  session_id,
struct ari_conf_outbound_websocket ows,
enum ast_websocket_type  ws_type 
)
static

Definition at line 628 of file ari_websockets.c.

635{
637 size_t size;
638
639 ast_debug(3, "%s: Creating ARI websocket session for apps '%s'\n",
640 session_id, apps);
641
642 size = sizeof(*session) + strlen(session_id) + 1;
643
645 if (!session) {
646 return NULL;
647 }
648
649 session->type = ws_type;
650 session->subscribe_all = subscribe_all;
651
652 strcpy(session->session_id, session_id); /* Safe */
653
654 /* Instantiate the hash table for Stasis apps */
655 if (AST_VECTOR_INIT(&session->websocket_apps, APPS_INIT_SIZE)) {
656 handle_create_error(ser, 500, "Internal Server Error",
657 "Allocation failed");
658 return NULL;
659 }
660
661 /* Instantiate the message queue */
662 if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
663 handle_create_error(ser, 500, "Internal Server Error",
664 "Allocation failed");
665 AST_VECTOR_FREE(&session->websocket_apps);
666 return NULL;
667 }
668
669 session->validator = ari_validate_message_fn;
670
671 if (ows) {
672 session->owc = ao2_bump(ows);
673 }
674
676 handle_create_error(ser, 500, "Internal Server Error",
677 "Stasis app registration failed");
679 return NULL;
680 }
681
683 handle_create_error(ser, 500, "Internal Server Error",
684 "Allocation failed");
686 return NULL;
687 }
688
689 return ao2_bump(session);
690}
static void session_dtor(void *obj)
#define MESSAGES_INIT_SIZE
#define APPS_INIT_SIZE
ari_validator ari_validate_message_fn
#define handle_create_error(ser, code, msg, reason)
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
static int subscribe_all(void)
Definition manager.c:9641
Registered applications container.
Definition pbx_app.c:69
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124

References ao2_alloc, ao2_bump, ao2_cleanup, ao2_link, APPS_INIT_SIZE, ari_validate_message_fn, ast_debug, AST_VECTOR_FREE, AST_VECTOR_INIT, handle_create_error, MESSAGES_INIT_SIZE, NULL, RAII_VAR, session, session_dtor(), ari_ws_session::session_id, session_register_apps(), session_registry, session_reset(), and subscribe_all().

Referenced by ast_ari_create_per_call_websocket(), outbound_session_create(), and websocket_attempted_cb().

◆ session_dtor()

static void session_dtor ( void *  obj)
static

Definition at line 599 of file ari_websockets.c.

600{
601 struct ari_ws_session *session = obj;
602
603 ast_debug(3, "%s: Destroying ARI websocket session\n",
604 session->session_id);
605
606 ast_free(session->app_name);
607 ast_free(session->remote_addr);
608 ast_free(session->channel_id);
609 ast_free(session->channel_name);
610 ao2_cleanup(session->owc);
611 session->owc = NULL;
612 if (!session->ast_ws_session) {
613 return;
614 }
615 ast_websocket_unref(session->ast_ws_session);
616 session->ast_ws_session = NULL;
617}

References ao2_cleanup, ast_debug, ast_free, ast_websocket_unref(), NULL, and session.

Referenced by session_create().

◆ session_find_by_app()

static struct ari_ws_session * session_find_by_app ( const char *  app_name,
unsigned int  ws_type 
)
static

Definition at line 907 of file ari_websockets.c.

909{
910 struct ari_ws_session *session = NULL;
911 struct ao2_iterator i;
912
914 return NULL;
915 }
916
918 while ((session = ao2_iterator_next(&i))) {
919 char *app = NULL;
920 if (!(session->type & ws_type)) {
922 continue;
923 }
924
925 app = AST_VECTOR_GET_CMP(&session->websocket_apps,
927 if (app) {
928 break;
929 }
931 }
933 return session;
934}
#define AST_VECTOR_GET_CMP(vec, value, cmp)
Get an element from a vector that matches the given comparison.
Definition vector.h:742

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, app, app_name(), ast_strings_equal(), ast_strlen_zero(), AST_VECTOR_GET_CMP, NULL, session, session_registry, and session_unref().

Referenced by ast_ari_close_per_call_websocket().

◆ session_read()

static struct ast_json * session_read ( struct ari_ws_session session)
static

Definition at line 197 of file ari_websockets.c.

198{
200
201 if (!session || !session->ast_ws_session) {
202 return NULL;
203 }
204 if (ast_websocket_fd(session->ast_ws_session) < 0) {
205 return NULL;
206 }
207
208 while (!message) {
209 int res;
210 char *payload;
211 uint64_t payload_len;
212 enum ast_websocket_opcode opcode;
213 int fragmented;
214
215 res = ast_wait_for_input(
216 ast_websocket_fd(session->ast_ws_session), -1);
217
218 if (res <= 0) {
219 ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
220 strerror(errno));
221 return NULL;
222 }
223
224 res = ast_websocket_read(session->ast_ws_session, &payload,
225 &payload_len, &opcode, &fragmented);
226
227 if (res != 0) {
228 ast_log(LOG_WARNING, "WebSocket read error: %s\n",
229 strerror(errno));
230 return NULL;
231 }
232
233 switch (opcode) {
235 ast_debug(1, "WebSocket closed\n");
236 return NULL;
238 message = ast_json_load_buf(payload, payload_len, NULL);
239 if (message == NULL) {
240 struct ast_json *error = ast_json_pack(
241 "{s:s, s:s, s:s, s:i, s:s, s:s }",
242 "type", "RESTResponse",
243 "transaction_id", "",
244 "request_id", "",
245 "status_code", 400,
246 "reason_phrase", "Failed to parse request message JSON",
247 "uri", ""
248 );
250 error, 0);
251 ast_json_unref(error);
253 "WebSocket input failed to parse\n");
254
255 }
256
257 break;
258 default:
259 /* Ignore all other message types */
260 break;
261 }
262 }
263
264 return ast_json_ref(message);
265}
void ari_websocket_send_event(struct ari_ws_session *session, const char *app_name, struct ast_json *message, int debug_app)
Callback handler for Stasis application messages.
int AST_OPTIONAL_API_NAME() ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
int AST_OPTIONAL_API_NAME() ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition json.c:67
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition json.c:585
int errno
int error(const char *format,...)
int ast_wait_for_input(int fd, int ms)
Definition utils.c:1732

References ari_websocket_send_event(), ast_debug, ast_json_load_buf(), ast_json_pack(), ast_json_ref(), ast_json_unref(), ast_log, ast_wait_for_input(), ast_websocket_fd(), AST_WEBSOCKET_OPCODE_CLOSE, AST_WEBSOCKET_OPCODE_TEXT, ast_websocket_read(), errno, error(), LOG_WARNING, NULL, RAII_VAR, and session.

Referenced by outbound_session_handler_thread(), and websocket_established_cb().

◆ session_register_apps()

static int session_register_apps ( struct ari_ws_session session,
const char *  _apps,
int  subscribe_all 
)
static

Definition at line 413 of file ari_websockets.c.

415{
416 char *apps = ast_strdupa(_apps);
417 char *app_name;
418 int app_counter = 0;
419
420 ast_debug(3, "%s: Registering apps '%s'. Subscribe all: %s\n",
421 session->session_id, apps, subscribe_all ? "yes" : "no");
422
423 while ((app_name = ast_strsep(&apps, ',', AST_STRSEP_STRIP))) {
424
426 ast_log(LOG_WARNING, "%s: Invalid application name\n", session->session_id);
427 return -1;
428 }
429
430 if (strlen(app_name) > ARI_MAX_APP_NAME_LEN) {
431 ast_log(LOG_WARNING, "%s: Websocket app '%s' > %d characters\n",
432 session->session_id, app_name, (int)ARI_MAX_APP_NAME_LEN);
433 return -1;
434 }
435
437 /*
438 * Outbound per-call configs only create a dialplan context.
439 * If they registered stasis apps there'd be no way for the
440 * Stasis dialplan app to know that it needs to start a
441 * per-call websocket connection.
442 */
444
449 ast_log(LOG_WARNING, "%s: Could not create context '%s'\n",
450 session->session_id, context_name);
451 return -1;
452 } else {
454 "Stasis", ast_strdup(app_name), ast_free_ptr,
458 }
459 } else {
460 ast_debug(3, "%s: Context '%s' already exists\n", session->session_id,
462 }
463 } else {
464 int already_registered = stasis_app_is_registered(app_name);
465 int res = 0;
466
467 if (subscribe_all) {
469 session);
470 } else {
472 session);
473 }
474
475 if (res != 0) {
476 return -1;
477 }
478
479 /*
480 * If there was an existing app by the same name, the register handler
481 * will have sent an ApplicationReplaced event. If it's a new app, we
482 * send an ApplicationRegistered event.
483 *
484 * Except... There's no websocket to send it on for outbound per-call
485 * configs and inbound websockets don't need them because they aready
486 * know what apps they've registered for.
487 */
488 if (!already_registered
490 session_send_app_event(session, "ApplicationRegistered",
491 app_name);
492 }
493 }
494
495 if (AST_VECTOR_ADD_SORTED(&session->websocket_apps, ast_strdup(app_name), strcmp)) {
496 ast_log(LOG_WARNING, "%s: Unable to add app '%s' to apps container\n",
497 session->session_id, app_name);
498 return -1;
499 }
500
501 app_counter++;
502 if (app_counter == 1) {
503 ast_free(session->app_name);
504 session->app_name = ast_strdup(app_name);
505 if (!session->app_name) {
506 ast_log(LOG_WARNING, "%s: Unable to duplicate app name\n",
507 session->session_id);
508 return -1;
509 }
510 }
511 }
512
513 return 0;
514}
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
static void session_send_app_event(struct ari_ws_session *session, const char *event_type, const char *app_name)
#define ARI_CONTEXT_REGISTRAR
#define STASIS_CONTEXT_PREFIX
#define ARI_MAX_APP_NAME_LEN
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define AST_MAX_CONTEXT
Definition channel.h:135
static const char context_name[]
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition pbx.c:6968
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition extconf.c:4170
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition pbx.c:6185
int stasis_app_is_registered(const char *name)
Check if a Stasis application is registered.
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
@ AST_STRSEP_STRIP
Definition strings.h:255
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition utils.c:1869
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition vector.h:382

References app_name(), ARI_CONTEXT_REGISTRAR, ARI_MAX_APP_NAME_LEN, ast_add_extension(), ast_context_find(), ast_context_find_or_create(), ast_debug, ast_free, ast_free_ptr(), ast_log, AST_MAX_CONTEXT, ast_strdup, ast_strdupa, ast_strlen_zero(), ast_strsep(), AST_STRSEP_STRIP, AST_VECTOR_ADD_SORTED, AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, AST_WS_TYPE_INBOUND, context_name, LOG_WARNING, NULL, session, session_send_app_event(), stasis_app_is_registered(), stasis_app_message_handler(), stasis_app_register(), stasis_app_register_all(), STASIS_CONTEXT_PREFIX, and subscribe_all().

Referenced by outbound_session_apply_config(), and session_create().

◆ session_registry_dtor()

static void session_registry_dtor ( void  )
static

◆ session_reset()

static void session_reset ( struct ari_ws_session session)
static

Definition at line 532 of file ari_websockets.c.

533{
535
536 ast_debug(3, "%s: Resetting ARI websocket session\n",
537 session->session_id);
538
539 /* Clean up the websocket_apps container */
540 if (AST_VECTOR_SIZE(&session->websocket_apps) > 0) {
542 }
543 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
544 AST_VECTOR_FREE(&session->websocket_apps);
545
546 AST_VECTOR_RESET(&session->message_queue, ast_json_unref);
547 AST_VECTOR_FREE(&session->message_queue);
548}
ast_mutex_t lock
Definition app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition vector.h:636

References ast_debug, ast_free_ptr(), ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_RESET, AST_VECTOR_SIZE, lock, SCOPED_AO2LOCK, session, and session_unregister_apps().

Referenced by session_cleanup(), and session_create().

◆ session_send_app_event()

static void session_send_app_event ( struct ari_ws_session session,
const char *  event_type,
const char *  app_name 
)
static

Definition at line 172 of file ari_websockets.c.

174{
175 char eid[20];
176 int debug_app = stasis_app_get_debug_by_name(app_name);
177 struct ast_json *msg = ast_json_pack("{s:s, s:o?, s:s, s:s }",
178 "type", event_type,
179 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
180 "application", app_name,
181 "asterisk_id", ast_eid_to_str(eid, sizeof(eid), &ast_eid_default));
182
183 if (!msg) {
184 return;
185 }
186 ast_debug(3, "%s: Sending '%s' event to app '%s'\n", session->session_id,
187 event_type, app_name);
188 /*
189 * We don't want to use ari_websocket_send_event() here because
190 * the app may be unregistered which will cause stasis_app_event_allowed
191 * to return false.
192 */
193 session_send_or_queue(session, msg, event_type, app_name, debug_app);
194 ast_json_unref(msg);
195}
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition utils.c:2873
struct ast_eid ast_eid_default
Global EID.
Definition options.c:94

References app_name(), ast_debug, ast_eid_default, ast_eid_to_str(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_tvnow(), NULL, session, session_send_or_queue(), and stasis_app_get_debug_by_name().

Referenced by session_register_apps(), session_unregister_app_cb(), and session_update().

◆ session_send_or_queue()

static void session_send_or_queue ( struct ari_ws_session session,
struct ast_json message,
const char *  msg_type,
const char *  app_name,
int  debug_app 
)
static

Definition at line 111 of file ari_websockets.c.

114{
115 const char *msg_timestamp, *msg_ast_id;
116
122 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
123 session->remote_addr, msg_type, app_name);
124 return;
125 }
126 }
127
128 msg_ast_id = S_OR(
129 ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
130 if (ast_strlen_zero(msg_ast_id)) {
131 char eid[20];
132
133 if (ast_json_object_set(message, "asterisk_id",
137 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
138 session->remote_addr, msg_type, app_name);
139 }
140 }
141
142 if (!session->ast_ws_session) {
143 /* If the websocket is NULL, the message goes to the queue */
144 if (AST_VECTOR_APPEND(&session->message_queue, message) == 0) {
146 }
147 /*
148 * If the msg_type one of the Application* types, the websocket
149 * might not be there yet so don't log.
150 */
151 if (!ast_begins_with(msg_type, "Application")) {
153 "%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
154 session->remote_addr,
155 msg_type,
156 app_name);
157 }
158 } else {
159
160 if (DEBUG_ATLEAST(4) || debug_app) {
162
163 ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
164 session->remote_addr,
165 str);
167 }
169 }
170}
const char * str
Definition app_jack.c:150
static int session_write(struct ari_ws_session *session, struct ast_json *message)
#define DEBUG_ATLEAST(level)
#define LOG_ERROR
#define ast_verbose(...)
void ast_json_free(void *p)
Asterisk's custom JSON allocator. Exposed for use by unit tests.
Definition json.c:52
@ AST_JSON_PRETTY
Definition json.h:795
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition json.c:484
static struct timeval msg_timestamp(void *msg, enum smdi_message_type type)
Definition res_smdi.c:366
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition strings.h:97
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References ao2_unlock, app_name(), ast_begins_with(), ast_eid_default, ast_eid_to_str(), ast_json_dump_string_format(), ast_json_free(), ast_json_object_get(), ast_json_object_set(), AST_JSON_PRETTY, ast_json_ref(), ast_json_string_create(), ast_json_string_get(), ast_json_timeval(), ast_log, ast_strlen_zero(), ast_tvnow(), AST_VECTOR_APPEND, ast_verbose, DEBUG_ATLEAST, LOG_ERROR, LOG_WARNING, msg_timestamp(), NULL, S_OR, session, session_write(), and str.

Referenced by ari_websocket_send_event(), and session_send_app_event().

◆ session_shutdown_cb()

static int session_shutdown_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 861 of file ari_websockets.c.

862{
863 struct ari_ws_session *session = obj;
864
865 /* Per-call configs have no actual websocket */
867 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session\n",
868 session->session_id,
871 return 0;
872 }
873 if (session->type == AST_WS_TYPE_INBOUND) {
874 ast_log(LOG_NOTICE, "%s: Shutting down inbound ARI websocket session from %s\n",
875 session->session_id, session->remote_addr);
876 } else {
877 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session to %s\n",
878 session->session_id,
880 session->remote_addr);
881 }
882
883 /*
884 * We need to ensure the session is kept around after the cleanup
885 * so we can close the websocket.
886 */
888 session->closing = 1;
890 if (session->ast_ws_session) {
891 ast_debug(3, "%s: Closing websocket\n", session->session_id);
892 ast_websocket_close(session->ast_ws_session, 1000);
893 } else if (session->thread) {
894 ast_debug(3, "%s: Cancelling handler thread\n", session->session_id);
895 pthread_cancel(session->thread);
896 }
897
898 return 0;
899}
const char * ari_websocket_type_to_str(enum ast_websocket_type type)

References ao2_bump, ari_websocket_type_to_str(), ast_debug, ast_log, ast_websocket_close(), AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, AST_WS_TYPE_INBOUND, LOG_NOTICE, session, and session_cleanup().

Referenced by ari_websocket_shutdown(), ari_websocket_shutdown_all(), and session_registry_dtor().

◆ session_unref()

static void session_unref ( struct ari_ws_session session)
static

Definition at line 361 of file ari_websockets.c.

362{
363 if (!session) {
364 return;
365 }
366 ast_debug(4, "%s: Unreffing ARI websocket session\n", session->session_id);
367 ao2_ref(session, -1);
368}

References ao2_ref, ast_debug, and session.

Referenced by ast_ari_close_per_call_websocket(), ast_ari_create_per_call_websocket(), outbound_session_create(), outbound_session_pc_close_thread(), session_cleanup(), session_find_by_app(), and websocket_attempted_cb().

◆ session_unregister_app_cb()

static void session_unregister_app_cb ( char *  app_name,
struct ari_ws_session session 
)
static

Definition at line 370 of file ari_websockets.c.

371{
372 ast_debug(3, "%s: Trying to unregister app '%s'\n",
373 session->session_id, app_name);
377 ast_debug(3, "%s: Unregistering context '%s' for app '%s'\n",
378 session->session_id, context_name, app_name);
380 } else {
381 ast_debug(3, "%s: Unregistering stasis app '%s' and unsubscribing from all events.\n",
382 session->session_id, app_name);
384 }
385
386 /*
387 * We don't send ApplicationUnregistered events for outbound per-call
388 * configs because there's no websocket to send them via or to
389 * inbound websockets because the websocket is probably closed already.
390 */
391 if (!(session->type
393 session_send_app_event(session, "ApplicationUnregistered", app_name);
394 }
395}
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition pbx.c:8249
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application and unsubscribe from all event sources.

References app_name(), ARI_CONTEXT_REGISTRAR, ast_context_destroy_by_name(), ast_debug, AST_MAX_CONTEXT, AST_WS_TYPE_CLIENT_PER_CALL_CONFIG, AST_WS_TYPE_INBOUND, context_name, session, session_send_app_event(), stasis_app_unregister(), and STASIS_CONTEXT_PREFIX.

Referenced by session_unregister_apps().

◆ session_unregister_apps()

static void session_unregister_apps ( struct ari_ws_session session)
static

Definition at line 397 of file ari_websockets.c.

398{
399 int app_count = (int)AST_VECTOR_SIZE(&session->websocket_apps);
400
401 if (app_count == 0) {
402 return;
403 }
404 ast_debug(3, "%s: Unregistering stasis apps.\n", session->session_id);
405
407 session);
408 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
409
410 return;
411}
static void session_unregister_app_cb(char *app_name, struct ari_ws_session *session)
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873

References ast_debug, ast_free_ptr(), AST_VECTOR_CALLBACK_VOID, AST_VECTOR_RESET, AST_VECTOR_SIZE, session, and session_unregister_app_cb().

Referenced by outbound_session_apply_config(), and session_reset().

◆ session_update()

static int session_update ( struct ari_ws_session ari_ws_session,
struct ast_websocket ast_ws_session,
int  send_registered_events 
)
static

Definition at line 703 of file ari_websockets.c.

705{
707 int i;
708
709 if (ast_ws_session == NULL) {
710 return -1;
711 }
712
713 if (!general) {
714 return -1;
715 }
716
718 ast_websocket_remote_address(ast_ws_session)));
720 ast_log(LOG_ERROR, "Failed to copy remote address\n");
721 return -1;
722 }
723
724 if (ast_websocket_set_nonblock(ast_ws_session) != 0) {
726 "ARI web socket failed to set nonblock; closing: %s\n",
727 strerror(errno));
728 return -1;
729 }
730
731 if (ast_websocket_set_timeout(ast_ws_session, general->write_timeout)) {
732 ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
733 general->write_timeout);
734 }
735
736 ao2_ref(ast_ws_session, +1);
737 ari_ws_session->ast_ws_session = ast_ws_session;
739 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
742 ast_json_unref(msg);
743 }
744
747
748 if (send_registered_events) {
749 int i;
750 char *app;
751
752 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->websocket_apps); i++) {
755 "ApplicationRegistered", app);
756 }
757 }
758
759 return 0;
760}
int AST_OPTIONAL_API_NAME() ast_websocket_set_nonblock(struct ast_websocket *session)
Set the socket of a WebSocket session to be non-blocking.
int AST_OPTIONAL_API_NAME() ast_websocket_set_timeout(struct ast_websocket *session, int timeout)
Set the timeout on a non-blocking WebSocket session.
struct ari_conf_general * ari_conf_get_general(void)
Global configuration options for ARI.
Definition internal.h:58
struct ari_ws_session::@453 message_queue
struct ast_vector_string websocket_apps
struct ast_websocket * ast_ws_session
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition vector.h:582

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlock, app, ari_conf_get_general(), ast_json_unref(), ast_log, ast_sockaddr_stringify(), ast_strdup, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_RESET, AST_VECTOR_SIZE, ast_websocket_remote_address(), ast_websocket_set_nonblock(), ast_websocket_set_timeout(), ari_ws_session::ast_ws_session, errno, LOG_ERROR, LOG_WARNING, ari_ws_session::message_queue, NULL, RAII_VAR, ari_ws_session::remote_addr, session_send_app_event(), session_write(), and ari_ws_session::websocket_apps.

Referenced by outbound_session_handler_thread(), and websocket_established_cb().

◆ session_write()

static int session_write ( struct ari_ws_session session,
struct ast_json message 
)
static

Definition at line 81 of file ari_websockets.c.

82{
83 RAII_VAR(char *, str, NULL, ast_json_free);
84
85 if (!session || !session->ast_ws_session || !message) {
86 return -1;
87 }
88
89#ifdef AST_DEVMODE
90 if (!session->validator(message)) {
91 ast_log(LOG_ERROR, "Outgoing message failed validation\n");
93 }
94#endif
95
97
98 if (str == NULL) {
99 ast_log(LOG_ERROR, "Failed to encode JSON object\n");
100 return -1;
101 }
102
103 if (ast_websocket_write_string(session->ast_ws_session, str)) {
104 ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
106 return -1;
107 }
108 return 0;
109}
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition res_ari.c:913
#define VALIDATION_FAILED
int AST_OPTIONAL_API_NAME() ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.

References ast_ari_json_format(), ast_json_dump_string_format(), ast_json_free(), ast_log, ast_sockaddr_stringify(), ast_websocket_remote_address(), ast_websocket_write_string(), LOG_ERROR, LOG_NOTICE, NULL, RAII_VAR, session, str, and VALIDATION_FAILED.

Referenced by session_send_or_queue(), and session_update().

◆ stasis_app_message_handler()

static void stasis_app_message_handler ( void *  data,
const char *  app_name,
struct ast_json message 
)
static

Definition at line 346 of file ari_websockets.c.

348{
349 int debug_app = stasis_app_get_debug_by_name(app_name);
350 struct ari_ws_session *session = data;
351
352 if (!session) {
353 ast_debug(3, "Stasis app '%s' message handler called with NULL session. OK for per_call_config websocket.\n",
354 app_name);
355 return;
356 }
357
359}

References app_name(), ari_websocket_send_event(), ast_debug, session, and stasis_app_get_debug_by_name().

Referenced by session_register_apps().

◆ websocket_attempted_cb()

static int websocket_attempted_cb ( struct ast_tcptls_session_instance ser,
struct ast_variable get_params,
struct ast_variable headers,
const char *  session_id 
)
static

Definition at line 771 of file ari_websockets.c.

774{
775 const char *subscribe_all = NULL;
776 const char *apps = NULL;
777 struct ari_ws_session *session = NULL;
778
779 apps = ast_variable_find_in_list(get_params, "app");
780 if (ast_strlen_zero(apps)) {
781 handle_create_error(ser, 400, "Bad Request",
782 "HTTP request is missing param: [app]");
783 return -1;
784 }
785
786 subscribe_all = ast_variable_find_in_list(get_params, "subscribeAll");
787
790 if (!session) {
791 handle_create_error(ser, 500, "Server Error",
792 "Failed to create ARI websocket session");
793 return -1;
794 }
795 /* It's in the session registry now so we can release our reference */
797
798 return 0;
799}
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
int attribute_pure ast_true(const char *val)
Make sure something is true. Determine if a string containing a boolean value is "true"....
Definition utils.c:2233

References ast_strlen_zero(), ast_true(), ast_variable_find_in_list(), AST_WS_TYPE_INBOUND, handle_create_error, NULL, session, session_create(), ari_ws_session::session_id, session_unref(), and subscribe_all().

Referenced by ari_websocket_load_module().

◆ websocket_established_cb()

static void websocket_established_cb ( struct ast_websocket ast_ws_session,
struct ast_variable get_params,
struct ast_variable upgrade_headers 
)
static

Definition at line 806 of file ari_websockets.c.

808{
809 /*
810 * ast_ws_session is passed in with it's refcount bumped so
811 * we need to unref it when we're done. The refcount will
812 * be bumped again when we add it to the ari_ws_session.
813 */
814 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
816 struct ast_json *msg;
817 struct ast_variable *v;
818 char *remote_addr = ast_sockaddr_stringify(
819 ast_websocket_remote_address(ast_ws_session));
820 const char *session_id = ast_websocket_session_id(ast_ws_session);
821
822 SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
823
824 if (TRACE_ATLEAST(2)) {
825 ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
826 for (v = upgrade_headers; v; v = v->next) {
827 ast_trace(3, "--> %s: %s\n", v->name, v->value);
828 }
829 }
830
831 /*
832 * Find the ari_ws_session that was created by websocket_attempted_cb
833 * and update its ast_websocket.
834 */
836 if (!ari_ws_session) {
838 "%s: Failed to locate an event session for the websocket session %s\n",
839 remote_addr, session_id);
840 }
841
842 /*
843 * Since this is a new inbound websocket session,
844 * session_register_apps() will have already sent "ApplicationRegistered"
845 * events for the apps. We don't want to do it again.
846 */
847 session_update(ari_ws_session, ast_ws_session, 0);
848
850 ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
851 while ((msg = session_read(ari_ws_session))) {
853 upgrade_headers, ari_ws_session->app_name, msg);
854 ast_json_unref(msg);
855 }
857
858 SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
859}
#define TRACE_ATLEAST(level)
#define ast_trace(level,...)
const char *AST_OPTIONAL_API_NAME() ast_websocket_session_id(struct ast_websocket *session)
Get the session ID for a WebSocket session.
struct ast_variable * next

References ao2_find, ari_ws_session::app_name, ari_websocket_process_request(), ast_json_unref(), ast_sockaddr_stringify(), ast_trace, ast_websocket_remote_address(), ast_websocket_session_id(), ast_websocket_unref(), ari_ws_session::ast_ws_session, ari_ws_session::connected, LOG_ERROR, ast_variable::name, ast_variable::next, NULL, OBJ_SEARCH_KEY, RAII_VAR, SCOPE_ENTER, SCOPE_EXIT, SCOPE_EXIT_LOG_RTN, session_cleanup(), session_read(), session_registry, session_update(), TRACE_ATLEAST, and ast_variable::value.

Referenced by ari_websocket_load_module().

Variable Documentation

◆ ari_validate_message_fn

ari_validator ari_validate_message_fn = null_validator

Definition at line 71 of file ari_websockets.c.

Referenced by session_create().

◆ ast_ws_server

struct ast_websocket_server* ast_ws_server

◆ observer_callbacks

struct ast_sorcery_observer observer_callbacks
static
Initial value:
= {
}
static void outbound_sessions_load(const char *name)

Definition at line 1548 of file ari_websockets.c.

1548 {
1549 .loaded = outbound_sessions_load,
1550};

Referenced by ari_websocket_load_module(), and ari_websocket_unload_module().

◆ session_registry

struct ao2_container* session_registry
static