Asterisk - The Open Source Telephony Project GIT-master-0644429
stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message Bus API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/stasis.h"
36#include "asterisk/threadpool.h"
37#include "asterisk/utils.h"
38#include "asterisk/uuid.h"
39#include "asterisk/vector.h"
44#include "asterisk/cli.h"
45
46/*** DOCUMENTATION
47 <managerEvent language="en_US" name="UserEvent">
48 <managerEventInstance class="EVENT_FLAG_USER">
49 <synopsis>A user defined event raised from the dialplan.</synopsis>
50 <syntax>
51 <channel_snapshot/>
52 <parameter name="UserEvent">
53 <para>The event name, as specified in the dialplan.</para>
54 </parameter>
55 </syntax>
56 <description>
57 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58 </description>
59 <see-also>
60 <ref type="application">UserEvent</ref>
61 <ref type="managerEvent">UserEvent</ref>
62 </see-also>
63 </managerEventInstance>
64 </managerEvent>
65 <configInfo name="stasis" language="en_US">
66 <configFile name="stasis.conf">
67 <configObject name="threadpool">
68 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69 <configOption name="initial_size" default="5">
70 <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71 </configOption>
72 <configOption name="idle_timeout_sec" default="20">
73 <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74 </configOption>
75 <configOption name="max_size" default="50">
76 <synopsis>Maximum number of threads in the threadpool.</synopsis>
77 </configOption>
78 </configObject>
79 <configObject name="declined_message_types">
80 <synopsis>Stasis message types for which to decline creation.</synopsis>
81 <configOption name="decline">
82 <synopsis>The message type to decline.</synopsis>
83 <description>
84 <para>This configuration option defines the name of the Stasis
85 message type that Asterisk is forbidden from creating and can be
86 specified as many times as necessary to achieve the desired result.</para>
87 <enumlist>
88 <enum name="stasis_app_recording_snapshot_type" />
89 <enum name="stasis_app_playback_snapshot_type" />
90 <enum name="stasis_test_message_type" />
91 <enum name="confbridge_start_type" />
92 <enum name="confbridge_end_type" />
93 <enum name="confbridge_join_type" />
94 <enum name="confbridge_leave_type" />
95 <enum name="confbridge_start_record_type" />
96 <enum name="confbridge_stop_record_type" />
97 <enum name="confbridge_mute_type" />
98 <enum name="confbridge_unmute_type" />
99 <enum name="confbridge_talking_type" />
100 <enum name="cel_generic_type" />
101 <enum name="ast_bridge_snapshot_type" />
102 <enum name="ast_bridge_merge_message_type" />
103 <enum name="ast_channel_entered_bridge_type" />
104 <enum name="ast_channel_left_bridge_type" />
105 <enum name="ast_blind_transfer_type" />
106 <enum name="ast_attended_transfer_type" />
107 <enum name="ast_endpoint_snapshot_type" />
108 <enum name="ast_endpoint_state_type" />
109 <enum name="ast_device_state_message_type" />
110 <enum name="ast_test_suite_message_type" />
111 <enum name="ast_mwi_state_type" />
112 <enum name="ast_mwi_vm_app_type" />
113 <enum name="ast_format_register_type" />
114 <enum name="ast_format_unregister_type" />
115 <enum name="ast_manager_get_generic_type" />
116 <enum name="ast_parked_call_type" />
117 <enum name="ast_channel_snapshot_type" />
118 <enum name="ast_channel_dial_type" />
119 <enum name="ast_channel_varset_type" />
120 <enum name="ast_channel_hangup_request_type" />
121 <enum name="ast_channel_dtmf_begin_type" />
122 <enum name="ast_channel_dtmf_end_type" />
123 <enum name="ast_channel_flash_type" />
124 <enum name="ast_channel_wink_type" />
125 <enum name="ast_channel_hold_type" />
126 <enum name="ast_channel_unhold_type" />
127 <enum name="ast_channel_chanspy_start_type" />
128 <enum name="ast_channel_chanspy_stop_type" />
129 <enum name="ast_channel_fax_type" />
130 <enum name="ast_channel_hangup_handler_type" />
131 <enum name="ast_channel_moh_start_type" />
132 <enum name="ast_channel_moh_stop_type" />
133 <enum name="ast_channel_mixmonitor_start_type" />
134 <enum name="ast_channel_mixmonitor_stop_type" />
135 <enum name="ast_channel_mixmonitor_mute_type" />
136 <enum name="ast_channel_agent_login_type" />
137 <enum name="ast_channel_agent_logoff_type" />
138 <enum name="ast_channel_talking_start" />
139 <enum name="ast_channel_talking_stop" />
140 <enum name="ast_channel_tone_detect" />
141 <enum name="ast_security_event_type" />
142 <enum name="ast_named_acl_change_type" />
143 <enum name="ast_local_bridge_type" />
144 <enum name="ast_local_optimization_begin_type" />
145 <enum name="ast_local_optimization_end_type" />
146 <enum name="stasis_subscription_change_type" />
147 <enum name="ast_multi_user_event_type" />
148 <enum name="stasis_cache_clear_type" />
149 <enum name="stasis_cache_update_type" />
150 <enum name="ast_network_change_type" />
151 <enum name="ast_system_registry_type" />
152 <enum name="ast_cc_available_type" />
153 <enum name="ast_cc_offertimerstart_type" />
154 <enum name="ast_cc_requested_type" />
155 <enum name="ast_cc_requestacknowledged_type" />
156 <enum name="ast_cc_callerstopmonitoring_type" />
157 <enum name="ast_cc_callerstartmonitoring_type" />
158 <enum name="ast_cc_callerrecalling_type" />
159 <enum name="ast_cc_recallcomplete_type" />
160 <enum name="ast_cc_failure_type" />
161 <enum name="ast_cc_monitorfailed_type" />
162 <enum name="ast_presence_state_message_type" />
163 <enum name="ast_rtp_rtcp_sent_type" />
164 <enum name="ast_rtp_rtcp_received_type" />
165 <enum name="ast_call_pickup_type" />
166 <enum name="aoc_s_type" />
167 <enum name="aoc_d_type" />
168 <enum name="aoc_e_type" />
169 <enum name="dahdichannel_type" />
170 <enum name="mcid_type" />
171 <enum name="session_timeout_type" />
172 <enum name="cdr_read_message_type" />
173 <enum name="cdr_write_message_type" />
174 <enum name="cdr_prop_write_message_type" />
175 <enum name="corosync_ping_message_type" />
176 <enum name="agi_exec_start_type" />
177 <enum name="agi_exec_end_type" />
178 <enum name="agi_async_start_type" />
179 <enum name="agi_async_exec_type" />
180 <enum name="agi_async_end_type" />
181 <enum name="queue_caller_join_type" />
182 <enum name="queue_caller_leave_type" />
183 <enum name="queue_caller_abandon_type" />
184 <enum name="queue_member_status_type" />
185 <enum name="queue_member_added_type" />
186 <enum name="queue_member_removed_type" />
187 <enum name="queue_member_pause_type" />
188 <enum name="queue_member_penalty_type" />
189 <enum name="queue_member_ringinuse_type" />
190 <enum name="queue_agent_called_type" />
191 <enum name="queue_agent_connect_type" />
192 <enum name="queue_agent_complete_type" />
193 <enum name="queue_agent_dump_type" />
194 <enum name="queue_agent_ringnoanswer_type" />
195 <enum name="meetme_join_type" />
196 <enum name="meetme_leave_type" />
197 <enum name="meetme_end_type" />
198 <enum name="meetme_mute_type" />
199 <enum name="meetme_talking_type" />
200 <enum name="meetme_talk_request_type" />
201 <enum name="appcdr_message_type" />
202 <enum name="forkcdr_message_type" />
203 <enum name="cdr_sync_message_type" />
204 </enumlist>
205 </description>
206 </configOption>
207 </configObject>
208 </configFile>
209 </configInfo>
210***/
211
212/*!
213 * \page stasis-impl Stasis Implementation Notes
214 *
215 * \par Reference counting
216 *
217 * Stasis introduces a number of objects, which are tightly related to one
218 * another. Because we rely on ref-counting for memory management, understanding
219 * these relationships is important to understanding this code.
220 *
221 * \code{.txt}
222 *
223 * stasis_topic <----> stasis_subscription
224 * ^ ^
225 * \ /
226 * \ /
227 * dispatch
228 * |
229 * |
230 * v
231 * stasis_message
232 * |
233 * |
234 * v
235 * stasis_message_type
236 *
237 * \endcode
238 *
239 * The most troubling thing in this chart is the cyclic reference between
240 * stasis_topic and stasis_subscription. This is both unfortunate, and
241 * necessary. Topics need the subscription in order to dispatch messages;
242 * subscriptions need the topics to unsubscribe and check subscription status.
243 *
244 * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
245 * topic's reference to a subscription. When the subcription is destroyed, it
246 * will remove its reference to the topic.
247 *
248 * This means that until a subscription has be explicitly unsubscribed, it will
249 * not be destroyed. Neither will a topic be destroyed while it has subscribers.
250 * The destructors of both have assertions regarding this to catch ref-counting
251 * problems where a subscription or topic has had an extra ao2_cleanup().
252 *
253 * The \ref dispatch_exec_sync object is a transient object, which is posted to
254 * a subscription's taskprocessor to send a message to the subscriber. They have
255 * short life cycles, allocated on one thread, destroyed on another.
256 *
257 * During shutdown, or the deletion of a domain object, there are a flurry of
258 * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
259 * are processed. Any one of these cleanups could be the one to actually destroy
260 * a given object, so care must be taken to ensure that an object isn't
261 * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
262 * that might happen when a RAII_VAR() goes out of scope.
263 *
264 * \par Typical life cycles
265 *
266 * \li stasis_topic - There are several topics which live for the duration of
267 * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
268 * are actually fed by shorter-lived topics whose lifetime is associated
269 * with some domain object (like ast_channel_topic() for a given
270 * ast_channel).
271 *
272 * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
273 * topics, for similar reasons.
274 *
275 * \li dispatch - Very short lived; just long enough to post a message to a
276 * subscriber.
277 *
278 * \li stasis_message - Short to intermediate lifetimes, but that is mostly
279 * irrelevant. Messages are strictly data and have no behavior associated
280 * with them, so it doesn't really matter if/when they are destroyed. By
281 * design, a component could hold a ref to a message forever without any
282 * ill consequences (aside from consuming more memory).
283 *
284 * \li stasis_message_type - Long life cycles, typically only destroyed on
285 * module unloading or _clean_ process exit.
286 *
287 * \par Subscriber shutdown sequencing
288 *
289 * Subscribers are sensitive to shutdown sequencing, specifically in how the
290 * reference message types. This is fully detailed in the documentation at
291 * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
292 *
293 * In short, the lifetime of the \a data (and \a callback, if in a module) must
294 * be held until the stasis_subscription_final_message() has been received.
295 * Depending on the structure of the subscriber code, this can be handled by
296 * using stasis_subscription_final_message() to free resources on the final
297 * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
298 * block until the unsubscribe has completed.
299 */
300
301/*! Initial size of the subscribers list. */
302#define INITIAL_SUBSCRIBERS_MAX 4
303
304/*! The number of buckets to use for topic pools */
305#define TOPIC_POOL_BUCKETS 57
306
307/*! Thread pool for topics that don't want a dedicated taskprocessor */
309
311
312#if defined(LOW_MEMORY)
313
314#define TOPIC_ALL_BUCKETS 257
315
316#else
317
318#define TOPIC_ALL_BUCKETS 997
319
320#endif
321
322#ifdef AST_DEVMODE
323
324/*! The number of buckets to use for topic statistics */
325#define TOPIC_STATISTICS_BUCKETS 57
326
327/*! The number of buckets to use for subscription statistics */
328#define SUBSCRIPTION_STATISTICS_BUCKETS 57
329
330/*! Global container which stores statistics for topics */
331static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
332
333/*! Global container which stores statistics for subscriptions */
334static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
335
336/*! \internal */
337struct stasis_message_type_statistics {
338 /*! \brief The number of messages of this published */
339 int published;
340 /*! \brief The number of messages of this that did not reach a subscriber */
341 int unused;
342 /*! \brief The stasis message type */
343 struct stasis_message_type *message_type;
344};
345
346/*! Lock to protect the message types vector */
347AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
348
349/*! Vector containing message type information */
350static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
351
352/*! \internal */
353struct stasis_topic_statistics {
354 /*! \brief Highest time spent dispatching messages to subscribers */
355 long highest_time_dispatched;
356 /*! \brief Lowest time spent dispatching messages to subscribers */
357 long lowest_time_dispatched;
358 /*! \brief The number of messages that were not dispatched to any subscriber */
359 int messages_not_dispatched;
360 /*! \brief The number of messages that were dispatched to at least 1 subscriber */
361 int messages_dispatched;
362 /*! \brief The ids of the subscribers to this topic */
363 struct ao2_container *subscribers;
364 /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
365 struct stasis_topic *topic;
366 /*! \brief Name of the topic */
367 char name[0];
368};
369#endif
370
371/*! \internal */
373 /*! Variable length array of the subscribers */
375
376 /*! Topics forwarding into this topic */
378
379#ifdef AST_DEVMODE
380 struct stasis_topic_statistics *statistics;
381#endif
382
383 /*! Unique incrementing integer for subscriber ids */
385
386 /*! Name of the topic */
387 char *name;
388
389 /*! Detail of the topic */
390 char *detail;
391
392 /*! Creation time */
393 struct timeval *creationtime;
394};
395
397
400
401 char *name;
402 char *detail;
403
404 struct timeval creationtime;
405
406 char buf[0];
407};
408
412
413static void proxy_dtor(void *weakproxy, void *container)
414{
415 ao2_unlink(container, weakproxy);
417}
418
419/* Forward declarations for the tightly-coupled subscription object */
420static int topic_add_subscription(struct stasis_topic *topic,
421 struct stasis_subscription *sub);
422
423static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
424
425/*! \brief Lock two topics. */
426#define topic_lock_both(topic1, topic2) \
427 do { \
428 ao2_lock(topic1); \
429 while (ao2_trylock(topic2)) { \
430 AO2_DEADLOCK_AVOIDANCE(topic1); \
431 } \
432 } while (0)
433
434static void topic_dtor(void *obj)
435{
436 struct stasis_topic *topic = obj;
437#ifdef AST_DEVMODE
438 struct ao2_container *topic_stats;
439#endif
440
441 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
442 topic->name, topic->detail);
443
444 /* Subscribers hold a reference to topics, so they should all be
445 * unsubscribed before we get here. */
447
450 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
451
452#ifdef AST_DEVMODE
453 if (topic->statistics) {
454 topic_stats = ao2_global_obj_ref(topic_statistics);
455 if (topic_stats) {
456 ao2_unlink(topic_stats, topic->statistics);
457 ao2_ref(topic_stats, -1);
458 }
459 ao2_ref(topic->statistics, -1);
460 }
461#endif
462}
463
464#ifdef AST_DEVMODE
465static void topic_statistics_destroy(void *obj)
466{
467 struct stasis_topic_statistics *statistics = obj;
468
469 ao2_cleanup(statistics->subscribers);
470}
471
472static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
473{
474 struct stasis_topic_statistics *statistics;
475 RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
476
477 if (!topic_stats) {
478 return NULL;
479 }
480
481 statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
482 if (!statistics) {
483 return NULL;
484 }
485
486 statistics->subscribers = ast_str_container_alloc(1);
487 if (!statistics->subscribers) {
488 ao2_ref(statistics, -1);
489 return NULL;
490 }
491
492 /* This is strictly used for the pointer address when showing the topic */
493 statistics->topic = topic;
494 strcpy(statistics->name, topic->name); /* SAFE */
495 ao2_link(topic_stats, statistics);
496
497 return statistics;
498}
499#endif
500
501static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
502{
503 struct topic_proxy *proxy;
504 struct stasis_topic* topic_tmp;
505 size_t detail_len;
506
507 if (!topic || !name || !strlen(name) || !detail) {
508 return -1;
509 }
510
512
513 topic_tmp = stasis_topic_get(name);
514 if (topic_tmp) {
515 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
516 ao2_ref(topic_tmp, -1);
518
519 return -1;
520 }
521
522 detail_len = strlen(detail) + 1;
523
524 proxy = ao2_t_weakproxy_alloc(
525 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
526 if (!proxy) {
528
529 return -1;
530 }
531
532 /* set the proxy info */
533 proxy->name = proxy->buf;
534 proxy->detail = proxy->name + strlen(name) + 1;
535
536 strcpy(proxy->name, name); /* SAFE */
537 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
538 proxy->creationtime = ast_tvnow();
539
540 /* We have exclusive access to proxy, no need for locking here. */
541 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
542 ao2_cleanup(proxy);
544
545 return -1;
546 }
547
549 ao2_cleanup(proxy);
552
553 return -1;
554 }
555
556 /* setting the topic point to the proxy */
557 topic->name = proxy->name;
558 topic->detail = proxy->detail;
559 topic->creationtime = &(proxy->creationtime);
560
562 ao2_ref(proxy, -1);
563
565
566 return 0;
567}
568
570 const char *name, const char* detail
571 )
572{
573 struct stasis_topic *topic;
574 int res = 0;
575
576 if (!name|| !strlen(name) || !detail) {
577 return NULL;
578 }
579 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
580
581 topic = stasis_topic_get(name);
582 if (topic) {
583 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
584 name, detail);
585 return topic;
586 }
587
588 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
589 if (!topic) {
590 return NULL;
591 }
592
594 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
595 if (res) {
596 ao2_ref(topic, -1);
597 return NULL;
598 }
599
600 /* link to the proxy */
601 if (link_topic_proxy(topic, name, detail)) {
602 ao2_ref(topic, -1);
603 return NULL;
604 }
605
606#ifdef AST_DEVMODE
607 topic->statistics = stasis_topic_statistics_create(topic);
608 if (!topic->statistics) {
609 ao2_ref(topic, -1);
610 return NULL;
611 }
612#endif
613 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
614
615 return topic;
616}
617
619{
621}
622
624{
626}
627
628const char *stasis_topic_name(const struct stasis_topic *topic)
629{
630 if (!topic) {
631 return NULL;
632 }
633 return topic->name;
634}
635
636const char *stasis_topic_detail(const struct stasis_topic *topic)
637{
638 if (!topic) {
639 return NULL;
640 }
641 return topic->detail;
642}
643
644size_t stasis_topic_subscribers(const struct stasis_topic *topic)
645{
646 return AST_VECTOR_SIZE(&topic->subscribers);
647}
648
649#ifdef AST_DEVMODE
650struct stasis_subscription_statistics {
651 /*! \brief The filename where the subscription originates */
652 const char *file;
653 /*! \brief The function where the subscription originates */
654 const char *func;
655 /*! \brief Names of the topics we are subscribed to */
656 struct ao2_container *topics;
657 /*! \brief The message type that currently took the longest to process */
658 struct stasis_message_type *highest_time_message_type;
659 /*! \brief Highest time spent invoking a message */
660 long highest_time_invoked;
661 /*! \brief Lowest time spent invoking a message */
662 long lowest_time_invoked;
663 /*! \brief The number of messages that were filtered out */
664 int messages_dropped;
665 /*! \brief The number of messages that passed filtering */
666 int messages_passed;
667 /*! \brief Using a mailbox to queue messages */
668 int uses_mailbox;
669 /*! \brief Using stasis threadpool for handling messages */
670 int uses_threadpool;
671 /*! \brief The line number where the subscription originates */
672 int lineno;
673 /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
674 struct stasis_subscription *sub;
675 /*! \brief Unique ID of the subscription */
676 char uniqueid[0];
677};
678#endif
679
680/*! \internal */
682 /*! Unique ID for this subscription */
683 char *uniqueid;
684 /*! Topic subscribed to. */
686 /*! Mailbox for processing incoming messages. */
688 /*! Callback function for incoming message processing. */
690 /*! Data pointer to be handed to the callback. */
691 void *data;
692
693 /*! Condition for joining with subscription. */
695 /*! Flag set when final message for sub has been received.
696 * Be sure join_lock is held before reading/setting. */
698 /*! Flag set when final message for sub has been processed.
699 * Be sure join_lock is held before reading/setting. */
701
702 /*! The message types this subscription is accepting */
704 /*! The message formatters this subscription is accepting */
706 /*! The message filter currently in use */
708
709#ifdef AST_DEVMODE
710 /*! Statistics information */
711 struct stasis_subscription_statistics *statistics;
712#endif
713};
714
715static void subscription_dtor(void *obj)
716{
717 struct stasis_subscription *sub = obj;
718#ifdef AST_DEVMODE
719 struct ao2_container *subscription_stats;
720#endif
721
722 /* Subscriptions need to be manually unsubscribed before destruction
723 * b/c there's a cyclic reference between topics and subscriptions */
725 /* If there are any messages in flight to this subscription; that would
726 * be bad. */
728
729 ast_free(sub->uniqueid);
730 ao2_cleanup(sub->topic);
731 sub->topic = NULL;
733 sub->mailbox = NULL;
734 ast_cond_destroy(&sub->join_cond);
735
736 AST_VECTOR_FREE(&sub->accepted_message_types);
737
738#ifdef AST_DEVMODE
739 if (sub->statistics) {
740 subscription_stats = ao2_global_obj_ref(subscription_statistics);
741 if (subscription_stats) {
742 ao2_unlink(subscription_stats, sub->statistics);
743 ao2_ref(subscription_stats, -1);
744 }
745 ao2_ref(sub->statistics, -1);
746 }
747#endif
748}
749
750/*!
751 * \brief Invoke the subscription's callback.
752 * \param sub Subscription to invoke.
753 * \param message Message to send.
754 */
756 struct stasis_message *message)
757{
758 unsigned int final = stasis_subscription_final_message(sub, message);
760#ifdef AST_DEVMODE
761 struct timeval start;
762 long elapsed;
763
764 start = ast_tvnow();
765#endif
766
767 /* Notify that the final message has been received */
768 if (final) {
769 ao2_lock(sub);
770 sub->final_message_rxed = 1;
771 ast_cond_signal(&sub->join_cond);
773 }
774
775 /*
776 * If filtering is turned on and this is a 'final' message, we only invoke the callback
777 * if the subscriber accepts subscription_change message types.
778 */
779 if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
780 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
781 /* Since sub is mostly immutable, no need to lock sub */
782 sub->callback(sub->data, sub, message);
783 }
784
785 /* Notify that the final message has been processed */
786 if (final) {
787 ao2_lock(sub);
788 sub->final_message_processed = 1;
789 ast_cond_signal(&sub->join_cond);
791 }
792
793#ifdef AST_DEVMODE
794 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
795 if (elapsed > sub->statistics->highest_time_invoked) {
796 sub->statistics->highest_time_invoked = elapsed;
797 ao2_lock(sub->statistics);
798 sub->statistics->highest_time_message_type = stasis_message_type(message);
799 ao2_unlock(sub->statistics);
800 }
801 if (elapsed < sub->statistics->lowest_time_invoked) {
802 sub->statistics->lowest_time_invoked = elapsed;
803 }
804#endif
805}
806
807static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
808static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
809
811{
812}
813
814#ifdef AST_DEVMODE
815static void subscription_statistics_destroy(void *obj)
816{
817 struct stasis_subscription_statistics *statistics = obj;
818
819 ao2_cleanup(statistics->topics);
820}
821
822static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
823 int needs_mailbox, int use_thread_pool, const char *file, int lineno,
824 const char *func)
825{
826 struct stasis_subscription_statistics *statistics;
827 RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
828
829 if (!subscription_stats) {
830 return NULL;
831 }
832
833 statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
834 if (!statistics) {
835 return NULL;
836 }
837
839 if (!statistics->topics) {
840 ao2_ref(statistics, -1);
841 return NULL;
842 }
843
844 statistics->file = file;
845 statistics->lineno = lineno;
846 statistics->func = func;
847 statistics->uses_mailbox = needs_mailbox;
848 statistics->uses_threadpool = use_thread_pool;
849 strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
850 statistics->sub = sub;
851 ao2_link(subscription_stats, statistics);
852
853 return statistics;
854}
855#endif
856
858 struct stasis_topic *topic,
860 void *data,
861 int needs_mailbox,
862 int use_thread_pool,
863 const char *file,
864 int lineno,
865 const char *func)
866{
867 struct stasis_subscription *sub;
868 int ret;
869
870 if (!topic) {
871 return NULL;
872 }
873
874 /* The ao2 lock is used for join_cond. */
876 if (!sub) {
877 return NULL;
878 }
879
880#ifdef AST_DEVMODE
882 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
883 if (ret < 0 || !sub->statistics) {
884 ao2_ref(sub, -1);
885 return NULL;
886 }
887#else
889 if (ret < 0) {
890 ao2_ref(sub, -1);
891 return NULL;
892 }
893#endif
894
895 if (needs_mailbox) {
896 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
897
898 /* Create name with seq number appended. */
899 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
900 use_thread_pool ? 'p' : 'm',
902
903 /*
904 * With a small number of subscribers, a thread-per-sub is
905 * acceptable. For a large number of subscribers, a thread
906 * pool should be used.
907 */
908 if (use_thread_pool) {
909 sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
910 } else {
911 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
912 }
913 if (!sub->mailbox) {
914 ao2_ref(sub, -1);
915
916 return NULL;
917 }
919 /* Taskprocessor has a reference */
920 ao2_ref(sub, +1);
921 }
922
923 ao2_ref(topic, +1);
924 sub->topic = topic;
925 sub->callback = callback;
926 sub->data = data;
927 ast_cond_init(&sub->join_cond, NULL);
929 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
930 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
931
932 if (topic_add_subscription(topic, sub) != 0) {
933 ao2_ref(sub, -1);
934 ao2_ref(topic, -1);
935
936 return NULL;
937 }
939
940 return sub;
941}
942
944 struct stasis_topic *topic,
946 void *data,
947 const char *file,
948 int lineno,
949 const char *func)
950{
951 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
952}
953
955 struct stasis_topic *topic,
957 void *data,
958 const char *file,
959 int lineno,
960 const char *func)
961{
962 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
963}
964
965static int sub_cleanup(void *data)
966{
967 struct stasis_subscription *sub = data;
969 return 0;
970}
971
973{
974 /* The subscription may be the last ref to this topic. Hold
975 * the topic ref open until after the unlock. */
976 struct stasis_topic *topic;
977
978 if (!sub) {
979 return NULL;
980 }
981
982 topic = ao2_bump(sub->topic);
983
984 /* We have to remove the subscription first, to ensure the unsubscribe
985 * is the final message */
986 if (topic_remove_subscription(sub->topic, sub) != 0) {
988 "Internal error: subscription has invalid topic\n");
989 ao2_cleanup(topic);
990
991 return NULL;
992 }
993
994 /* Now let everyone know about the unsubscribe */
996
997 /* When all that's done, remove the ref the mailbox has on the sub */
998 if (sub->mailbox) {
999 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1000 /* Nothing we can do here, the conditional is just to keep
1001 * the compiler happy that we're not ignoring the result. */
1002 }
1003 }
1004
1005 /* Unsubscribing unrefs the subscription */
1007 ao2_cleanup(topic);
1008
1009 return NULL;
1010}
1011
1013 long low_water, long high_water)
1014{
1015 int res = -1;
1016
1017 if (subscription) {
1018 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1019 low_water, high_water);
1020 }
1021 return res;
1022}
1023
1025 const struct stasis_message_type *type)
1026{
1027 if (!subscription) {
1028 return -1;
1029 }
1030
1031 ast_assert(type != NULL);
1033
1035 /* Filtering is unreliable as this message type is not yet initialized
1036 * so force all messages through.
1037 */
1039 return 0;
1040 }
1041
1042 ao2_lock(subscription->topic);
1044 /* We do this for the same reason as above. The subscription can still operate, so allow
1045 * it to do so by forcing all messages through.
1046 */
1048 }
1049 ao2_unlock(subscription->topic);
1050
1051 return 0;
1052}
1053
1055 const struct stasis_message_type *type)
1056{
1057 if (!subscription) {
1058 return -1;
1059 }
1060
1061 ast_assert(type != NULL);
1063
1065 return 0;
1066 }
1067
1068 ao2_lock(subscription->topic);
1070 /* The memory is already allocated so this can't fail */
1072 }
1073 ao2_unlock(subscription->topic);
1074
1075 return 0;
1076}
1077
1080{
1081 if (!subscription) {
1082 return -1;
1083 }
1084
1085 ao2_lock(subscription->topic);
1086 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1087 subscription->filter = filter;
1088 }
1089 ao2_unlock(subscription->topic);
1090
1091 return 0;
1092}
1093
1096{
1097 ast_assert(subscription != NULL);
1098
1099 ao2_lock(subscription->topic);
1100 subscription->accepted_formatters = formatters;
1101 ao2_unlock(subscription->topic);
1102
1103 return;
1104}
1105
1107{
1108 if (subscription) {
1109 ao2_lock(subscription);
1110 /* Wait until the processed flag has been set */
1111 while (!subscription->final_message_processed) {
1112 ast_cond_wait(&subscription->join_cond,
1113 ao2_object_get_lockaddr(subscription));
1114 }
1115 ao2_unlock(subscription);
1116 }
1117}
1118
1120{
1121 if (subscription) {
1122 int ret;
1123
1124 ao2_lock(subscription);
1125 ret = subscription->final_message_rxed;
1126 ao2_unlock(subscription);
1127
1128 return ret;
1129 }
1130
1131 /* Null subscription is about as done as you can get */
1132 return 1;
1133}
1134
1136 struct stasis_subscription *subscription)
1137{
1138 if (!subscription) {
1139 return NULL;
1140 }
1141
1142 /* Bump refcount to hold it past the unsubscribe */
1143 ao2_ref(subscription, +1);
1144 stasis_unsubscribe(subscription);
1145 stasis_subscription_join(subscription);
1146 /* Now decrement the refcount back */
1147 ao2_cleanup(subscription);
1148 return NULL;
1149}
1150
1152{
1153 if (sub) {
1154 size_t i;
1155 struct stasis_topic *topic = sub->topic;
1156
1157 ao2_lock(topic);
1158 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1159 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1160 ao2_unlock(topic);
1161 return 1;
1162 }
1163 }
1164 ao2_unlock(topic);
1165 }
1166
1167 return 0;
1168}
1169
1171{
1172 return sub->uniqueid;
1173}
1174
1176{
1177 struct stasis_subscription_change *change;
1178
1180 return 0;
1181 }
1182
1183 change = stasis_message_data(msg);
1184 if (strcmp("Unsubscribe", change->description)) {
1185 return 0;
1186 }
1187
1188 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1189 return 0;
1190 }
1191
1192 return 1;
1193}
1194
1195/*!
1196 * \brief Add a subscriber to a topic.
1197 * \param topic Topic
1198 * \param sub Subscriber
1199 * \return 0 on success
1200 * \return Non-zero on error
1201 */
1203{
1204 size_t idx;
1205
1206 ao2_lock(topic);
1207 /* The reference from the topic to the subscription is shared with
1208 * the owner of the subscription, which will explicitly unsubscribe
1209 * to release it.
1210 *
1211 * If we bumped the refcount here, the owner would have to unsubscribe
1212 * and cleanup, which is a bit awkward. */
1214
1215 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1218 }
1219
1220#ifdef AST_DEVMODE
1222 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1223#endif
1224
1226
1227 return 0;
1228}
1229
1231{
1232 size_t idx;
1233 int res;
1234
1235 ao2_lock(topic);
1236 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1239 }
1242
1243#ifdef AST_DEVMODE
1244 if (!res) {
1246 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1247 }
1248#endif
1249
1251
1252 return res;
1253}
1254
1255/*!
1256 * \internal \brief Dispatch a message to a subscriber asynchronously
1257 * \param local \ref ast_taskprocessor_local object
1258 * \return 0
1259 */
1261{
1262 struct stasis_subscription *sub = local->local_data;
1263 struct stasis_message *message = local->data;
1264
1267
1268 return 0;
1269}
1270
1271/*!
1272 * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1273 * a published message to a subscriber
1274 */
1280};
1281
1282/*!
1283 * \internal \brief Dispatch a message to a subscriber synchronously
1284 * \param local \ref ast_taskprocessor_local object
1285 * \return 0
1286 */
1288{
1289 struct stasis_subscription *sub = local->local_data;
1290 struct sync_task_data *std = local->data;
1291 struct stasis_message *message = std->task_data;
1292
1295
1296 ast_mutex_lock(&std->lock);
1297 std->complete = 1;
1298 ast_cond_signal(&std->cond);
1299 ast_mutex_unlock(&std->lock);
1300
1301 return 0;
1302}
1303
1304/*!
1305 * \internal \brief Dispatch a message to a subscriber
1306 * \param sub The subscriber to dispatch to
1307 * \param message The message to send
1308 * \param synchronous If non-zero, synchronize on the subscriber receiving
1309 * the message
1310 * \retval 0 if message was not dispatched
1311 * \retval 1 if message was dispatched
1312 */
1313static unsigned int dispatch_message(struct stasis_subscription *sub,
1314 struct stasis_message *message,
1315 int synchronous)
1316{
1318
1319 /*
1320 * The 'do while' gives us an easy way to skip remaining logic once
1321 * we determine the message should be accepted.
1322 * The code looks more verbose than it needs to be but it optimizes
1323 * down very nicely. It's just easier to understand and debug this way.
1324 */
1325 do {
1326 struct stasis_message_type *message_type = stasis_message_type(message);
1327 int type_id = stasis_message_type_id(message_type);
1328 int type_filter_specified = 0;
1329 int formatter_filter_specified = 0;
1330 int type_filter_passed = 0;
1331 int formatter_filter_passed = 0;
1332
1333 /* We always accept final messages so only run the filter logic if not final */
1334 if (is_final) {
1335 break;
1336 }
1337
1338 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1339 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1340
1341 /* Accept if no filters of either type were specified */
1342 if (!type_filter_specified && !formatter_filter_specified) {
1343 break;
1344 }
1345
1346 type_filter_passed = type_filter_specified
1347 && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1348 && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1349
1350 /*
1351 * Since the type and formatter filters are OR'd, we can skip
1352 * the formatter check if the type check passes.
1353 */
1354 if (type_filter_passed) {
1355 break;
1356 }
1357
1358 formatter_filter_passed = formatter_filter_specified
1359 && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1360
1361 if (formatter_filter_passed) {
1362 break;
1363 }
1364
1365#ifdef AST_DEVMODE
1366 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1367#endif
1368
1369 return 0;
1370
1371 } while (0);
1372
1373#ifdef AST_DEVMODE
1374 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1375#endif
1376
1377 if (!sub->mailbox) {
1378 /* Dispatch directly */
1380 return 1;
1381 }
1382
1383 /* Bump the message for the taskprocessor push. This will get de-ref'd
1384 * by the task processor callback.
1385 */
1387 if (!synchronous) {
1389 /* Push failed; ugh. */
1390 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1392 return 0;
1393 }
1394 } else {
1395 struct sync_task_data std;
1396
1397 ast_mutex_init(&std.lock);
1398 ast_cond_init(&std.cond, NULL);
1399 std.complete = 0;
1400 std.task_data = message;
1401
1403 /* Push failed; ugh. */
1404 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1406 ast_mutex_destroy(&std.lock);
1407 ast_cond_destroy(&std.cond);
1408 return 0;
1409 }
1410
1411 ast_mutex_lock(&std.lock);
1412 while (!std.complete) {
1413 ast_cond_wait(&std.cond, &std.lock);
1414 }
1415 ast_mutex_unlock(&std.lock);
1416
1417 ast_mutex_destroy(&std.lock);
1418 ast_cond_destroy(&std.cond);
1419 }
1420
1421 return 1;
1422}
1423
1424/*!
1425 * \internal \brief Publish a message to a topic's subscribers
1426 * \brief topic The topic to publish to
1427 * \brief message The message to publish
1428 * \brief sync_sub An optional subscriber of the topic to publish synchronously
1429 * to
1430 */
1431static void publish_msg(struct stasis_topic *topic,
1432 struct stasis_message *message, struct stasis_subscription *sync_sub)
1433{
1434 size_t i;
1435#ifdef AST_DEVMODE
1436 unsigned int dispatched = 0;
1438 struct stasis_message_type_statistics *statistics;
1439 struct timeval start;
1440 long elapsed;
1441#endif
1442
1443 ast_assert(topic != NULL);
1445
1446#ifdef AST_DEVMODE
1447 ast_mutex_lock(&message_type_statistics_lock);
1448 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1449 struct stasis_message_type_statistics new_statistics = {
1450 .published = 0,
1451 };
1452 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1453 ast_mutex_unlock(&message_type_statistics_lock);
1454 return;
1455 }
1456 }
1457 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1458 statistics->message_type = stasis_message_type(message);
1459 ast_mutex_unlock(&message_type_statistics_lock);
1460
1461 ast_atomic_fetchadd_int(&statistics->published, +1);
1462#endif
1463
1464 /* If there are no subscribers don't bother */
1465 if (!stasis_topic_subscribers(topic)) {
1466#ifdef AST_DEVMODE
1467 ast_atomic_fetchadd_int(&statistics->unused, +1);
1468 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1469#endif
1470 return;
1471 }
1472
1473 /*
1474 * The topic may be unref'ed by the subscription invocation.
1475 * Make sure we hold onto a reference while dispatching.
1476 */
1477 ao2_ref(topic, +1);
1478#ifdef AST_DEVMODE
1479 start = ast_tvnow();
1480#endif
1481 ao2_lock(topic);
1482 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1484
1485 ast_assert(sub != NULL);
1486#ifdef AST_DEVMODE
1487 dispatched +=
1488#endif
1489 dispatch_message(sub, message, (sub == sync_sub));
1490 }
1492
1493#ifdef AST_DEVMODE
1494 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1495 if (elapsed > topic->statistics->highest_time_dispatched) {
1496 topic->statistics->highest_time_dispatched = elapsed;
1497 }
1498 if (elapsed < topic->statistics->lowest_time_dispatched) {
1499 topic->statistics->lowest_time_dispatched = elapsed;
1500 }
1501 if (dispatched) {
1502 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1503 } else {
1504 ast_atomic_fetchadd_int(&statistics->unused, +1);
1505 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1506 }
1507#endif
1508
1509 ao2_ref(topic, -1);
1510}
1511
1513{
1515}
1516
1518{
1519 ast_assert(sub != NULL);
1520
1521 publish_msg(sub->topic, message, sub);
1522}
1523
1524/*!
1525 * \brief Forwarding information
1526 *
1527 * Any message posted to \a from_topic is forwarded to \a to_topic.
1528 *
1529 * In cases where both the \a from_topic and \a to_topic need to be locked,
1530 * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1531 */
1533 /*! Originating topic */
1535 /*! Destination topic */
1537};
1538
1539static void forward_dtor(void *obj)
1540{
1541 struct stasis_forward *forward = obj;
1542
1543 ao2_cleanup(forward->from_topic);
1544 forward->from_topic = NULL;
1545 ao2_cleanup(forward->to_topic);
1546 forward->to_topic = NULL;
1547}
1548
1550{
1551 int idx;
1552 struct stasis_topic *from;
1553 struct stasis_topic *to;
1554
1555 if (!forward) {
1556 return NULL;
1557 }
1558
1559 from = forward->from_topic;
1560 to = forward->to_topic;
1561
1562 if (from && to) {
1563 topic_lock_both(to, from);
1566
1567 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1569 }
1570 ao2_unlock(from);
1571 ao2_unlock(to);
1572 }
1573
1574 ao2_cleanup(forward);
1575
1576 return NULL;
1577}
1578
1580 struct stasis_topic *to_topic)
1581{
1582 int res;
1583 size_t idx;
1584 struct stasis_forward *forward;
1585
1586 if (!from_topic || !to_topic) {
1587 return NULL;
1588 }
1589
1590 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1591 if (!forward) {
1592 return NULL;
1593 }
1594
1595 /* Forwards to ourselves are implicit. */
1596 if (to_topic == from_topic) {
1597 return forward;
1598 }
1599
1600 forward->from_topic = ao2_bump(from_topic);
1601 forward->to_topic = ao2_bump(to_topic);
1602
1605 if (res != 0) {
1608 ao2_ref(forward, -1);
1609 return NULL;
1610 }
1611
1612 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1614 }
1617
1618 return forward;
1619}
1620
1621static void subscription_change_dtor(void *obj)
1622{
1623 struct stasis_subscription_change *change = obj;
1624
1625 ao2_cleanup(change->topic);
1626}
1627
1629{
1630 size_t description_len = strlen(description) + 1;
1631 size_t uniqueid_len = strlen(uniqueid) + 1;
1632 struct stasis_subscription_change *change;
1633
1634 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1636 if (!change) {
1637 return NULL;
1638 }
1639
1640 strcpy(change->description, description); /* SAFE */
1641 change->uniqueid = change->description + description_len;
1642 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1643 ao2_ref(topic, +1);
1644 change->topic = topic;
1645
1646 return change;
1647}
1648
1650{
1651 struct stasis_subscription_change *change;
1652 struct stasis_message *msg;
1653
1654 /* This assumes that we have already unsubscribed */
1656
1658 return;
1659 }
1660
1661 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1662 if (!change) {
1663 return;
1664 }
1665
1667 if (!msg) {
1668 ao2_cleanup(change);
1669 return;
1670 }
1671
1672 stasis_publish(topic, msg);
1673 ao2_cleanup(msg);
1674 ao2_cleanup(change);
1675}
1676
1678 struct stasis_subscription *sub)
1679{
1680 struct stasis_subscription_change *change;
1681 struct stasis_message *msg;
1682
1683 /* This assumes that we have already unsubscribed */
1685
1687 return;
1688 }
1689
1690 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1691 if (!change) {
1692 return;
1693 }
1694
1696 if (!msg) {
1697 ao2_cleanup(change);
1698 return;
1699 }
1700
1701 stasis_publish(topic, msg);
1702
1703 /* Now we have to dispatch to the subscription itself */
1704 dispatch_message(sub, msg, 0);
1705
1706 ao2_cleanup(msg);
1707 ao2_cleanup(change);
1708}
1709
1713 char name[0];
1714};
1715
1716static void topic_pool_entry_dtor(void *obj)
1717{
1718 struct topic_pool_entry *entry = obj;
1719
1720 entry->forward = stasis_forward_cancel(entry->forward);
1721 ao2_cleanup(entry->topic);
1722 entry->topic = NULL;
1723}
1724
1725static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1726{
1728
1729 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1731 if (!topic_pool_entry) {
1732 return NULL;
1733 }
1734
1735 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1736
1737 return topic_pool_entry;
1738}
1739
1743};
1744
1745static void topic_pool_dtor(void *obj)
1746{
1747 struct stasis_topic_pool *pool = obj;
1748
1749#ifdef AO2_DEBUG
1750 {
1751 char *container_name =
1752 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1753 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1754 ao2_container_unregister(container_name);
1755 }
1756#endif
1757
1759 pool->pool_container = NULL;
1760 ao2_cleanup(pool->pool_topic);
1761 pool->pool_topic = NULL;
1762}
1763
1764static int topic_pool_entry_hash(const void *obj, const int flags)
1765{
1766 const struct topic_pool_entry *object;
1767 const char *key;
1768
1769 switch (flags & OBJ_SEARCH_MASK) {
1770 case OBJ_SEARCH_KEY:
1771 key = obj;
1772 break;
1773 case OBJ_SEARCH_OBJECT:
1774 object = obj;
1775 key = object->name;
1776 break;
1777 default:
1778 /* Hash can only work on something with a full key. */
1779 ast_assert(0);
1780 return 0;
1781 }
1782 return ast_str_case_hash(key);
1783}
1784
1785static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1786{
1787 const struct topic_pool_entry *object_left = obj;
1788 const struct topic_pool_entry *object_right = arg;
1789 const char *right_key = arg;
1790 int cmp;
1791
1792 switch (flags & OBJ_SEARCH_MASK) {
1793 case OBJ_SEARCH_OBJECT:
1794 right_key = object_right->name;
1795 /* Fall through */
1796 case OBJ_SEARCH_KEY:
1797 cmp = strcasecmp(object_left->name, right_key);
1798 break;
1800 /* Not supported by container */
1801 ast_assert(0);
1802 cmp = -1;
1803 break;
1804 default:
1805 /*
1806 * What arg points to is specific to this traversal callback
1807 * and has no special meaning to astobj2.
1808 */
1809 cmp = 0;
1810 break;
1811 }
1812 if (cmp) {
1813 return 0;
1814 }
1815 /*
1816 * At this point the traversal callback is identical to a sorted
1817 * container.
1818 */
1819 return CMP_MATCH;
1820}
1821
1822#ifdef AO2_DEBUG
1823static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1824{
1825 struct topic_pool_entry *entry = v_obj;
1826
1827 if (!entry) {
1828 return;
1829 }
1830 prnt(where, "%s", stasis_topic_name(entry->topic));
1831}
1832#endif
1833
1835{
1836 struct stasis_topic_pool *pool;
1837
1839 if (!pool) {
1840 return NULL;
1841 }
1842
1845 if (!pool->pool_container) {
1846 ao2_cleanup(pool);
1847 return NULL;
1848 }
1849
1850#ifdef AO2_DEBUG
1851 {
1852 char *container_name =
1853 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1854 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1855 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1856 }
1857#endif
1858
1859 ao2_ref(pooled_topic, +1);
1860 pool->pool_topic = pooled_topic;
1861
1862 return pool;
1863}
1864
1865void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1866{
1867 /*
1868 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1869 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1870 * name and search only on <topic_name>.
1871 */
1872 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1873 int pool_topic_name_len = strlen(pool_topic_name);
1874 const char *search_topic_name;
1875
1876 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1877 search_topic_name = topic_name + pool_topic_name_len + 1;
1878 } else {
1879 search_topic_name = topic_name;
1880 }
1881
1882 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1883}
1884
1885struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1886{
1888 SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1889 char *new_topic_name;
1890 int ret;
1891
1893 if (topic_pool_entry) {
1894 return topic_pool_entry->topic;
1895 }
1896
1898 if (!topic_pool_entry) {
1899 return NULL;
1900 }
1901
1902 /* To provide further detail and to ensure that the topic is unique within the scope of the
1903 * system we prefix it with the pooling topic name, which should itself already be unique.
1904 */
1905 ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1906 if (ret < 0) {
1907 return NULL;
1908 }
1909
1910 topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1911 ast_free(new_topic_name);
1912 if (!topic_pool_entry->topic) {
1913 return NULL;
1914 }
1915
1917 if (!topic_pool_entry->forward) {
1918 return NULL;
1919 }
1920
1922 return NULL;
1923 }
1924
1925 return topic_pool_entry->topic;
1926}
1927
1928int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1929{
1931
1933 if (!topic_pool_entry) {
1934 return 0;
1935 }
1936
1938 return 1;
1939}
1940
1942{
1943#ifdef AST_DEVMODE
1945 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1946 }
1947#endif
1948}
1949
1950/*! \brief A multi object blob data structure to carry user event stasis messages */
1952 struct ast_json *blob; /*< A blob of JSON data */
1953 AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1954};
1955
1956/*!
1957 * \internal
1958 * \brief Destructor for \ref ast_multi_object_blob objects
1959 */
1960static void multi_object_blob_dtor(void *obj)
1961{
1962 struct ast_multi_object_blob *multi = obj;
1963 int type;
1964 int i;
1965
1966 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1967 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1969 }
1970 AST_VECTOR_FREE(&multi->snapshots[type]);
1971 }
1972 ast_json_unref(multi->blob);
1973}
1974
1975/*! \brief Create a stasis user event multi object blob */
1977{
1978 int type;
1979 struct ast_multi_object_blob *multi;
1980
1981 ast_assert(blob != NULL);
1982
1983 multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1984 if (!multi) {
1985 return NULL;
1986 }
1987
1988 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1989 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1990 ao2_ref(multi, -1);
1991
1992 return NULL;
1993 }
1994 }
1995
1996 multi->blob = ast_json_ref(blob);
1997
1998 return multi;
1999}
2000
2001/*! \brief Add an object (snapshot) to the blob */
2004{
2005 if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2006 ao2_cleanup(object);
2007 }
2008}
2009
2010/*! \brief Publish single channel user event (for app_userevent compatibility) */
2012 struct stasis_message_type *type, struct ast_json *blob)
2013{
2014 struct stasis_message *message;
2015 struct ast_channel_snapshot *channel_snapshot;
2016 struct ast_multi_object_blob *multi;
2017
2018 if (!type) {
2019 return;
2020 }
2021
2023 if (!multi) {
2024 return;
2025 }
2026
2027 channel_snapshot = ast_channel_snapshot_create(chan);
2028 if (!channel_snapshot) {
2029 ao2_ref(multi, -1);
2030 return;
2031 }
2032
2033 /* this call steals the channel_snapshot reference */
2034 ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2035
2037 ao2_ref(multi, -1);
2038 if (message) {
2039 /* app_userevent still publishes to channel */
2041 ao2_ref(message, -1);
2042 }
2043}
2044
2045/*! \internal \brief convert multi object blob to ari json */
2047 struct stasis_message *message,
2048 const struct stasis_message_sanitizer *sanitize)
2049{
2050 struct ast_json *out;
2052 struct ast_json *blob = multi->blob;
2053 const struct timeval *tv = stasis_message_timestamp(message);
2055 int i;
2056
2058 if (!out) {
2059 return NULL;
2060 }
2061
2062 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2063 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2064 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2065 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2066
2067 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2068 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2069 struct ast_json *json_object = NULL;
2070 char *name = NULL;
2071 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2072
2073 switch (type) {
2075 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2076 name = "channel";
2077 break;
2078 case STASIS_UMOS_BRIDGE:
2079 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2080 name = "bridge";
2081 break;
2083 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2084 name = "endpoint";
2085 break;
2086 }
2087 if (json_object) {
2088 ast_json_object_set(out, name, json_object);
2089 }
2090 }
2091 }
2092
2093 return out;
2094}
2095
2096/*! \internal \brief convert multi object blob to ami string */
2097static struct ast_str *multi_object_blob_to_ami(void *obj)
2098{
2099 struct ast_str *ami_str=ast_str_create(1024);
2100 struct ast_str *ami_snapshot;
2101 const struct ast_multi_object_blob *multi = obj;
2103 int i;
2104
2105 if (!ami_str) {
2106 return NULL;
2107 }
2108 if (!multi) {
2109 ast_free(ami_str);
2110 return NULL;
2111 }
2112
2113 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2114 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2115 char *name = NULL;
2116 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2117 ami_snapshot = NULL;
2118
2119 if (i > 0) {
2120 ast_asprintf(&name, "%d", i + 1);
2121 }
2122
2123 switch (type) {
2125 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2126 break;
2127
2128 case STASIS_UMOS_BRIDGE:
2129 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2130 break;
2131
2133 /* currently not sending endpoint snapshots to AMI */
2134 break;
2135 }
2136 if (ami_snapshot) {
2137 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2138 ast_free(ami_snapshot);
2139 }
2140 ast_free(name);
2141 }
2142 }
2143
2144 return ami_str;
2145}
2146
2147/*! \internal \brief Callback to pass only user defined parameters from blob */
2148static int userevent_exclusion_cb(const char *key)
2149{
2150 if (!strcmp("eventname", key)) {
2151 return 1;
2152 }
2153 return 0;
2154}
2155
2157 struct stasis_message *message)
2158{
2159 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2160 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2162 const char *eventname;
2163
2164 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2166 object_string = multi_object_blob_to_ami(multi);
2167 if (!object_string || !body) {
2168 return NULL;
2169 }
2170
2172 "%s"
2173 "UserEvent: %s\r\n"
2174 "%s",
2175 ast_str_buffer(object_string),
2176 eventname,
2177 ast_str_buffer(body));
2178}
2179
2180/*! \brief A structure to hold global configuration-related options */
2182 /*! The list of message types to decline */
2184};
2185
2186/*! \brief Threadpool configuration options */
2188 /*! Initial size of the thread pool */
2190 /*! Time, in seconds, before we expire a thread */
2192 /*! Maximum number of thread to allow */
2194};
2195
2197 /*! Thread pool configuration options */
2199 /*! Declined message types */
2201};
2202
2204 .type = ACO_GLOBAL,
2205 .name = "threadpool",
2206 .item_offset = offsetof(struct stasis_config, threadpool_options),
2207 .category = "threadpool",
2208 .category_match = ACO_WHITELIST_EXACT,
2209};
2210
2212
2213/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2214static struct aco_type declined_option = {
2215 .type = ACO_GLOBAL,
2216 .name = "declined_message_types",
2217 .item_offset = offsetof(struct stasis_config, declined_message_types),
2218 .category_match = ACO_WHITELIST_EXACT,
2219 .category = "declined_message_types",
2220};
2221
2223
2225 .filename = "stasis.conf",
2227};
2228
2229/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2231
2232static void *stasis_config_alloc(void);
2233
2234/*! \brief Register information about the configs being processed by this module */
2236 .files = ACO_FILES(&stasis_conf),
2237);
2238
2240{
2241 struct stasis_declined_config *declined = obj;
2242
2243 ao2_cleanup(declined->declined);
2244}
2245
2246static void stasis_config_destructor(void *obj)
2247{
2248 struct stasis_config *cfg = obj;
2249
2252}
2253
2254static void *stasis_config_alloc(void)
2255{
2256 struct stasis_config *cfg;
2257
2258 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2259 return NULL;
2260 }
2261
2262 cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2263 if (!cfg->threadpool_options) {
2264 ao2_ref(cfg, -1);
2265 return NULL;
2266 }
2267
2270 if (!cfg->declined_message_types) {
2271 ao2_ref(cfg, -1);
2272 return NULL;
2273 }
2274
2276 if (!cfg->declined_message_types->declined) {
2277 ao2_ref(cfg, -1);
2278 return NULL;
2279 }
2280
2281 return cfg;
2282}
2283
2285{
2287 char *name_in_declined;
2288 int res;
2289
2290 if (!cfg || !cfg->declined_message_types) {
2291 ao2_cleanup(cfg);
2292 return 0;
2293 }
2294
2295 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2296 res = name_in_declined ? 1 : 0;
2297 ao2_cleanup(name_in_declined);
2298 ao2_ref(cfg, -1);
2299 if (res) {
2300 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2301 }
2302 return res;
2303}
2304
2305static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2306{
2307 struct stasis_declined_config *declined = obj;
2308
2309 if (ast_strlen_zero(var->value)) {
2310 return 0;
2311 }
2312
2313 if (ast_str_container_add(declined->declined, var->value)) {
2314 return -1;
2315 }
2316
2317 return 0;
2318}
2319
2320/*!
2321 * @{ \brief Define multi user event message type(s).
2322 */
2323
2325 .to_json = multi_user_event_to_json,
2327 );
2328
2329/*! @} */
2330
2331/*!
2332 * \internal
2333 * \brief CLI command implementation for 'stasis show topics'
2334 */
2335static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2336{
2337 struct ao2_iterator iter;
2338 struct topic_proxy *topic;
2339 struct ao2_container *tmp_container;
2340 int count = 0;
2341#define FMT_HEADERS "%-64s %-64s\n"
2342#define FMT_FIELDS "%-64s %-64s\n"
2343
2344 switch (cmd) {
2345 case CLI_INIT:
2346 e->command = "stasis show topics";
2347 e->usage =
2348 "Usage: stasis show topics\n"
2349 " Shows a list of topics\n";
2350 return NULL;
2351 case CLI_GENERATE:
2352 return NULL;
2353 }
2354
2355 if (a->argc != e->args) {
2356 return CLI_SHOWUSAGE;
2357 }
2358
2359 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2360
2362 topic_proxy_sort_fn, NULL);
2363
2364 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2365 ao2_cleanup(tmp_container);
2366
2367 return NULL;
2368 }
2369
2370 /* getting all topic in order */
2371 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2372 while ((topic = ao2_iterator_next(&iter))) {
2373 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2374 ao2_ref(topic, -1);
2375 ++count;
2376 }
2377 ao2_iterator_destroy(&iter);
2378 ao2_cleanup(tmp_container);
2379
2380 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2381
2382#undef FMT_HEADERS
2383#undef FMT_FIELDS
2384
2385 return CLI_SUCCESS;
2386}
2387
2388/*!
2389 * \internal
2390 * \brief CLI tab completion for topic names
2391 */
2392static char *topic_complete_name(const char *word)
2393{
2394 struct topic_proxy *topic;
2395 struct ao2_iterator it;
2396 int wordlen = strlen(word);
2397 int ret;
2398
2400 while ((topic = ao2_iterator_next(&it))) {
2401 if (!strncasecmp(word, topic->name, wordlen)) {
2402 ret = ast_cli_completion_add(ast_strdup(topic->name));
2403 if (ret) {
2404 ao2_ref(topic, -1);
2405 break;
2406 }
2407 }
2408 ao2_ref(topic, -1);
2409 }
2411 return NULL;
2412}
2413
2414/*!
2415 * \internal
2416 * \brief CLI command implementation for 'stasis show topic'
2417 */
2418static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2419{
2420 struct stasis_topic *topic;
2421 char print_time[32];
2422 int i;
2423
2424 switch (cmd) {
2425 case CLI_INIT:
2426 e->command = "stasis show topic";
2427 e->usage =
2428 "Usage: stasis show topic <name>\n"
2429 " Show stasis topic detail info.\n";
2430 return NULL;
2431 case CLI_GENERATE:
2432 if (a->pos == 3) {
2433 return topic_complete_name(a->word);
2434 } else {
2435 return NULL;
2436 }
2437 }
2438
2439 if (a->argc != 4) {
2440 return CLI_SHOWUSAGE;
2441 }
2442
2443 topic = stasis_topic_get(a->argv[3]);
2444 if (!topic) {
2445 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2446 return CLI_FAILURE;
2447 }
2448
2449 ast_cli(a->fd, "Name: %s\n", topic->name);
2450 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2451 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2452 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2453 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2454 ast_cli(a->fd, "Duration time: %s\n", print_time);
2455
2456 ao2_lock(topic);
2457 ast_cli(a->fd, "\nSubscribers:\n");
2458 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2459 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2460 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2461 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2462 }
2463
2464 ast_cli(a->fd, "\nForwarded topics:\n");
2465 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2466 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2467 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2468 }
2469 ao2_unlock(topic);
2470
2471 ao2_ref(topic, -1);
2472
2473 return CLI_SUCCESS;
2474}
2475
2476
2477static struct ast_cli_entry cli_stasis[] = {
2478 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2479 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2480};
2481
2482
2483#ifdef AST_DEVMODE
2484
2485AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2486
2487/*!
2488 * \internal
2489 * \brief CLI command implementation for 'stasis statistics show subscriptions'
2490 */
2491static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2492{
2493 struct ao2_container *sorted_subscriptions;
2494 struct ao2_container *subscription_stats;
2495 struct ao2_iterator iter;
2496 struct stasis_subscription_statistics *statistics;
2497 int count = 0;
2498 int dropped = 0;
2499 int passed = 0;
2500#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2501#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2502#define FMT_FIELDS2 "%-64s %10d %10d\n"
2503
2504 switch (cmd) {
2505 case CLI_INIT:
2506 e->command = "stasis statistics show subscriptions";
2507 e->usage =
2508 "Usage: stasis statistics show subscriptions\n"
2509 " Shows a list of subscriptions and their general statistics\n";
2510 return NULL;
2511 case CLI_GENERATE:
2512 return NULL;
2513 }
2514
2515 if (a->argc != e->args) {
2516 return CLI_SHOWUSAGE;
2517 }
2518
2519 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2520 if (!subscription_stats) {
2521 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2522 return CLI_FAILURE;
2523 }
2524
2525 sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2526 stasis_subscription_statistics_sort_fn, NULL);
2527 if (!sorted_subscriptions) {
2528 ao2_ref(subscription_stats, -1);
2529 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2530 return CLI_SUCCESS;
2531 }
2532
2533 if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2534 ao2_ref(sorted_subscriptions, -1);
2535 ao2_ref(subscription_stats, -1);
2536 ast_cli(a->fd, "Could not sort subscription statistics\n");
2537 return CLI_SUCCESS;
2538 }
2539
2540 ao2_ref(subscription_stats, -1);
2541
2542 ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2543
2544 iter = ao2_iterator_init(sorted_subscriptions, 0);
2545 while ((statistics = ao2_iterator_next(&iter))) {
2546 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2547 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2548 dropped += statistics->messages_dropped;
2549 passed += statistics->messages_passed;
2550 ao2_ref(statistics, -1);
2551 ++count;
2552 }
2553 ao2_iterator_destroy(&iter);
2554
2555 ao2_ref(sorted_subscriptions, -1);
2556
2557 ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2558 ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2559
2560#undef FMT_HEADERS
2561#undef FMT_FIELDS
2562#undef FMT_FIELDS2
2563
2564 return CLI_SUCCESS;
2565}
2566
2567/*!
2568 * \internal
2569 * \brief CLI tab completion for subscription statistics names
2570 */
2571static char *subscription_statistics_complete_name(const char *word, int state)
2572{
2573 struct stasis_subscription_statistics *statistics;
2574 struct ao2_container *subscription_stats;
2575 struct ao2_iterator it_statistics;
2576 int wordlen = strlen(word);
2577 int which = 0;
2578 char *result = NULL;
2579
2580 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2581 if (!subscription_stats) {
2582 return result;
2583 }
2584
2585 it_statistics = ao2_iterator_init(subscription_stats, 0);
2586 while ((statistics = ao2_iterator_next(&it_statistics))) {
2587 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2588 && ++which > state) {
2589 result = ast_strdup(statistics->uniqueid);
2590 }
2591 ao2_ref(statistics, -1);
2592 if (result) {
2593 break;
2594 }
2595 }
2596 ao2_iterator_destroy(&it_statistics);
2597 ao2_ref(subscription_stats, -1);
2598 return result;
2599}
2600
2601/*!
2602 * \internal
2603 * \brief CLI command implementation for 'stasis statistics show subscription'
2604 */
2605static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2606{
2607 struct stasis_subscription_statistics *statistics;
2608 struct ao2_container *subscription_stats;
2609 struct ao2_iterator i;
2610 char *name;
2611
2612 switch (cmd) {
2613 case CLI_INIT:
2614 e->command = "stasis statistics show subscription";
2615 e->usage =
2616 "Usage: stasis statistics show subscription <uniqueid>\n"
2617 " Show stasis subscription statistics.\n";
2618 return NULL;
2619 case CLI_GENERATE:
2620 if (a->pos == 4) {
2621 return subscription_statistics_complete_name(a->word, a->n);
2622 } else {
2623 return NULL;
2624 }
2625 }
2626
2627 if (a->argc != 5) {
2628 return CLI_SHOWUSAGE;
2629 }
2630
2631 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2632 if (!subscription_stats) {
2633 ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2634 return CLI_FAILURE;
2635 }
2636
2637 statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2638 if (!statistics) {
2639 ao2_ref(subscription_stats, -1);
2640 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2641 return CLI_FAILURE;
2642 }
2643
2644 ao2_ref(subscription_stats, -1);
2645
2646 ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2647 ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2648 ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2649 ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2650 ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2651 ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2652 ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2653 ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2654 ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2655 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2656 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2657
2659 if (statistics->highest_time_message_type) {
2660 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2661 }
2663
2664 ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2665
2666 ast_cli(a->fd, "Subscribed topics:\n");
2667 i = ao2_iterator_init(statistics->topics, 0);
2668 while ((name = ao2_iterator_next(&i))) {
2669 ast_cli(a->fd, "\t%s\n", name);
2670 ao2_ref(name, -1);
2671 }
2673
2674 ao2_ref(statistics, -1);
2675
2676 return CLI_SUCCESS;
2677}
2678
2679AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2680
2681/*!
2682 * \internal
2683 * \brief CLI command implementation for 'stasis statistics show topics'
2684 */
2685static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2686{
2687 struct ao2_container *sorted_topics;
2688 struct ao2_container *topic_stats;
2689 struct ao2_iterator iter;
2690 struct stasis_topic_statistics *statistics;
2691 int count = 0;
2692 int not_dispatched = 0;
2693 int dispatched = 0;
2694#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2695#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2696#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2697
2698 switch (cmd) {
2699 case CLI_INIT:
2700 e->command = "stasis statistics show topics";
2701 e->usage =
2702 "Usage: stasis statistics show topics\n"
2703 " Shows a list of topics and their general statistics\n";
2704 return NULL;
2705 case CLI_GENERATE:
2706 return NULL;
2707 }
2708
2709 if (a->argc != e->args) {
2710 return CLI_SHOWUSAGE;
2711 }
2712
2713 topic_stats = ao2_global_obj_ref(topic_statistics);
2714 if (!topic_stats) {
2715 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2716 return CLI_FAILURE;
2717 }
2718
2720 stasis_topic_statistics_sort_fn, NULL);
2721 if (!sorted_topics) {
2722 ao2_ref(topic_stats, -1);
2723 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2724 return CLI_SUCCESS;
2725 }
2726
2727 if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2728 ao2_ref(sorted_topics, -1);
2729 ao2_ref(topic_stats, -1);
2730 ast_cli(a->fd, "Could not sort topic statistics\n");
2731 return CLI_SUCCESS;
2732 }
2733
2734 ao2_ref(topic_stats, -1);
2735
2736 ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2737
2738 iter = ao2_iterator_init(sorted_topics, 0);
2739 while ((statistics = ao2_iterator_next(&iter))) {
2740 ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2741 statistics->messages_not_dispatched, statistics->messages_dispatched,
2742 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2743 not_dispatched += statistics->messages_not_dispatched;
2744 dispatched += statistics->messages_dispatched;
2745 ao2_ref(statistics, -1);
2746 ++count;
2747 }
2748 ao2_iterator_destroy(&iter);
2749
2750 ao2_ref(sorted_topics, -1);
2751
2752 ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2753 ast_cli(a->fd, "\n%d topics\n\n", count);
2754
2755#undef FMT_HEADERS
2756#undef FMT_FIELDS
2757#undef FMT_FIELDS2
2758
2759 return CLI_SUCCESS;
2760}
2761
2762/*!
2763 * \internal
2764 * \brief CLI tab completion for topic statistics names
2765 */
2766static char *topic_statistics_complete_name(const char *word, int state)
2767{
2768 struct stasis_topic_statistics *statistics;
2769 struct ao2_container *topic_stats;
2770 struct ao2_iterator it_statistics;
2771 int wordlen = strlen(word);
2772 int which = 0;
2773 char *result = NULL;
2774
2775 topic_stats = ao2_global_obj_ref(topic_statistics);
2776 if (!topic_stats) {
2777 return result;
2778 }
2779
2780 it_statistics = ao2_iterator_init(topic_stats, 0);
2781 while ((statistics = ao2_iterator_next(&it_statistics))) {
2782 if (!strncasecmp(word, statistics->name, wordlen)
2783 && ++which > state) {
2784 result = ast_strdup(statistics->name);
2785 }
2786 ao2_ref(statistics, -1);
2787 if (result) {
2788 break;
2789 }
2790 }
2791 ao2_iterator_destroy(&it_statistics);
2792 ao2_ref(topic_stats, -1);
2793 return result;
2794}
2795
2796/*!
2797 * \internal
2798 * \brief CLI command implementation for 'stasis statistics show topic'
2799 */
2800static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2801{
2802 struct stasis_topic_statistics *statistics;
2803 struct ao2_container *topic_stats;
2804 struct ao2_iterator i;
2805 char *uniqueid;
2806
2807 switch (cmd) {
2808 case CLI_INIT:
2809 e->command = "stasis statistics show topic";
2810 e->usage =
2811 "Usage: stasis statistics show topic <name>\n"
2812 " Show stasis topic statistics.\n";
2813 return NULL;
2814 case CLI_GENERATE:
2815 if (a->pos == 4) {
2816 return topic_statistics_complete_name(a->word, a->n);
2817 } else {
2818 return NULL;
2819 }
2820 }
2821
2822 if (a->argc != 5) {
2823 return CLI_SHOWUSAGE;
2824 }
2825
2826 topic_stats = ao2_global_obj_ref(topic_statistics);
2827 if (!topic_stats) {
2828 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2829 return CLI_FAILURE;
2830 }
2831
2832 statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2833 if (!statistics) {
2834 ao2_ref(topic_stats, -1);
2835 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2836 return CLI_FAILURE;
2837 }
2838
2839 ao2_ref(topic_stats, -1);
2840
2841 ast_cli(a->fd, "Topic: %s\n", statistics->name);
2842 ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2843 ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2844 ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2845 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2846 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2847 ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2848
2849 ast_cli(a->fd, "Subscribers:\n");
2850 i = ao2_iterator_init(statistics->subscribers, 0);
2851 while ((uniqueid = ao2_iterator_next(&i))) {
2852 ast_cli(a->fd, "\t%s\n", uniqueid);
2853 ao2_ref(uniqueid, -1);
2854 }
2856
2857 ao2_ref(statistics, -1);
2858
2859 return CLI_SUCCESS;
2860}
2861
2862/*!
2863 * \internal
2864 * \brief CLI command implementation for 'stasis statistics show messages'
2865 */
2866static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2867{
2868 int i;
2869 int count = 0;
2870 int published = 0;
2871 int unused = 0;
2872#define FMT_HEADERS "%-64s %10s %10s\n"
2873#define FMT_FIELDS "%-64s %10d %10d\n"
2874
2875 switch (cmd) {
2876 case CLI_INIT:
2877 e->command = "stasis statistics show messages";
2878 e->usage =
2879 "Usage: stasis statistics show messages\n"
2880 " Shows a list of message types and their general statistics\n";
2881 return NULL;
2882 case CLI_GENERATE:
2883 return NULL;
2884 }
2885
2886 if (a->argc != e->args) {
2887 return CLI_SHOWUSAGE;
2888 }
2889
2890 ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2891
2892 ast_mutex_lock(&message_type_statistics_lock);
2893 for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2894 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2895
2896 if (!statistics->message_type) {
2897 continue;
2898 }
2899
2900 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2901 statistics->unused);
2902 published += statistics->published;
2903 unused += statistics->unused;
2904 ++count;
2905 }
2906 ast_mutex_unlock(&message_type_statistics_lock);
2907
2908 ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2909 ast_cli(a->fd, "\n%d seen message types\n\n", count);
2910
2911#undef FMT_HEADERS
2912#undef FMT_FIELDS
2913
2914 return CLI_SUCCESS;
2915}
2916
2917static struct ast_cli_entry cli_stasis_statistics[] = {
2918 AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2919 AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2920 AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2921 AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2922 AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2923};
2924
2925static int subscription_statistics_hash(const void *obj, const int flags)
2926{
2927 const struct stasis_subscription_statistics *object;
2928 const char *key;
2929
2930 switch (flags & OBJ_SEARCH_MASK) {
2931 case OBJ_SEARCH_KEY:
2932 key = obj;
2933 break;
2934 case OBJ_SEARCH_OBJECT:
2935 object = obj;
2936 key = object->uniqueid;
2937 break;
2938 default:
2939 /* Hash can only work on something with a full key. */
2940 ast_assert(0);
2941 return 0;
2942 }
2943 return ast_str_case_hash(key);
2944}
2945
2946static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2947{
2948 const struct stasis_subscription_statistics *object_left = obj;
2949 const struct stasis_subscription_statistics *object_right = arg;
2950 const char *right_key = arg;
2951 int cmp;
2952
2953 switch (flags & OBJ_SEARCH_MASK) {
2954 case OBJ_SEARCH_OBJECT:
2955 right_key = object_right->uniqueid;
2956 /* Fall through */
2957 case OBJ_SEARCH_KEY:
2958 cmp = strcasecmp(object_left->uniqueid, right_key);
2959 break;
2961 /* Not supported by container */
2962 ast_assert(0);
2963 cmp = -1;
2964 break;
2965 default:
2966 /*
2967 * What arg points to is specific to this traversal callback
2968 * and has no special meaning to astobj2.
2969 */
2970 cmp = 0;
2971 break;
2972 }
2973 if (cmp) {
2974 return 0;
2975 }
2976 /*
2977 * At this point the traversal callback is identical to a sorted
2978 * container.
2979 */
2980 return CMP_MATCH;
2981}
2982
2983static int topic_statistics_hash(const void *obj, const int flags)
2984{
2985 const struct stasis_topic_statistics *object;
2986 const char *key;
2987
2988 switch (flags & OBJ_SEARCH_MASK) {
2989 case OBJ_SEARCH_KEY:
2990 key = obj;
2991 break;
2992 case OBJ_SEARCH_OBJECT:
2993 object = obj;
2994 key = object->name;
2995 break;
2996 default:
2997 /* Hash can only work on something with a full key. */
2998 ast_assert(0);
2999 return 0;
3000 }
3001 return ast_str_case_hash(key);
3002}
3003
3004static int topic_statistics_cmp(void *obj, void *arg, int flags)
3005{
3006 const struct stasis_topic_statistics *object_left = obj;
3007 const struct stasis_topic_statistics *object_right = arg;
3008 const char *right_key = arg;
3009 int cmp;
3010
3011 switch (flags & OBJ_SEARCH_MASK) {
3012 case OBJ_SEARCH_OBJECT:
3013 right_key = object_right->name;
3014 /* Fall through */
3015 case OBJ_SEARCH_KEY:
3016 cmp = strcasecmp(object_left->name, right_key);
3017 break;
3019 /* Not supported by container */
3020 ast_assert(0);
3021 cmp = -1;
3022 break;
3023 default:
3024 /*
3025 * What arg points to is specific to this traversal callback
3026 * and has no special meaning to astobj2.
3027 */
3028 cmp = 0;
3029 break;
3030 }
3031 if (cmp) {
3032 return 0;
3033 }
3034 /*
3035 * At this point the traversal callback is identical to a sorted
3036 * container.
3037 */
3038 return CMP_MATCH;
3039}
3040#endif
3041
3042/*! \brief Cleanup function for graceful shutdowns */
3043static void stasis_cleanup(void)
3044{
3045#ifdef AST_DEVMODE
3046 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3047 AST_VECTOR_FREE(&message_type_statistics);
3048 ao2_global_obj_release(subscription_statistics);
3049 ao2_global_obj_release(topic_statistics);
3050#endif
3053 topic_all = NULL;
3055 threadpool = NULL;
3058 aco_info_destroy(&cfg_info);
3060}
3061
3063{
3064 struct stasis_config *cfg;
3065 int cache_init;
3066 struct ast_threadpool_options threadpool_opts = { 0, };
3067#ifdef AST_DEVMODE
3068 struct ao2_container *subscription_stats;
3069 struct ao2_container *topic_stats;
3070#endif
3071
3072 /* Be sure the types are cleaned up after the message bus */
3074
3075 if (aco_info_init(&cfg_info)) {
3076 return -1;
3077 }
3078
3079 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3081 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3083 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3084 INT_MAX);
3085 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3087 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3088 INT_MAX);
3089 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3091 FLDSET(struct stasis_threadpool_conf, max_size), 0,
3092 INT_MAX);
3093
3094 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3095 struct stasis_config *default_cfg = stasis_config_alloc();
3096
3097 if (!default_cfg) {
3098 return -1;
3099 }
3100
3101 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3102 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3103 ao2_ref(default_cfg, -1);
3104
3105 return -1;
3106 }
3107
3108 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3109 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3110 ao2_ref(default_cfg, -1);
3111
3112 return -1;
3113 }
3114
3115 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3117 cfg = default_cfg;
3118 } else {
3120 if (!cfg) {
3121 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3122
3123 return -1;
3124 }
3125 }
3126
3127 threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3128 threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3129 threadpool_opts.auto_increment = 1;
3130 threadpool_opts.max_size = cfg->threadpool_options->max_size;
3131 threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3132 threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3133 ao2_ref(cfg, -1);
3134 if (!threadpool) {
3135 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3136
3137 return -1;
3138 }
3139
3140 cache_init = stasis_cache_init();
3141 if (cache_init != 0) {
3142 return -1;
3143 }
3144
3146 return -1;
3147 }
3149 return -1;
3150 }
3151
3153 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3154 if (!topic_all) {
3155 return -1;
3156 }
3157
3159 return -1;
3160 }
3161
3162#ifdef AST_DEVMODE
3163 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3164 * topic or subscripton.
3165 */
3166 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3167 subscription_statistics_hash, 0, subscription_statistics_cmp);
3168 if (!subscription_stats) {
3169 return -1;
3170 }
3171 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3172 ao2_cleanup(subscription_stats);
3173
3174 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3175 topic_statistics_hash, 0, topic_statistics_cmp);
3176 if (!topic_stats) {
3177 return -1;
3178 }
3179 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3180 ao2_cleanup(topic_stats);
3181 if (!topic_stats) {
3182 return -1;
3183 }
3184
3185 AST_VECTOR_INIT(&message_type_statistics, 0);
3186
3187 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3188 return -1;
3189 }
3190#endif
3191
3192 return 0;
3193}
#define var
Definition: ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static PGresult * result
Definition: cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2768
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:554
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2002
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2011
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1976
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:520
#define ast_cond_signal(cond)
Definition: lock.h:203
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10126
#define EVENT_FLAG_USER
Definition: manager.h:81
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ao2_container * container
Definition: res_fax.c:501
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2097
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1119
static char * topic_complete_name(const char *word)
Definition: stasis.c:2392
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:972
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2477
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:302
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2046
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1621
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1865
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2156
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1054
#define FMT_HEADERS
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1287
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1012
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1885
static int sub_cleanup(void *data)
Definition: stasis.c:965
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1549
static void forward_dtor(void *obj)
Definition: stasis.c:1539
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1745
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1517
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
static void * stasis_config_alloc(void)
Definition: stasis.c:2254
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1725
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078
static void topic_dtor(void *obj)
Definition: stasis.c:434
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:305
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1628
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3043
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3062
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:413
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:810
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:954
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1785
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2246
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2284
static struct ast_threadpool * threadpool
Definition: stasis.c:308
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1764
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1941
static struct aco_type threadpool_option
Definition: stasis.c:2203
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1960
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:501
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1202
static void subscription_dtor(void *obj)
Definition: stasis.c:715
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:569
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1175
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:636
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1135
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1677
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1928
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1431
struct aco_file stasis_conf
Definition: stasis.c:2224
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1649
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2239
static struct aco_type * threadpool_options[]
Definition: stasis.c:2211
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1313
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1094
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1170
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1834
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:755
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1579
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:943
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1716
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1512
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1260
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1151
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2305
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2148
struct aco_type * declined_options[]
Definition: stasis.c:2222
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:426
struct ao2_container * topic_all
Definition: stasis.c:396
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1230
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2335
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:318
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2214
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2418
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition: cli.h:171
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:502
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1951
struct ast_multi_object_blob::@395 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:1952
Structure for mutex and tracking information.
Definition: lock.h:135
Support for dynamic strings.
Definition: strings.h:623
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
An opaque threadpool structure.
Definition: threadpool.c:36
Structure for variables, used for configurations and for channel variables.
Definition: search.h:40
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2200
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2198
A structure to hold global configuration-related options.
Definition: stasis.c:2181
struct ao2_container * declined
Definition: stasis.c:2183
Forwarding information.
Definition: stasis.c:1532
struct stasis_topic * from_topic
Definition: stasis.c:1534
struct stasis_topic * to_topic
Definition: stasis.c:1536
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
int final_message_rxed
Definition: stasis.c:697
struct stasis_topic * topic
Definition: stasis.c:685
ast_cond_t join_cond
Definition: stasis.c:694
stasis_subscription_cb callback
Definition: stasis.c:689
struct stasis_subscription::@394 accepted_message_types
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
enum stasis_subscription_message_filter filter
Definition: stasis.c:707
int final_message_processed
Definition: stasis.c:700
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:705
Threadpool configuration options.
Definition: stasis.c:2187
struct ao2_container * pool_container
Definition: stasis.c:1741
struct stasis_topic * pool_topic
Definition: stasis.c:1742
struct stasis_topic::@393 upstream_topics
char * name
Definition: stasis.c:387
int subscriber_id
Definition: stasis.c:384
char * detail
Definition: stasis.c:390
struct timeval * creationtime
Definition: stasis.c:393
struct stasis_topic::@392 subscribers
ast_cond_t cond
Definition: stasis.c:1277
void * task_data
Definition: stasis.c:1279
ast_mutex_t lock
Definition: stasis.c:1276
Definition: stasis.c:1710
struct stasis_topic * topic
Definition: stasis.c:1712
struct stasis_forward * forward
Definition: stasis.c:1711
char name[0]
Definition: stasis.c:1713
char buf[0]
Definition: stasis.c:406
struct timeval creationtime
Definition: stasis.c:404
char * name
Definition: stasis.c:401
char * detail
Definition: stasis.c:402
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_val a
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2297
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
FILE * out
Definition: utils/frame.c:33
static void statistics(void)
Definition: utils/frame.c:287
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
#define ARRAY_LEN(a)
Definition: utils.h:666
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668