Asterisk - The Open Source Telephony Project GIT-master-27fb039
Loading...
Searching...
No Matches
res/stasis/app.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis application support.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26#include "asterisk.h"
27
28#include "app.h"
29#include "control.h"
30#include "messaging.h"
31
32#include "asterisk/callerid.h"
33#include "asterisk/cli.h"
34#include "asterisk/stasis_app.h"
39
40#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44/*! Global debug flag. No need for locking */
46
47static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49struct stasis_app {
50 /*! Aggregation topic for this application. */
52 /*! Router for handling messages forwarded to \a topic. */
54 /*! Router for handling messages to the bridge all \a topic. */
56 /*! Optional router for handling endpoint messages in 'all' subscriptions */
58 /*! Container of the channel forwards to this app's topic. */
60 /*! Callback function for this application. */
62 /*! Opaque data to hand to callback function. */
63 void *data;
64 /*! Subscription model for the application */
66 /*! Whether or not someone wants to see debug messages about this app */
67 int debug;
68 /*! An array of allowed events types for this application */
70 /*! An array of disallowed events types for this application */
72 /*! Name of the Stasis application */
73 char name[];
74};
75
81
82/*! Subscription info for a particular channel/bridge. */
84 /*! Count of number of times this channel/bridge has been subscribed */
86
87 /*! Forward for the regular topic */
89 /*! Forward for the caching topic */
91
92 /* Type of object being forwarded */
94 /*! Unique id of the object being forwarded */
95 char id[];
96};
97
98static void forwards_dtor(void *obj)
99{
100#ifdef AST_DEVMODE
101 struct app_forwards *forwards = obj;
102#endif /* AST_DEVMODE */
103
104 ast_assert(forwards->topic_forward == NULL);
106}
107
108static void forwards_unsubscribe(struct app_forwards *forwards)
109{
111 forwards->topic_forward = NULL;
113 forwards->topic_cached_forward = NULL;
114}
115
117 const char *id)
118{
119 struct app_forwards *forwards;
120
121 if (!app || ast_strlen_zero(id)) {
122 return NULL;
123 }
124
125 forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
126 if (!forwards) {
127 return NULL;
128 }
129
130 strcpy(forwards->id, id); /* SAFE */
131
132 return forwards;
133}
134
135/*! Forward a channel's topics to an app */
137 struct ast_channel *chan)
138{
139 struct app_forwards *forwards;
140
141 if (!app) {
142 return NULL;
143 }
144
145 forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
146 if (!forwards) {
147 return NULL;
148 }
149
150 forwards->forward_type = FORWARD_CHANNEL;
153 app->topic);
154
155 if (!forwards->topic_forward) {
156 ao2_ref(forwards, -1);
157 return NULL;
158 }
159
160 return forwards;
161}
162
163/*! Forward a bridge's topics to an app */
165 struct ast_bridge *bridge)
166{
167 struct app_forwards *forwards;
168
169 if (!app) {
170 return NULL;
171 }
172
173 forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
174 if (!forwards) {
175 return NULL;
176 }
177
178 forwards->forward_type = FORWARD_BRIDGE;
179 forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
180
181 if (!forwards->topic_forward && bridge) {
182 forwards_unsubscribe(forwards);
183 ao2_ref(forwards, -1);
184 return NULL;
185 }
186
187 return forwards;
188}
189
190static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
191 struct stasis_message *message)
192{
193 struct stasis_app *app = data;
194
195 stasis_publish(app->topic, message);
196}
197
198/*! Forward a endpoint's topics to an app */
200 struct ast_endpoint *endpoint)
201{
202 struct app_forwards *forwards;
203 int ret = 0;
204
205 if (!app) {
206 return NULL;
207 }
208
209 forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
210 if (!forwards) {
211 return NULL;
212 }
213
214 forwards->forward_type = FORWARD_ENDPOINT;
215 if (endpoint) {
217 app->topic);
219 ast_endpoint_topic_cached(endpoint), app->topic);
220
221 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
222 /* Half-subscribed is a bad thing */
223 forwards_unsubscribe(forwards);
224 ao2_ref(forwards, -1);
225 return NULL;
226 }
227 } else {
228 /* Since endpoint subscriptions also subscribe to channels, in the case
229 * of all endpoint subscriptions, we only want messages for the endpoints.
230 * As such, we route those particular messages and then re-publish them
231 * on the app's topic.
232 */
233 ast_assert(app->endpoint_router == NULL);
235 if (!app->endpoint_router) {
236 forwards_unsubscribe(forwards);
237 ao2_ref(forwards, -1);
238 return NULL;
239 }
240
241 ret |= stasis_message_router_add(app->endpoint_router,
243 ret |= stasis_message_router_add(app->endpoint_router,
245
246 if (ret) {
247 ao2_ref(app->endpoint_router, -1);
248 app->endpoint_router = NULL;
249 ao2_ref(forwards, -1);
250 return NULL;
251 }
252 }
253
254 return forwards;
255}
256
257static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
258{
259 const struct app_forwards *object_left = obj_left;
260 const struct app_forwards *object_right = obj_right;
261 const char *right_key = obj_right;
262 int cmp;
263
264 switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
265 case OBJ_POINTER:
266 right_key = object_right->id;
267 /* Fall through */
268 case OBJ_KEY:
269 cmp = strcmp(object_left->id, right_key);
270 break;
271 case OBJ_PARTIAL_KEY:
272 /*
273 * We could also use a partial key struct containing a length
274 * so strlen() does not get called for every comparison instead.
275 */
276 cmp = strncmp(object_left->id, right_key, strlen(right_key));
277 break;
278 default:
279 /* Sort can only work on something with a full or partial key. */
280 ast_assert(0);
281 cmp = 0;
282 break;
283 }
284 return cmp;
285}
286
287static void app_dtor(void *obj)
288{
289 struct stasis_app *app = obj;
290 size_t size = strlen("stasis-") + strlen(app->name) + 1;
291 char context_name[size];
292
293 ast_verb(1, "Destroying Stasis app %s\n", app->name);
294
295 ast_assert(app->router == NULL);
296 ast_assert(app->bridge_router == NULL);
297 ast_assert(app->endpoint_router == NULL);
298
299 /* If we created a context for this application, remove it */
300 strcpy(context_name, "stasis-");
301 strcat(context_name, app->name);
303
304 ao2_cleanup(app->topic);
305 app->topic = NULL;
306 ao2_cleanup(app->forwards);
307 app->forwards = NULL;
308 ao2_cleanup(app->data);
309 app->data = NULL;
310
311 ast_json_unref(app->events_allowed);
312 app->events_allowed = NULL;
313 ast_json_unref(app->events_disallowed);
314 app->events_disallowed = NULL;
315
316}
317
319{
321 struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
322 struct ast_channel *chan;
323
324 if (!snapshot) {
325 return;
326 }
327
329 if (!chan) {
330 return;
331 }
332
334 ast_channel_unref(chan);
335}
336
338 struct stasis_message *message)
339{
340 struct stasis_app *app = data;
341
344 }
345}
346
348 struct stasis_message *message)
349{
350 struct stasis_app *app = data;
351 struct ast_json *json;
352
353 /* The dial type can be converted to JSON so it will always be passed
354 * here.
355 */
358 }
359
360 /* By default, send any message that has a JSON representation */
362 if (!json) {
363 return;
364 }
365
366 app_send(app, json);
367 ast_json_unref(json);
368}
369
370/*! \brief Typedef for callbacks that get called on channel snapshot updates */
371typedef struct ast_json *(*channel_snapshot_monitor)(
372 struct ast_channel_snapshot *old_snapshot,
373 struct ast_channel_snapshot *new_snapshot,
374 const struct timeval *tv);
375
377 const char *type,
378 struct ast_channel_snapshot *snapshot,
379 const struct timeval *tv)
380{
381 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
382
383 if (!json_channel) {
384 return NULL;
385 }
386
387 return ast_json_pack("{s: s, s: o, s: o}",
388 "type", type,
389 "timestamp", ast_json_timeval(*tv, NULL),
390 "channel", json_channel);
391}
392
394 struct ast_channel_snapshot *snapshot,
395 const struct timeval *tv)
396{
397 return simple_channel_event("ChannelCreated", snapshot, tv);
398}
399
401 struct ast_channel_snapshot *snapshot,
402 const struct timeval *tv)
403{
404 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
405 struct ast_json *blob;
406
407 if (!json_channel) {
408 return NULL;
409 }
410
411 blob = ast_json_pack("{s: s, s: o, s: i, s: s}",
412 "type", "ChannelDestroyed",
413 "timestamp", ast_json_timeval(*tv, NULL),
414 "cause", snapshot->hangup->cause,
415 "cause_txt", ast_cause2str(snapshot->hangup->cause));
416
417 if (!blob) {
418 return NULL;
419 }
420
421 if (snapshot->hangup->tech_cause) {
422 ast_json_object_set(blob, "tech_cause",
424 }
425
426 ast_json_object_set(blob, "channel", json_channel);
427
428 return blob;
429}
430
432 struct ast_channel_snapshot *snapshot,
433 const struct timeval *tv)
434{
435 return simple_channel_event("ChannelStateChange", snapshot, tv);
436}
437
438/*! \brief Handle channel state changes */
439static struct ast_json *channel_state(
440 struct ast_channel_snapshot *old_snapshot,
441 struct ast_channel_snapshot *new_snapshot,
442 const struct timeval *tv)
443{
444 struct ast_channel_snapshot *snapshot = new_snapshot ?
445 new_snapshot : old_snapshot;
446
447 if (!old_snapshot) {
448 return channel_created_event(snapshot, tv);
449 } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
450 return channel_destroyed_event(snapshot, tv);
451 } else if (old_snapshot->state != new_snapshot->state) {
452 return channel_state_change_event(snapshot, tv);
453 }
454
455 return NULL;
456}
457
459 struct ast_channel_snapshot *old_snapshot,
460 struct ast_channel_snapshot *new_snapshot,
461 const struct timeval *tv)
462{
463 struct ast_json *json_channel;
464
465 /* No Newexten event on first channel snapshot */
466 if (!old_snapshot) {
467 return NULL;
468 }
469
470 /* Empty application is not valid for a Newexten event */
471 if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
472 return NULL;
473 }
474
475 if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
476 return NULL;
477 }
478
479 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
480 if (!json_channel) {
481 return NULL;
482 }
483
484 return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
485 "type", "ChannelDialplan",
486 "timestamp", ast_json_timeval(*tv, NULL),
487 "dialplan_app", new_snapshot->dialplan->appl,
488 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
489 "channel", json_channel);
490}
491
493 struct ast_channel_snapshot *old_snapshot,
494 struct ast_channel_snapshot *new_snapshot,
495 const struct timeval *tv)
496{
497 struct ast_json *json_channel;
498
499 /* No NewCallerid event on first channel snapshot */
500 if (!old_snapshot) {
501 return NULL;
502 }
503
504 if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
505 return NULL;
506 }
507
508 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
509 if (!json_channel) {
510 return NULL;
511 }
512
513 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
514 "type", "ChannelCallerId",
515 "timestamp", ast_json_timeval(*tv, NULL),
516 "caller_presentation", new_snapshot->caller->pres,
517 "caller_presentation_txt", ast_describe_caller_presentation(
518 new_snapshot->caller->pres),
519 "channel", json_channel);
520}
521
523 struct ast_channel_snapshot *old_snapshot,
524 struct ast_channel_snapshot *new_snapshot,
525 const struct timeval *tv)
526{
527 struct ast_json *json_channel;
528
529 /* No ChannelConnectedLine event on first channel snapshot */
530 if (!old_snapshot) {
531 return NULL;
532 }
533
534 if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
535 return NULL;
536 }
537
538 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
539 if (!json_channel) {
540 return NULL;
541 }
542
543 return ast_json_pack("{s: s, s: o, s: o}",
544 "type", "ChannelConnectedLine",
545 "timestamp", ast_json_timeval(*tv, NULL),
546 "channel", json_channel);
547}
548
555
556static void sub_channel_update_handler(void *data,
557 struct stasis_subscription *sub,
558 struct stasis_message *message)
559{
560 struct stasis_app *app = data;
562 int i;
563
564 for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
565 struct ast_json *msg;
566
567 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
569 if (msg) {
570 app_send(app, msg);
571 ast_json_unref(msg);
572 }
573 }
574
575 if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
576 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
577 }
578}
579
581 const char *type,
582 struct ast_endpoint_snapshot *snapshot,
583 const struct timeval *tv)
584{
585 struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
586
587 if (!json_endpoint) {
588 return NULL;
589 }
590
591 return ast_json_pack("{s: s, s: o, s: o}",
592 "type", type,
593 "timestamp", ast_json_timeval(*tv, NULL),
594 "endpoint", json_endpoint);
595}
596
597static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
598{
599 struct ast_endpoint_snapshot *snapshot;
600 struct ast_json *json_endpoint;
601 struct ast_json *message;
602 struct stasis_app *app = pvt;
603 char *tech;
604 char *resource;
605
606 tech = ast_strdupa(endpoint_id);
607 resource = strchr(tech, '/');
608 if (resource) {
609 resource[0] = '\0';
610 resource++;
611 }
612
613 if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
614 return -1;
615 }
616
617 snapshot = ast_endpoint_latest_snapshot(tech, resource);
618 if (!snapshot) {
619 return -1;
620 }
621
622 json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
623 ao2_ref(snapshot, -1);
624 if (!json_endpoint) {
625 return -1;
626 }
627
628 message = ast_json_pack("{s: s, s: o, s: o, s: o}",
629 "type", "TextMessageReceived",
630 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
631 "endpoint", json_endpoint,
632 "message", ast_json_ref(json_msg));
633 if (message) {
636 }
637
638 return 0;
639}
640
642 struct stasis_subscription *sub,
643 struct stasis_message *message)
644{
645 struct stasis_app *app = data;
647 struct ast_endpoint_snapshot *new_snapshot;
648 struct ast_endpoint_snapshot *old_snapshot;
649 const struct timeval *tv;
650
652
654
656
657 new_snapshot = stasis_message_data(update->new_snapshot);
658 old_snapshot = stasis_message_data(update->old_snapshot);
659
660 if (new_snapshot) {
661 struct ast_json *json;
662
663 tv = stasis_message_timestamp(update->new_snapshot);
664
665 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
666 if (!json) {
667 return;
668 }
669
670 app_send(app, json);
671 ast_json_unref(json);
672 }
673
674 if (!new_snapshot && old_snapshot) {
675 unsubscribe(app, "endpoint", old_snapshot->id, 1);
676 }
677}
678
680 const char *type,
681 struct ast_bridge_snapshot *snapshot,
682 const struct timeval *tv)
683{
684 struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
685 if (!json_bridge) {
686 return NULL;
687 }
688
689 return ast_json_pack("{s: s, s: o, s: o}",
690 "type", type,
691 "timestamp", ast_json_timeval(*tv, NULL),
692 "bridge", json_bridge);
693}
694
695static void sub_bridge_update_handler(void *data,
696 struct stasis_subscription *sub,
697 struct stasis_message *message)
698{
699 struct ast_json *json = NULL;
700 struct stasis_app *app = data;
702 const struct timeval *tv;
703
705
707
708 if (!update->new_snapshot) {
709 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
710 } else if (!update->old_snapshot) {
711 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
712 } else if (update->new_snapshot && update->old_snapshot
713 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
714 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
715 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
716 ast_json_object_set(json, "old_video_source_id",
717 ast_json_string_create(update->old_snapshot->video_source_id));
718 }
719 }
720
721 if (json) {
722 app_send(app, json);
723 ast_json_unref(json);
724 }
725
726 if (!update->new_snapshot && update->old_snapshot) {
727 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
728 }
729}
730
731
732/*! \brief Helper function for determining if the application is subscribed to a given entity */
733static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
734{
735 struct app_forwards *forwards = NULL;
736
737 forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
738 if (!forwards) {
739 return 0;
740 }
741
742 ao2_ref(forwards, -1);
743 return 1;
744}
745
746static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
747 struct stasis_message *message)
748{
749 struct stasis_app *app = data;
750 struct ast_bridge_merge_message *merge;
751
753
754 /* Find out if we're subscribed to either bridge */
755 if (bridge_app_subscribed(app, merge->from->uniqueid) ||
757 /* Forward the message to the app */
758 stasis_publish(app->topic, message);
759 }
760}
761
762/*! \brief Callback function for checking if channels in a bridge are subscribed to */
764{
765 int subscribed = 0;
766 struct ao2_iterator iter;
767 char *uniqueid;
768
769 if (bridge_app_subscribed(app, snapshot->uniqueid)) {
770 return 1;
771 }
772
773 iter = ao2_iterator_init(snapshot->channels, 0);
774 for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
775 if (bridge_app_subscribed(app, uniqueid)) {
776 subscribed = 1;
777 ao2_ref(uniqueid, -1);
778 break;
779 }
780 }
782
783 return subscribed;
784}
785
787 struct stasis_message *message)
788{
789 struct stasis_app *app = data;
791 struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
792
793 if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
794 (bridge && bridge_app_subscribed_involved(app, bridge))) {
795 stasis_publish(app->topic, message);
796 }
797}
798
800 struct stasis_message *message)
801{
802 struct stasis_app *app = data;
804 int subscribed = 0;
805
807 if (!subscribed) {
809 }
810 if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
812 }
813 if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
815 }
816
817 if (!subscribed) {
818 switch (transfer_msg->dest_type) {
821 break;
824 if (!subscribed) {
826 }
827 break;
828 break;
831 if (!subscribed) {
833 }
834 break;
835 default:
836 break;
837 }
838 }
839
840 if (subscribed) {
841 stasis_publish(app->topic, message);
842 }
843}
844
846 struct stasis_message *message)
847{
848 struct stasis_app *app = data;
849
852 }
853}
854
856{
857 if (!app) {
858 return;
859 }
860
861 app->debug = debug;
862}
863
865{
867
868 if (!app) {
869 return;
870 }
871
872 app->debug = debug;
874}
875
877{
878 return (app ? app->debug : 0) || global_debug;
879}
880
882{
883 int debug_enabled = 0;
884
885 if (global_debug) {
886 debug_enabled = 1;
887 } else {
889
890 if (app) {
891 if (app->debug) {
892 debug_enabled = 1;
893 }
894 ao2_ref(app, -1);
895 }
896 }
897 return debug_enabled;
898}
899
901{
903 if (!global_debug) {
904 struct ao2_container *app_names = stasis_app_get_all();
905 struct ao2_iterator it_app_names;
906 char *app_name;
907 struct stasis_app *app;
908
909 if (!app_names || !ao2_container_count(app_names)) {
910 ao2_cleanup(app_names);
911 return;
912 }
913
914 it_app_names = ao2_iterator_init(app_names, 0);
915 while ((app_name = ao2_iterator_next(&it_app_names))) {
918 }
919
922 }
923 ao2_iterator_cleanup(&it_app_names);
924 ao2_cleanup(app_names);
925 }
926}
927
929{
931 size_t size;
932 int res = 0;
933 size_t context_size = strlen("stasis-") + strlen(name) + 1;
934 char context_name[context_size];
935 char *topic_name;
936 int ret;
937
938 ast_assert(name != NULL);
940
941 ast_verb(1, "Creating Stasis app '%s'\n", name);
942
943 size = sizeof(*app) + strlen(name) + 1;
945 if (!app) {
946 return NULL;
947 }
948 app->subscription_model = subscription_model;
949
953 if (!app->forwards) {
954 return NULL;
955 }
956
957 ret = ast_asprintf(&topic_name, "ari:application/%s", name);
958 if (ret < 0) {
959 return NULL;
960 }
961
962 app->topic = stasis_topic_create(topic_name);
963 ast_free(topic_name);
964 if (!app->topic) {
965 return NULL;
966 }
967
969 if (!app->bridge_router) {
970 return NULL;
971 }
972
973 res |= stasis_message_router_add(app->bridge_router,
975
976 res |= stasis_message_router_add(app->bridge_router,
978
979 res |= stasis_message_router_add(app->bridge_router,
981
982 res |= stasis_message_router_add(app->bridge_router,
984
985 if (res != 0) {
986 return NULL;
987 }
988 /* Bridge router holds a reference */
989 ao2_ref(app, +1);
990
991 app->router = stasis_message_router_create(app->topic);
992 if (!app->router) {
993 return NULL;
994 }
995
996 res |= stasis_message_router_add(app->router,
998
999 res |= stasis_message_router_add(app->router,
1001
1004
1005 res |= stasis_message_router_add(app->router,
1007
1010
1011 if (res != 0) {
1012 return NULL;
1013 }
1014 /* Router holds a reference */
1015 ao2_ref(app, +1);
1016
1017 strncpy(app->name, name, size - sizeof(*app));
1018 app->handler = handler;
1019 app->data = ao2_bump(data);
1020
1021 /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1022 * this should only be done if a context does not already exist. */
1023 strcpy(context_name, "stasis-");
1024 strcat(context_name, name);
1026 if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1027 ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1028 } else {
1029 ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1030 ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1031 }
1032 } else {
1033 ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1035 }
1036
1037 ao2_ref(app, +1);
1038 return app;
1039}
1040
1042{
1043 return app->topic;
1044}
1045
1047{
1049 char eid[20];
1050 void *data;
1051
1053 ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1054 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1056 }
1057
1058 /* Copy off mutable state with lock held */
1059 ao2_lock(app);
1060 handler = app->handler;
1061 data = ao2_bump(app->data);
1062 ao2_unlock(app);
1063 /* Name is immutable; no need to copy */
1064
1065 if (handler) {
1066 handler(data, app->name, message);
1067 } else {
1068 ast_verb(3,
1069 "Inactive Stasis app '%s' missed message\n", app->name);
1070 }
1071 ao2_cleanup(data);
1072}
1073
1075{
1076 ao2_lock(app);
1077
1078 ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1079 app->handler = NULL;
1080 ao2_cleanup(app->data);
1081 app->data = NULL;
1082
1083 ao2_unlock(app);
1084}
1085
1087{
1088 ao2_lock(app);
1089
1091
1093 app->router = NULL;
1094 stasis_message_router_unsubscribe(app->bridge_router);
1095 app->bridge_router = NULL;
1096 stasis_message_router_unsubscribe(app->endpoint_router);
1097 app->endpoint_router = NULL;
1098
1099 ao2_unlock(app);
1100}
1101
1103{
1104 int ret;
1105
1106 ao2_lock(app);
1107 ret = app->handler != NULL;
1108 ao2_unlock(app);
1109
1110 return ret;
1111}
1112
1114{
1115 int ret;
1116
1117 ao2_lock(app);
1118 ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1119 ao2_unlock(app);
1120
1121 return ret;
1122}
1123
1125{
1126 ao2_lock(app);
1127 if (app->handler && app->data) {
1128 struct ast_json *msg;
1129
1130 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1131
1132 msg = ast_json_pack("{s: s, s: o?, s: s}",
1133 "type", "ApplicationReplaced",
1134 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1135 "application", app->name);
1136 if (msg) {
1137 /*
1138 * The app must be unlocked before calling 'send' since a handler may
1139 * subsequently attempt to grab the app lock after first obtaining a
1140 * lock for another object, thus causing a deadlock.
1141 */
1142 ao2_unlock(app);
1143 app_send(app, msg);
1144 ao2_lock(app);
1145 ast_json_unref(msg);
1146 if (!app->handler) {
1147 /*
1148 * If the handler disappeared then the app was deactivated. In that
1149 * case don't replace. Re-activation will reset the handler later.
1150 */
1151 ao2_unlock(app);
1152 return;
1153 }
1154 }
1155 } else {
1156 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1157 }
1158
1159 app->handler = handler;
1160 ao2_replace(app->data, data);
1161 ao2_unlock(app);
1162}
1163
1164const char *stasis_app_name(const struct stasis_app *app)
1165{
1166 return app->name;
1167}
1168
1169static int forwards_filter_by_type(void *obj, void *arg, int flags)
1170{
1171 struct app_forwards *forward = obj;
1172 enum forward_type *forward_type = arg;
1173
1174 if (forward->forward_type == *forward_type) {
1175 return CMP_MATCH;
1176 }
1177
1178 return 0;
1179}
1180
1181void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1182{
1183 struct ao2_iterator *channels;
1184 struct ao2_iterator *endpoints;
1185 struct ao2_iterator *bridges;
1186 struct app_forwards *forward;
1188
1189 ast_cli(a->fd, "Name: %s\n"
1190 " Debug: %s\n"
1191 " Subscription Model: %s\n",
1192 app->name,
1193 app->debug ? "Yes" : "No",
1194 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1195 "Global Resource Subscription" :
1196 "Application/Explicit Resource Subscription");
1197 ast_cli(a->fd, " Subscriptions: %d\n", ao2_container_count(app->forwards));
1198
1199 ast_cli(a->fd, " Channels:\n");
1203 if (channels) {
1204 while ((forward = ao2_iterator_next(channels))) {
1205 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1206 ao2_ref(forward, -1);
1207 }
1209 }
1210
1211 ast_cli(a->fd, " Bridges:\n");
1213 bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1215 if (bridges) {
1216 while ((forward = ao2_iterator_next(bridges))) {
1217 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1218 ao2_ref(forward, -1);
1219 }
1221 }
1222
1223 ast_cli(a->fd, " Endpoints:\n");
1227 if (endpoints) {
1228 while ((forward = ao2_iterator_next(endpoints))) {
1229 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1230 ao2_ref(forward, -1);
1231 }
1233 }
1234}
1235
1236struct ast_json *app_to_json(const struct stasis_app *app)
1237{
1238 struct ast_json *json;
1239 struct ast_json *channels;
1240 struct ast_json *bridges;
1241 struct ast_json *endpoints;
1242 struct ao2_iterator i;
1243 struct app_forwards *forwards;
1244
1245 json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1246 "name", app->name,
1247 "channel_ids", "bridge_ids", "endpoint_ids");
1248 if (!json) {
1249 return NULL;
1250 }
1251 channels = ast_json_object_get(json, "channel_ids");
1252 bridges = ast_json_object_get(json, "bridge_ids");
1253 endpoints = ast_json_object_get(json, "endpoint_ids");
1254
1255 i = ao2_iterator_init(app->forwards, 0);
1256 while ((forwards = ao2_iterator_next(&i))) {
1257 struct ast_json *array = NULL;
1258 int append_res;
1259
1260 switch (forwards->forward_type) {
1261 case FORWARD_CHANNEL:
1262 array = channels;
1263 break;
1264 case FORWARD_BRIDGE:
1265 array = bridges;
1266 break;
1267 case FORWARD_ENDPOINT:
1268 array = endpoints;
1269 break;
1270 }
1271
1272 /* If forward_type value is unexpected this will safely return an error. */
1273 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1274 ao2_ref(forwards, -1);
1275
1276 if (append_res != 0) {
1277 ast_log(LOG_ERROR, "Error building response\n");
1279 ast_json_unref(json);
1280
1281 return NULL;
1282 }
1283 }
1285
1286 return json;
1287}
1288
1290{
1291 struct app_forwards *forwards;
1292
1293 if (!app) {
1294 return -1;
1295 }
1296
1297 ao2_lock(app->forwards);
1298 /* If subscribed to all, don't subscribe again */
1299 forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1300 if (forwards) {
1301 ao2_unlock(app->forwards);
1302 ao2_ref(forwards, -1);
1303
1304 return 0;
1305 }
1306
1307 forwards = ao2_find(app->forwards,
1308 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1310 if (!forwards) {
1311 int res;
1312
1313 /* Forwards not found, create one */
1314 forwards = forwards_create_channel(app, chan);
1315 if (!forwards) {
1316 ao2_unlock(app->forwards);
1317
1318 return -1;
1319 }
1320
1321 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1322 if (!res) {
1323 ao2_unlock(app->forwards);
1324 ao2_ref(forwards, -1);
1325
1326 return -1;
1327 }
1328 }
1329
1330 ++forwards->interested;
1331 ast_debug(3, "Channel '%s' is %d interested in %s\n",
1332 chan ? ast_channel_uniqueid(chan) : "ALL",
1333 forwards->interested,
1334 app->name);
1335
1336 ao2_unlock(app->forwards);
1337 ao2_ref(forwards, -1);
1338
1339 return 0;
1340}
1341
1342static int subscribe_channel(struct stasis_app *app, void *obj)
1343{
1344 return app_subscribe_channel(app, obj);
1345}
1346
1347static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1348{
1349 struct app_forwards *forwards;
1350
1351 if (!id) {
1352 if (!strcmp(kind, "bridge")) {
1353 id = BRIDGE_ALL;
1354 } else if (!strcmp(kind, "channel")) {
1355 id = CHANNEL_ALL;
1356 } else if (!strcmp(kind, "endpoint")) {
1357 id = ENDPOINT_ALL;
1358 } else {
1359 ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1360 return -1;
1361 }
1362 }
1363
1364 ao2_lock(app->forwards);
1365 forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1366 if (!forwards) {
1367 ao2_unlock(app->forwards);
1368 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1369 return -1;
1370 }
1371 forwards->interested--;
1372
1373 ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1374 if (forwards->interested == 0 || terminate) {
1375 /* No one is interested any more; unsubscribe */
1376 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1377 forwards_unsubscribe(forwards);
1378 ao2_find(app->forwards, forwards,
1380 OBJ_NODATA);
1381
1382 if (!strcmp(kind, "endpoint")) {
1384 }
1385 }
1386 ao2_unlock(app->forwards);
1387 ao2_ref(forwards, -1);
1388
1389 return 0;
1390}
1391
1393{
1394 if (!app) {
1395 return -1;
1396 }
1397
1399}
1400
1401int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1402{
1403 if (!app) {
1404 return -1;
1405 }
1406
1407 return unsubscribe(app, "channel", channel_id, 0);
1408}
1409
1410int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1411{
1412 struct app_forwards *forwards;
1413
1414 if (ast_strlen_zero(channel_id)) {
1415 channel_id = CHANNEL_ALL;
1416 }
1417 forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1418 ao2_cleanup(forwards);
1419
1420 return forwards != NULL;
1421}
1422
1423static void *channel_find(const struct stasis_app *app, const char *id)
1424{
1425 return ast_channel_get_by_name(id);
1426}
1427
1429 .scheme = "channel:",
1430 .find = channel_find,
1431 .subscribe = subscribe_channel,
1432 .unsubscribe = app_unsubscribe_channel_id,
1433 .is_subscribed = app_is_subscribed_channel_id
1434};
1435
1436int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1437{
1438 struct app_forwards *forwards;
1439
1440 if (!app) {
1441 return -1;
1442 }
1443
1444 ao2_lock(app->forwards);
1445 /* If subscribed to all, don't subscribe again */
1446 forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1447 if (forwards) {
1448 ao2_unlock(app->forwards);
1449 ao2_ref(forwards, -1);
1450
1451 return 0;
1452 }
1453
1454 forwards = ao2_find(app->forwards,
1455 bridge ? bridge->uniqueid : BRIDGE_ALL,
1457 if (!forwards) {
1458 int res;
1459
1460 /* Forwards not found, create one */
1461 forwards = forwards_create_bridge(app, bridge);
1462 if (!forwards) {
1463 ao2_unlock(app->forwards);
1464
1465 return -1;
1466 }
1467
1468 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1469 if (!res) {
1470 ao2_unlock(app->forwards);
1471 ao2_ref(forwards, -1);
1472
1473 return -1;
1474 }
1475 }
1476
1477 ++forwards->interested;
1478 ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1479 bridge ? bridge->uniqueid : "ALL",
1480 forwards->interested,
1481 app->name);
1482
1483 ao2_unlock(app->forwards);
1484 ao2_ref(forwards, -1);
1485
1486 return 0;
1487}
1488
1489static int subscribe_bridge(struct stasis_app *app, void *obj)
1490{
1491 return app_subscribe_bridge(app, obj);
1492}
1493
1495{
1496 if (!app) {
1497 return -1;
1498 }
1499
1500 return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1501}
1502
1503int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1504{
1505 if (!app) {
1506 return -1;
1507 }
1508
1509 return unsubscribe(app, "bridge", bridge_id, 0);
1510}
1511
1512int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1513{
1514 struct app_forwards *forwards;
1515
1516 if (ast_strlen_zero(bridge_id)) {
1517 bridge_id = BRIDGE_ALL;
1518 }
1519
1520 forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1521 ao2_cleanup(forwards);
1522
1523 return forwards != NULL;
1524}
1525
1526static void *bridge_find(const struct stasis_app *app, const char *id)
1527{
1529}
1530
1532 .scheme = "bridge:",
1533 .find = bridge_find,
1534 .subscribe = subscribe_bridge,
1535 .unsubscribe = app_unsubscribe_bridge_id,
1536 .is_subscribed = app_is_subscribed_bridge_id
1537};
1538
1540{
1541 struct app_forwards *forwards;
1542
1543 if (!app) {
1544 return -1;
1545 }
1546
1547 ao2_lock(app->forwards);
1548 /* If subscribed to all, don't subscribe again */
1549 forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1550 if (forwards) {
1551 ao2_unlock(app->forwards);
1552 ao2_ref(forwards, -1);
1553
1554 return 0;
1555 }
1556
1557 forwards = ao2_find(app->forwards,
1558 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1560 if (!forwards) {
1561 int res;
1562
1563 /* Forwards not found, create one */
1564 forwards = forwards_create_endpoint(app, endpoint);
1565 if (!forwards) {
1566 ao2_unlock(app->forwards);
1567
1568 return -1;
1569 }
1570
1571 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1572 if (!res) {
1573 ao2_unlock(app->forwards);
1574 ao2_ref(forwards, -1);
1575
1576 return -1;
1577 }
1578
1579 /* Subscribe for messages */
1581 }
1582
1583 ++forwards->interested;
1584 ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1585 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1586 forwards->interested,
1587 app->name);
1588
1589 ao2_unlock(app->forwards);
1590 ao2_ref(forwards, -1);
1591
1592 return 0;
1593}
1594
1595static int subscribe_endpoint(struct stasis_app *app, void *obj)
1596{
1597 return app_subscribe_endpoint(app, obj);
1598}
1599
1600int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1601{
1602 if (!app) {
1603 return -1;
1604 }
1605
1606 return unsubscribe(app, "endpoint", endpoint_id, 0);
1607}
1608
1609int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1610{
1611 struct app_forwards *forwards;
1612
1613 if (ast_strlen_zero(endpoint_id)) {
1614 endpoint_id = ENDPOINT_ALL;
1615 }
1616 forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1617 ao2_cleanup(forwards);
1618
1619 return forwards != NULL;
1620}
1621
1622static void *endpoint_find(const struct stasis_app *app, const char *id)
1623{
1624 return ast_endpoint_find_by_id(id);
1625}
1626
1628 .scheme = "endpoint:",
1629 .find = endpoint_find,
1630 .subscribe = subscribe_endpoint,
1631 .unsubscribe = app_unsubscribe_endpoint_id,
1632 .is_subscribed = app_is_subscribed_endpoint_id
1633};
1634
1641
1648
1650{
1651 if (!app || !json) {
1652 return json;
1653 }
1654
1655 ast_json_object_set(json, "events_allowed", app->events_allowed ?
1656 ast_json_ref(app->events_allowed) : ast_json_array_create());
1657 ast_json_object_set(json, "events_disallowed", app->events_disallowed ?
1658 ast_json_ref(app->events_disallowed) : ast_json_array_create());
1659
1660 return json;
1661}
1662
1664 struct ast_json *filter, const char *filter_type)
1665{
1668 /* If no filters are specified then reset this filter type */
1669 filter = NULL;
1670 } else {
1671 /* Otherwise try to get the filter array for this type */
1672 filter = ast_json_object_get(filter, filter_type);
1673 if (!filter) {
1674 /* A filter type exists, but not this one, so don't update */
1675 return 0;
1676 }
1677 }
1678 }
1679
1680 /* At this point the filter object should be an array */
1682 ast_log(LOG_ERROR, "Invalid json type event filter - app: %s, filter: %s\n",
1683 app->name, filter_type);
1684 return -1;
1685 }
1686
1687 if (filter) {
1688 /* Confirm that at least the type names are specified */
1689 struct ast_json *obj;
1690 int i;
1691
1692 for (i = 0; i < ast_json_array_size(filter) &&
1693 (obj = ast_json_array_get(filter, i)); ++i) {
1694
1695 if (ast_strlen_zero(ast_json_object_string_get(obj, "type"))) {
1696 ast_log(LOG_ERROR, "Filter event must have a type - app: %s, "
1697 "filter: %s\n", app->name, filter_type);
1698 return -1;
1699 }
1700 }
1701 }
1702
1703 ao2_lock(app);
1706 ao2_unlock(app);
1707
1708 return 0;
1709}
1710
1712{
1713 return app_event_filter_set(app, &app->events_allowed, filter, "allowed");
1714}
1715
1717{
1718 return app_event_filter_set(app, &app->events_disallowed, filter, "disallowed");
1719}
1720
1725
1726static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
1727{
1728 struct ast_json *obj;
1729 int i;
1730
1731 if (!array || !ast_json_array_size(array)) {
1732 return empty;
1733 }
1734
1735 for (i = 0; i < ast_json_array_size(array) &&
1736 (obj = ast_json_array_get(array, i)); ++i) {
1737
1740 return 1;
1741 }
1742 }
1743
1744 return 0;
1745}
1746
1748{
1750 int res;
1751
1752 if (!app) {
1753 return 0;
1754 }
1755
1756 ao2_lock(app);
1757 res = !app_event_filter_matched(app->events_disallowed, event, 0) &&
1758 app_event_filter_matched(app->events_allowed, event, 1);
1759 ao2_unlock(app);
1760 ao2_ref(app, -1);
1761
1762 return res;
1763}
static const char app[]
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
void ast_free_ptr(void *ptr)
free() wrapper
Definition astmm.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_log
Definition astobj2.c:42
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
@ CMP_MATCH
Definition astobj2.h:1027
#define OBJ_KEY
Definition astobj2.h:1151
#define OBJ_POINTER
Definition astobj2.h:1150
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
void ao2_iterator_cleanup(struct ao2_iterator *iter)
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition astobj2.h:501
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define OBJ_PARTIAL_KEY
Definition astobj2.h:1152
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition astobj2.h:407
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
@ AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT
Reject duplicate objects in container.
Definition astobj2.h:1201
static struct ao2_container * bridges
Definition bridge.c:132
CallerID (and other GR30) management and generation Includes code and algorithms from the Zapata libr...
const char * ast_describe_caller_presentation(int data)
Convert caller ID pres value to explanatory string.
Definition callerid.c:1364
static const char type[]
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
const char * ast_channel_uniqueid(const struct ast_channel *chan)
struct ast_channel * ast_channel_get_by_name(const char *search)
Find a channel by name or uniqueid.
Definition channel.c:1416
#define ast_channel_unref(c)
Decrease channel reference count.
Definition channel.h:3018
const char * ast_cause2str(int cause) attribute_pure
Gives the string form of a given cause code.
Definition channel.c:612
@ AST_FLAG_DEAD
Definition channel.h:1065
Standard Command Line Interface.
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
static struct channel_usage channels
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition codec_g726.c:367
Internal API for the Stasis application controller.
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
static const char name[]
Definition format_mp3.c:68
static const char context_name[]
static int array(struct ast_channel *chan, const char *cmd, char *var, const char *value)
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
static int subscribed
Definition manager.c:167
struct stasis_topic * ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_topic * ast_channel_topic_all(void)
A topic which publishes the events for all channels.
struct ast_channel_snapshot * ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
Retrieve a channel snapshot associated with a specific role from a ast_multi_channel_blob.
struct stasis_message_type * ast_endpoint_contact_state_type(void)
Message type for endpoint contact state changes.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct stasis_message_type * ast_channel_dial_type(void)
Message type for when a channel dials another channel.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct ast_endpoint_snapshot * ast_endpoint_latest_snapshot(const char *tech, const char *resource)
Retrieve the most recent snapshot for the endpoint with the given name.
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
struct stasis_message_type * ast_endpoint_state_type(void)
Message type for endpoint state changes.
#define AST_LOG_WARNING
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define ast_verb(level,...)
#define LOG_WARNING
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition json.h:600
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition json.c:278
enum ast_json_type ast_json_typeof(const struct ast_json *value)
Get the type of value.
Definition json.c:78
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition json.c:378
size_t ast_json_object_size(struct ast_json *object)
Get size of JSON object.
Definition json.c:403
struct ast_json * ast_json_array_get(const struct ast_json *array, size_t index)
Get an element from an array.
Definition json.c:370
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
struct ast_json * ast_json_integer_create(intmax_t value)
Create a JSON integer.
Definition json.c:327
struct ast_json * ast_json_array_create(void)
Create a empty JSON array.
Definition json.c:362
@ AST_JSON_ARRAY
Definition json.h:165
@ AST_JSON_OBJECT
Definition json.h:164
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition json.c:283
#define AST_JSON_UTF8_VALIDATE(str)
Check str for UTF-8 and replace with an empty string if fails the check.
Definition json.h:224
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition json.c:407
size_t ast_json_array_size(const struct ast_json *array)
Get the size of a JSON array.
Definition json.c:366
static struct ao2_container * endpoints
int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
Subscribe an application to an endpoint for messages.
Definition messaging.c:493
void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
Subscribe for messages from a particular endpoint.
Definition messaging.c:423
Stasis out-of-call text message support.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition pbx.c:6949
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition pbx.c:8230
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition extconf.c:4170
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition pbx.c:6170
const char * app_name(struct ast_app *app)
Definition pbx_app.c:463
void stasis_app_set_debug(struct stasis_app *app, int debug)
Enable/disable request/response and event logging on an application.
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
Subscribes an application to a endpoint.
static int forwards_filter_by_type(void *obj, void *arg, int flags)
static struct ast_json * channel_destroyed_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static struct ast_json * channel_connected_line(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Test if an app is subscribed to a endpoint.
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
Cancel the subscription an app has for a bridge.
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Add a bridge subscription to an existing channel subscription.
static struct app_forwards * forwards_create_channel(struct stasis_app *app, struct ast_channel *chan)
void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
Update the handler and data for a res_stasis application.
struct stasis_topic * ast_app_get_topic(struct stasis_app *app)
Returns the stasis topic for an app.
static int app_event_filter_set(struct stasis_app *app, struct ast_json **member, struct ast_json *filter, const char *filter_type)
static struct ast_json * channel_callerid(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static struct ast_json * channel_state(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Handle channel state changes.
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
#define CHANNEL_ALL
static void sub_endpoint_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void * channel_find(const struct stasis_app *app, const char *id)
static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
Helper function for determining if the application is subscribed to a given entity.
struct ast_json *(* channel_snapshot_monitor)(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Typedef for callbacks that get called on channel snapshot updates.
forward_type
@ FORWARD_CHANNEL
@ FORWARD_ENDPOINT
@ FORWARD_BRIDGE
static int subscribe_channel(struct stasis_app *app, void *obj)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
Test if an app is subscribed to a channel.
void stasis_app_unregister_event_sources(void)
Unregister core event sources.
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Cancel the bridge subscription for an application.
void app_shutdown(struct stasis_app *app)
Tears down an application.
static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
Callback function for checking if channels in a bridge are subscribed to.
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Cancel the subscription an app has for a channel.
static void app_dtor(void *obj)
void stasis_app_set_debug_by_name(const char *app_name, int debug)
Enable/disable request/response and event logging on an application.
static struct ast_json * channel_created_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
int global_debug
static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Subscribes an application to a channel.
static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
int app_is_finished(struct stasis_app *app)
Checks whether a deactivated app has no channels.
struct ast_json * stasis_app_event_filter_to_json(struct stasis_app *app, struct ast_json *json)
Convert and add the app's event type filter(s) to the given json object.
void stasis_app_register_event_sources(void)
Register core event sources.
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void sub_default_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct ast_json * simple_endpoint_event(const char *type, struct ast_endpoint_snapshot *snapshot, const struct timeval *tv)
struct stasis_app_event_source bridge_event_source
int stasis_app_event_filter_set(struct stasis_app *app, struct ast_json *filter)
Set the application's event type filter.
struct stasis_app * app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
Create a res_stasis application.
static struct ast_json * channel_state_change_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Cancel the subscription an app has for a endpoint.
void stasis_app_set_global_debug(int debug)
Enable/disable request/response and event logging on all applications.
int stasis_app_get_debug(struct stasis_app *app)
Get debug status of an application.
static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
Dump properties of a stasis_app to the CLI.
static struct app_forwards * forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge)
static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void endpoint_state_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void * endpoint_find(const struct stasis_app *app, const char *id)
static void forwards_dtor(void *obj)
#define BRIDGE_ALL
static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void bridge_merge_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct stasis_app_event_source channel_event_source
static void sub_bridge_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
int app_is_active(struct stasis_app *app)
Checks whether an app is active.
static struct ast_json * simple_bridge_event(const char *type, struct ast_bridge_snapshot *snapshot, const struct timeval *tv)
static void forwards_unsubscribe(struct app_forwards *forwards)
static void sub_channel_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct ast_json * app_to_json(const struct stasis_app *app)
Create a JSON representation of a stasis_app.
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
Cancel the subscription an app has for a channel.
static struct ast_json * channel_dialplan(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
Test if an app is subscribed to a bridge.
static struct ast_json * simple_channel_event(const char *type, struct ast_channel_snapshot *snapshot, const struct timeval *tv)
const char * stasis_app_name(const struct stasis_app *app)
Retrieve an application's name.
void app_deactivate(struct stasis_app *app)
Deactivates an application.
static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
static channel_snapshot_monitor channel_monitors[]
static struct app_forwards * forwards_create(struct stasis_app *app, const char *id)
#define ENDPOINT_ALL
static int app_events_allowed_set(struct stasis_app *app, struct ast_json *filter)
static void * bridge_find(const struct stasis_app *app, const char *id)
struct stasis_app_event_source endpoint_event_source
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
static int app_events_disallowed_set(struct stasis_app *app, struct ast_json *filter)
static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
void app_send(struct stasis_app *app, struct ast_json *message)
Send a message to an application.
static struct app_forwards * forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
static int subscribe_endpoint(struct stasis_app *app, void *obj)
Internal API for the Stasis application controller.
stasis_app_subscription_model
@ STASIS_APP_SUBSCRIBE_ALL
An application is automatically subscribed to all resources in Asterisk, even if it does not control ...
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
static int debug
Global debug status.
Definition res_xmpp.c:570
#define NULL
Definition resample.c:96
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition stasis.c:1615
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition stasis.h:310
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition stasis.c:1241
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1645
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition stasis.c:1578
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
Stasis Application API. See Stasis Application API for detailed documentation.
struct stasis_message_sanitizer * stasis_app_get_sanitizer(void)
Get the Stasis message sanitizer for app_stasis applications.
struct ast_bridge * stasis_app_bridge_find_by_id(const char *bridge_id)
Returns the bridge with the given id.
Definition res_stasis.c:800
void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
Unregister an application event source.
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition stasis_app.h:67
struct stasis_app * stasis_app_get_by_name(const char *name)
Retrieve a handle to a Stasis application by its name.
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
Register an application event source.
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct stasis_message_type * ast_bridge_snapshot_type(void)
Message type for ast_bridge_snapshot.
struct stasis_message_type * ast_blind_transfer_type(void)
Message type for ast_blind_transfer_message.
struct stasis_topic * ast_bridge_topic(struct ast_bridge *bridge)
A topic which publishes the events for a particular bridge.
struct stasis_message_type * ast_attended_transfer_type(void)
Message type for ast_attended_transfer_message.
struct stasis_message_type * ast_bridge_merge_message_type(void)
Message type for ast_bridge_merge_message.
struct stasis_topic * ast_bridge_topic_all(void)
A topic which publishes the events for all bridges.
@ AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE
@ AST_ATTENDED_TRANSFER_DEST_LINK
@ AST_ATTENDED_TRANSFER_DEST_THREEWAY
int ast_channel_snapshot_connected_line_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the connected line info of two snapshots.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
int ast_channel_snapshot_caller_id_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the callerid info of two snapshots.
int ast_channel_snapshot_cep_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the context, exten and priority of two snapshots.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
#define stasis_message_router_create(topic)
Create a new message router object.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
enum forward_type forward_type
struct stasis_forward * topic_forward
struct stasis_forward * topic_cached_forward
Message representing attended transfer.
enum ast_attended_transfer_dest_type dest_type
struct ast_channel_snapshot * links[2]
struct ast_bridge_channel_snapshot_pair to_transfer_target
struct ast_bridge_channel_snapshot_pair threeway
union ast_attended_transfer_message::@305 dest
struct ast_bridge_channel_snapshot_pair to_transferee
char bridge[AST_UUID_STR_LEN]
Message published during a blind transfer.
struct ast_bridge_snapshot * bridge
struct ast_channel_snapshot * transferer
struct ast_bridge_snapshot * bridge_snapshot
struct ast_channel_snapshot * channel_snapshot
Message representing the merge of two bridges.
struct ast_bridge_snapshot * from
struct ast_bridge_snapshot * to
Structure that contains a snapshot of information about a bridge.
Definition bridge.h:318
const ast_string_field uniqueid
Definition bridge.h:332
struct ao2_container * channels
Definition bridge.h:335
Structure that contains information about a bridge.
Definition bridge.h:353
const ast_string_field uniqueid
Definition bridge.h:405
const ast_string_field uniqueid
const ast_string_field data
const ast_string_field appl
Structure representing a change of snapshot of channel state.
Structure representing a snapshot of channel state.
struct ast_channel_snapshot_dialplan * dialplan
struct ast_channel_snapshot_base * base
enum ast_channel_state state
struct ast_channel_snapshot_caller * caller
struct ast_flags flags
struct ast_channel_snapshot_hangup * hangup
Main Channel structure associated with a channel.
struct ast_channel_snapshot * snapshot
const char * data
A snapshot of an endpoint's state.
const ast_string_field id
Abstract JSON element (object, array, string, int, ...).
A multi channel blob data structure for multi_channel_blob stasis messages.
Event source information and callbacks.
Definition stasis_app.h:184
const char * scheme
The scheme to match against on [un]subscribes.
Definition stasis_app.h:186
struct stasis_topic * topic
struct ast_json * events_disallowed
struct stasis_message_router * router
stasis_app_cb handler
struct ao2_container * forwards
struct stasis_message_router * bridge_router
struct ast_json * events_allowed
enum stasis_app_subscription_model subscription_model
struct stasis_message_router * endpoint_router
Cache update message.
Definition stasis.h:965
Forwarding information.
Definition stasis.c:1598
const char * name
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition test_ari.c:59
static struct test_val a
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define ast_test_flag(p, flag)
Definition utils.h:64
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define ast_assert(a)
Definition utils.h:779
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition utils.c:2875
#define ARRAY_LEN(a)
Definition utils.h:706
struct ast_eid ast_eid_default
Global EID.
Definition options.c:94