Asterisk - The Open Source Telephony Project GIT-master-c7a8271
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
ari_websockets.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#include "asterisk.h"
20
21#include "resource_events.h"
22#include "ari_websockets.h"
23#include "internal.h"
25#include "asterisk/app.h"
26#include "asterisk/ari.h"
27#include "asterisk/astobj2.h"
29#include "asterisk/module.h"
30#include "asterisk/pbx.h"
31#include "asterisk/stasis_app.h"
32#include "asterisk/time.h"
33#include "asterisk/uuid.h"
34#include "asterisk/vector.h"
36
37
38/*! \file
39 *
40 * \brief WebSocket support for RESTful API's.
41 * \author David M. Lee, II <dlee@digium.com>
42 */
43
44/*! Number of buckets for the ari_ws_session registry. Remember to keep it a prime number! */
45#define SESSION_REGISTRY_NUM_BUCKETS 23
46
47/*! Initial size of websocket session apps vector */
48#define APPS_INIT_SIZE 7
49
50/*! Initial size of the websocket session message queue. */
51#define MESSAGES_INIT_SIZE 23
52
53#define ARI_CONTEXT_REGISTRAR "res_ari"
54
55/*! \brief Local registry for created \ref ari_ws_session objects. */
57
59
60#if defined(AST_DEVMODE)
62#else
63 /*!
64 * \brief Validator that always succeeds.
65 */
66 static int null_validator(struct ast_json *json)
67 {
68 return 1;
69 }
70
72#endif
73
74
75#define VALIDATION_FAILED \
76 "{" \
77 " \"error\": \"InvalidMessage\"," \
78 " \"message\": \"Message validation failed\"" \
79 "}"
80
82{
83 RAII_VAR(char *, str, NULL, ast_json_free);
84
85 if (!session || !session->ast_ws_session || !message) {
86 return -1;
87 }
88
89#ifdef AST_DEVMODE
90 if (!session->validator(message)) {
91 ast_log(LOG_ERROR, "Outgoing message failed validation\n");
93 }
94#endif
95
97
98 if (str == NULL) {
99 ast_log(LOG_ERROR, "Failed to encode JSON object\n");
100 return -1;
101 }
102
103 if (ast_websocket_write_string(session->ast_ws_session, str)) {
104 ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
106 return -1;
107 }
108 return 0;
109}
110
112 struct ast_json *message, const char *msg_type, const char *app_name,
113 int debug_app)
114{
115 const char *msg_timestamp, *msg_ast_id;
116
122 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
123 session->remote_addr, msg_type, app_name);
124 return;
125 }
126 }
127
128 msg_ast_id = S_OR(
129 ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
130 if (ast_strlen_zero(msg_ast_id)) {
131 char eid[20];
132
133 if (ast_json_object_set(message, "asterisk_id",
137 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
138 session->remote_addr, msg_type, app_name);
139 }
140 }
141
142 if (!session->ast_ws_session) {
143 /* If the websocket is NULL, the message goes to the queue */
144 if (AST_VECTOR_APPEND(&session->message_queue, message) == 0) {
146 }
147 /*
148 * If the msg_type one of the Application* types, the websocket
149 * might not be there yet so don't log.
150 */
151 if (!ast_begins_with(msg_type, "Application")) {
153 "%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
154 session->remote_addr,
155 msg_type,
156 app_name);
157 }
158 } else {
159
160 if (DEBUG_ATLEAST(4) || debug_app) {
162
163 ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
164 session->remote_addr,
165 str);
167 }
169 }
170}
171
173 const char *event_type, const char *app_name)
174{
175 char eid[20];
176 int debug_app = stasis_app_get_debug_by_name(app_name);
177 struct ast_json *msg = ast_json_pack("{s:s, s:o?, s:s, s:s }",
178 "type", event_type,
179 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
180 "application", app_name,
181 "asterisk_id", ast_eid_to_str(eid, sizeof(eid), &ast_eid_default));
182
183 if (!msg) {
184 return;
185 }
186 ast_debug(3, "%s: Sending '%s' event to app '%s'\n", session->session_id,
187 event_type, app_name);
188 /*
189 * We don't want to use ari_websocket_send_event() here because
190 * the app may be unregistered which will cause stasis_app_event_allowed
191 * to return false.
192 */
193 session_send_or_queue(session, msg, event_type, app_name, debug_app);
194 ast_json_unref(msg);
195}
196
198{
200
201 if (!session || !session->ast_ws_session) {
202 return NULL;
203 }
204 if (ast_websocket_fd(session->ast_ws_session) < 0) {
205 return NULL;
206 }
207
208 while (!message) {
209 int res;
210 char *payload;
211 uint64_t payload_len;
212 enum ast_websocket_opcode opcode;
213 int fragmented;
214
215 res = ast_wait_for_input(
216 ast_websocket_fd(session->ast_ws_session), -1);
217
218 if (res <= 0) {
219 ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
220 strerror(errno));
221 return NULL;
222 }
223
224 res = ast_websocket_read(session->ast_ws_session, &payload,
225 &payload_len, &opcode, &fragmented);
226
227 if (res != 0) {
228 ast_log(LOG_WARNING, "WebSocket read error: %s\n",
229 strerror(errno));
230 return NULL;
231 }
232
233 switch (opcode) {
235 ast_debug(1, "WebSocket closed\n");
236 return NULL;
238 message = ast_json_load_buf(payload, payload_len, NULL);
239 if (message == NULL) {
240 struct ast_json *error = ast_json_pack(
241 "{s:s, s:s, s:s, s:i, s:s, s:s }",
242 "type", "RESTResponse",
243 "transaction_id", "",
244 "request_id", "",
245 "status_code", 400,
246 "reason_phrase", "Failed to parse request message JSON",
247 "uri", ""
248 );
250 error, 0);
253 "WebSocket input failed to parse\n");
254
255 }
256
257 break;
258 default:
259 /* Ignore all other message types */
260 break;
261 }
262 }
263
264 return ast_json_ref(message);
265}
266
268 struct ast_tcptls_session_instance *ser, const char *uri,
269 enum ast_http_method method, struct ast_variable *get_params,
270 struct ast_variable *headers)
271{
272 struct ast_http_uri fake_urih = {
274 };
275
276 ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
277 headers);
278}
279
280/*!
281 * \brief Callback handler for Stasis application messages.
282 *
283 * \internal
284 *
285 * \param data Void pointer to the event session (\ref event_session).
286 * \param app_name Name of the Stasis application that dispatched the message.
287 * \param message The dispatched message.
288 * \param debug_app Debug flag for the application.
289 */
291 const char *app_name, struct ast_json *message, int debug_app)
292{
293 char *remote_addr = session->ast_ws_session ? ast_sockaddr_stringify(
294 ast_websocket_remote_address(session->ast_ws_session)) : "";
295 const char *msg_type, *msg_application;
296 SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
297
299
301
302 msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
303 msg_application = S_OR(
305
306 /* If we've been replaced, remove the application from our local
307 websocket_apps container */
308 if (session->type == AST_WS_TYPE_INBOUND
309 && strcmp(msg_type, "ApplicationReplaced") == 0 &&
310 strcmp(msg_application, app_name) == 0) {
311 AST_VECTOR_REMOVE_CMP_ORDERED(&session->websocket_apps,
313 }
314
315 /* Now, we need to determine our state to see how we will handle the message */
319 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
320 remote_addr, msg_type, msg_application);
321 }
322
325 app_name, debug_app);
326 }
327
329 && !ast_strlen_zero(session->channel_id)
330 && ast_strings_equal(msg_type, "StasisEnd")) {
331 struct ast_json *chan = ast_json_object_get(message, "channel");
332 struct ast_json *id_obj = ast_json_object_get(chan, "id");
333 const char *id = ast_json_string_get(id_obj);
334 if (!ast_strlen_zero(id)
335 && ast_strings_equal(id, session->channel_id)) {
336 ast_debug(3, "%s: StasisEnd message sent for channel '%s'\n",
337 remote_addr, id);
338 session->stasis_end_sent = 1;
339 }
340 }
342 SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
343 remote_addr, msg_type, app_name);
344}
345
346static void stasis_app_message_handler(void *data, const char *app_name,
347 struct ast_json *message)
348{
349 int debug_app = stasis_app_get_debug_by_name(app_name);
350 struct ari_ws_session *session = data;
351
352 if (!session) {
353 ast_debug(3, "Stasis app '%s' message handler called with NULL session. OK for per_call_config websocket.\n",
354 app_name);
355 return;
356 }
357
359}
360
362{
363 if (!session) {
364 return;
365 }
366 ast_debug(4, "%s: Unreffing ARI websocket session\n", session->session_id);
367 ao2_ref(session, -1);
368}
369
371{
372 ast_debug(3, "%s: Trying to unregister app '%s'\n",
373 session->session_id, app_name);
377 ast_debug(3, "%s: Unregistering context '%s' for app '%s'\n",
378 session->session_id, context_name, app_name);
380 } else {
381 ast_debug(3, "%s: Unregistering stasis app '%s' and unsubscribing from all events.\n",
382 session->session_id, app_name);
384 }
385
386 /*
387 * We don't send ApplicationUnregistered events for outbound per-call
388 * configs because there's no websocket to send them via or to
389 * inbound websockets because the websocket is probably closed already.
390 */
391 if (!(session->type
393 session_send_app_event(session, "ApplicationUnregistered", app_name);
394 }
395}
396
398{
399 int app_count = (int)AST_VECTOR_SIZE(&session->websocket_apps);
400
401 if (app_count == 0) {
402 return;
403 }
404 ast_debug(3, "%s: Unregistering stasis apps.\n", session->session_id);
405
407 session);
408 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
409
410 return;
411}
412
414 const char *_apps, int subscribe_all)
415{
416 char *apps = ast_strdupa(_apps);
417 char *app_name;
418 int app_counter = 0;
419
420 ast_debug(3, "%s: Registering apps '%s'. Subscribe all: %s\n",
421 session->session_id, apps, subscribe_all ? "yes" : "no");
422
423 while ((app_name = ast_strsep(&apps, ',', AST_STRSEP_STRIP))) {
424
426 ast_log(LOG_WARNING, "%s: Invalid application name\n", session->session_id);
427 return -1;
428 }
429
430 if (strlen(app_name) > ARI_MAX_APP_NAME_LEN) {
431 ast_log(LOG_WARNING, "%s: Websocket app '%s' > %d characters\n",
432 session->session_id, app_name, (int)ARI_MAX_APP_NAME_LEN);
433 return -1;
434 }
435
437 /*
438 * Outbound per-call configs only create a dialplan context.
439 * If they registered stasis apps there'd be no way for the
440 * Stasis dialplan app to know that it needs to start a
441 * per-call websocket connection.
442 */
444
449 ast_log(LOG_WARNING, "%s: Could not create context '%s'\n",
450 session->session_id, context_name);
451 return -1;
452 } else {
454 "Stasis", ast_strdup(app_name), ast_free_ptr,
458 }
459 } else {
460 ast_debug(3, "%s: Context '%s' already exists\n", session->session_id,
462 }
463 } else {
464 int already_registered = stasis_app_is_registered(app_name);
465 int res = 0;
466
467 if (subscribe_all) {
469 session);
470 } else {
472 session);
473 }
474
475 if (res != 0) {
476 return -1;
477 }
478
479 /*
480 * If there was an existing app by the same name, the register handler
481 * will have sent an ApplicationReplaced event. If it's a new app, we
482 * send an ApplicationRegistered event.
483 *
484 * Except... There's no websocket to send it on for outbound per-call
485 * configs and inbound websockets don't need them because they aready
486 * know what apps they've registered for.
487 */
488 if (!already_registered
490 session_send_app_event(session, "ApplicationRegistered",
491 app_name);
492 }
493 }
494
495 if (AST_VECTOR_ADD_SORTED(&session->websocket_apps, ast_strdup(app_name), strcmp)) {
496 ast_log(LOG_WARNING, "%s: Unable to add app '%s' to apps container\n",
497 session->session_id, app_name);
498 return -1;
499 }
500
501 app_counter++;
502 if (app_counter == 1) {
503 ast_free(session->app_name);
504 session->app_name = ast_strdup(app_name);
505 if (!session->app_name) {
506 ast_log(LOG_WARNING, "%s: Unable to duplicate app name\n",
507 session->session_id);
508 return -1;
509 }
510 }
511 }
512
513 return 0;
514}
515
516/*
517 * Websocket session cleanup is a bit complicated because it can be
518 * in different states, it may or may not be in the registry container,
519 * and stasis may be sending asynchronous events to it and some
520 * stages of cleanup need to lock it.
521 *
522 * That's why there are 3 different cleanup functions.
523 */
524
525/*!
526 * \internal
527 * \brief Reset the ari_ws_session without destroying it.
528 * It can't be reused and will be cleaned up by the caller.
529 * This should only be called by session_create()
530 * and session_cleanup().
531 */
533{
535
536 ast_debug(3, "%s: Resetting ARI websocket session\n",
537 session->session_id);
538
539 /* Clean up the websocket_apps container */
540 if (AST_VECTOR_SIZE(&session->websocket_apps) > 0) {
542 }
543 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
544 AST_VECTOR_FREE(&session->websocket_apps);
545
546 AST_VECTOR_RESET(&session->message_queue, ast_json_unref);
547 AST_VECTOR_FREE(&session->message_queue);
548}
549
550/*!
551 * \internal
552 * \brief RAII_VAR and container ari_ws_session cleanup function.
553 * This unlinks the ari_ws_session from the registry and cleans up the
554 * decrements the reference count.
555 */
557{
558 if (!session) {
559 return;
560 }
561 ast_debug(3, "%s: Cleaning up ARI websocket session RC: %d\n",
562 session->session_id, (int)ao2_ref(session, 0));
563
565
566 if (session_registry) {
567 ast_debug(3, "%s: Unlinking websocket session from registry RC: %d\n",
568 session->session_id, (int)ao2_ref(session, 0));
570 }
571
572 /*
573 * If this is a per-call config then its only reference
574 * was held by the registry container so we don't need
575 * to unref it here.
576 */
579 }
580}
581
582/*!
583 * \internal
584 * \brief The ao2 destructor.
585 * This cleans up the reference to the parent ast_websocket and the
586 * outbound connection websocket if any.
587 */
588static void session_dtor(void *obj)
589{
590 struct ari_ws_session *session = obj;
591
592 ast_debug(3, "%s: Destroying ARI websocket session\n",
593 session->session_id);
594
595 ast_free(session->app_name);
596 ast_free(session->remote_addr);
597 ast_free(session->channel_id);
598 ast_free(session->channel_name);
599 ao2_cleanup(session->owc);
600 session->owc = NULL;
601 if (!session->ast_ws_session) {
602 return;
603 }
604 ast_websocket_unref(session->ast_ws_session);
605 session->ast_ws_session = NULL;
606}
607
608#define handle_create_error(ser, code, msg, reason) \
609({ \
610 if (ser) { \
611 ast_http_error(ser, code, msg, reason); \
612 } \
613 ast_log(LOG_WARNING, "Failed to create ARI websocket session: %d %s %s\n", \
614 code, msg, reason); \
615})
616
618 struct ast_tcptls_session_instance *ser,
619 const char *apps,
620 int subscribe_all,
621 const char *session_id,
622 struct ari_conf_outbound_websocket *ows,
623 enum ast_websocket_type ws_type)
624{
626 size_t size;
627
628 ast_debug(3, "%s: Creating ARI websocket session for apps '%s'\n",
630
631 size = sizeof(*session) + strlen(session_id) + 1;
632
634 if (!session) {
635 return NULL;
636 }
637
638 session->type = ws_type;
639 session->subscribe_all = subscribe_all;
640
641 strcpy(session->session_id, session_id); /* Safe */
642
643 /* Instantiate the hash table for Stasis apps */
644 if (AST_VECTOR_INIT(&session->websocket_apps, APPS_INIT_SIZE)) {
645 handle_create_error(ser, 500, "Internal Server Error",
646 "Allocation failed");
647 return NULL;
648 }
649
650 /* Instantiate the message queue */
651 if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
652 handle_create_error(ser, 500, "Internal Server Error",
653 "Allocation failed");
654 AST_VECTOR_FREE(&session->websocket_apps);
655 return NULL;
656 }
657
658 session->validator = ari_validate_message_fn;
659
660 if (ows) {
661 session->owc = ao2_bump(ows);
662 }
663
665 handle_create_error(ser, 500, "Internal Server Error",
666 "Stasis app registration failed");
668 return NULL;
669 }
670
672 handle_create_error(ser, 500, "Internal Server Error",
673 "Allocation failed");
675 return NULL;
676 }
677
678 return ao2_bump(session);
679}
680
681/*!
682 * \internal
683 * \brief Updates the websocket session.
684 *
685 * \details If the value of the \c ws_session is not \c NULL and there are messages in the
686 * event session's \c message_queue, the messages are dispatched and removed from
687 * the queue.
688 *
689 * \param ari_ws_session The ARI websocket session
690 * \param ast_ws_session The Asterisk websocket session
691 */
693 struct ast_websocket *ast_ws_session, int send_registered_events)
694{
696 int i;
697
698 if (ast_ws_session == NULL) {
699 return -1;
700 }
701
702 if (!general) {
703 return -1;
704 }
705
709 ast_log(LOG_ERROR, "Failed to copy remote address\n");
710 return -1;
711 }
712
715 "ARI web socket failed to set nonblock; closing: %s\n",
716 strerror(errno));
717 return -1;
718 }
719
720 if (ast_websocket_set_timeout(ast_ws_session, general->write_timeout)) {
721 ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
722 general->write_timeout);
723 }
724
728 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
731 ast_json_unref(msg);
732 }
733
736
737 if (send_registered_events) {
738 int i;
739 char *app;
740
741 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->websocket_apps); i++) {
744 "ApplicationRegistered", app);
745 }
746 }
747
748 return 0;
749}
750
751/*!
752 * \internal
753 * \brief This function gets called for incoming websocket connections
754 * before the upgrade process is completed.
755 *
756 * The point is to be able to report early errors via HTTP rather
757 * than letting res_http_websocket create an ast_websocket session
758 * then immediately close it if there's an error.
759 */
761 struct ast_variable *get_params, struct ast_variable *headers,
762 const char *session_id)
763{
764 const char *subscribe_all = NULL;
765 const char *apps = NULL;
766 struct ari_ws_session *session = NULL;
767
768 apps = ast_variable_find_in_list(get_params, "app");
769 if (ast_strlen_zero(apps)) {
770 handle_create_error(ser, 400, "Bad Request",
771 "HTTP request is missing param: [app]");
772 return -1;
773 }
774
775 subscribe_all = ast_variable_find_in_list(get_params, "subscribeAll");
776
779 if (!session) {
780 handle_create_error(ser, 500, "Server Error",
781 "Failed to create ARI websocket session");
782 return -1;
783 }
784 /* It's in the session registry now so we can release our reference */
786
787 return 0;
788}
789
790/*!
791 * \internal
792 * \brief This function gets called for incoming websocket connections
793 * after the upgrade process is completed.
794 */
796 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
797{
798 /*
799 * ast_ws_session is passed in with it's refcount bumped so
800 * we need to unref it when we're done. The refcount will
801 * be bumped again when we add it to the ari_ws_session.
802 */
805 struct ast_json *msg;
806 struct ast_variable *v;
807 char *remote_addr = ast_sockaddr_stringify(
808 ast_websocket_remote_address(ast_ws_session));
809 const char *session_id = ast_websocket_session_id(ast_ws_session);
810
811 SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
812
813 if (TRACE_ATLEAST(2)) {
814 ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
815 for (v = upgrade_headers; v; v = v->next) {
816 ast_trace(3, "--> %s: %s\n", v->name, v->value);
817 }
818 }
819
820 /*
821 * Find the ari_ws_session that was created by websocket_attempted_cb
822 * and update its ast_websocket.
823 */
825 if (!ari_ws_session) {
827 "%s: Failed to locate an event session for the websocket session %s\n",
828 remote_addr, session_id);
829 }
830
831 /*
832 * Since this is a new inbound websocket session,
833 * session_register_apps() will have already sent "ApplicationRegistered"
834 * events for the apps. We don't want to do it again.
835 */
836 session_update(ari_ws_session, ast_ws_session, 0);
837
839 ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
840 while ((msg = session_read(ari_ws_session))) {
842 upgrade_headers, ari_ws_session->app_name, msg);
843 ast_json_unref(msg);
844 }
846
847 SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
848}
849
850static int session_shutdown_cb(void *obj, void *arg, int flags)
851{
852 struct ari_ws_session *session = obj;
853
854 /* Per-call configs have no actual websocket */
856 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session\n",
857 session->session_id,
860 return 0;
861 }
862 if (session->type == AST_WS_TYPE_INBOUND) {
863 ast_log(LOG_NOTICE, "%s: Shutting down inbound ARI websocket session from %s\n",
864 session->session_id, session->remote_addr);
865 } else {
866 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session to %s\n",
867 session->session_id,
869 session->remote_addr);
870 }
871
872 /*
873 * We need to ensure the session is kept around after the cleanup
874 * so we can close the websocket.
875 */
877 session->closing = 1;
879 if (session->ast_ws_session) {
880 ast_websocket_close(session->ast_ws_session, 1000);
881 }
882
883 return 0;
884}
885
886
888{
890}
891
892static struct ari_ws_session *session_find_by_app(const char *app_name,
893 unsigned int ws_type)
894{
895 struct ari_ws_session *session = NULL;
896 struct ao2_iterator i;
897
899 return NULL;
900 }
901
903 while ((session = ao2_iterator_next(&i))) {
904 char *app = NULL;
905 if (!(session->type & ws_type)) {
907 continue;
908 }
909
910 app = AST_VECTOR_GET_CMP(&session->websocket_apps,
912 if (app) {
913 break;
914 }
916 }
918 return session;
919}
920
921/*!
922 * \internal
923 * \brief Connection and request handler thread for outbound websockets.
924 *
925 * This thread handles the connection and reconnection logic for outbound
926 * websockets. Once connected, it waits for incoming REST over Websocket
927 * requests and dispatches them to ari_websocket_process_request()).
928 */
929static void *outbound_session_handler_thread(void *obj)
930{
932 int already_sent_registers = 1;
933
934 ast_debug(3, "%s: Starting outbound websocket thread RC: %d\n",
935 session->session_id, (int)ao2_ref(session, 0));
936 session->thread = pthread_self();
937 session->connected = 0;
938
939 while(1) {
941 RAII_VAR(struct ast_variable *, upgrade_headers, NULL, ast_variables_destroy);
943 struct ast_json *msg;
944
945 ast_debug(3, "%s: Attempting to connect to %s\n", session->session_id,
946 session->owc->websocket_client->uri);
947
948 astws = ast_websocket_client_connect(session->owc->websocket_client,
949 NULL, session->session_id, &result);
950 if (!astws || result != WS_OK) {
952 struct stasis_app_control *control =
954 if (control) {
955 ast_debug(3, "%s: Connection failed. Returning to dialplan.\n",
956 session->session_id);
959 ao2_cleanup(control);
960 } else {
961 ast_debug(3, "%s: Connection failed. No control object found.\n",
962 session->session_id);
963 }
964
965 break;
966 }
967 usleep(session->owc->websocket_client->reconnect_interval * 1000);
968 continue;
969 }
970 ast_log(LOG_NOTICE, "%s: Outbound websocket connected to %s\n",
971 session->type == AST_WS_TYPE_CLIENT_PERSISTENT ? session->session_id : session->channel_name,
972 session->owc->websocket_client->uri);
973
974 /*
975 * We only want to send "ApplicationRegistered" events in the
976 * case of a reconnect. The initial connection will have already sent
977 * the events when outbound_register_apps() was called.
978 */
979 session_update(session, astws, !already_sent_registers);
980 already_sent_registers = 0;
981
982 /*
983 * This is the Authorization header that would normally be taken
984 * from the incoming HTTP request that is being upgraded to a websocket.
985 * Since this is an outbound websocket, we have to create it ourselves.
986 *
987 * This is NOT the same as the Authorization header that is used for
988 * authentication with the remote websocket server.
989 */
990 upgrade_headers = ast_http_create_basic_auth_header(
991 session->owc->local_ari_user, session->owc->local_ari_password);
992 if (!upgrade_headers) {
993 ast_log(LOG_WARNING, "%s: Failed to create upgrade header\n", session->session_id);
994 session->thread = 0;
995 ast_websocket_close(astws, 1000);
996 return NULL;
997 }
998
999 session->connected = 1;
1000 ast_debug(3, "%s: Websocket connected\n", session->session_id);
1001 ast_debug(3, "%s: Waiting for messages RC: %d\n",
1002 session->session_id, (int)ao2_ref(session, 0));
1003
1004 /*
1005 * The websocket is connected. Now we need to wait for messages
1006 * from the server.
1007 */
1008 while ((msg = session_read(session))) {
1010 upgrade_headers, session->app_name, msg);
1011 ast_json_unref(msg);
1012 }
1013
1014 session->connected = 0;
1015 ast_websocket_unref(session->ast_ws_session);
1016 session->ast_ws_session = NULL;
1017 if (session->closing) {
1018 ast_debug(3, "%s: Websocket closing RC: %d\n",
1019 session->session_id, (int)ao2_ref(session, 0));
1020 break;
1021 }
1022
1023 ast_log(LOG_WARNING, "%s: Websocket disconnected. Reconnecting\n",
1024 session->session_id);
1025 }
1026
1027 ast_debug(3, "%s: Stopping outbound websocket thread RC: %d\n",
1028 session->session_id, (int)ao2_ref(session, 0));
1029 session->thread = 0;
1030
1031 return NULL;
1032}
1033
1039};
1040
1042 struct ari_ws_session *session,
1043 struct ari_conf_outbound_websocket *new_owc)
1044{
1045 enum session_apply_result apply_result;
1046 enum ari_conf_owc_fields what_changed;
1047 const char *new_owc_id = ast_sorcery_object_get_id(new_owc);
1048
1049 what_changed = ari_conf_owc_detect_changes(session->owc, new_owc);
1050
1051 if (what_changed == ARI_OWC_FIELD_NONE) {
1052 ast_debug(2, "%s: No changes detected\n", new_owc_id);
1054 }
1055 ast_debug(2, "%s: Config change detected. Checking details\n", new_owc_id);
1056
1057 if (what_changed & ARI_OWC_NEEDS_REREGISTER) {
1058 ast_debug(2, "%s: Re-registering apps\n", new_owc_id);
1059
1060 if (!(what_changed & ARI_OWC_FIELD_SUBSCRIBE_ALL)) {
1061 /*
1062 * If subscribe_all didn't change, we don't have to
1063 * unregister apps that are already registered and
1064 * also in the new config. We'll remove them from
1065 * the session->websocket_apps container so that
1066 * session_unregister_apps will only clean up
1067 * the ones that are going away. session_register_apps
1068 * will add them back in again and cause ApplicationReplaced
1069 * messages to be sent.
1070 *
1071 * If subscribe_all did change, we have no choice but to
1072 * unregister all apps and register all the ones in
1073 * the new config even if they already existed.
1074 */
1075 int i = 0;
1076 char *app;
1077
1078 while(i < (int) AST_VECTOR_SIZE(&session->websocket_apps)) {
1079 app = AST_VECTOR_GET(&session->websocket_apps, i);
1080 if (ast_in_delimited_string(app, new_owc->apps, ',')) {
1081 AST_VECTOR_REMOVE_ORDERED(&session->websocket_apps, i);
1082 ast_debug(3, "%s: Unlinked app '%s' to keep it from being unregistered\n",
1083 new_owc_id, app);
1084 ast_free(app);
1085 } else {
1086 i++;
1087 }
1088 }
1089 }
1090
1092
1093 /*
1094 * Register the new apps. This will also replace any
1095 * existing apps that are in the new config sending
1096 * ApplicationRegistered or ApplicationReplaced events
1097 * as necessary.
1098 */
1099 if (session_register_apps(session, new_owc->apps,
1100 new_owc->subscribe_all) < 0) {
1101 ast_log(LOG_WARNING, "%s: Failed to register apps '%s'\n",
1102 new_owc_id, new_owc->apps);
1103 /* Roll back. */
1105 /* Re-register the original apps. */
1106 if (session_register_apps(session, session->owc->apps,
1107 session->owc->subscribe_all) < 0) {
1108 ast_log(LOG_WARNING, "%s: Failed to re-register apps '%s'\n",
1109 new_owc_id, session->owc->apps);
1110 }
1111 return SESSION_APPLY_FAILED;
1112 }
1113 }
1114 /*
1115 * We need to update the session with the new config
1116 * but it has to be done after re-registering apps and
1117 * before we reconnect.
1118 */
1119 ao2_replace(session->owc, new_owc);
1120 session->type = new_owc->websocket_client->connection_type;
1121 session->subscribe_all = new_owc->subscribe_all;
1122
1123 apply_result = SESSION_APPLY_OK;
1124
1125 if (what_changed & ARI_OWC_NEEDS_RECONNECT) {
1126 ast_debug(2, "%s: Reconnect required\n", new_owc_id);
1127 apply_result = SESSION_APPLY_RECONNECT_REQUIRED;
1128 if (session->ast_ws_session) {
1129 ast_debug(2, "%s: Closing websocket\n", new_owc_id);
1130 ast_websocket_close(session->ast_ws_session, 1000);
1131 }
1132 }
1133
1134 return apply_result;
1135}
1136
1137/*
1138 * This is the fail-safe timeout for the per-call websocket
1139 * connection. To prevent a cleanup race condition, we wait
1140 * 3 times the timeout the thread will use to connect to the
1141 * websocket server. This way we're sure the thread will be
1142 * done before we do final cleanup. This timeout is only used
1143 * if the thread is cancelled somehow and can't indicate
1144 * whether it actually connected or not.
1145 */
1146#define PER_CALL_FAIL_SAFE_TIMEOUT(owc) \
1147 (int64_t)((owc->websocket_client->connect_timeout + owc->websocket_client->reconnect_interval) \
1148 * (owc->websocket_client->reconnect_attempts + 3))
1149
1150/*!
1151 * \brief This function gets called by app_stasis when a call arrives
1152 * but a Stasis application isn't already registered. We check to see
1153 * if a per-call config exists for the application and if so, we create a
1154 * per-call websocket connection and return a unique app id which app_stasis
1155 * can use to call stasis_app_exec() with.
1156 */
1158 struct ast_channel *chan)
1159{
1162 RAII_VAR(char *, session_id, NULL, ast_free);
1163 RAII_VAR(char *, app_id, NULL, ast_free);
1164 enum ari_conf_owc_fields invalid_fields;
1165 const char *owc_id = NULL;
1166 char *app_id_rtn = NULL;
1167 struct timeval tv_start;
1168 int res = 0;
1169
1171 if (!owc) {
1172 ast_log(LOG_WARNING, "%s: Failed to find outbound websocket per-call config for app '%s'\n",
1173 ast_channel_name(chan), app_name);
1174 return NULL;
1175 }
1176 owc_id = ast_sorcery_object_get_id(owc);
1177 invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1178
1179 if (invalid_fields) {
1180 ast_log(LOG_WARNING, "%s: Unable to create per-call websocket. Outbound websocket config is invalid\n",
1181 owc_id);
1182 return NULL;
1183 }
1184
1185 res = ast_asprintf(&session_id, "%s:%s", owc_id, ast_channel_name(chan));
1186 if (res < 0) {
1187 return NULL;
1188 }
1189 res = ast_asprintf(&app_id, "%s:%s", app_name, ast_channel_name(chan));
1190 if (res < 0) {
1191 ast_free(app_id);
1192 return NULL;
1193 }
1194
1195 session = session_create(NULL, app_id, owc->subscribe_all,
1196 session_id, owc, AST_WS_TYPE_CLIENT_PER_CALL);
1197 if (!session) {
1198 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", session_id);
1199 return NULL;
1200 }
1201
1202 session->channel_id = ast_strdup(ast_channel_uniqueid(chan));
1203 session->channel_name = ast_strdup(ast_channel_name(chan));
1204
1205 /*
1206 * We have to bump the session reference count here because
1207 * we need to check that the session is connected before we return.
1208 * If it didn't connect, then the thread will have cleaned up the
1209 * session while we're in the loop checking for the connection
1210 * which will result in a SEGV or FRACK.
1211 * RAII will clean up this bump.
1212 */
1214 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1215 (int)ao2_ref(session, 0));
1216
1220 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1221 return NULL;
1222 }
1223
1224 /*
1225 * We need to make sure the session connected and is processing
1226 * requests before we return but we don't want to block forever
1227 * in case the thread never starts or gets cancelled so we have
1228 * a fail-safe timeout.
1229 */
1230 tv_start = ast_tvnow();
1231 while (session->thread > 0 && !session->connected) {
1232 struct timeval tv_now = ast_tvnow();
1233 if (ast_tvdiff_ms(tv_now, tv_start) > PER_CALL_FAIL_SAFE_TIMEOUT(owc)) {
1234 break;
1235 }
1236 /* Sleep for 500ms before checking again. */
1237 usleep(500 * 1000);
1238 }
1239
1240 if (session->thread <= 0 || !session->connected) {
1241 ast_log(LOG_WARNING, "%s: Failed to create per call websocket thread\n",
1242 session_id);
1243 return NULL;
1244 }
1245
1246 ast_debug(3, "%s: Created per call websocket for app '%s'\n",
1247 session_id, app_id);
1248
1249 /*
1250 * We now need to prevent RAII from freeing the app_id.
1251 */
1252 app_id_rtn = app_id;
1253 app_id = NULL;
1254 return app_id_rtn;
1255}
1256
1257#define STASIS_END_MAX_WAIT_MS 5000
1258#define STASIS_END_POST_WAIT_US (3000 * 1000)
1259
1260/*
1261 * This thread is used to close the websocket after the StasisEnd
1262 * event has been sent and control has been returned to the dialplan.
1263 * We wait a few seconds to allow additional events to be sent
1264 * like ChannelVarset and ChannelDestroyed.
1265 */
1266static void *outbound_session_pc_close_thread(void *data)
1267{
1268 /*
1269 * We're using RAII because we want to show a debug message
1270 * after we run ast_websocket_close().
1271 */
1272 RAII_VAR(struct ari_ws_session *, session, data, session_unref);
1273
1274 /*
1275 * We're going to wait 3 seconds to allow stasis to send additional
1276 * events like ChannelVarset and ChannelDestroyed after the StasisEnd.
1277 */
1278 ast_debug(3, "%s: Waiting for %dms before closing websocket RC: %d\n",
1279 session->session_id, (int)(STASIS_END_POST_WAIT_US / 1000),
1280 (int)ao2_ref(session, 0));
1282 session->closing = 1;
1283 if (session->ast_ws_session) {
1284 ast_websocket_close(session->ast_ws_session, 1000);
1285 }
1286 ast_debug(3, "%s: Websocket closed RC: %d\n", session->session_id,
1287 (int)ao2_ref(session, 0));
1288 return NULL;
1289}
1290
1291/*!
1292 * \brief This function is called by the app_stasis dialplan app
1293 * to close a per-call websocket after stasis_app_exec() returns.
1294 */
1296{
1297 struct ari_ws_session *session = NULL;
1298 pthread_t thread;
1299 struct timeval tv_start;
1300
1302 if (!session) {
1303 ast_debug(3, "%s: Per call websocket not found\n", app_name);
1305 return;
1306 }
1308
1309 /*
1310 * When stasis_app_exec() returns, the StasisEnd event for the
1311 * channel has been queued but since actually sending it is done
1312 * in a separate thread, it probably won't have been sent yet.
1313 * We need to wait for it to go out on the wire before we close the
1314 * websocket. ari_websocket_send_event will set a flag on the session
1315 * when a StasisEnd event is sent for the channel that originally
1316 * triggered the connection. We'll wait for that but we don't want
1317 * to wait forever so there's a fail-safe timeout in case a thread
1318 * got cancelled or we missed the StasisEnd event somehow.
1319 */
1320 ast_debug(3, "%s: Waiting for StasisEnd event to be sent RC: %d\n",
1321 session->session_id, (int)ao2_ref(session, 0));
1322
1323 tv_start = ast_tvnow();
1324 while (session->thread > 0 && !session->stasis_end_sent) {
1325 struct timeval tv_now = ast_tvnow();
1326 int64_t diff = ast_tvdiff_ms(tv_now, tv_start);
1327 ast_debug(3, "%s: Waiting for StasisEnd event %lu %d %ld\n",
1328 session->session_id, (unsigned long)session->thread,
1329 session->stasis_end_sent, diff);
1330 if (diff > STASIS_END_MAX_WAIT_MS) {
1331 break;
1332 }
1333 /* Sleep for 500ms before checking again. */
1334 usleep(500 * 1000);
1335 }
1336 ast_debug(3, "%s: StasisEnd event sent. Scheduling websocket close. RC: %d\n",
1337 session->session_id, (int)ao2_ref(session, 0));
1338
1339 /*
1340 * We can continue to send events like ChannelVarset and ChannelDestroyed
1341 * to the websocket after the StasisEnd event but those events won't be
1342 * generated until after the Stasis() dialplan app returns. We don't want
1343 * to hold up the dialplan while we wait so we'll create a thread that waits
1344 * a few seconds more before closing the websocket.
1345 *
1346 * We transferring ownership of the session to the thread.
1347 */
1350 ast_log(LOG_WARNING, "%s: Failed to create websocket close thread\n",
1351 session->session_id);
1353 }
1354 ast_debug(3, "%s: Scheduled websocket close RC: %d\n",
1355 session->session_id, (int)ao2_ref(session, 0));
1356
1357 return;
1358}
1359
1361{
1362 return ao2_bump(session_registry);
1363}
1364
1365static int outbound_session_create(void *obj, void *args, int flags)
1366{
1367 struct ari_conf_outbound_websocket *owc = obj;
1368 const char *owc_id = ast_sorcery_object_get_id(owc);
1369 struct ari_ws_session *session = NULL;
1370 enum session_apply_result apply_result;
1371 enum ari_conf_owc_fields invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1372
1374 if (session) {
1375 ast_debug(2, "%s: Found existing connection\n", owc_id);
1376 if (invalid_fields) {
1379 "%s: Unable to update websocket session. Outbound websocket config is invalid\n",
1380 owc_id);
1381 return 0;
1382 }
1383
1385 apply_result = outbound_session_apply_config(session, owc);
1388 if (apply_result == SESSION_APPLY_FAILED) {
1390 "%s: Failed to apply new configuration. Existing connection preserved.\n",
1391 owc_id);
1392 }
1393 return 0;
1394 }
1395
1396 if (invalid_fields) {
1398 "%s: Unable to create websocket session. Outbound websocket config is invalid\n",
1399 owc_id);
1400 return 0;
1401 }
1402
1405 if (!session) {
1406 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", owc_id);
1407 return 0;
1408 }
1409
1411 /* There's no thread to transfer the reference to */
1413 return 0;
1414 }
1415
1416 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1417 (int)ao2_ref(session, 0));
1418 /* We're transferring the session reference to the thread. */
1422 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1423 return 0;
1424 }
1425 ast_debug(2, "%s: launched thread\n", session->session_id);
1426
1427 return 0;
1428}
1429
1430static void outbound_sessions_load(const char *name)
1431{
1433 struct ao2_iterator i;
1434 struct ari_ws_session *session;
1435
1436 ast_debug(2, "Reloading ARI websockets\n");
1437
1439
1441 while ((session = ao2_iterator_next(&i))) {
1442 int cleanup = 1;
1443 if (session->owc
1444 && (session->type &
1446 struct ari_conf_outbound_websocket *ows =
1447 ari_conf_get_owc(session->session_id);
1448 if (!ows) {
1449 ast_debug(3, "Cleaning up outbound websocket %s\n",
1450 session->session_id);
1451 session->closing = 1;
1453 if (session->ast_ws_session) {
1454 ast_websocket_close(session->ast_ws_session, 1000);
1455 }
1456
1458 /*
1459 * If persistent, session_cleanup will cleanup
1460 * this reference so we don't want to double clean it up.
1461 * session_cleanup doesn't cleanup the reference
1462 * for per-call configs so we need to do that ourselves.
1463 */
1464 cleanup = 0;
1465 }
1466 }
1467 ao2_cleanup(ows);
1468 }
1469 /* We don't want to double cleanup if its been closed. */
1470 if (cleanup) {
1472 }
1473 }
1475
1476 return;
1477}
1478
1480{
1481 if (owc) {
1482 return outbound_session_create(owc, NULL, 0);
1483 }
1484 return -1;
1485}
1486
1488{
1489 if (session) {
1491 }
1492}
1493
1495{
1496 if (session_registry) {
1499 }
1500}
1501
1502static void session_registry_dtor(void)
1503{
1504 if (session_registry) {
1509 }
1510}
1511
1514};
1515
1517{
1518 ari_sorcery_observer_remove("outbound_websocket", &observer_callbacks);
1522 return 0;
1523}
1524
1527
1529{
1530 int res = 0;
1531 struct ast_websocket_protocol *protocol;
1532
1533 ast_debug(2, "Initializing ARI websockets. Enabled: %s\n", is_enabled ? "yes" : "no");
1534
1537 ari_ws_session_sort_fn, ari_ws_session_cmp_fn);
1538 if (!session_registry) {
1540 "Failed to allocate the local registry for websocket applications\n");
1542 }
1543
1544 res = ari_sorcery_observer_add("outbound_websocket", &observer_callbacks);
1545 if (res < 0) {
1546 ast_log(LOG_WARNING, "Failed to register ARI websocket observer\n");
1549 }
1550
1551 /*
1552 * The global "enabled" flag only controls whether the REST and
1553 * inbound websockets are enabled. The outbound websocket
1554 * configs are always enabled.
1555 if (!is_enabled) {
1556 return AST_MODULE_LOAD_SUCCESS;
1557 }
1558 */
1559
1561 if (!ast_ws_server) {
1564 }
1565
1566 protocol = ast_websocket_sub_protocol_alloc("ari");
1567 if (!protocol) {
1570 }
1574
1576}
1577
static const char app[]
Definition: app_adsiprog.c:56
const char * str
Definition: app_jack.c:150
pthread_t thread
Definition: app_sla.c:335
ast_mutex_t lock
Definition: app_sla.c:337
Asterisk RESTful API hooks.
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition: res_ari.c:908
int ast_ari_validate_message(struct ast_json *json)
Validator for Message.
Generated file - Build validators for ARI model objects.
int(* ari_validator)(struct ast_json *json)
Function type for validator functions. Allows for.
int ari_websocket_process_request(struct ari_ws_session *ari_ws_session, const char *remote_addr, struct ast_variable *upgrade_headers, const char *app_name, struct ast_json *request_msg)
static struct ast_sorcery_observer observer_callbacks
static struct ari_ws_session * session_find_by_app(const char *app_name, unsigned int ws_type)
int ari_outbound_websocket_start(struct ari_conf_outbound_websocket *owc)
static int null_validator(struct ast_json *json)
Validator that always succeeds.
static void session_reset(struct ari_ws_session *session)
static void session_send_or_queue(struct ari_ws_session *session, struct ast_json *message, const char *msg_type, const char *app_name, int debug_app)
static struct ari_ws_session * session_create(struct ast_tcptls_session_instance *ser, const char *apps, int subscribe_all, const char *session_id, struct ari_conf_outbound_websocket *ows, enum ast_websocket_type ws_type)
static void session_dtor(void *obj)
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
#define STASIS_END_POST_WAIT_US
static void * outbound_session_handler_thread(void *obj)
struct ari_ws_session * ari_websocket_get_session(const char *session_id)
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
static void outbound_sessions_load(const char *name)
static int outbound_session_create(void *obj, void *args, int flags)
void ari_websocket_shutdown_all(void)
session_apply_result
@ SESSION_APPLY_FAILED
@ SESSION_APPLY_RECONNECT_REQUIRED
@ SESSION_APPLY_OK
@ SESSION_APPLY_NO_CHANGE
#define MESSAGES_INIT_SIZE
static struct ao2_container * session_registry
Local registry for created ari_ws_session objects.
void ari_websocket_send_event(struct ari_ws_session *session, const char *app_name, struct ast_json *message, int debug_app)
Callback handler for Stasis application messages.
static void session_unregister_apps(struct ari_ws_session *session)
int ari_websocket_load_module(int is_enabled)
static void session_unref(struct ari_ws_session *session)
#define STASIS_END_MAX_WAIT_MS
static void * outbound_session_pc_close_thread(void *data)
static void session_cleanup(struct ari_ws_session *session)
void ast_ari_close_per_call_websocket(char *app_name)
This function is called by the app_stasis dialplan app to close a per-call websocket after stasis_app...
#define APPS_INIT_SIZE
char * ast_ari_create_per_call_websocket(const char *app_name, struct ast_channel *chan)
This function gets called by app_stasis when a call arrives but a Stasis application isn't already re...
#define PER_CALL_FAIL_SAFE_TIMEOUT(owc)
static void session_unregister_app_cb(char *app_name, struct ari_ws_session *session)
static void session_send_app_event(struct ari_ws_session *session, const char *event_type, const char *app_name)
static int session_write(struct ari_ws_session *session, struct ast_json *message)
ari_validator ari_validate_message_fn
static void websocket_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
void ari_websocket_shutdown(struct ari_ws_session *session)
#define ARI_CONTEXT_REGISTRAR
static int session_shutdown_cb(void *obj, void *arg, int flags)
struct ast_websocket_server * ast_ws_server
static int session_register_apps(struct ari_ws_session *session, const char *_apps, int subscribe_all)
static enum session_apply_result outbound_session_apply_config(struct ari_ws_session *session, struct ari_conf_outbound_websocket *new_owc)
int ari_websocket_unload_module(void)
#define handle_create_error(ser, code, msg, reason)
void ari_handle_websocket(struct ast_tcptls_session_instance *ser, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
Wrapper for invoking the websocket code for an incoming connection.
#define VALIDATION_FAILED
struct ao2_container * ari_websocket_get_sessions(void)
static void session_registry_dtor(void)
static struct ast_json * session_read(struct ari_ws_session *session)
static int session_update(struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session, int send_registered_events)
Internal API's for websockets.
#define STASIS_CONTEXT_PREFIX
#define ARI_MAX_APP_NAME_LEN
const char * ari_websocket_type_to_str(enum ast_websocket_type type)
Asterisk main include file. File version handling, generic pbx functions.
static struct ast_mansession session
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition: astmm.h:298
void ast_free_ptr(void *ptr)
free() wrapper
Definition: astmm.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition: astobj2.h:2048
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition: astobj2.h:1211
static PGresult * result
Definition: cel_pgsql.c:84
const char * ast_channel_name(const struct ast_channel *chan)
const char * ast_channel_uniqueid(const struct ast_channel *chan)
#define AST_MAX_CONTEXT
Definition: channel.h:135
void ast_verbose(const char *fmt,...)
Definition: extconf.c:2206
static const char name[]
Definition: format_mp3.c:68
static const char context_name[]
#define TRACE_ATLEAST(level)
#define SCOPE_ENTER(level,...)
#define SCOPE_EXIT(...)
#define SCOPE_EXIT_LOG_RTN(__log_level,...)
#define ast_trace(level,...)
ast_http_method
HTTP Request methods known by Asterisk.
Definition: http.h:58
struct ast_variable * ast_http_create_basic_auth_header(const char *userid, const char *password)
Create an HTTP authorization header.
Definition: http.c:1668
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
struct ast_websocket_protocol * ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int ast_websocket_set_timeout(struct ast_websocket *session, int timeout)
Set the timeout on a non-blocking WebSocket session.
int ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
int ast_websocket_set_nonblock(struct ast_websocket *session)
Set the socket of a WebSocket session to be non-blocking.
const char * ast_websocket_session_id(struct ast_websocket *session)
Get the session ID for a WebSocket session.
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
struct ast_sockaddr * ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
int ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
int ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_INBOUND
@ AST_WS_TYPE_CLIENT_PER_CALL_CONFIG
@ AST_WS_TYPE_CLIENT_PERSISTENT
@ AST_WS_TYPE_CLIENT_PER_CALL
struct ast_websocket_server * ast_websocket_server_create(void)
Creates a ast_websocket_server.
Application convenience functions, designed to give consistent look and feel to Asterisk apps.
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
Definition: main/config.c:1013
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition: extconf.c:1262
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
#define LOG_WARNING
Internal API's for res_ari.
ari_conf_owc_fields
Definition: internal.h:96
@ ARI_OWC_NEEDS_REREGISTER
Definition: internal.h:106
@ ARI_OWC_FIELD_NONE
Definition: internal.h:97
@ ARI_OWC_FIELD_SUBSCRIBE_ALL
Definition: internal.h:102
@ ARI_OWC_NEEDS_RECONNECT
Definition: internal.h:103
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
void ast_json_free(void *p)
Asterisk's custom JSON allocator. Exposed for use by unit tests.
Definition: json.c:52
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
@ AST_JSON_PRETTY
Definition: json.h:795
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition: json.c:585
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition: json.c:484
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:611
int errno
static int subscribe_all(void)
Definition: manager.c:9588
Asterisk module definitions.
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition: netsock2.h:256
Core PBX routines and definitions.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition: pbx.c:6943
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition: pbx.c:8220
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition: extconf.c:4172
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition: pbx.c:6164
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
static void * cleanup(void *unused)
Definition: pbx_realtime.c:124
struct ari_conf_outbound_websocket * ari_conf_get_owc_for_app(const char *app_name, unsigned int ws_type)
Get the outbound websocket configuration for a Stasis app.
int ari_sorcery_observer_remove(const char *object_type, const struct ast_sorcery_observer *callbacks)
enum ari_conf_owc_fields ari_conf_owc_get_invalid_fields(const char *id)
struct ao2_container * ari_conf_get_owcs(void)
enum ari_conf_owc_fields ari_conf_owc_detect_changes(struct ari_conf_outbound_websocket *old_owc, struct ari_conf_outbound_websocket *new_owc)
Detect changes between two outbound websocket configurations.
int ari_sorcery_observer_add(const char *object_type, const struct ast_sorcery_observer *callbacks)
struct ari_conf_general * ari_conf_get_general(void)
struct ari_conf_outbound_websocket * ari_conf_get_owc(const char *id)
static int is_enabled(void)
Helper function to check if module is enabled.
Definition: res_ari.c:96
const char * method
Definition: res_pjsip.c:1279
static struct timeval msg_timestamp(void *msg, enum smdi_message_type type)
Definition: res_smdi.c:366
#define NULL
Definition: resample.c:96
Generated file - declares stubs to be implemented in res/ari/resource_events.c.
const char * ast_sorcery_object_get_id(const void *object)
Get the unique identifier of a sorcery object.
Definition: sorcery.c:2317
Stasis Application API. See Stasis Application API for detailed documentation.
void stasis_app_control_mark_failed(struct stasis_app_control *control)
Set the failed flag on a control structure.
Definition: control.c:377
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
int stasis_app_is_registered(const char *name)
Check if a Stasis application is registered.
Definition: res_stasis.c:1747
int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
Exit res_stasis and continue execution in the dialplan.
Definition: control.c:415
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
Definition: res_stasis.c:1838
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application and unsubscribe from all event sources.
Definition: res_stasis.c:1848
struct stasis_app_control * stasis_app_control_find_by_channel_id(const char *channel_id)
Returns the handler for the channel with the given id.
Definition: res_stasis.c:349
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
Definition: res_stasis.c:1843
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition: strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
int attribute_pure ast_true(const char *val)
Make sure something is true. Determine if a string containing a boolean value is "true"....
Definition: utils.c:2199
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
@ AST_STRSEP_STRIP
Definition: strings.h:255
int ast_in_delimited_string(const char *needle, const char *haystack, char delim)
Check if there is an exact match for 'needle' between delimiters in 'haystack'.
Definition: strings.c:466
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition: utils.c:1835
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Registered applications container.
Definition: pbx_app.c:68
Global configuration options for ARI.
Definition: internal.h:57
const ast_string_field apps
Definition: internal.h:116
struct ast_websocket_client * websocket_client
Definition: internal.h:119
struct ast_vector_string websocket_apps
struct ari_conf_outbound_websocket * owc
struct ari_ws_session::@424 message_queue
struct ast_websocket * ast_ws_session
Main Channel structure associated with a channel.
Definition of a URI handler.
Definition: http.h:102
const char * uri
Definition: http.h:105
void * data
Definition: http.h:116
Abstract JSON element (object, array, string, int, ...).
Interface for a sorcery object type observer.
Definition: sorcery.h:332
void(* loaded)(const char *object_type)
Callback for when an object type is loaded/reloaded.
Definition: sorcery.h:343
describes a server instance
Definition: tcptls.h:151
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
enum ast_websocket_type connection_type
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
ast_websocket_pre_callback session_attempted
Callback called when a new session is attempted. Optional.
Structure for a WebSocket server.
Structure definition for session.
const char * args
Time-related functions and macros.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
int error(const char *format,...)
Definition: utils/frame.c:999
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
int ast_wait_for_input(int fd, int ms)
Definition: utils.c:1698
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: utils.c:2839
#define ast_pthread_create_detached_background(a, b, c, d)
Definition: utils.h:597
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:94
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REMOVE_ORDERED(vec, idx)
Remove an element from a vector by index while maintaining order.
Definition: vector.h:459
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:636
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:582
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_REMOVE_CMP_ORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison while maintaining order.
Definition: vector.h:551
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:185
#define AST_VECTOR_GET_CMP(vec, value, cmp)
Get an element from a vector that matches the given comparison.
Definition: vector.h:742
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition: vector.h:382
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:873
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.