Asterisk - The Open Source Telephony Project GIT-master-6144b6b
Loading...
Searching...
No Matches
ari_websockets.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#include "asterisk.h"
20
21#include "resource_events.h"
22#include "ari_websockets.h"
23#include "internal.h"
25#include "asterisk/app.h"
26#include "asterisk/ari.h"
27#include "asterisk/astobj2.h"
29#include "asterisk/module.h"
30#include "asterisk/pbx.h"
31#include "asterisk/stasis_app.h"
32#include "asterisk/time.h"
33#include "asterisk/uuid.h"
34#include "asterisk/vector.h"
36
37
38/*! \file
39 *
40 * \brief WebSocket support for RESTful API's.
41 * \author David M. Lee, II <dlee@digium.com>
42 */
43
44/*! Number of buckets for the ari_ws_session registry. Remember to keep it a prime number! */
45#define SESSION_REGISTRY_NUM_BUCKETS 23
46
47/*! Initial size of websocket session apps vector */
48#define APPS_INIT_SIZE 7
49
50/*! Initial size of the websocket session message queue. */
51#define MESSAGES_INIT_SIZE 23
52
53#define ARI_CONTEXT_REGISTRAR "res_ari"
54
55/*! \brief Local registry for created \ref ari_ws_session objects. */
57
59
60#if defined(AST_DEVMODE)
62#else
63 /*!
64 * \brief Validator that always succeeds.
65 */
66 static int null_validator(struct ast_json *json)
67 {
68 return 1;
69 }
70
72#endif
73
74
75#define VALIDATION_FAILED \
76 "{" \
77 " \"error\": \"InvalidMessage\"," \
78 " \"message\": \"Message validation failed\"" \
79 "}"
80
82{
83 RAII_VAR(char *, str, NULL, ast_json_free);
84
85 if (!session || !session->ast_ws_session || !message) {
86 return -1;
87 }
88
89#ifdef AST_DEVMODE
90 if (!session->validator(message)) {
91 ast_log(LOG_ERROR, "Outgoing message failed validation\n");
93 }
94#endif
95
97
98 if (str == NULL) {
99 ast_log(LOG_ERROR, "Failed to encode JSON object\n");
100 return -1;
101 }
102
103 if (ast_websocket_write_string(session->ast_ws_session, str)) {
104 ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
106 return -1;
107 }
108 return 0;
109}
110
112 struct ast_json *message, const char *msg_type, const char *app_name,
113 int debug_app)
114{
115 const char *msg_timestamp, *msg_ast_id;
116
122 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
123 session->remote_addr, msg_type, app_name);
124 return;
125 }
126 }
127
128 msg_ast_id = S_OR(
129 ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
130 if (ast_strlen_zero(msg_ast_id)) {
131 char eid[20];
132
133 if (ast_json_object_set(message, "asterisk_id",
137 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
138 session->remote_addr, msg_type, app_name);
139 }
140 }
141
142 if (!session->ast_ws_session) {
143 /* If the websocket is NULL, the message goes to the queue */
144 if (AST_VECTOR_APPEND(&session->message_queue, message) == 0) {
146 }
147 /*
148 * If the msg_type one of the Application* types, the websocket
149 * might not be there yet so don't log.
150 */
151 if (!ast_begins_with(msg_type, "Application")) {
153 "%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
154 session->remote_addr,
155 msg_type,
156 app_name);
157 }
158 } else {
159
160 if (DEBUG_ATLEAST(4) || debug_app) {
162
163 ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
164 session->remote_addr,
165 str);
167 }
169 }
170}
171
173 const char *event_type, const char *app_name)
174{
175 char eid[20];
176 int debug_app = stasis_app_get_debug_by_name(app_name);
177 struct ast_json *msg = ast_json_pack("{s:s, s:o?, s:s, s:s }",
178 "type", event_type,
179 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
180 "application", app_name,
181 "asterisk_id", ast_eid_to_str(eid, sizeof(eid), &ast_eid_default));
182
183 if (!msg) {
184 return;
185 }
186 ast_debug(3, "%s: Sending '%s' event to app '%s'\n", session->session_id,
187 event_type, app_name);
188 /*
189 * We don't want to use ari_websocket_send_event() here because
190 * the app may be unregistered which will cause stasis_app_event_allowed
191 * to return false.
192 */
193 session_send_or_queue(session, msg, event_type, app_name, debug_app);
194 ast_json_unref(msg);
195}
196
198{
200
201 if (!session || !session->ast_ws_session) {
202 return NULL;
203 }
204 if (ast_websocket_fd(session->ast_ws_session) < 0) {
205 return NULL;
206 }
207
208 while (!message) {
209 int res;
210 char *payload;
211 uint64_t payload_len;
212 enum ast_websocket_opcode opcode;
213 int fragmented;
214
215 res = ast_wait_for_input(
216 ast_websocket_fd(session->ast_ws_session), -1);
217
218 if (res <= 0) {
219 ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
220 strerror(errno));
221 return NULL;
222 }
223
224 res = ast_websocket_read(session->ast_ws_session, &payload,
225 &payload_len, &opcode, &fragmented);
226
227 if (res != 0) {
228 ast_log(LOG_WARNING, "WebSocket read error: %s\n",
229 strerror(errno));
230 return NULL;
231 }
232
233 switch (opcode) {
235 ast_debug(1, "WebSocket closed\n");
236 return NULL;
238 message = ast_json_load_buf(payload, payload_len, NULL);
239 if (message == NULL) {
240 struct ast_json *error = ast_json_pack(
241 "{s:s, s:s, s:s, s:i, s:s, s:s }",
242 "type", "RESTResponse",
243 "transaction_id", "",
244 "request_id", "",
245 "status_code", 400,
246 "reason_phrase", "Failed to parse request message JSON",
247 "uri", ""
248 );
250 error, 0);
253 "WebSocket input failed to parse\n");
254
255 }
256
257 break;
258 default:
259 /* Ignore all other message types */
260 break;
261 }
262 }
263
264 return ast_json_ref(message);
265}
266
268 struct ast_tcptls_session_instance *ser, const char *uri,
269 enum ast_http_method method, struct ast_variable *get_params,
270 struct ast_variable *headers)
271{
272 struct ast_http_uri fake_urih = {
274 };
275
276 ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
277 headers);
278}
279
280/*!
281 * \brief Callback handler for Stasis application messages.
282 *
283 * \internal
284 *
285 * \param data Void pointer to the event session (\ref event_session).
286 * \param app_name Name of the Stasis application that dispatched the message.
287 * \param message The dispatched message.
288 * \param debug_app Debug flag for the application.
289 */
291 const char *app_name, struct ast_json *message, int debug_app)
292{
293 char *remote_addr = session->ast_ws_session ? ast_sockaddr_stringify(
294 ast_websocket_remote_address(session->ast_ws_session)) : "";
295 const char *msg_type, *msg_application;
296 SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
297
299
301
302 msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
303 msg_application = S_OR(
305
306 /* If we've been replaced, remove the application from our local
307 websocket_apps container */
308 if (session->type == AST_WS_TYPE_INBOUND
309 && strcmp(msg_type, "ApplicationReplaced") == 0 &&
310 strcmp(msg_application, app_name) == 0) {
311 AST_VECTOR_REMOVE_CMP_ORDERED(&session->websocket_apps,
313 }
314
315 /* Now, we need to determine our state to see how we will handle the message */
319 "%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
320 remote_addr, msg_type, msg_application);
321 }
322
325 app_name, debug_app);
326 }
327
329 && !ast_strlen_zero(session->channel_id)
330 && ast_strings_equal(msg_type, "StasisEnd")) {
331 struct ast_json *chan = ast_json_object_get(message, "channel");
332 struct ast_json *id_obj = ast_json_object_get(chan, "id");
333 const char *id = ast_json_string_get(id_obj);
334 if (!ast_strlen_zero(id)
335 && ast_strings_equal(id, session->channel_id)) {
336 ast_debug(3, "%s: StasisEnd message sent for channel '%s'\n",
337 remote_addr, id);
338 session->stasis_end_sent = 1;
339 }
340 }
342 SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
343 remote_addr, msg_type, app_name);
344}
345
346static void stasis_app_message_handler(void *data, const char *app_name,
347 struct ast_json *message)
348{
349 int debug_app = stasis_app_get_debug_by_name(app_name);
350 struct ari_ws_session *session = data;
351
352 if (!session) {
353 ast_debug(3, "Stasis app '%s' message handler called with NULL session. OK for per_call_config websocket.\n",
354 app_name);
355 return;
356 }
357
359}
360
362{
363 if (!session) {
364 return;
365 }
366 ast_debug(4, "%s: Unreffing ARI websocket session\n", session->session_id);
367 ao2_ref(session, -1);
368}
369
371{
372 ast_debug(3, "%s: Trying to unregister app '%s'\n",
373 session->session_id, app_name);
377 ast_debug(3, "%s: Unregistering context '%s' for app '%s'\n",
378 session->session_id, context_name, app_name);
380 } else {
381 ast_debug(3, "%s: Unregistering stasis app '%s' and unsubscribing from all events.\n",
382 session->session_id, app_name);
384 }
385
386 /*
387 * We don't send ApplicationUnregistered events for outbound per-call
388 * configs because there's no websocket to send them via or to
389 * inbound websockets because the websocket is probably closed already.
390 */
391 if (!(session->type
393 session_send_app_event(session, "ApplicationUnregistered", app_name);
394 }
395}
396
398{
399 int app_count = (int)AST_VECTOR_SIZE(&session->websocket_apps);
400
401 if (app_count == 0) {
402 return;
403 }
404 ast_debug(3, "%s: Unregistering stasis apps.\n", session->session_id);
405
407 session);
408 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
409
410 return;
411}
412
414 const char *_apps, int subscribe_all)
415{
416 char *apps = ast_strdupa(_apps);
417 char *app_name;
418 int app_counter = 0;
419
420 ast_debug(3, "%s: Registering apps '%s'. Subscribe all: %s\n",
421 session->session_id, apps, subscribe_all ? "yes" : "no");
422
423 while ((app_name = ast_strsep(&apps, ',', AST_STRSEP_STRIP))) {
424
426 ast_log(LOG_WARNING, "%s: Invalid application name\n", session->session_id);
427 return -1;
428 }
429
430 if (strlen(app_name) > ARI_MAX_APP_NAME_LEN) {
431 ast_log(LOG_WARNING, "%s: Websocket app '%s' > %d characters\n",
432 session->session_id, app_name, (int)ARI_MAX_APP_NAME_LEN);
433 return -1;
434 }
435
437 /*
438 * Outbound per-call configs only create a dialplan context.
439 * If they registered stasis apps there'd be no way for the
440 * Stasis dialplan app to know that it needs to start a
441 * per-call websocket connection.
442 */
444
449 ast_log(LOG_WARNING, "%s: Could not create context '%s'\n",
450 session->session_id, context_name);
451 return -1;
452 } else {
454 "Stasis", ast_strdup(app_name), ast_free_ptr,
458 }
459 } else {
460 ast_debug(3, "%s: Context '%s' already exists\n", session->session_id,
462 }
463 } else {
464 int already_registered = stasis_app_is_registered(app_name);
465 int res = 0;
466
467 if (subscribe_all) {
469 session);
470 } else {
472 session);
473 }
474
475 if (res != 0) {
476 return -1;
477 }
478
479 /*
480 * If there was an existing app by the same name, the register handler
481 * will have sent an ApplicationReplaced event. If it's a new app, we
482 * send an ApplicationRegistered event.
483 *
484 * Except... There's no websocket to send it on for outbound per-call
485 * configs and inbound websockets don't need them because they aready
486 * know what apps they've registered for.
487 */
488 if (!already_registered
490 session_send_app_event(session, "ApplicationRegistered",
491 app_name);
492 }
493 }
494
495 if (AST_VECTOR_ADD_SORTED(&session->websocket_apps, ast_strdup(app_name), strcmp)) {
496 ast_log(LOG_WARNING, "%s: Unable to add app '%s' to apps container\n",
497 session->session_id, app_name);
498 return -1;
499 }
500
501 app_counter++;
502 if (app_counter == 1) {
503 ast_free(session->app_name);
504 session->app_name = ast_strdup(app_name);
505 if (!session->app_name) {
506 ast_log(LOG_WARNING, "%s: Unable to duplicate app name\n",
507 session->session_id);
508 return -1;
509 }
510 }
511 }
512
513 return 0;
514}
515
516/*
517 * Websocket session cleanup is a bit complicated because it can be
518 * in different states, it may or may not be in the registry container,
519 * and stasis may be sending asynchronous events to it and some
520 * stages of cleanup need to lock it.
521 *
522 * That's why there are 3 different cleanup functions.
523 */
524
525/*!
526 * \internal
527 * \brief Reset the ari_ws_session without destroying it.
528 * It can't be reused and will be cleaned up by the caller.
529 * This should only be called by session_create()
530 * and session_cleanup().
531 */
533{
535
536 ast_debug(3, "%s: Resetting ARI websocket session\n",
537 session->session_id);
538
539 /* Clean up the websocket_apps container */
540 if (AST_VECTOR_SIZE(&session->websocket_apps) > 0) {
542 }
543 AST_VECTOR_RESET(&session->websocket_apps, ast_free_ptr);
544 AST_VECTOR_FREE(&session->websocket_apps);
545
546 AST_VECTOR_RESET(&session->message_queue, ast_json_unref);
547 AST_VECTOR_FREE(&session->message_queue);
548}
549
550/*!
551 * \internal
552 * \brief RAII_VAR and container ari_ws_session cleanup function.
553 * This unlinks the ari_ws_session from the registry and cleans up the
554 * decrements the reference count.
555 */
556static void session_cleanup(void *obj)
557{
558 struct ari_ws_session *session = obj;
559 enum ast_websocket_type wstype;
560
561 if (!session) {
562 return;
563 }
564
565 /*
566 * We need to save the websocket type because client-per-call websockets
567 * will get destroyed when they're unlinked from the session_registry
568 * so we won't be able to test it afterwards.
569 */
570 wstype = session->type;
571
572 ast_debug(3, "%s: Cleaning up ARI %s websocket session. Refcount: %d\n",
573 session->session_id, ast_websocket_type_to_str(wstype), (int)ao2_ref(session, 0));
574
576
577 if (session_registry) {
578 ast_debug(3, "%s: Unlinking %s websocket session from registry Refcount: %d\n",
579 session->session_id, ast_websocket_type_to_str(wstype), (int)ao2_ref(session, 0));
581 }
582
583 /*
584 * If this is a per-call config then its only reference
585 * was held by the registry container so we don't need
586 * to unref it here.
587 */
590 }
591}
592
593/*!
594 * \internal
595 * \brief The ao2 destructor.
596 * This cleans up the reference to the parent ast_websocket and the
597 * outbound connection websocket if any.
598 */
599static void session_dtor(void *obj)
600{
601 struct ari_ws_session *session = obj;
602
603 ast_debug(3, "%s: Destroying ARI websocket session\n",
604 session->session_id);
605
606 ast_free(session->app_name);
607 ast_free(session->remote_addr);
608 ast_free(session->channel_id);
609 ast_free(session->channel_name);
610 ao2_cleanup(session->owc);
611 session->owc = NULL;
612 if (!session->ast_ws_session) {
613 return;
614 }
615 ast_websocket_unref(session->ast_ws_session);
616 session->ast_ws_session = NULL;
617}
618
619#define handle_create_error(ser, code, msg, reason) \
620({ \
621 if (ser) { \
622 ast_http_error(ser, code, msg, reason); \
623 } \
624 ast_log(LOG_WARNING, "Failed to create ARI websocket session: %d %s %s\n", \
625 code, msg, reason); \
626})
627
629 struct ast_tcptls_session_instance *ser,
630 const char *apps,
631 int subscribe_all,
632 const char *session_id,
633 struct ari_conf_outbound_websocket *ows,
634 enum ast_websocket_type ws_type)
635{
637 size_t size;
638
639 ast_debug(3, "%s: Creating ARI websocket session for apps '%s'\n",
641
642 size = sizeof(*session) + strlen(session_id) + 1;
643
645 if (!session) {
646 return NULL;
647 }
648
649 session->type = ws_type;
650 session->subscribe_all = subscribe_all;
651
652 strcpy(session->session_id, session_id); /* Safe */
653
654 /* Instantiate the hash table for Stasis apps */
655 if (AST_VECTOR_INIT(&session->websocket_apps, APPS_INIT_SIZE)) {
656 handle_create_error(ser, 500, "Internal Server Error",
657 "Allocation failed");
658 return NULL;
659 }
660
661 /* Instantiate the message queue */
662 if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
663 handle_create_error(ser, 500, "Internal Server Error",
664 "Allocation failed");
665 AST_VECTOR_FREE(&session->websocket_apps);
666 return NULL;
667 }
668
669 session->validator = ari_validate_message_fn;
670
671 if (ows) {
672 session->owc = ao2_bump(ows);
673 }
674
676 handle_create_error(ser, 500, "Internal Server Error",
677 "Stasis app registration failed");
679 return NULL;
680 }
681
683 handle_create_error(ser, 500, "Internal Server Error",
684 "Allocation failed");
686 return NULL;
687 }
688
689 return ao2_bump(session);
690}
691
692/*!
693 * \internal
694 * \brief Updates the websocket session.
695 *
696 * \details If the value of the \c ws_session is not \c NULL and there are messages in the
697 * event session's \c message_queue, the messages are dispatched and removed from
698 * the queue.
699 *
700 * \param ari_ws_session The ARI websocket session
701 * \param ast_ws_session The Asterisk websocket session
702 */
704 struct ast_websocket *ast_ws_session, int send_registered_events)
705{
707 int i;
708
709 if (ast_ws_session == NULL) {
710 return -1;
711 }
712
713 if (!general) {
714 return -1;
715 }
716
720 ast_log(LOG_ERROR, "Failed to copy remote address\n");
721 return -1;
722 }
723
726 "ARI web socket failed to set nonblock; closing: %s\n",
727 strerror(errno));
728 return -1;
729 }
730
731 if (ast_websocket_set_timeout(ast_ws_session, general->write_timeout)) {
732 ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
733 general->write_timeout);
734 }
735
739 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
742 ast_json_unref(msg);
743 }
744
747
748 if (send_registered_events) {
749 int i;
750 char *app;
751
752 for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->websocket_apps); i++) {
755 "ApplicationRegistered", app);
756 }
757 }
758
759 return 0;
760}
761
762/*!
763 * \internal
764 * \brief This function gets called for incoming websocket connections
765 * before the upgrade process is completed.
766 *
767 * The point is to be able to report early errors via HTTP rather
768 * than letting res_http_websocket create an ast_websocket session
769 * then immediately close it if there's an error.
770 */
772 struct ast_variable *get_params, struct ast_variable *headers,
773 const char *session_id)
774{
775 const char *subscribe_all = NULL;
776 const char *apps = NULL;
777 struct ari_ws_session *session = NULL;
778
779 apps = ast_variable_find_in_list(get_params, "app");
780 if (ast_strlen_zero(apps)) {
781 handle_create_error(ser, 400, "Bad Request",
782 "HTTP request is missing param: [app]");
783 return -1;
784 }
785
786 subscribe_all = ast_variable_find_in_list(get_params, "subscribeAll");
787
790 if (!session) {
791 handle_create_error(ser, 500, "Server Error",
792 "Failed to create ARI websocket session");
793 return -1;
794 }
795 /* It's in the session registry now so we can release our reference */
797
798 return 0;
799}
800
801/*!
802 * \internal
803 * \brief This function gets called for incoming websocket connections
804 * after the upgrade process is completed.
805 */
807 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
808{
809 /*
810 * ast_ws_session is passed in with it's refcount bumped so
811 * we need to unref it when we're done. The refcount will
812 * be bumped again when we add it to the ari_ws_session.
813 */
816 struct ast_json *msg;
817 struct ast_variable *v;
818 char *remote_addr = ast_sockaddr_stringify(
819 ast_websocket_remote_address(ast_ws_session));
820 const char *session_id = ast_websocket_session_id(ast_ws_session);
821
822 SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
823
824 if (TRACE_ATLEAST(2)) {
825 ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
826 for (v = upgrade_headers; v; v = v->next) {
827 ast_trace(3, "--> %s: %s\n", v->name, v->value);
828 }
829 }
830
831 /*
832 * Find the ari_ws_session that was created by websocket_attempted_cb
833 * and update its ast_websocket.
834 */
836 if (!ari_ws_session) {
838 "%s: Failed to locate an event session for the websocket session %s\n",
839 remote_addr, session_id);
840 }
841
842 /*
843 * Since this is a new inbound websocket session,
844 * session_register_apps() will have already sent "ApplicationRegistered"
845 * events for the apps. We don't want to do it again.
846 */
847 session_update(ari_ws_session, ast_ws_session, 0);
848
850 ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
851 while ((msg = session_read(ari_ws_session))) {
853 upgrade_headers, ari_ws_session->app_name, msg);
854 ast_json_unref(msg);
855 }
858
859 SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
860}
861
862static int session_shutdown_cb(void *obj, void *arg, int flags)
863{
864 struct ari_ws_session *session = obj;
865
866 /* Per-call configs have no actual websocket */
868 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session\n",
869 session->session_id,
872 return 0;
873 }
874 if (session->type == AST_WS_TYPE_INBOUND) {
875 ast_log(LOG_NOTICE, "%s: Shutting down inbound ARI websocket session from %s\n",
876 session->session_id, session->remote_addr);
877 } else {
878 ast_log(LOG_NOTICE, "%s: Shutting down %s ARI websocket session to %s\n",
879 session->session_id,
881 session->remote_addr);
882 }
883
884 /*
885 * We need to ensure the session is kept around after the cleanup
886 * so we can close the websocket.
887 */
889 session->closing = 1;
891 if (session->ast_ws_session) {
892 ast_debug(3, "%s: Closing websocket\n", session->session_id);
893 ast_websocket_close(session->ast_ws_session, 1000);
894 } else if (session->thread) {
895 ast_debug(3, "%s: Cancelling handler thread\n", session->session_id);
896 pthread_cancel(session->thread);
897 }
898
899 return 0;
900}
901
902
907
908static struct ari_ws_session *session_find_by_app(const char *app_name,
909 unsigned int ws_type)
910{
911 struct ari_ws_session *session = NULL;
912 struct ao2_iterator i;
913
915 return NULL;
916 }
917
919 while ((session = ao2_iterator_next(&i))) {
920 char *app = NULL;
921 if (!(session->type & ws_type)) {
923 continue;
924 }
925
926 app = AST_VECTOR_GET_CMP(&session->websocket_apps,
928 if (app) {
929 break;
930 }
932 }
934 return session;
935}
936
937/*!
938 * \internal
939 * \brief Connection and request handler thread for outbound websockets.
940 *
941 * This thread handles the connection and reconnection logic for outbound
942 * websockets. Once connected, it waits for incoming REST over Websocket
943 * requests and dispatches them to ari_websocket_process_request()).
944 */
945static void *outbound_session_handler_thread(void *obj)
946{
947 struct ari_ws_session *session = obj;
948 int already_sent_registers = 1;
949
950 /*
951 * We use pthread_cleanup_push because RAII destructors don't run
952 * if we cancel the thread.
953 */
954 pthread_cleanup_push(session_cleanup, obj);
955
956 ast_debug(3, "%s: Starting outbound websocket thread RC: %d\n",
957 session->session_id, (int)ao2_ref(session, 0));
958 session->thread = pthread_self();
959 session->connected = 0;
960
961 while(1) {
963 RAII_VAR(struct ast_variable *, upgrade_headers, NULL, ast_variables_destroy);
965 struct ast_json *msg;
966
967 ast_debug(3, "%s: Attempting to connect to %s\n", session->session_id,
968 session->owc->websocket_client->uri);
969
970 astws = ast_websocket_client_connect(session->owc->websocket_client,
971 NULL, session->session_id, &result);
972 if (!astws || result != WS_OK) {
974 struct stasis_app_control *control =
976 if (control) {
977 ast_debug(3, "%s: Connection failed. Returning to dialplan.\n",
978 session->session_id);
981 ao2_cleanup(control);
982 } else {
983 ast_debug(3, "%s: Connection failed. No control object found.\n",
984 session->session_id);
985 }
986
987 break;
988 }
989
990 if (session->closing) {
991 ast_debug(3, "%s: Websocket closing RC: %d\n",
992 session->session_id, (int)ao2_ref(session, 0));
993 break;
994 }
995
996 usleep(session->owc->websocket_client->reconnect_interval * 1000);
997 continue;
998 }
999 ast_log(LOG_NOTICE, "%s: Outbound websocket connected to %s\n",
1000 session->type == AST_WS_TYPE_CLIENT_PERSISTENT ? session->session_id : session->channel_name,
1001 session->owc->websocket_client->uri);
1002
1003 /*
1004 * We only want to send "ApplicationRegistered" events in the
1005 * case of a reconnect. The initial connection will have already sent
1006 * the events when outbound_register_apps() was called.
1007 *
1008 * Note: session_update() bumps astws.
1009 */
1010 session_update(session, astws, !already_sent_registers);
1011 already_sent_registers = 0;
1012
1013 /*
1014 * This is the Authorization header that would normally be taken
1015 * from the incoming HTTP request that is being upgraded to a websocket.
1016 * Since this is an outbound websocket, we have to create it ourselves.
1017 *
1018 * This is NOT the same as the Authorization header that is used for
1019 * authentication with the remote websocket server.
1020 */
1021 upgrade_headers = ast_http_create_basic_auth_header(
1022 session->owc->local_ari_user, session->owc->local_ari_password);
1023 if (!upgrade_headers) {
1024 ast_log(LOG_WARNING, "%s: Failed to create upgrade header\n", session->session_id);
1025 session->thread = 0;
1026 session->connected = 0;
1027 ast_websocket_close(astws, 1000);
1028 /* Clean up the reference held by session_update() */
1029 ast_websocket_unref(astws);
1030 session->ast_ws_session = NULL;
1031 break;
1032 }
1033
1034 session->connected = 1;
1035 ast_debug(3, "%s: Websocket connected\n", session->session_id);
1036 ast_debug(3, "%s: Waiting for messages RC: %d\n",
1037 session->session_id, (int)ao2_ref(session, 0));
1038
1039 /*
1040 * The websocket is connected. Now we need to wait for messages
1041 * from the server.
1042 */
1043 while ((msg = session_read(session))) {
1045 upgrade_headers, session->app_name, msg);
1046 ast_json_unref(msg);
1047 }
1048
1049 session->connected = 0;
1051 /* Clean up the reference held by session_update() */
1052 ast_websocket_unref(session->ast_ws_session);
1053 session->ast_ws_session = NULL;
1054 if (session->closing) {
1055 ast_debug(3, "%s: Websocket closing RC: %d\n",
1056 session->session_id, (int)ao2_ref(session, 0));
1057 break;
1058 }
1059
1060 ast_log(LOG_WARNING, "%s: Websocket disconnected. Reconnecting\n",
1061 session->session_id);
1062 }
1063
1064 ast_debug(3, "%s: Stopping outbound websocket thread RC: %d\n",
1065 session->session_id, (int)ao2_ref(session, 0));
1066 session->thread = 0;
1067
1068 pthread_cleanup_pop(1);
1069
1070 return NULL;
1071}
1072
1079
1081 struct ari_ws_session *session,
1082 struct ari_conf_outbound_websocket *new_owc)
1083{
1084 enum session_apply_result apply_result;
1085 enum ari_conf_owc_fields what_changed;
1086 const char *new_owc_id = ast_sorcery_object_get_id(new_owc);
1087
1088 what_changed = ari_conf_owc_detect_changes(session->owc, new_owc);
1089
1090 if (what_changed == ARI_OWC_FIELD_NONE) {
1091 ast_debug(2, "%s: No changes detected\n", new_owc_id);
1093 }
1094 ast_debug(2, "%s: Config change detected. Checking details\n", new_owc_id);
1095
1096 if (what_changed & ARI_OWC_NEEDS_REREGISTER) {
1097 ast_debug(2, "%s: Re-registering apps\n", new_owc_id);
1098
1099 if (!(what_changed & ARI_OWC_FIELD_SUBSCRIBE_ALL)) {
1100 /*
1101 * If subscribe_all didn't change, we don't have to
1102 * unregister apps that are already registered and
1103 * also in the new config. We'll remove them from
1104 * the session->websocket_apps container so that
1105 * session_unregister_apps will only clean up
1106 * the ones that are going away. session_register_apps
1107 * will add them back in again and cause ApplicationReplaced
1108 * messages to be sent.
1109 *
1110 * If subscribe_all did change, we have no choice but to
1111 * unregister all apps and register all the ones in
1112 * the new config even if they already existed.
1113 */
1114 int i = 0;
1115 char *app;
1116
1117 while(i < (int) AST_VECTOR_SIZE(&session->websocket_apps)) {
1118 app = AST_VECTOR_GET(&session->websocket_apps, i);
1119 if (ast_in_delimited_string(app, new_owc->apps, ',')) {
1120 AST_VECTOR_REMOVE_ORDERED(&session->websocket_apps, i);
1121 ast_debug(3, "%s: Unlinked app '%s' to keep it from being unregistered\n",
1122 new_owc_id, app);
1123 ast_free(app);
1124 } else {
1125 i++;
1126 }
1127 }
1128 }
1129
1131
1132 /*
1133 * Register the new apps. This will also replace any
1134 * existing apps that are in the new config sending
1135 * ApplicationRegistered or ApplicationReplaced events
1136 * as necessary.
1137 */
1138 if (session_register_apps(session, new_owc->apps,
1139 new_owc->subscribe_all) < 0) {
1140 ast_log(LOG_WARNING, "%s: Failed to register apps '%s'\n",
1141 new_owc_id, new_owc->apps);
1142 /* Roll back. */
1144 /* Re-register the original apps. */
1145 if (session_register_apps(session, session->owc->apps,
1146 session->owc->subscribe_all) < 0) {
1147 ast_log(LOG_WARNING, "%s: Failed to re-register apps '%s'\n",
1148 new_owc_id, session->owc->apps);
1149 }
1150 return SESSION_APPLY_FAILED;
1151 }
1152 }
1153 /*
1154 * We need to update the session with the new config
1155 * but it has to be done after re-registering apps and
1156 * before we reconnect.
1157 */
1158 ao2_replace(session->owc, new_owc);
1159 session->type = new_owc->websocket_client->connection_type;
1160 session->subscribe_all = new_owc->subscribe_all;
1161
1162 apply_result = SESSION_APPLY_OK;
1163
1164 if (what_changed & ARI_OWC_NEEDS_RECONNECT) {
1165 ast_debug(2, "%s: Reconnect required\n", new_owc_id);
1166 apply_result = SESSION_APPLY_RECONNECT_REQUIRED;
1167 if (session->ast_ws_session) {
1168 ast_debug(2, "%s: Closing websocket\n", new_owc_id);
1169 ast_websocket_close(session->ast_ws_session, 1000);
1170 }
1171 }
1172
1173 return apply_result;
1174}
1175
1176/*
1177 * This is the fail-safe timeout for the per-call websocket
1178 * connection. To prevent a cleanup race condition, we wait
1179 * 3 times the timeout the thread will use to connect to the
1180 * websocket server. This way we're sure the thread will be
1181 * done before we do final cleanup. This timeout is only used
1182 * if the thread is cancelled somehow and can't indicate
1183 * whether it actually connected or not.
1184 */
1185#define PER_CALL_FAIL_SAFE_TIMEOUT(owc) \
1186 (int64_t)((owc->websocket_client->connect_timeout + owc->websocket_client->reconnect_interval) \
1187 * (owc->websocket_client->reconnect_attempts + 3))
1188
1189/*!
1190 * \brief This function gets called by app_stasis when a call arrives
1191 * but a Stasis application isn't already registered. We check to see
1192 * if a per-call config exists for the application and if so, we create a
1193 * per-call websocket connection and return a unique app id which app_stasis
1194 * can use to call stasis_app_exec() with.
1195 */
1197 struct ast_channel *chan)
1198{
1201 RAII_VAR(char *, session_id, NULL, ast_free);
1202 RAII_VAR(char *, app_id, NULL, ast_free);
1203 enum ari_conf_owc_fields invalid_fields;
1204 const char *owc_id = NULL;
1205 char *app_id_rtn = NULL;
1206 struct timeval tv_start;
1207 int res = 0;
1208
1210 if (!owc) {
1211 ast_log(LOG_WARNING, "%s: Failed to find outbound websocket per-call config for app '%s'\n",
1212 ast_channel_name(chan), app_name);
1213 return NULL;
1214 }
1215 owc_id = ast_sorcery_object_get_id(owc);
1216 invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1217
1218 if (invalid_fields) {
1219 ast_log(LOG_WARNING, "%s: Unable to create per-call websocket. Outbound websocket config is invalid\n",
1220 owc_id);
1221 return NULL;
1222 }
1223
1224 res = ast_asprintf(&session_id, "%s:%s", owc_id, ast_channel_name(chan));
1225 if (res < 0) {
1226 return NULL;
1227 }
1228 res = ast_asprintf(&app_id, "%s:%s", app_name, ast_channel_name(chan));
1229 if (res < 0) {
1230 ast_free(app_id);
1231 return NULL;
1232 }
1233
1234 session = session_create(NULL, app_id, owc->subscribe_all,
1235 session_id, owc, AST_WS_TYPE_CLIENT_PER_CALL);
1236 if (!session) {
1237 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", session_id);
1238 return NULL;
1239 }
1240
1241 session->channel_id = ast_strdup(ast_channel_uniqueid(chan));
1242 session->channel_name = ast_strdup(ast_channel_name(chan));
1243
1244 /*
1245 * We have to bump the session reference count here because
1246 * we need to check that the session is connected before we return.
1247 * If it didn't connect, then the thread will have cleaned up the
1248 * session while we're in the loop checking for the connection
1249 * which will result in a SEGV or FRACK.
1250 * RAII will clean up this bump.
1251 */
1253 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1254 (int)ao2_ref(session, 0));
1255
1259 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1260 return NULL;
1261 }
1262
1263 /*
1264 * We need to make sure the session connected and is processing
1265 * requests before we return but we don't want to block forever
1266 * in case the thread never starts or gets cancelled so we have
1267 * a fail-safe timeout.
1268 */
1269 tv_start = ast_tvnow();
1270 while (session->thread > 0 && !session->connected) {
1271 struct timeval tv_now = ast_tvnow();
1272 if (ast_tvdiff_ms(tv_now, tv_start) > PER_CALL_FAIL_SAFE_TIMEOUT(owc)) {
1273 break;
1274 }
1275 /* Sleep for 500ms before checking again. */
1276 usleep(500 * 1000);
1277 }
1278
1279 if (session->thread <= 0 || !session->connected) {
1280 ast_log(LOG_WARNING, "%s: Failed to create per call websocket thread\n",
1281 session_id);
1282 return NULL;
1283 }
1284
1285 ast_debug(3, "%s: Created per call websocket for app '%s'\n",
1286 session_id, app_id);
1287
1288 /*
1289 * We now need to prevent RAII from freeing the app_id.
1290 */
1291 app_id_rtn = app_id;
1292 app_id = NULL;
1293 return app_id_rtn;
1294}
1295
1296#define STASIS_END_MAX_WAIT_MS 5000
1297#define STASIS_END_POST_WAIT_US (3000 * 1000)
1298
1299/*
1300 * This thread is used to close the websocket after the StasisEnd
1301 * event has been sent and control has been returned to the dialplan.
1302 * We wait a few seconds to allow additional events to be sent
1303 * like ChannelVarset and ChannelDestroyed.
1304 */
1305static void *outbound_session_pc_close_thread(void *data)
1306{
1307 /*
1308 * We're using RAII because we want to show a debug message
1309 * after we run ast_websocket_close().
1310 */
1311 RAII_VAR(struct ari_ws_session *, session, data, session_unref);
1312
1313 /*
1314 * We're going to wait 3 seconds to allow stasis to send additional
1315 * events like ChannelVarset and ChannelDestroyed after the StasisEnd.
1316 */
1317 ast_debug(3, "%s: Waiting for %dms before closing websocket RC: %d\n",
1318 session->session_id, (int)(STASIS_END_POST_WAIT_US / 1000),
1319 (int)ao2_ref(session, 0));
1321 session->closing = 1;
1322 if (session->ast_ws_session) {
1323 ast_websocket_close(session->ast_ws_session, 1000);
1324 }
1325 ast_debug(3, "%s: Websocket closed RC: %d\n", session->session_id,
1326 (int)ao2_ref(session, 0));
1327 return NULL;
1328}
1329
1330/*!
1331 * \brief This function is called by the app_stasis dialplan app
1332 * to close a per-call websocket after stasis_app_exec() returns.
1333 */
1335{
1336 struct ari_ws_session *session = NULL;
1337 pthread_t thread;
1338 struct timeval tv_start;
1339
1341 if (!session) {
1342 ast_debug(3, "%s: Per call websocket not found\n", app_name);
1344 return;
1345 }
1347
1348 /*
1349 * When stasis_app_exec() returns, the StasisEnd event for the
1350 * channel has been queued but since actually sending it is done
1351 * in a separate thread, it probably won't have been sent yet.
1352 * We need to wait for it to go out on the wire before we close the
1353 * websocket. ari_websocket_send_event will set a flag on the session
1354 * when a StasisEnd event is sent for the channel that originally
1355 * triggered the connection. We'll wait for that but we don't want
1356 * to wait forever so there's a fail-safe timeout in case a thread
1357 * got cancelled or we missed the StasisEnd event somehow.
1358 */
1359 ast_debug(3, "%s: Waiting for StasisEnd event to be sent RC: %d\n",
1360 session->session_id, (int)ao2_ref(session, 0));
1361
1362 tv_start = ast_tvnow();
1363 while (session->thread > 0 && !session->stasis_end_sent) {
1364 struct timeval tv_now = ast_tvnow();
1365 int64_t diff = ast_tvdiff_ms(tv_now, tv_start);
1366 ast_debug(3, "%s: Waiting for StasisEnd event %lu %d %ld\n",
1367 session->session_id, (unsigned long)session->thread,
1368 session->stasis_end_sent, diff);
1369 if (diff > STASIS_END_MAX_WAIT_MS) {
1370 break;
1371 }
1372 /* Sleep for 500ms before checking again. */
1373 usleep(500 * 1000);
1374 }
1375 ast_debug(3, "%s: StasisEnd event sent. Scheduling websocket close. RC: %d\n",
1376 session->session_id, (int)ao2_ref(session, 0));
1377
1378 /*
1379 * We can continue to send events like ChannelVarset and ChannelDestroyed
1380 * to the websocket after the StasisEnd event but those events won't be
1381 * generated until after the Stasis() dialplan app returns. We don't want
1382 * to hold up the dialplan while we wait so we'll create a thread that waits
1383 * a few seconds more before closing the websocket.
1384 *
1385 * We transferring ownership of the session to the thread.
1386 */
1389 ast_log(LOG_WARNING, "%s: Failed to create websocket close thread\n",
1390 session->session_id);
1392 }
1393 ast_debug(3, "%s: Scheduled websocket close RC: %d\n",
1394 session->session_id, (int)ao2_ref(session, 0));
1395
1396 return;
1397}
1398
1403
1404static int outbound_session_create(void *obj, void *args, int flags)
1405{
1406 struct ari_conf_outbound_websocket *owc = obj;
1407 const char *owc_id = ast_sorcery_object_get_id(owc);
1408 struct ari_ws_session *session = NULL;
1409 enum session_apply_result apply_result;
1410 enum ari_conf_owc_fields invalid_fields = ari_conf_owc_get_invalid_fields(owc_id);
1411
1413 if (session) {
1414 ast_debug(2, "%s: Found existing connection\n", owc_id);
1415 if (invalid_fields) {
1418 "%s: Unable to update websocket session. Outbound websocket config is invalid\n",
1419 owc_id);
1420 return 0;
1421 }
1422
1424 apply_result = outbound_session_apply_config(session, owc);
1427 if (apply_result == SESSION_APPLY_FAILED) {
1429 "%s: Failed to apply new configuration. Existing connection preserved.\n",
1430 owc_id);
1431 }
1432 return 0;
1433 }
1434
1435 if (invalid_fields) {
1437 "%s: Unable to create websocket session. Outbound websocket config is invalid\n",
1438 owc_id);
1439 return 0;
1440 }
1441
1444 if (!session) {
1445 ast_log(LOG_WARNING, "%s: Failed to create websocket session\n", owc_id);
1446 return 0;
1447 }
1448
1450 /* There's no thread to transfer the reference to */
1452 return 0;
1453 }
1454
1455 ast_debug(2, "%s: Starting thread RC: %d\n", session->session_id,
1456 (int)ao2_ref(session, 0));
1457 /* We're transferring the session reference to the thread. */
1461 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", session->session_id);
1462 return 0;
1463 }
1464 ast_debug(2, "%s: launched thread\n", session->session_id);
1465
1466 return 0;
1467}
1468
1469static void outbound_sessions_load(const char *name)
1470{
1472 struct ao2_iterator i;
1473 struct ari_ws_session *session;
1474
1475 ast_debug(2, "Reloading ARI websockets\n");
1476
1478
1480 while ((session = ao2_iterator_next(&i))) {
1481 int cleanup = 1;
1482 if (session->owc
1483 && (session->type &
1485 struct ari_conf_outbound_websocket *ows =
1486 ari_conf_get_owc(session->session_id);
1487 if (!ows) {
1488 ast_debug(3, "Cleaning up outbound websocket %s\n",
1489 session->session_id);
1490 session->closing = 1;
1492 if (session->ast_ws_session) {
1493 ast_debug(3, "%s: Closing websocket\n", session->session_id);
1494 ast_websocket_close(session->ast_ws_session, 1000);
1495 } else if (session->thread) {
1496 ast_debug(3, "%s: Cancelling handler thread\n", session->session_id);
1497 pthread_cancel(session->thread);
1498 }
1499
1501 /*
1502 * If persistent, session_cleanup will cleanup
1503 * this reference so we don't want to double clean it up.
1504 * session_cleanup doesn't cleanup the reference
1505 * for per-call configs so we need to do that ourselves.
1506 */
1507 cleanup = 0;
1508 }
1509 }
1510 ao2_cleanup(ows);
1511 }
1512 /* We don't want to double cleanup if its been closed. */
1513 if (cleanup) {
1515 }
1516 }
1518
1519 return;
1520}
1521
1523{
1524 if (owc) {
1525 return outbound_session_create(owc, NULL, 0);
1526 }
1527 return -1;
1528}
1529
1531{
1532 if (session) {
1534 }
1535}
1536
1544
1554
1558
1560{
1561 ari_sorcery_observer_remove("outbound_websocket", &observer_callbacks);
1565 return 0;
1566}
1567
1570
1572{
1573 int res = 0;
1574 struct ast_websocket_protocol *protocol;
1575
1576 ast_debug(2, "Initializing ARI websockets. Enabled: %s\n", is_enabled ? "yes" : "no");
1577
1580 ari_ws_session_sort_fn, ari_ws_session_cmp_fn);
1581 if (!session_registry) {
1583 "Failed to allocate the local registry for websocket applications\n");
1585 }
1586
1587 res = ari_sorcery_observer_add("outbound_websocket", &observer_callbacks);
1588 if (res < 0) {
1589 ast_log(LOG_WARNING, "Failed to register ARI websocket observer\n");
1592 }
1593
1594 /*
1595 * The global "enabled" flag only controls whether the REST and
1596 * inbound websockets are enabled. The outbound websocket
1597 * configs are always enabled.
1598 if (!is_enabled) {
1599 return AST_MODULE_LOAD_SUCCESS;
1600 }
1601 */
1602
1604 if (!ast_ws_server) {
1607 }
1608
1609 protocol = ast_websocket_sub_protocol_alloc("ari");
1610 if (!protocol) {
1613 }
1617
1619}
1620
static const char app[]
const char * str
Definition app_jack.c:150
pthread_t thread
Definition app_sla.c:335
ast_mutex_t lock
Definition app_sla.c:337
Asterisk RESTful API hooks.
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition res_ari.c:913
int ast_ari_validate_message(struct ast_json *json)
Validator for Message.
Generated file - Build validators for ARI model objects.
int(* ari_validator)(struct ast_json *json)
Function type for validator functions. Allows for.
int ari_websocket_process_request(struct ari_ws_session *ari_ws_session, const char *remote_addr, struct ast_variable *upgrade_headers, const char *app_name, struct ast_json *request_msg)
static struct ast_sorcery_observer observer_callbacks
static struct ari_ws_session * session_find_by_app(const char *app_name, unsigned int ws_type)
int ari_outbound_websocket_start(struct ari_conf_outbound_websocket *owc)
static int null_validator(struct ast_json *json)
Validator that always succeeds.
static void session_reset(struct ari_ws_session *session)
static void session_send_or_queue(struct ari_ws_session *session, struct ast_json *message, const char *msg_type, const char *app_name, int debug_app)
static struct ari_ws_session * session_create(struct ast_tcptls_session_instance *ser, const char *apps, int subscribe_all, const char *session_id, struct ari_conf_outbound_websocket *ows, enum ast_websocket_type ws_type)
static void session_dtor(void *obj)
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
#define STASIS_END_POST_WAIT_US
static void * outbound_session_handler_thread(void *obj)
struct ari_ws_session * ari_websocket_get_session(const char *session_id)
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
static void outbound_sessions_load(const char *name)
static int outbound_session_create(void *obj, void *args, int flags)
void ari_websocket_shutdown_all(void)
session_apply_result
@ SESSION_APPLY_FAILED
@ SESSION_APPLY_RECONNECT_REQUIRED
@ SESSION_APPLY_OK
@ SESSION_APPLY_NO_CHANGE
#define MESSAGES_INIT_SIZE
static struct ao2_container * session_registry
Local registry for created ari_ws_session objects.
void ari_websocket_send_event(struct ari_ws_session *session, const char *app_name, struct ast_json *message, int debug_app)
Callback handler for Stasis application messages.
static void session_unregister_apps(struct ari_ws_session *session)
int ari_websocket_load_module(int is_enabled)
static void session_unref(struct ari_ws_session *session)
#define STASIS_END_MAX_WAIT_MS
static void * outbound_session_pc_close_thread(void *data)
void ast_ari_close_per_call_websocket(char *app_name)
This function is called by the app_stasis dialplan app to close a per-call websocket after stasis_app...
static void session_cleanup(void *obj)
#define APPS_INIT_SIZE
char * ast_ari_create_per_call_websocket(const char *app_name, struct ast_channel *chan)
This function gets called by app_stasis when a call arrives but a Stasis application isn't already re...
#define PER_CALL_FAIL_SAFE_TIMEOUT(owc)
static void session_unregister_app_cb(char *app_name, struct ari_ws_session *session)
static void session_send_app_event(struct ari_ws_session *session, const char *event_type, const char *app_name)
static int session_write(struct ari_ws_session *session, struct ast_json *message)
ari_validator ari_validate_message_fn
static void websocket_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
void ari_websocket_shutdown(struct ari_ws_session *session)
#define ARI_CONTEXT_REGISTRAR
static int session_shutdown_cb(void *obj, void *arg, int flags)
struct ast_websocket_server * ast_ws_server
static int session_register_apps(struct ari_ws_session *session, const char *_apps, int subscribe_all)
static enum session_apply_result outbound_session_apply_config(struct ari_ws_session *session, struct ari_conf_outbound_websocket *new_owc)
int ari_websocket_unload_module(void)
#define handle_create_error(ser, code, msg, reason)
void ari_handle_websocket(struct ast_tcptls_session_instance *ser, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
Wrapper for invoking the websocket code for an incoming connection.
#define VALIDATION_FAILED
struct ao2_container * ari_websocket_get_sessions(void)
static void session_registry_dtor(void)
static struct ast_json * session_read(struct ari_ws_session *session)
static int session_update(struct ari_ws_session *ari_ws_session, struct ast_websocket *ast_ws_session, int send_registered_events)
Internal API's for websockets.
#define STASIS_CONTEXT_PREFIX
#define ARI_MAX_APP_NAME_LEN
const char * ari_websocket_type_to_str(enum ast_websocket_type type)
Asterisk main include file. File version handling, generic pbx functions.
static struct ast_mansession session
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
void ast_free_ptr(void *ptr)
free() wrapper
Definition astmm.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_log
Definition astobj2.c:42
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition astobj2.h:501
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition astobj2.h:1211
static PGresult * result
Definition cel_pgsql.c:84
const char * ast_channel_name(const struct ast_channel *chan)
const char * ast_channel_uniqueid(const struct ast_channel *chan)
#define AST_MAX_CONTEXT
Definition channel.h:135
static const char name[]
Definition format_mp3.c:68
static const char context_name[]
#define TRACE_ATLEAST(level)
#define SCOPE_ENTER(level,...)
#define SCOPE_EXIT(...)
#define SCOPE_EXIT_LOG_RTN(__log_level,...)
#define ast_trace(level,...)
ast_http_method
HTTP Request methods known by Asterisk.
Definition http.h:58
struct ast_variable * ast_http_create_basic_auth_header(const char *userid, const char *password)
Create an HTTP authorization header.
Definition http.c:1726
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
void AST_OPTIONAL_API_NAME() ast_websocket_ref(struct ast_websocket *session)
Increase the reference count for a WebSocket session.
const char *AST_OPTIONAL_API_NAME() ast_websocket_session_id(struct ast_websocket *session)
Get the session ID for a WebSocket session.
@ AST_WEBSOCKET_STATUS_GOING_AWAY
int AST_OPTIONAL_API_NAME() ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int AST_OPTIONAL_API_NAME() ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
struct ast_sockaddr *AST_OPTIONAL_API_NAME() ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
int AST_OPTIONAL_API_NAME() ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
const char * ast_websocket_type_to_str(enum ast_websocket_type type)
int AST_OPTIONAL_API_NAME() ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
int AST_OPTIONAL_API_NAME() ast_websocket_set_nonblock(struct ast_websocket *session)
Set the socket of a WebSocket session to be non-blocking.
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_INBOUND
@ AST_WS_TYPE_CLIENT_PER_CALL_CONFIG
@ AST_WS_TYPE_CLIENT_PERSISTENT
@ AST_WS_TYPE_CLIENT_PER_CALL
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME() ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int AST_OPTIONAL_API_NAME() ast_websocket_set_timeout(struct ast_websocket *session, int timeout)
Set the timeout on a non-blocking WebSocket session.
int AST_OPTIONAL_API_NAME() ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
struct ast_websocket_server *AST_OPTIONAL_API_NAME() ast_websocket_server_create(void)
Creates a ast_websocket_server.
void AST_OPTIONAL_API_NAME() ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
Application convenience functions, designed to give consistent look and feel to Asterisk apps.
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition extconf.c:1260
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
#define LOG_WARNING
#define ast_verbose(...)
Internal API's for res_ari.
ari_conf_owc_fields
Definition internal.h:103
@ ARI_OWC_NEEDS_REREGISTER
Definition internal.h:113
@ ARI_OWC_FIELD_NONE
Definition internal.h:104
@ ARI_OWC_FIELD_SUBSCRIBE_ALL
Definition internal.h:109
@ ARI_OWC_NEEDS_RECONNECT
Definition internal.h:110
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
void ast_json_free(void *p)
Asterisk's custom JSON allocator. Exposed for use by unit tests.
Definition json.c:52
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
@ AST_JSON_PRETTY
Definition json.h:795
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition json.c:414
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition json.c:585
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition json.c:407
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition json.c:484
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
int errno
static int subscribe_all(void)
Definition manager.c:9641
Asterisk module definitions.
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition netsock2.h:256
Core PBX routines and definitions.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition pbx.c:5206
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition pbx.c:6491
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition extconf.c:4170
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition pbx.c:4542
const char * app_name(struct ast_app *app)
Definition pbx_app.c:475
struct ari_conf_outbound_websocket * ari_conf_get_owc_for_app(const char *app_name, unsigned int ws_type)
Get the outbound websocket configuration for a Stasis app.
int ari_sorcery_observer_remove(const char *object_type, const struct ast_sorcery_observer *callbacks)
enum ari_conf_owc_fields ari_conf_owc_get_invalid_fields(const char *id)
struct ao2_container * ari_conf_get_owcs(void)
enum ari_conf_owc_fields ari_conf_owc_detect_changes(struct ari_conf_outbound_websocket *old_owc, struct ari_conf_outbound_websocket *new_owc)
Detect changes between two outbound websocket configurations.
int ari_sorcery_observer_add(const char *object_type, const struct ast_sorcery_observer *callbacks)
struct ari_conf_general * ari_conf_get_general(void)
struct ari_conf_outbound_websocket * ari_conf_get_owc(const char *id)
static int is_enabled(void)
Helper function to check if module is enabled.
Definition res_ari.c:96
const char * method
Definition res_pjsip.c:1273
static struct timeval msg_timestamp(void *msg, enum smdi_message_type type)
Definition res_smdi.c:366
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition res_stasis.c:327
static struct @522 args
#define NULL
Definition resample.c:96
Generated file - declares stubs to be implemented in res/ari/resource_events.c.
const char * ast_sorcery_object_get_id(const void *object)
Get the unique identifier of a sorcery object.
Definition sorcery.c:2381
Stasis Application API. See Stasis Application API for detailed documentation.
void stasis_app_control_mark_failed(struct stasis_app_control *control)
Set the failed flag on a control structure.
Definition control.c:377
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
int stasis_app_is_registered(const char *name)
Check if a Stasis application is registered.
int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
Exit res_stasis and continue execution in the dialplan.
Definition control.c:415
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application and unsubscribe from all event sources.
struct stasis_app_control * stasis_app_control_find_by_channel_id(const char *channel_id)
Returns the handler for the channel with the given id.
Definition res_stasis.c:349
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
int attribute_pure ast_true(const char *val)
Make sure something is true. Determine if a string containing a boolean value is "true"....
Definition utils.c:2233
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
@ AST_STRSEP_STRIP
Definition strings.h:255
int ast_in_delimited_string(const char *needle, const char *haystack, char delim)
Check if there is an exact match for 'needle' between delimiters in 'haystack'.
Definition strings.c:466
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition strings.h:97
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition utils.c:1869
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
Registered applications container.
Definition pbx_app.c:69
Global configuration options for ARI.
Definition internal.h:58
const ast_string_field apps
Definition internal.h:123
struct ast_websocket_client * websocket_client
Definition internal.h:126
struct ari_ws_session::@453 message_queue
struct ast_vector_string websocket_apps
struct ari_conf_outbound_websocket * owc
struct ast_websocket * ast_ws_session
Main Channel structure associated with a channel.
Definition of a URI handler.
Definition http.h:102
const char * uri
Definition http.h:105
void * data
Definition http.h:116
Abstract JSON element (object, array, string, int, ...).
Interface for a sorcery object type observer.
Definition sorcery.h:332
void(* loaded)(const char *object_type)
Callback for when an object type is loaded/reloaded.
Definition sorcery.h:343
describes a server instance
Definition tcptls.h:151
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
enum ast_websocket_type connection_type
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
ast_websocket_pre_callback session_attempted
Callback called when a new session is attempted. Optional.
Structure for a WebSocket server.
Structure definition for session.
Time-related functions and macros.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
int error(const char *format,...)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define ast_assert(a)
Definition utils.h:779
int ast_wait_for_input(int fd, int ms)
Definition utils.c:1732
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition utils.c:2873
#define ast_pthread_create_detached_background(a, b, c, d)
Definition utils.h:637
struct ast_eid ast_eid_default
Global EID.
Definition options.c:94
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REMOVE_ORDERED(vec, idx)
Remove an element from a vector by index while maintaining order.
Definition vector.h:459
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition vector.h:636
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition vector.h:582
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_REMOVE_CMP_ORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison while maintaining order.
Definition vector.h:551
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_GET_CMP(vec, value, cmp)
Get an element from a vector that matches the given comparison.
Definition vector.h:742
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition vector.h:382
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.