Asterisk - The Open Source Telephony Project GIT-master-a358458
res/stasis/app.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis application support.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26#include "asterisk.h"
27
28#include "app.h"
29#include "control.h"
30#include "messaging.h"
31
32#include "asterisk/callerid.h"
33#include "asterisk/cli.h"
34#include "asterisk/stasis_app.h"
39
40#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43
44/*! Global debug flag. No need for locking */
46
47static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48
49struct stasis_app {
50 /*! Aggregation topic for this application. */
52 /*! Router for handling messages forwarded to \a topic. */
54 /*! Router for handling messages to the bridge all \a topic. */
56 /*! Optional router for handling endpoint messages in 'all' subscriptions */
58 /*! Container of the channel forwards to this app's topic. */
60 /*! Callback function for this application. */
62 /*! Opaque data to hand to callback function. */
63 void *data;
64 /*! Subscription model for the application */
66 /*! Whether or not someone wants to see debug messages about this app */
67 int debug;
68 /*! An array of allowed events types for this application */
70 /*! An array of disallowed events types for this application */
72 /*! Name of the Stasis application */
73 char name[];
74};
75
80};
81
82/*! Subscription info for a particular channel/bridge. */
84 /*! Count of number of times this channel/bridge has been subscribed */
86
87 /*! Forward for the regular topic */
89 /*! Forward for the caching topic */
91
92 /* Type of object being forwarded */
94 /*! Unique id of the object being forwarded */
95 char id[];
96};
97
98static void forwards_dtor(void *obj)
99{
100#ifdef AST_DEVMODE
101 struct app_forwards *forwards = obj;
102#endif /* AST_DEVMODE */
103
104 ast_assert(forwards->topic_forward == NULL);
106}
107
108static void forwards_unsubscribe(struct app_forwards *forwards)
109{
111 forwards->topic_forward = NULL;
113 forwards->topic_cached_forward = NULL;
114}
115
117 const char *id)
118{
119 struct app_forwards *forwards;
120
121 if (!app || ast_strlen_zero(id)) {
122 return NULL;
123 }
124
125 forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
126 if (!forwards) {
127 return NULL;
128 }
129
130 strcpy(forwards->id, id); /* SAFE */
131
132 return forwards;
133}
134
135/*! Forward a channel's topics to an app */
137 struct ast_channel *chan)
138{
139 struct app_forwards *forwards;
140
141 if (!app) {
142 return NULL;
143 }
144
145 forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
146 if (!forwards) {
147 return NULL;
148 }
149
150 forwards->forward_type = FORWARD_CHANNEL;
153 app->topic);
154
155 if (!forwards->topic_forward) {
156 ao2_ref(forwards, -1);
157 return NULL;
158 }
159
160 return forwards;
161}
162
163/*! Forward a bridge's topics to an app */
165 struct ast_bridge *bridge)
166{
167 struct app_forwards *forwards;
168
169 if (!app) {
170 return NULL;
171 }
172
173 forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
174 if (!forwards) {
175 return NULL;
176 }
177
178 forwards->forward_type = FORWARD_BRIDGE;
179 forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
180
181 if (!forwards->topic_forward && bridge) {
182 forwards_unsubscribe(forwards);
183 ao2_ref(forwards, -1);
184 return NULL;
185 }
186
187 return forwards;
188}
189
190static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
191 struct stasis_message *message)
192{
193 struct stasis_app *app = data;
194
195 stasis_publish(app->topic, message);
196}
197
198/*! Forward a endpoint's topics to an app */
200 struct ast_endpoint *endpoint)
201{
202 struct app_forwards *forwards;
203 int ret = 0;
204
205 if (!app) {
206 return NULL;
207 }
208
209 forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
210 if (!forwards) {
211 return NULL;
212 }
213
214 forwards->forward_type = FORWARD_ENDPOINT;
215 if (endpoint) {
217 app->topic);
219 ast_endpoint_topic_cached(endpoint), app->topic);
220
221 if (!forwards->topic_forward || !forwards->topic_cached_forward) {
222 /* Half-subscribed is a bad thing */
223 forwards_unsubscribe(forwards);
224 ao2_ref(forwards, -1);
225 return NULL;
226 }
227 } else {
228 /* Since endpoint subscriptions also subscribe to channels, in the case
229 * of all endpoint subscriptions, we only want messages for the endpoints.
230 * As such, we route those particular messages and then re-publish them
231 * on the app's topic.
232 */
233 ast_assert(app->endpoint_router == NULL);
235 if (!app->endpoint_router) {
236 forwards_unsubscribe(forwards);
237 ao2_ref(forwards, -1);
238 return NULL;
239 }
240
241 ret |= stasis_message_router_add(app->endpoint_router,
243 ret |= stasis_message_router_add(app->endpoint_router,
245
246 if (ret) {
247 ao2_ref(app->endpoint_router, -1);
248 app->endpoint_router = NULL;
249 ao2_ref(forwards, -1);
250 return NULL;
251 }
252 }
253
254 return forwards;
255}
256
257static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
258{
259 const struct app_forwards *object_left = obj_left;
260 const struct app_forwards *object_right = obj_right;
261 const char *right_key = obj_right;
262 int cmp;
263
264 switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
265 case OBJ_POINTER:
266 right_key = object_right->id;
267 /* Fall through */
268 case OBJ_KEY:
269 cmp = strcmp(object_left->id, right_key);
270 break;
271 case OBJ_PARTIAL_KEY:
272 /*
273 * We could also use a partial key struct containing a length
274 * so strlen() does not get called for every comparison instead.
275 */
276 cmp = strncmp(object_left->id, right_key, strlen(right_key));
277 break;
278 default:
279 /* Sort can only work on something with a full or partial key. */
280 ast_assert(0);
281 cmp = 0;
282 break;
283 }
284 return cmp;
285}
286
287static void app_dtor(void *obj)
288{
289 struct stasis_app *app = obj;
290 size_t size = strlen("stasis-") + strlen(app->name) + 1;
291 char context_name[size];
292
293 ast_verb(1, "Destroying Stasis app %s\n", app->name);
294
295 ast_assert(app->router == NULL);
296 ast_assert(app->bridge_router == NULL);
297 ast_assert(app->endpoint_router == NULL);
298
299 /* If we created a context for this application, remove it */
300 strcpy(context_name, "stasis-");
301 strcat(context_name, app->name);
303
304 ao2_cleanup(app->topic);
305 app->topic = NULL;
306 ao2_cleanup(app->forwards);
307 app->forwards = NULL;
308 ao2_cleanup(app->data);
309 app->data = NULL;
310
311 ast_json_unref(app->events_allowed);
312 app->events_allowed = NULL;
313 ast_json_unref(app->events_disallowed);
314 app->events_disallowed = NULL;
315
316}
317
319{
321 struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
322 struct ast_channel *chan;
323
324 if (!snapshot) {
325 return;
326 }
327
329 if (!chan) {
330 return;
331 }
332
334 ast_channel_unref(chan);
335}
336
338 struct stasis_message *message)
339{
340 struct stasis_app *app = data;
341
344 }
345}
346
348 struct stasis_message *message)
349{
350 struct stasis_app *app = data;
351 struct ast_json *json;
352
353 /* The dial type can be converted to JSON so it will always be passed
354 * here.
355 */
358 }
359
360 /* By default, send any message that has a JSON representation */
362 if (!json) {
363 return;
364 }
365
366 app_send(app, json);
367 ast_json_unref(json);
368}
369
370/*! \brief Typedef for callbacks that get called on channel snapshot updates */
371typedef struct ast_json *(*channel_snapshot_monitor)(
372 struct ast_channel_snapshot *old_snapshot,
373 struct ast_channel_snapshot *new_snapshot,
374 const struct timeval *tv);
375
377 const char *type,
378 struct ast_channel_snapshot *snapshot,
379 const struct timeval *tv)
380{
381 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
382
383 if (!json_channel) {
384 return NULL;
385 }
386
387 return ast_json_pack("{s: s, s: o, s: o}",
388 "type", type,
389 "timestamp", ast_json_timeval(*tv, NULL),
390 "channel", json_channel);
391}
392
394 struct ast_channel_snapshot *snapshot,
395 const struct timeval *tv)
396{
397 return simple_channel_event("ChannelCreated", snapshot, tv);
398}
399
401 struct ast_channel_snapshot *snapshot,
402 const struct timeval *tv)
403{
404 struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
405
406 if (!json_channel) {
407 return NULL;
408 }
409
410 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
411 "type", "ChannelDestroyed",
412 "timestamp", ast_json_timeval(*tv, NULL),
413 "cause", snapshot->hangup->cause,
414 "cause_txt", ast_cause2str(snapshot->hangup->cause),
415 "channel", json_channel);
416}
417
419 struct ast_channel_snapshot *snapshot,
420 const struct timeval *tv)
421{
422 return simple_channel_event("ChannelStateChange", snapshot, tv);
423}
424
425/*! \brief Handle channel state changes */
426static struct ast_json *channel_state(
427 struct ast_channel_snapshot *old_snapshot,
428 struct ast_channel_snapshot *new_snapshot,
429 const struct timeval *tv)
430{
431 struct ast_channel_snapshot *snapshot = new_snapshot ?
432 new_snapshot : old_snapshot;
433
434 if (!old_snapshot) {
435 return channel_created_event(snapshot, tv);
436 } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
437 return channel_destroyed_event(snapshot, tv);
438 } else if (old_snapshot->state != new_snapshot->state) {
439 return channel_state_change_event(snapshot, tv);
440 }
441
442 return NULL;
443}
444
446 struct ast_channel_snapshot *old_snapshot,
447 struct ast_channel_snapshot *new_snapshot,
448 const struct timeval *tv)
449{
450 struct ast_json *json_channel;
451
452 /* No Newexten event on first channel snapshot */
453 if (!old_snapshot) {
454 return NULL;
455 }
456
457 /* Empty application is not valid for a Newexten event */
458 if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
459 return NULL;
460 }
461
462 if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
463 return NULL;
464 }
465
466 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
467 if (!json_channel) {
468 return NULL;
469 }
470
471 return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
472 "type", "ChannelDialplan",
473 "timestamp", ast_json_timeval(*tv, NULL),
474 "dialplan_app", new_snapshot->dialplan->appl,
475 "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
476 "channel", json_channel);
477}
478
480 struct ast_channel_snapshot *old_snapshot,
481 struct ast_channel_snapshot *new_snapshot,
482 const struct timeval *tv)
483{
484 struct ast_json *json_channel;
485
486 /* No NewCallerid event on first channel snapshot */
487 if (!old_snapshot) {
488 return NULL;
489 }
490
491 if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
492 return NULL;
493 }
494
495 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
496 if (!json_channel) {
497 return NULL;
498 }
499
500 return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
501 "type", "ChannelCallerId",
502 "timestamp", ast_json_timeval(*tv, NULL),
503 "caller_presentation", new_snapshot->caller->pres,
504 "caller_presentation_txt", ast_describe_caller_presentation(
505 new_snapshot->caller->pres),
506 "channel", json_channel);
507}
508
510 struct ast_channel_snapshot *old_snapshot,
511 struct ast_channel_snapshot *new_snapshot,
512 const struct timeval *tv)
513{
514 struct ast_json *json_channel;
515
516 /* No ChannelConnectedLine event on first channel snapshot */
517 if (!old_snapshot) {
518 return NULL;
519 }
520
521 if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
522 return NULL;
523 }
524
525 json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
526 if (!json_channel) {
527 return NULL;
528 }
529
530 return ast_json_pack("{s: s, s: o, s: o}",
531 "type", "ChannelConnectedLine",
532 "timestamp", ast_json_timeval(*tv, NULL),
533 "channel", json_channel);
534}
535
541};
542
543static void sub_channel_update_handler(void *data,
544 struct stasis_subscription *sub,
545 struct stasis_message *message)
546{
547 struct stasis_app *app = data;
549 int i;
550
551 for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
552 struct ast_json *msg;
553
554 msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
556 if (msg) {
557 app_send(app, msg);
558 ast_json_unref(msg);
559 }
560 }
561
562 if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
563 unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
564 }
565}
566
568 const char *type,
569 struct ast_endpoint_snapshot *snapshot,
570 const struct timeval *tv)
571{
572 struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
573
574 if (!json_endpoint) {
575 return NULL;
576 }
577
578 return ast_json_pack("{s: s, s: o, s: o}",
579 "type", type,
580 "timestamp", ast_json_timeval(*tv, NULL),
581 "endpoint", json_endpoint);
582}
583
584static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
585{
586 struct ast_endpoint_snapshot *snapshot;
587 struct ast_json *json_endpoint;
588 struct ast_json *message;
589 struct stasis_app *app = pvt;
590 char *tech;
591 char *resource;
592
593 tech = ast_strdupa(endpoint_id);
594 resource = strchr(tech, '/');
595 if (resource) {
596 resource[0] = '\0';
597 resource++;
598 }
599
600 if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
601 return -1;
602 }
603
604 snapshot = ast_endpoint_latest_snapshot(tech, resource);
605 if (!snapshot) {
606 return -1;
607 }
608
609 json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
610 ao2_ref(snapshot, -1);
611 if (!json_endpoint) {
612 return -1;
613 }
614
615 message = ast_json_pack("{s: s, s: o, s: o, s: o}",
616 "type", "TextMessageReceived",
617 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
618 "endpoint", json_endpoint,
619 "message", ast_json_ref(json_msg));
620 if (message) {
623 }
624
625 return 0;
626}
627
629 struct stasis_subscription *sub,
630 struct stasis_message *message)
631{
632 struct stasis_app *app = data;
634 struct ast_endpoint_snapshot *new_snapshot;
635 struct ast_endpoint_snapshot *old_snapshot;
636 const struct timeval *tv;
637
639
641
643
644 new_snapshot = stasis_message_data(update->new_snapshot);
645 old_snapshot = stasis_message_data(update->old_snapshot);
646
647 if (new_snapshot) {
648 struct ast_json *json;
649
650 tv = stasis_message_timestamp(update->new_snapshot);
651
652 json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
653 if (!json) {
654 return;
655 }
656
657 app_send(app, json);
658 ast_json_unref(json);
659 }
660
661 if (!new_snapshot && old_snapshot) {
662 unsubscribe(app, "endpoint", old_snapshot->id, 1);
663 }
664}
665
667 const char *type,
668 struct ast_bridge_snapshot *snapshot,
669 const struct timeval *tv)
670{
671 struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
672 if (!json_bridge) {
673 return NULL;
674 }
675
676 return ast_json_pack("{s: s, s: o, s: o}",
677 "type", type,
678 "timestamp", ast_json_timeval(*tv, NULL),
679 "bridge", json_bridge);
680}
681
682static void sub_bridge_update_handler(void *data,
683 struct stasis_subscription *sub,
684 struct stasis_message *message)
685{
686 struct ast_json *json = NULL;
687 struct stasis_app *app = data;
689 const struct timeval *tv;
690
692
694
695 if (!update->new_snapshot) {
696 json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
697 } else if (!update->old_snapshot) {
698 json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
699 } else if (update->new_snapshot && update->old_snapshot
700 && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
701 json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
702 if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
703 ast_json_object_set(json, "old_video_source_id",
704 ast_json_string_create(update->old_snapshot->video_source_id));
705 }
706 }
707
708 if (json) {
709 app_send(app, json);
710 ast_json_unref(json);
711 }
712
713 if (!update->new_snapshot && update->old_snapshot) {
714 unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
715 }
716}
717
718
719/*! \brief Helper function for determining if the application is subscribed to a given entity */
720static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
721{
722 struct app_forwards *forwards = NULL;
723
724 forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
725 if (!forwards) {
726 return 0;
727 }
728
729 ao2_ref(forwards, -1);
730 return 1;
731}
732
733static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
734 struct stasis_message *message)
735{
736 struct stasis_app *app = data;
737 struct ast_bridge_merge_message *merge;
738
740
741 /* Find out if we're subscribed to either bridge */
742 if (bridge_app_subscribed(app, merge->from->uniqueid) ||
744 /* Forward the message to the app */
745 stasis_publish(app->topic, message);
746 }
747}
748
749/*! \brief Callback function for checking if channels in a bridge are subscribed to */
751{
752 int subscribed = 0;
753 struct ao2_iterator iter;
754 char *uniqueid;
755
756 if (bridge_app_subscribed(app, snapshot->uniqueid)) {
757 return 1;
758 }
759
760 iter = ao2_iterator_init(snapshot->channels, 0);
761 for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
762 if (bridge_app_subscribed(app, uniqueid)) {
763 subscribed = 1;
764 ao2_ref(uniqueid, -1);
765 break;
766 }
767 }
769
770 return subscribed;
771}
772
774 struct stasis_message *message)
775{
776 struct stasis_app *app = data;
778 struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
779
780 if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
781 (bridge && bridge_app_subscribed_involved(app, bridge))) {
782 stasis_publish(app->topic, message);
783 }
784}
785
787 struct stasis_message *message)
788{
789 struct stasis_app *app = data;
791 int subscribed = 0;
792
794 if (!subscribed) {
796 }
797 if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
799 }
800 if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
802 }
803
804 if (!subscribed) {
805 switch (transfer_msg->dest_type) {
808 break;
811 if (!subscribed) {
813 }
814 break;
815 break;
818 if (!subscribed) {
820 }
821 break;
822 default:
823 break;
824 }
825 }
826
827 if (subscribed) {
828 stasis_publish(app->topic, message);
829 }
830}
831
833 struct stasis_message *message)
834{
835 struct stasis_app *app = data;
836
839 }
840}
841
843{
844 if (!app) {
845 return;
846 }
847
848 app->debug = debug;
849}
850
852{
854
855 if (!app) {
856 return;
857 }
858
859 app->debug = debug;
861}
862
864{
865 return (app ? app->debug : 0) || global_debug;
866}
867
869{
870 int debug_enabled = 0;
871
872 if (global_debug) {
873 debug_enabled = 1;
874 } else {
876
877 if (app) {
878 if (app->debug) {
879 debug_enabled = 1;
880 }
881 ao2_ref(app, -1);
882 }
883 }
884 return debug_enabled;
885}
886
888{
890 if (!global_debug) {
891 struct ao2_container *app_names = stasis_app_get_all();
892 struct ao2_iterator it_app_names;
893 char *app_name;
894 struct stasis_app *app;
895
896 if (!app_names || !ao2_container_count(app_names)) {
897 ao2_cleanup(app_names);
898 return;
899 }
900
901 it_app_names = ao2_iterator_init(app_names, 0);
902 while ((app_name = ao2_iterator_next(&it_app_names))) {
905 }
906
909 }
910 ao2_iterator_cleanup(&it_app_names);
911 ao2_cleanup(app_names);
912 }
913}
914
916{
918 size_t size;
919 int res = 0;
920 size_t context_size = strlen("stasis-") + strlen(name) + 1;
921 char context_name[context_size];
922 char *topic_name;
923 int ret;
924
925 ast_assert(name != NULL);
927
928 ast_verb(1, "Creating Stasis app '%s'\n", name);
929
930 size = sizeof(*app) + strlen(name) + 1;
932 if (!app) {
933 return NULL;
934 }
935 app->subscription_model = subscription_model;
936
940 if (!app->forwards) {
941 return NULL;
942 }
943
944 ret = ast_asprintf(&topic_name, "ari:application/%s", name);
945 if (ret < 0) {
946 return NULL;
947 }
948
949 app->topic = stasis_topic_create(topic_name);
950 ast_free(topic_name);
951 if (!app->topic) {
952 return NULL;
953 }
954
956 if (!app->bridge_router) {
957 return NULL;
958 }
959
960 res |= stasis_message_router_add(app->bridge_router,
962
963 res |= stasis_message_router_add(app->bridge_router,
965
966 res |= stasis_message_router_add(app->bridge_router,
968
969 res |= stasis_message_router_add(app->bridge_router,
971
972 if (res != 0) {
973 return NULL;
974 }
975 /* Bridge router holds a reference */
976 ao2_ref(app, +1);
977
978 app->router = stasis_message_router_create(app->topic);
979 if (!app->router) {
980 return NULL;
981 }
982
983 res |= stasis_message_router_add(app->router,
985
986 res |= stasis_message_router_add(app->router,
988
991
992 res |= stasis_message_router_add(app->router,
994
997
998 if (res != 0) {
999 return NULL;
1000 }
1001 /* Router holds a reference */
1002 ao2_ref(app, +1);
1003
1004 strncpy(app->name, name, size - sizeof(*app));
1005 app->handler = handler;
1006 app->data = ao2_bump(data);
1007
1008 /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1009 * this should only be done if a context does not already exist. */
1010 strcpy(context_name, "stasis-");
1011 strcat(context_name, name);
1013 if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1014 ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1015 } else {
1016 ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1017 ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1018 }
1019 } else {
1020 ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1022 }
1023
1024 ao2_ref(app, +1);
1025 return app;
1026}
1027
1029{
1030 return app->topic;
1031}
1032
1034{
1036 char eid[20];
1037 void *data;
1038
1040 ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1041 ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1043 }
1044
1045 /* Copy off mutable state with lock held */
1046 ao2_lock(app);
1047 handler = app->handler;
1048 data = ao2_bump(app->data);
1049 ao2_unlock(app);
1050 /* Name is immutable; no need to copy */
1051
1052 if (handler) {
1053 handler(data, app->name, message);
1054 } else {
1055 ast_verb(3,
1056 "Inactive Stasis app '%s' missed message\n", app->name);
1057 }
1058 ao2_cleanup(data);
1059}
1060
1062{
1063 ao2_lock(app);
1064
1065 ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1066 app->handler = NULL;
1067 ao2_cleanup(app->data);
1068 app->data = NULL;
1069
1070 ao2_unlock(app);
1071}
1072
1074{
1075 ao2_lock(app);
1076
1078
1080 app->router = NULL;
1081 stasis_message_router_unsubscribe(app->bridge_router);
1082 app->bridge_router = NULL;
1083 stasis_message_router_unsubscribe(app->endpoint_router);
1084 app->endpoint_router = NULL;
1085
1086 ao2_unlock(app);
1087}
1088
1090{
1091 int ret;
1092
1093 ao2_lock(app);
1094 ret = app->handler != NULL;
1095 ao2_unlock(app);
1096
1097 return ret;
1098}
1099
1101{
1102 int ret;
1103
1104 ao2_lock(app);
1105 ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1106 ao2_unlock(app);
1107
1108 return ret;
1109}
1110
1112{
1113 ao2_lock(app);
1114 if (app->handler && app->data) {
1115 struct ast_json *msg;
1116
1117 ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1118
1119 msg = ast_json_pack("{s: s, s: o?, s: s}",
1120 "type", "ApplicationReplaced",
1121 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1122 "application", app->name);
1123 if (msg) {
1124 /*
1125 * The app must be unlocked before calling 'send' since a handler may
1126 * subsequently attempt to grab the app lock after first obtaining a
1127 * lock for another object, thus causing a deadlock.
1128 */
1129 ao2_unlock(app);
1130 app_send(app, msg);
1131 ao2_lock(app);
1132 ast_json_unref(msg);
1133 if (!app->handler) {
1134 /*
1135 * If the handler disappeared then the app was deactivated. In that
1136 * case don't replace. Re-activation will reset the handler later.
1137 */
1138 ao2_unlock(app);
1139 return;
1140 }
1141 }
1142 } else {
1143 ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1144 }
1145
1146 app->handler = handler;
1147 ao2_replace(app->data, data);
1148 ao2_unlock(app);
1149}
1150
1151const char *stasis_app_name(const struct stasis_app *app)
1152{
1153 return app->name;
1154}
1155
1156static int forwards_filter_by_type(void *obj, void *arg, int flags)
1157{
1158 struct app_forwards *forward = obj;
1159 enum forward_type *forward_type = arg;
1160
1161 if (forward->forward_type == *forward_type) {
1162 return CMP_MATCH;
1163 }
1164
1165 return 0;
1166}
1167
1168void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1169{
1170 struct ao2_iterator *channels;
1171 struct ao2_iterator *endpoints;
1172 struct ao2_iterator *bridges;
1173 struct app_forwards *forward;
1175
1176 ast_cli(a->fd, "Name: %s\n"
1177 " Debug: %s\n"
1178 " Subscription Model: %s\n",
1179 app->name,
1180 app->debug ? "Yes" : "No",
1181 app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
1182 "Global Resource Subscription" :
1183 "Application/Explicit Resource Subscription");
1184 ast_cli(a->fd, " Subscriptions: %d\n", ao2_container_count(app->forwards));
1185
1186 ast_cli(a->fd, " Channels:\n");
1190 if (channels) {
1191 while ((forward = ao2_iterator_next(channels))) {
1192 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1193 ao2_ref(forward, -1);
1194 }
1196 }
1197
1198 ast_cli(a->fd, " Bridges:\n");
1200 bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1202 if (bridges) {
1203 while ((forward = ao2_iterator_next(bridges))) {
1204 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1205 ao2_ref(forward, -1);
1206 }
1208 }
1209
1210 ast_cli(a->fd, " Endpoints:\n");
1214 if (endpoints) {
1215 while ((forward = ao2_iterator_next(endpoints))) {
1216 ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1217 ao2_ref(forward, -1);
1218 }
1220 }
1221}
1222
1223struct ast_json *app_to_json(const struct stasis_app *app)
1224{
1225 struct ast_json *json;
1226 struct ast_json *channels;
1227 struct ast_json *bridges;
1228 struct ast_json *endpoints;
1229 struct ao2_iterator i;
1230 struct app_forwards *forwards;
1231
1232 json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1233 "name", app->name,
1234 "channel_ids", "bridge_ids", "endpoint_ids");
1235 if (!json) {
1236 return NULL;
1237 }
1238 channels = ast_json_object_get(json, "channel_ids");
1239 bridges = ast_json_object_get(json, "bridge_ids");
1240 endpoints = ast_json_object_get(json, "endpoint_ids");
1241
1242 i = ao2_iterator_init(app->forwards, 0);
1243 while ((forwards = ao2_iterator_next(&i))) {
1244 struct ast_json *array = NULL;
1245 int append_res;
1246
1247 switch (forwards->forward_type) {
1248 case FORWARD_CHANNEL:
1249 array = channels;
1250 break;
1251 case FORWARD_BRIDGE:
1252 array = bridges;
1253 break;
1254 case FORWARD_ENDPOINT:
1255 array = endpoints;
1256 break;
1257 }
1258
1259 /* If forward_type value is unexpected this will safely return an error. */
1260 append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1261 ao2_ref(forwards, -1);
1262
1263 if (append_res != 0) {
1264 ast_log(LOG_ERROR, "Error building response\n");
1266 ast_json_unref(json);
1267
1268 return NULL;
1269 }
1270 }
1272
1273 return json;
1274}
1275
1277{
1278 struct app_forwards *forwards;
1279
1280 if (!app) {
1281 return -1;
1282 }
1283
1284 ao2_lock(app->forwards);
1285 /* If subscribed to all, don't subscribe again */
1286 forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1287 if (forwards) {
1288 ao2_unlock(app->forwards);
1289 ao2_ref(forwards, -1);
1290
1291 return 0;
1292 }
1293
1294 forwards = ao2_find(app->forwards,
1295 chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1297 if (!forwards) {
1298 int res;
1299
1300 /* Forwards not found, create one */
1301 forwards = forwards_create_channel(app, chan);
1302 if (!forwards) {
1303 ao2_unlock(app->forwards);
1304
1305 return -1;
1306 }
1307
1308 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1309 if (!res) {
1310 ao2_unlock(app->forwards);
1311 ao2_ref(forwards, -1);
1312
1313 return -1;
1314 }
1315 }
1316
1317 ++forwards->interested;
1318 ast_debug(3, "Channel '%s' is %d interested in %s\n",
1319 chan ? ast_channel_uniqueid(chan) : "ALL",
1320 forwards->interested,
1321 app->name);
1322
1323 ao2_unlock(app->forwards);
1324 ao2_ref(forwards, -1);
1325
1326 return 0;
1327}
1328
1329static int subscribe_channel(struct stasis_app *app, void *obj)
1330{
1331 return app_subscribe_channel(app, obj);
1332}
1333
1334static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1335{
1336 struct app_forwards *forwards;
1337
1338 if (!id) {
1339 if (!strcmp(kind, "bridge")) {
1340 id = BRIDGE_ALL;
1341 } else if (!strcmp(kind, "channel")) {
1342 id = CHANNEL_ALL;
1343 } else if (!strcmp(kind, "endpoint")) {
1344 id = ENDPOINT_ALL;
1345 } else {
1346 ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1347 return -1;
1348 }
1349 }
1350
1351 ao2_lock(app->forwards);
1352 forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1353 if (!forwards) {
1354 ao2_unlock(app->forwards);
1355 ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1356 return -1;
1357 }
1358 forwards->interested--;
1359
1360 ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1361 if (forwards->interested == 0 || terminate) {
1362 /* No one is interested any more; unsubscribe */
1363 ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1364 forwards_unsubscribe(forwards);
1365 ao2_find(app->forwards, forwards,
1367 OBJ_NODATA);
1368
1369 if (!strcmp(kind, "endpoint")) {
1371 }
1372 }
1373 ao2_unlock(app->forwards);
1374 ao2_ref(forwards, -1);
1375
1376 return 0;
1377}
1378
1380{
1381 if (!app) {
1382 return -1;
1383 }
1384
1386}
1387
1388int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1389{
1390 if (!app) {
1391 return -1;
1392 }
1393
1394 return unsubscribe(app, "channel", channel_id, 0);
1395}
1396
1397int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1398{
1399 struct app_forwards *forwards;
1400
1401 if (ast_strlen_zero(channel_id)) {
1402 channel_id = CHANNEL_ALL;
1403 }
1404 forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1405 ao2_cleanup(forwards);
1406
1407 return forwards != NULL;
1408}
1409
1410static void *channel_find(const struct stasis_app *app, const char *id)
1411{
1412 return ast_channel_get_by_name(id);
1413}
1414
1416 .scheme = "channel:",
1417 .find = channel_find,
1418 .subscribe = subscribe_channel,
1419 .unsubscribe = app_unsubscribe_channel_id,
1420 .is_subscribed = app_is_subscribed_channel_id
1421};
1422
1423int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1424{
1425 struct app_forwards *forwards;
1426
1427 if (!app) {
1428 return -1;
1429 }
1430
1431 ao2_lock(app->forwards);
1432 /* If subscribed to all, don't subscribe again */
1433 forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1434 if (forwards) {
1435 ao2_unlock(app->forwards);
1436 ao2_ref(forwards, -1);
1437
1438 return 0;
1439 }
1440
1441 forwards = ao2_find(app->forwards,
1442 bridge ? bridge->uniqueid : BRIDGE_ALL,
1444 if (!forwards) {
1445 int res;
1446
1447 /* Forwards not found, create one */
1448 forwards = forwards_create_bridge(app, bridge);
1449 if (!forwards) {
1450 ao2_unlock(app->forwards);
1451
1452 return -1;
1453 }
1454
1455 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1456 if (!res) {
1457 ao2_unlock(app->forwards);
1458 ao2_ref(forwards, -1);
1459
1460 return -1;
1461 }
1462 }
1463
1464 ++forwards->interested;
1465 ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1466 bridge ? bridge->uniqueid : "ALL",
1467 forwards->interested,
1468 app->name);
1469
1470 ao2_unlock(app->forwards);
1471 ao2_ref(forwards, -1);
1472
1473 return 0;
1474}
1475
1476static int subscribe_bridge(struct stasis_app *app, void *obj)
1477{
1478 return app_subscribe_bridge(app, obj);
1479}
1480
1482{
1483 if (!app) {
1484 return -1;
1485 }
1486
1487 return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1488}
1489
1490int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1491{
1492 if (!app) {
1493 return -1;
1494 }
1495
1496 return unsubscribe(app, "bridge", bridge_id, 0);
1497}
1498
1499int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1500{
1501 struct app_forwards *forwards;
1502
1503 if (ast_strlen_zero(bridge_id)) {
1504 bridge_id = BRIDGE_ALL;
1505 }
1506
1507 forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1508 ao2_cleanup(forwards);
1509
1510 return forwards != NULL;
1511}
1512
1513static void *bridge_find(const struct stasis_app *app, const char *id)
1514{
1516}
1517
1519 .scheme = "bridge:",
1520 .find = bridge_find,
1521 .subscribe = subscribe_bridge,
1522 .unsubscribe = app_unsubscribe_bridge_id,
1523 .is_subscribed = app_is_subscribed_bridge_id
1524};
1525
1527{
1528 struct app_forwards *forwards;
1529
1530 if (!app) {
1531 return -1;
1532 }
1533
1534 ao2_lock(app->forwards);
1535 /* If subscribed to all, don't subscribe again */
1536 forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1537 if (forwards) {
1538 ao2_unlock(app->forwards);
1539 ao2_ref(forwards, -1);
1540
1541 return 0;
1542 }
1543
1544 forwards = ao2_find(app->forwards,
1545 endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1547 if (!forwards) {
1548 int res;
1549
1550 /* Forwards not found, create one */
1551 forwards = forwards_create_endpoint(app, endpoint);
1552 if (!forwards) {
1553 ao2_unlock(app->forwards);
1554
1555 return -1;
1556 }
1557
1558 res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1559 if (!res) {
1560 ao2_unlock(app->forwards);
1561 ao2_ref(forwards, -1);
1562
1563 return -1;
1564 }
1565
1566 /* Subscribe for messages */
1568 }
1569
1570 ++forwards->interested;
1571 ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1572 endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1573 forwards->interested,
1574 app->name);
1575
1576 ao2_unlock(app->forwards);
1577 ao2_ref(forwards, -1);
1578
1579 return 0;
1580}
1581
1582static int subscribe_endpoint(struct stasis_app *app, void *obj)
1583{
1584 return app_subscribe_endpoint(app, obj);
1585}
1586
1587int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1588{
1589 if (!app) {
1590 return -1;
1591 }
1592
1593 return unsubscribe(app, "endpoint", endpoint_id, 0);
1594}
1595
1596int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1597{
1598 struct app_forwards *forwards;
1599
1600 if (ast_strlen_zero(endpoint_id)) {
1601 endpoint_id = ENDPOINT_ALL;
1602 }
1603 forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1604 ao2_cleanup(forwards);
1605
1606 return forwards != NULL;
1607}
1608
1609static void *endpoint_find(const struct stasis_app *app, const char *id)
1610{
1611 return ast_endpoint_find_by_id(id);
1612}
1613
1615 .scheme = "endpoint:",
1616 .find = endpoint_find,
1617 .subscribe = subscribe_endpoint,
1618 .unsubscribe = app_unsubscribe_endpoint_id,
1619 .is_subscribed = app_is_subscribed_endpoint_id
1620};
1621
1623{
1627}
1628
1630{
1634}
1635
1637{
1638 if (!app || !json) {
1639 return json;
1640 }
1641
1642 ast_json_object_set(json, "events_allowed", app->events_allowed ?
1643 ast_json_ref(app->events_allowed) : ast_json_array_create());
1644 ast_json_object_set(json, "events_disallowed", app->events_disallowed ?
1645 ast_json_ref(app->events_disallowed) : ast_json_array_create());
1646
1647 return json;
1648}
1649
1651 struct ast_json *filter, const char *filter_type)
1652{
1655 /* If no filters are specified then reset this filter type */
1656 filter = NULL;
1657 } else {
1658 /* Otherwise try to get the filter array for this type */
1659 filter = ast_json_object_get(filter, filter_type);
1660 if (!filter) {
1661 /* A filter type exists, but not this one, so don't update */
1662 return 0;
1663 }
1664 }
1665 }
1666
1667 /* At this point the filter object should be an array */
1669 ast_log(LOG_ERROR, "Invalid json type event filter - app: %s, filter: %s\n",
1670 app->name, filter_type);
1671 return -1;
1672 }
1673
1674 if (filter) {
1675 /* Confirm that at least the type names are specified */
1676 struct ast_json *obj;
1677 int i;
1678
1679 for (i = 0; i < ast_json_array_size(filter) &&
1680 (obj = ast_json_array_get(filter, i)); ++i) {
1681
1682 if (ast_strlen_zero(ast_json_object_string_get(obj, "type"))) {
1683 ast_log(LOG_ERROR, "Filter event must have a type - app: %s, "
1684 "filter: %s\n", app->name, filter_type);
1685 return -1;
1686 }
1687 }
1688 }
1689
1690 ao2_lock(app);
1693 ao2_unlock(app);
1694
1695 return 0;
1696}
1697
1699{
1700 return app_event_filter_set(app, &app->events_allowed, filter, "allowed");
1701}
1702
1704{
1705 return app_event_filter_set(app, &app->events_disallowed, filter, "disallowed");
1706}
1707
1709{
1711}
1712
1713static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
1714{
1715 struct ast_json *obj;
1716 int i;
1717
1718 if (!array || !ast_json_array_size(array)) {
1719 return empty;
1720 }
1721
1722 for (i = 0; i < ast_json_array_size(array) &&
1723 (obj = ast_json_array_get(array, i)); ++i) {
1724
1727 return 1;
1728 }
1729 }
1730
1731 return 0;
1732}
1733
1735{
1737 int res;
1738
1739 if (!app) {
1740 return 0;
1741 }
1742
1743 ao2_lock(app);
1744 res = !app_event_filter_matched(app->events_disallowed, event, 0) &&
1745 app_event_filter_matched(app->events_allowed, event, 1);
1746 ao2_unlock(app);
1747 ao2_ref(app, -1);
1748
1749 return res;
1750}
static const char app[]
Definition: app_adsiprog.c:56
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition: astmm.h:298
void ast_free_ptr(void *ptr)
free() wrapper
Definition: astmm.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
@ CMP_MATCH
Definition: astobj2.h:1027
#define OBJ_KEY
Definition: astobj2.h:1151
#define OBJ_POINTER
Definition: astobj2.h:1150
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
void ao2_iterator_cleanup(struct ao2_iterator *iter)
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define OBJ_PARTIAL_KEY
Definition: astobj2.h:1152
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
@ AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT
Reject duplicate objects in container.
Definition: astobj2.h:1201
static struct ao2_container * bridges
Definition: bridge.c:123
CallerID (and other GR30) management and generation Includes code and algorithms from the Zapata libr...
const char * ast_describe_caller_presentation(int data)
Convert caller ID pres value to explanatory string.
Definition: callerid.c:1265
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
const char * ast_channel_uniqueid(const struct ast_channel *chan)
#define ast_channel_unref(c)
Decrease channel reference count.
Definition: channel.h:2958
const char * ast_cause2str(int cause) attribute_pure
Gives the string form of a given cause code.
Definition: channel.c:612
@ AST_FLAG_DEAD
Definition: channel.h:1045
struct ast_channel * ast_channel_get_by_name(const char *name)
Find a channel by name.
Definition: channel.c:1454
Standard Command Line Interface.
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
static struct channel_usage channels
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
Internal API for the Stasis application controller.
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
static const char name[]
Definition: format_mp3.c:68
static const char context_name[]
static int array(struct ast_channel *chan, const char *cmd, char *var, const char *value)
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
static int subscribed
Definition: manager.c:1621
struct stasis_topic * ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_topic * ast_channel_topic_all(void)
A topic which publishes the events for all channels.
struct ast_channel_snapshot * ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
Retrieve a channel snapshot associated with a specific role from a ast_multi_channel_blob.
struct stasis_message_type * ast_endpoint_contact_state_type(void)
Message type for endpoint contact state changes.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct stasis_message_type * ast_channel_dial_type(void)
Message type for when a channel dials another channel.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct ast_endpoint_snapshot * ast_endpoint_latest_snapshot(const char *tech, const char *resource)
Retrieve the most recent snapshot for the endpoint with the given name.
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
struct stasis_message_type * ast_endpoint_state_type(void)
Message type for endpoint state changes.
#define AST_LOG_WARNING
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define ast_verb(level,...)
#define LOG_WARNING
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition: json.h:600
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
enum ast_json_type ast_json_typeof(const struct ast_json *value)
Get the type of value.
Definition: json.c:78
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition: json.c:378
size_t ast_json_object_size(struct ast_json *object)
Get size of JSON object.
Definition: json.c:403
struct ast_json * ast_json_array_get(const struct ast_json *array, size_t index)
Get an element from an array.
Definition: json.c:370
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_array_create(void)
Create a empty JSON array.
Definition: json.c:362
@ AST_JSON_ARRAY
Definition: json.h:165
@ AST_JSON_OBJECT
Definition: json.h:164
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
#define AST_JSON_UTF8_VALIDATE(str)
Check str for UTF-8 and replace with an empty string if fails the check.
Definition: json.h:224
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
size_t ast_json_array_size(const struct ast_json *array)
Get the size of a JSON array.
Definition: json.c:366
static struct ao2_container * endpoints
int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
Subscribe an application to an endpoint for messages.
Definition: messaging.c:493
void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
Subscribe for messages from a particular endpoint.
Definition: messaging.c:423
Stasis out-of-call text message support.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition: pbx.c:6928
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition: pbx.c:8205
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition: extconf.c:4172
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition: pbx.c:6149
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
void stasis_app_set_debug(struct stasis_app *app, int debug)
Enable/disable request/response and event logging on an application.
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
Subscribes an application to a endpoint.
static int forwards_filter_by_type(void *obj, void *arg, int flags)
static struct ast_json * channel_destroyed_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static struct ast_json * channel_connected_line(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Test if an app is subscribed to a endpoint.
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
Cancel the subscription an app has for a bridge.
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Add a bridge subscription to an existing channel subscription.
static struct app_forwards * forwards_create_channel(struct stasis_app *app, struct ast_channel *chan)
void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
Update the handler and data for a res_stasis application.
struct stasis_topic * ast_app_get_topic(struct stasis_app *app)
Returns the stasis topic for an app.
static int app_event_filter_set(struct stasis_app *app, struct ast_json **member, struct ast_json *filter, const char *filter_type)
static struct ast_json * channel_callerid(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static struct ast_json * channel_state(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Handle channel state changes.
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
#define CHANNEL_ALL
static void sub_endpoint_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void * channel_find(const struct stasis_app *app, const char *id)
static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
Helper function for determining if the application is subscribed to a given entity.
struct ast_json *(* channel_snapshot_monitor)(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Typedef for callbacks that get called on channel snapshot updates.
forward_type
@ FORWARD_CHANNEL
@ FORWARD_ENDPOINT
@ FORWARD_BRIDGE
static int subscribe_channel(struct stasis_app *app, void *obj)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
Test if an app is subscribed to a channel.
void stasis_app_unregister_event_sources(void)
Unregister core event sources.
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Cancel the bridge subscription for an application.
void app_shutdown(struct stasis_app *app)
Tears down an application.
static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
Callback function for checking if channels in a bridge are subscribed to.
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Cancel the subscription an app has for a channel.
static void app_dtor(void *obj)
void stasis_app_set_debug_by_name(const char *app_name, int debug)
Enable/disable request/response and event logging on an application.
static struct ast_json * channel_created_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
int global_debug
static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Subscribes an application to a channel.
static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
int app_is_finished(struct stasis_app *app)
Checks whether a deactivated app has no channels.
struct ast_json * stasis_app_event_filter_to_json(struct stasis_app *app, struct ast_json *json)
Convert and add the app's event type filter(s) to the given json object.
void stasis_app_register_event_sources(void)
Register core event sources.
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void sub_default_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct ast_json * simple_endpoint_event(const char *type, struct ast_endpoint_snapshot *snapshot, const struct timeval *tv)
struct stasis_app_event_source bridge_event_source
int stasis_app_event_filter_set(struct stasis_app *app, struct ast_json *filter)
Set the application's event type filter.
struct stasis_app * app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
Create a res_stasis application.
static struct ast_json * channel_state_change_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Cancel the subscription an app has for a endpoint.
void stasis_app_set_global_debug(int debug)
Enable/disable request/response and event logging on all applications.
int stasis_app_get_debug(struct stasis_app *app)
Get debug status of an application.
static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
Dump properties of a stasis_app to the CLI.
static struct app_forwards * forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge)
static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void endpoint_state_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void * endpoint_find(const struct stasis_app *app, const char *id)
static void forwards_dtor(void *obj)
#define BRIDGE_ALL
static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void bridge_merge_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct stasis_app_event_source channel_event_source
static void sub_bridge_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
int app_is_active(struct stasis_app *app)
Checks whether an app is active.
static struct ast_json * simple_bridge_event(const char *type, struct ast_bridge_snapshot *snapshot, const struct timeval *tv)
static void forwards_unsubscribe(struct app_forwards *forwards)
static void sub_channel_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct ast_json * app_to_json(const struct stasis_app *app)
Create a JSON representation of a stasis_app.
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
Cancel the subscription an app has for a channel.
static struct ast_json * channel_dialplan(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
Test if an app is subscribed to a bridge.
static struct ast_json * simple_channel_event(const char *type, struct ast_channel_snapshot *snapshot, const struct timeval *tv)
const char * stasis_app_name(const struct stasis_app *app)
Retrieve an application's name.
void app_deactivate(struct stasis_app *app)
Deactivates an application.
static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
static channel_snapshot_monitor channel_monitors[]
static struct app_forwards * forwards_create(struct stasis_app *app, const char *id)
#define ENDPOINT_ALL
static int app_events_allowed_set(struct stasis_app *app, struct ast_json *filter)
static void * bridge_find(const struct stasis_app *app, const char *id)
struct stasis_app_event_source endpoint_event_source
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
static int app_events_disallowed_set(struct stasis_app *app, struct ast_json *filter)
static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
void app_send(struct stasis_app *app, struct ast_json *message)
Send a message to an application.
static struct app_forwards * forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
static int subscribe_endpoint(struct stasis_app *app, void *obj)
Internal API for the Stasis application controller.
stasis_app_subscription_model
@ STASIS_APP_SUBSCRIBE_ALL
An application is automatically subscribed to all resources in Asterisk, even if it does not control ...
struct stasis_forward * sub
Definition: res_corosync.c:240
static int debug
Global debug status.
Definition: res_xmpp.c:441
#define NULL
Definition: resample.c:96
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition: stasis.h:310
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
Stasis Application API. See Stasis Application API for detailed documentation.
struct stasis_message_sanitizer * stasis_app_get_sanitizer(void)
Get the Stasis message sanitizer for app_stasis applications.
Definition: res_stasis.c:2271
struct ast_bridge * stasis_app_bridge_find_by_id(const char *bridge_id)
Returns the bridge with the given id.
Definition: res_stasis.c:774
void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
Unregister an application event source.
Definition: res_stasis.c:1830
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
struct stasis_app * stasis_app_get_by_name(const char *name)
Retrieve a handle to a Stasis application by its name.
Definition: res_stasis.c:1701
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
Register an application event source.
Definition: res_stasis.c:1823
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
Definition: res_stasis.c:1715
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct stasis_message_type * ast_bridge_snapshot_type(void)
Message type for ast_bridge_snapshot.
struct stasis_message_type * ast_blind_transfer_type(void)
Message type for ast_blind_transfer_message.
struct stasis_topic * ast_bridge_topic(struct ast_bridge *bridge)
A topic which publishes the events for a particular bridge.
struct stasis_message_type * ast_attended_transfer_type(void)
Message type for ast_attended_transfer_message.
struct stasis_message_type * ast_bridge_merge_message_type(void)
Message type for ast_bridge_merge_message.
struct stasis_topic * ast_bridge_topic_all(void)
A topic which publishes the events for all bridges.
@ AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE
@ AST_ATTENDED_TRANSFER_DEST_LINK
@ AST_ATTENDED_TRANSFER_DEST_THREEWAY
int ast_channel_snapshot_connected_line_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the connected line info of two snapshots.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
int ast_channel_snapshot_caller_id_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the callerid info of two snapshots.
int ast_channel_snapshot_cep_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the context, exten and priority of two snapshots.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
#define stasis_message_router_create(topic)
Create a new message router object.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition: strings.c:238
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
enum forward_type forward_type
struct stasis_forward * topic_forward
struct stasis_forward * topic_cached_forward
Message representing attended transfer.
union ast_attended_transfer_message::@284 dest
enum ast_attended_transfer_dest_type dest_type
struct ast_channel_snapshot * links[2]
struct ast_bridge_channel_snapshot_pair to_transfer_target
struct ast_bridge_channel_snapshot_pair threeway
struct ast_bridge_channel_snapshot_pair to_transferee
char bridge[AST_UUID_STR_LEN]
Message published during a blind transfer.
struct ast_bridge_snapshot * bridge
struct ast_channel_snapshot * transferer
struct ast_bridge_snapshot * bridge_snapshot
struct ast_channel_snapshot * channel_snapshot
Message representing the merge of two bridges.
struct ast_bridge_snapshot * from
struct ast_bridge_snapshot * to
Structure that contains a snapshot of information about a bridge.
Definition: bridge.h:314
const ast_string_field uniqueid
Definition: bridge.h:328
struct ao2_container * channels
Definition: bridge.h:331
Structure that contains information about a bridge.
Definition: bridge.h:349
const ast_string_field uniqueid
Definition: bridge.h:401
const ast_string_field uniqueid
const ast_string_field data
const ast_string_field appl
Structure representing a change of snapshot of channel state.
Structure representing a snapshot of channel state.
struct ast_channel_snapshot_dialplan * dialplan
struct ast_channel_snapshot_base * base
enum ast_channel_state state
struct ast_channel_snapshot_caller * caller
struct ast_flags flags
struct ast_channel_snapshot_hangup * hangup
Main Channel structure associated with a channel.
struct ast_channel_snapshot * snapshot
const char * data
A snapshot of an endpoint's state.
const ast_string_field id
Abstract JSON element (object, array, string, int, ...).
A multi channel blob data structure for multi_channel_blob stasis messages.
Definition: astman.c:222
Event source information and callbacks.
Definition: stasis_app.h:174
const char * scheme
The scheme to match against on [un]subscribes.
Definition: stasis_app.h:176
struct stasis_topic * topic
struct ast_json * events_disallowed
struct stasis_message_router * router
stasis_app_cb handler
struct ao2_container * forwards
struct stasis_message_router * bridge_router
struct ast_json * events_allowed
enum stasis_app_subscription_model subscription_model
struct stasis_message_router * endpoint_router
Cache update message.
Definition: stasis.h:965
Forwarding information.
Definition: stasis.c:1531
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
static struct test_val a
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define ast_test_flag(p, flag)
Definition: utils.h:63
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: utils.c:2839
#define ARRAY_LEN(a)
Definition: utils.h:666
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93