Asterisk - The Open Source Telephony Project GIT-master-ff80666
stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message Bus API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/stasis.h"
36#include "asterisk/taskpool.h"
37#include "asterisk/utils.h"
38#include "asterisk/uuid.h"
39#include "asterisk/vector.h"
44#include "asterisk/cli.h"
45
46/*** DOCUMENTATION
47 <managerEvent language="en_US" name="UserEvent">
48 <managerEventInstance class="EVENT_FLAG_USER">
49 <since>
50 <version>12.3.0</version>
51 </since>
52 <synopsis>A user defined event raised from the dialplan.</synopsis>
53 <syntax>
54 <channel_snapshot/>
55 <parameter name="UserEvent">
56 <para>The event name, as specified in the dialplan.</para>
57 </parameter>
58 </syntax>
59 <description>
60 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
61 </description>
62 <see-also>
63 <ref type="application">UserEvent</ref>
64 <ref type="managerEvent">UserEvent</ref>
65 </see-also>
66 </managerEventInstance>
67 </managerEvent>
68 <configInfo name="stasis" language="en_US">
69 <configFile name="stasis.conf">
70 <configObject name="threadpool">
71 <since>
72 <version>12.8.0</version>
73 <version>13.1.0</version>
74 </since>
75 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
76 <configOption name="initial_size" default="5">
77 <since>
78 <version>12.8.0</version>
79 <version>13.1.0</version>
80 </since>
81 <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
82 </configOption>
83 <configOption name="idle_timeout_sec" default="20">
84 <since>
85 <version>12.8.0</version>
86 <version>13.1.0</version>
87 </since>
88 <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
89 </configOption>
90 <configOption name="max_size" default="50">
91 <since>
92 <version>12.8.0</version>
93 <version>13.1.0</version>
94 </since>
95 <synopsis>Maximum number of threads in the threadpool.</synopsis>
96 </configOption>
97 </configObject>
98 <configObject name="taskpool">
99 <since>
100 <version>23.1.0</version>
101 <version>22.7.0</version>
102 <version>20.17.0</version>
103 </since>
104 <synopsis>Settings that configure the taskpool Stasis uses to deliver some messages.</synopsis>
105 <configOption name="minimum_size" default="5">
106 <since>
107 <version>23.1.0</version>
108 <version>22.7.0</version>
109 <version>20.17.0</version>
110 </since>
111 <synopsis>Minimum number of taskprocessors in the message bus taskpool.</synopsis>
112 </configOption>
113 <configOption name="initial_size" default="5">
114 <since>
115 <version>23.1.0</version>
116 <version>22.7.0</version>
117 <version>20.17.0</version>
118 </since>
119 <synopsis>Initial number of taskprocessors in the message bus taskpool.</synopsis>
120 </configOption>
121 <configOption name="idle_timeout_sec" default="20">
122 <since>
123 <version>23.1.0</version>
124 <version>22.7.0</version>
125 <version>20.17.0</version>
126 </since>
127 <synopsis>Number of seconds before an idle taskprocessor is disposed of.</synopsis>
128 </configOption>
129 <configOption name="max_size" default="50">
130 <since>
131 <version>23.1.0</version>
132 <version>22.7.0</version>
133 <version>20.17.0</version>
134 </since>
135 <synopsis>Maximum number of taskprocessors in the taskpool.</synopsis>
136 </configOption>
137 </configObject>
138 <configObject name="declined_message_types">
139 <since>
140 <version>13.0.0</version>
141 </since>
142 <synopsis>Stasis message types for which to decline creation.</synopsis>
143 <configOption name="decline">
144 <since>
145 <version>12.8.0</version>
146 <version>13.1.0</version>
147 </since>
148 <synopsis>The message type to decline.</synopsis>
149 <description>
150 <para>This configuration option defines the name of the Stasis
151 message type that Asterisk is forbidden from creating and can be
152 specified as many times as necessary to achieve the desired result.</para>
153 <enumlist>
154 <enum name="stasis_app_recording_snapshot_type" />
155 <enum name="stasis_app_playback_snapshot_type" />
156 <enum name="stasis_test_message_type" />
157 <enum name="confbridge_start_type" />
158 <enum name="confbridge_end_type" />
159 <enum name="confbridge_join_type" />
160 <enum name="confbridge_leave_type" />
161 <enum name="confbridge_start_record_type" />
162 <enum name="confbridge_stop_record_type" />
163 <enum name="confbridge_mute_type" />
164 <enum name="confbridge_unmute_type" />
165 <enum name="confbridge_talking_type" />
166 <enum name="cel_generic_type" />
167 <enum name="ast_bridge_snapshot_type" />
168 <enum name="ast_bridge_merge_message_type" />
169 <enum name="ast_channel_entered_bridge_type" />
170 <enum name="ast_channel_left_bridge_type" />
171 <enum name="ast_blind_transfer_type" />
172 <enum name="ast_attended_transfer_type" />
173 <enum name="ast_endpoint_snapshot_type" />
174 <enum name="ast_endpoint_state_type" />
175 <enum name="ast_device_state_message_type" />
176 <enum name="ast_test_suite_message_type" />
177 <enum name="ast_mwi_state_type" />
178 <enum name="ast_mwi_vm_app_type" />
179 <enum name="ast_format_register_type" />
180 <enum name="ast_format_unregister_type" />
181 <enum name="ast_manager_get_generic_type" />
182 <enum name="ast_parked_call_type" />
183 <enum name="ast_channel_snapshot_type" />
184 <enum name="ast_channel_dial_type" />
185 <enum name="ast_channel_varset_type" />
186 <enum name="ast_channel_hangup_request_type" />
187 <enum name="ast_channel_dtmf_begin_type" />
188 <enum name="ast_channel_dtmf_end_type" />
189 <enum name="ast_channel_flash_type" />
190 <enum name="ast_channel_wink_type" />
191 <enum name="ast_channel_hold_type" />
192 <enum name="ast_channel_unhold_type" />
193 <enum name="ast_channel_chanspy_start_type" />
194 <enum name="ast_channel_chanspy_stop_type" />
195 <enum name="ast_channel_fax_type" />
196 <enum name="ast_channel_hangup_handler_type" />
197 <enum name="ast_channel_moh_start_type" />
198 <enum name="ast_channel_moh_stop_type" />
199 <enum name="ast_channel_mixmonitor_start_type" />
200 <enum name="ast_channel_mixmonitor_stop_type" />
201 <enum name="ast_channel_mixmonitor_mute_type" />
202 <enum name="ast_channel_agent_login_type" />
203 <enum name="ast_channel_agent_logoff_type" />
204 <enum name="ast_channel_talking_start" />
205 <enum name="ast_channel_talking_stop" />
206 <enum name="ast_channel_tone_detect" />
207 <enum name="ast_security_event_type" />
208 <enum name="ast_named_acl_change_type" />
209 <enum name="ast_local_bridge_type" />
210 <enum name="ast_local_optimization_begin_type" />
211 <enum name="ast_local_optimization_end_type" />
212 <enum name="stasis_subscription_change_type" />
213 <enum name="ast_multi_user_event_type" />
214 <enum name="stasis_cache_clear_type" />
215 <enum name="stasis_cache_update_type" />
216 <enum name="ast_network_change_type" />
217 <enum name="ast_system_registry_type" />
218 <enum name="ast_cc_available_type" />
219 <enum name="ast_cc_offertimerstart_type" />
220 <enum name="ast_cc_requested_type" />
221 <enum name="ast_cc_requestacknowledged_type" />
222 <enum name="ast_cc_callerstopmonitoring_type" />
223 <enum name="ast_cc_callerstartmonitoring_type" />
224 <enum name="ast_cc_callerrecalling_type" />
225 <enum name="ast_cc_recallcomplete_type" />
226 <enum name="ast_cc_failure_type" />
227 <enum name="ast_cc_monitorfailed_type" />
228 <enum name="ast_presence_state_message_type" />
229 <enum name="ast_rtp_rtcp_sent_type" />
230 <enum name="ast_rtp_rtcp_received_type" />
231 <enum name="ast_call_pickup_type" />
232 <enum name="aoc_s_type" />
233 <enum name="aoc_d_type" />
234 <enum name="aoc_e_type" />
235 <enum name="dahdichannel_type" />
236 <enum name="mcid_type" />
237 <enum name="session_timeout_type" />
238 <enum name="cdr_read_message_type" />
239 <enum name="cdr_write_message_type" />
240 <enum name="cdr_prop_write_message_type" />
241 <enum name="corosync_ping_message_type" />
242 <enum name="agi_exec_start_type" />
243 <enum name="agi_exec_end_type" />
244 <enum name="agi_async_start_type" />
245 <enum name="agi_async_exec_type" />
246 <enum name="agi_async_end_type" />
247 <enum name="queue_caller_join_type" />
248 <enum name="queue_caller_leave_type" />
249 <enum name="queue_caller_abandon_type" />
250 <enum name="queue_member_status_type" />
251 <enum name="queue_member_added_type" />
252 <enum name="queue_member_removed_type" />
253 <enum name="queue_member_pause_type" />
254 <enum name="queue_member_penalty_type" />
255 <enum name="queue_member_ringinuse_type" />
256 <enum name="queue_agent_called_type" />
257 <enum name="queue_agent_connect_type" />
258 <enum name="queue_agent_complete_type" />
259 <enum name="queue_agent_dump_type" />
260 <enum name="queue_agent_ringnoanswer_type" />
261 <enum name="meetme_join_type" />
262 <enum name="meetme_leave_type" />
263 <enum name="meetme_end_type" />
264 <enum name="meetme_mute_type" />
265 <enum name="meetme_talking_type" />
266 <enum name="meetme_talk_request_type" />
267 <enum name="appcdr_message_type" />
268 <enum name="forkcdr_message_type" />
269 <enum name="cdr_sync_message_type" />
270 </enumlist>
271 </description>
272 </configOption>
273 </configObject>
274 </configFile>
275 </configInfo>
276***/
277
278/*!
279 * \page stasis-impl Stasis Implementation Notes
280 *
281 * \par Reference counting
282 *
283 * Stasis introduces a number of objects, which are tightly related to one
284 * another. Because we rely on ref-counting for memory management, understanding
285 * these relationships is important to understanding this code.
286 *
287 * \code{.txt}
288 *
289 * stasis_topic <----> stasis_subscription
290 * ^ ^
291 * \ /
292 * \ /
293 * dispatch
294 * |
295 * |
296 * v
297 * stasis_message
298 * |
299 * |
300 * v
301 * stasis_message_type
302 *
303 * \endcode
304 *
305 * The most troubling thing in this chart is the cyclic reference between
306 * stasis_topic and stasis_subscription. This is both unfortunate, and
307 * necessary. Topics need the subscription in order to dispatch messages;
308 * subscriptions need the topics to unsubscribe and check subscription status.
309 *
310 * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
311 * topic's reference to a subscription. When the subscription is destroyed, it
312 * will remove its reference to the topic.
313 *
314 * This means that until a subscription has be explicitly unsubscribed, it will
315 * not be destroyed. Neither will a topic be destroyed while it has subscribers.
316 * The destructors of both have assertions regarding this to catch ref-counting
317 * problems where a subscription or topic has had an extra ao2_cleanup().
318 *
319 * The \ref dispatch_exec_sync object is a transient object, which is posted to
320 * a subscription's taskprocessor to send a message to the subscriber. They have
321 * short life cycles, allocated on one thread, destroyed on another.
322 *
323 * During shutdown, or the deletion of a domain object, there are a flurry of
324 * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
325 * are processed. Any one of these cleanups could be the one to actually destroy
326 * a given object, so care must be taken to ensure that an object isn't
327 * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
328 * that might happen when a RAII_VAR() goes out of scope.
329 *
330 * \par Typical life cycles
331 *
332 * \li stasis_topic - There are several topics which live for the duration of
333 * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
334 * are actually fed by shorter-lived topics whose lifetime is associated
335 * with some domain object (like ast_channel_topic() for a given
336 * ast_channel).
337 *
338 * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
339 * topics, for similar reasons.
340 *
341 * \li dispatch - Very short lived; just long enough to post a message to a
342 * subscriber.
343 *
344 * \li stasis_message - Short to intermediate lifetimes, but that is mostly
345 * irrelevant. Messages are strictly data and have no behavior associated
346 * with them, so it doesn't really matter if/when they are destroyed. By
347 * design, a component could hold a ref to a message forever without any
348 * ill consequences (aside from consuming more memory).
349 *
350 * \li stasis_message_type - Long life cycles, typically only destroyed on
351 * module unloading or _clean_ process exit.
352 *
353 * \par Subscriber shutdown sequencing
354 *
355 * Subscribers are sensitive to shutdown sequencing, specifically in how the
356 * reference message types. This is fully detailed in the documentation at
357 * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
358 *
359 * In short, the lifetime of the \a data (and \a callback, if in a module) must
360 * be held until the stasis_subscription_final_message() has been received.
361 * Depending on the structure of the subscriber code, this can be handled by
362 * using stasis_subscription_final_message() to free resources on the final
363 * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
364 * block until the unsubscribe has completed.
365 */
366
367/*! Initial size of the subscribers list. */
368#define INITIAL_SUBSCRIBERS_MAX 4
369
370/*! The number of buckets to use for topic pools */
371#define TOPIC_POOL_BUCKETS 57
372
373/*! Taskpool for topics that don't want a dedicated taskprocessor */
374static struct ast_taskpool *taskpool;
375
377
378#if defined(LOW_MEMORY)
379
380#define TOPIC_ALL_BUCKETS 257
381
382#else
383
384#define TOPIC_ALL_BUCKETS 997
385
386#endif
387
388#ifdef AST_DEVMODE
389
390/*! The number of buckets to use for topic statistics */
391#define TOPIC_STATISTICS_BUCKETS 57
392
393/*! The number of buckets to use for subscription statistics */
394#define SUBSCRIPTION_STATISTICS_BUCKETS 57
395
396/*! Global container which stores statistics for topics */
397static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
398
399/*! Global container which stores statistics for subscriptions */
400static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
401
402/*! \internal */
403struct stasis_message_type_statistics {
404 /*! \brief The number of messages of this published */
405 int published;
406 /*! \brief The number of messages of this that did not reach a subscriber */
407 int unused;
408 /*! \brief The stasis message type */
409 struct stasis_message_type *message_type;
410};
411
412/*! Lock to protect the message types vector */
413AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
414
415/*! Vector containing message type information */
416static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
417
418/*! \internal */
419struct stasis_topic_statistics {
420 /*! \brief Highest time spent dispatching messages to subscribers */
421 long highest_time_dispatched;
422 /*! \brief Lowest time spent dispatching messages to subscribers */
423 long lowest_time_dispatched;
424 /*! \brief The number of messages that were not dispatched to any subscriber */
425 int messages_not_dispatched;
426 /*! \brief The number of messages that were dispatched to at least 1 subscriber */
427 int messages_dispatched;
428 /*! \brief The ids of the subscribers to this topic */
429 struct ao2_container *subscribers;
430 /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
431 struct stasis_topic *topic;
432 /*! \brief Name of the topic */
433 char name[0];
434};
435#endif
436
437/*! \internal */
439 /*! Variable length array of the subscribers */
441
442 /*! Topics forwarding into this topic */
444
445#ifdef AST_DEVMODE
446 struct stasis_topic_statistics *statistics;
447#endif
448
449 /*! Unique incrementing integer for subscriber ids */
451
452 /*! Name of the topic */
453 char *name;
454
455 /*! Detail of the topic */
456 char *detail;
457
458 /*! Creation time */
459 struct timeval *creationtime;
460};
461
463
466
467 char *name;
468 char *detail;
469
470 struct timeval creationtime;
471
472 char buf[0];
473};
474
478
479static void proxy_dtor(void *weakproxy, void *container)
480{
481 ao2_unlink(container, weakproxy);
483}
484
485/* Forward declarations for the tightly-coupled subscription object */
486static int topic_add_subscription(struct stasis_topic *topic,
487 struct stasis_subscription *sub);
488
489static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
490
491/*! \brief Lock two topics. */
492#define topic_lock_both(topic1, topic2) \
493 do { \
494 ao2_lock(topic1); \
495 while (ao2_trylock(topic2)) { \
496 AO2_DEADLOCK_AVOIDANCE(topic1); \
497 } \
498 } while (0)
499
500static void topic_dtor(void *obj)
501{
502 struct stasis_topic *topic = obj;
503#ifdef AST_DEVMODE
504 struct ao2_container *topic_stats;
505#endif
506
507 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
508 topic->name, topic->detail);
509
510 /* Subscribers hold a reference to topics, so they should all be
511 * unsubscribed before we get here. */
513
516 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
517
518#ifdef AST_DEVMODE
519 if (topic->statistics) {
520 topic_stats = ao2_global_obj_ref(topic_statistics);
521 if (topic_stats) {
522 ao2_unlink(topic_stats, topic->statistics);
523 ao2_ref(topic_stats, -1);
524 }
525 ao2_ref(topic->statistics, -1);
526 }
527#endif
528}
529
530#ifdef AST_DEVMODE
531static void topic_statistics_destroy(void *obj)
532{
533 struct stasis_topic_statistics *statistics = obj;
534
535 ao2_cleanup(statistics->subscribers);
536}
537
538static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
539{
540 struct stasis_topic_statistics *statistics;
541 RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
542
543 if (!topic_stats) {
544 return NULL;
545 }
546
547 statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
548 if (!statistics) {
549 return NULL;
550 }
551
552 statistics->subscribers = ast_str_container_alloc(1);
553 if (!statistics->subscribers) {
554 ao2_ref(statistics, -1);
555 return NULL;
556 }
557
558 /* This is strictly used for the pointer address when showing the topic */
559 statistics->topic = topic;
560 strcpy(statistics->name, topic->name); /* SAFE */
561 ao2_link(topic_stats, statistics);
562
563 return statistics;
564}
565#endif
566
567static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
568{
569 struct topic_proxy *proxy;
570 struct stasis_topic* topic_tmp;
571 size_t detail_len;
572
573 if (!topic || !name || !strlen(name) || !detail) {
574 return -1;
575 }
576
578
579 topic_tmp = stasis_topic_get(name);
580 if (topic_tmp) {
581 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
582 ao2_ref(topic_tmp, -1);
584
585 return -1;
586 }
587
588 detail_len = strlen(detail) + 1;
589
590 proxy = ao2_t_weakproxy_alloc(
591 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
592 if (!proxy) {
594
595 return -1;
596 }
597
598 /* set the proxy info */
599 proxy->name = proxy->buf;
600 proxy->detail = proxy->name + strlen(name) + 1;
601
602 strcpy(proxy->name, name); /* SAFE */
603 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
604 proxy->creationtime = ast_tvnow();
605
606 /* We have exclusive access to proxy, no need for locking here. */
607 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
608 ao2_cleanup(proxy);
610
611 return -1;
612 }
613
615 ao2_cleanup(proxy);
618
619 return -1;
620 }
621
622 /* setting the topic point to the proxy */
623 topic->name = proxy->name;
624 topic->detail = proxy->detail;
625 topic->creationtime = &(proxy->creationtime);
626
628 ao2_ref(proxy, -1);
629
631
632 return 0;
633}
634
636 const char *name, const char* detail
637 )
638{
639 struct stasis_topic *topic;
640 int res = 0;
641
642 if (!name|| !strlen(name) || !detail) {
643 return NULL;
644 }
645 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
646
647 topic = stasis_topic_get(name);
648 if (topic) {
649 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
650 name, detail);
651 return topic;
652 }
653
654 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
655 if (!topic) {
656 return NULL;
657 }
658
660 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
661 if (res) {
662 ao2_ref(topic, -1);
663 return NULL;
664 }
665
666 /* link to the proxy */
667 if (link_topic_proxy(topic, name, detail)) {
668 ao2_ref(topic, -1);
669 return NULL;
670 }
671
672#ifdef AST_DEVMODE
673 topic->statistics = stasis_topic_statistics_create(topic);
674 if (!topic->statistics) {
675 ao2_ref(topic, -1);
676 return NULL;
677 }
678#endif
679 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
680
681 return topic;
682}
683
685{
687}
688
690{
692}
693
694const char *stasis_topic_name(const struct stasis_topic *topic)
695{
696 if (!topic) {
697 return NULL;
698 }
699 return topic->name;
700}
701
702const char *stasis_topic_detail(const struct stasis_topic *topic)
703{
704 if (!topic) {
705 return NULL;
706 }
707 return topic->detail;
708}
709
710size_t stasis_topic_subscribers(const struct stasis_topic *topic)
711{
712 return AST_VECTOR_SIZE(&topic->subscribers);
713}
714
715#ifdef AST_DEVMODE
716struct stasis_subscription_statistics {
717 /*! \brief The filename where the subscription originates */
718 const char *file;
719 /*! \brief The function where the subscription originates */
720 const char *func;
721 /*! \brief Names of the topics we are subscribed to */
722 struct ao2_container *topics;
723 /*! \brief The message type that currently took the longest to process */
724 struct stasis_message_type *highest_time_message_type;
725 /*! \brief Highest time spent invoking a message */
726 long highest_time_invoked;
727 /*! \brief Lowest time spent invoking a message */
728 long lowest_time_invoked;
729 /*! \brief The number of messages that were filtered out */
730 int messages_dropped;
731 /*! \brief The number of messages that passed filtering */
732 int messages_passed;
733 /*! \brief Using a mailbox to queue messages */
734 int uses_mailbox;
735 /*! \brief Using stasis taskpool for handling messages */
736 int uses_taskpool;
737 /*! \brief The line number where the subscription originates */
738 int lineno;
739 /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
740 struct stasis_subscription *sub;
741 /*! \brief Unique ID of the subscription */
742 char uniqueid[0];
743};
744#endif
745
746/*! \internal */
748 /*! Unique ID for this subscription */
749 char *uniqueid;
750 /*! Topic subscribed to. */
752 /*! Mailbox for processing incoming messages. */
754 /*! Callback function for incoming message processing. */
756 /*! Data pointer to be handed to the callback. */
757 void *data;
758
759 /*! Condition for joining with subscription. */
761 /*! Flag set when final message for sub has been received.
762 * Be sure join_lock is held before reading/setting. */
764 /*! Flag set when final message for sub has been processed.
765 * Be sure join_lock is held before reading/setting. */
767
768 /*! The message types this subscription is accepting */
770 /*! The message formatters this subscription is accepting */
772 /*! The message filter currently in use */
774
775#ifdef AST_DEVMODE
776 /*! Statistics information */
777 struct stasis_subscription_statistics *statistics;
778#endif
779};
780
781static void subscription_dtor(void *obj)
782{
783 struct stasis_subscription *sub = obj;
784#ifdef AST_DEVMODE
785 struct ao2_container *subscription_stats;
786#endif
787
788 /* Subscriptions need to be manually unsubscribed before destruction
789 * b/c there's a cyclic reference between topics and subscriptions */
791 /* If there are any messages in flight to this subscription; that would
792 * be bad. */
794
795 ast_free(sub->uniqueid);
796 ao2_cleanup(sub->topic);
797 sub->topic = NULL;
799 sub->mailbox = NULL;
800 ast_cond_destroy(&sub->join_cond);
801
802 AST_VECTOR_FREE(&sub->accepted_message_types);
803
804#ifdef AST_DEVMODE
805 if (sub->statistics) {
806 subscription_stats = ao2_global_obj_ref(subscription_statistics);
807 if (subscription_stats) {
808 ao2_unlink(subscription_stats, sub->statistics);
809 ao2_ref(subscription_stats, -1);
810 }
811 ao2_ref(sub->statistics, -1);
812 }
813#endif
814}
815
816/*!
817 * \brief Invoke the subscription's callback.
818 * \param sub Subscription to invoke.
819 * \param message Message to send.
820 */
822 struct stasis_message *message)
823{
824 unsigned int final = stasis_subscription_final_message(sub, message);
826#ifdef AST_DEVMODE
827 struct timeval start;
828 long elapsed;
829
830 start = ast_tvnow();
831#endif
832
833 /* Notify that the final message has been received */
834 if (final) {
835 ao2_lock(sub);
836 sub->final_message_rxed = 1;
837 ast_cond_signal(&sub->join_cond);
839 }
840
841 /*
842 * If filtering is turned on and this is a 'final' message, we only invoke the callback
843 * if the subscriber accepts subscription_change message types.
844 */
845 if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
846 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
847 /* Since sub is mostly immutable, no need to lock sub */
848 sub->callback(sub->data, sub, message);
849 }
850
851 /* Notify that the final message has been processed */
852 if (final) {
853 ao2_lock(sub);
854 sub->final_message_processed = 1;
855 ast_cond_signal(&sub->join_cond);
857 }
858
859#ifdef AST_DEVMODE
860 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
861 if (elapsed > sub->statistics->highest_time_invoked) {
862 sub->statistics->highest_time_invoked = elapsed;
863 ao2_lock(sub->statistics);
864 sub->statistics->highest_time_message_type = stasis_message_type(message);
865 ao2_unlock(sub->statistics);
866 }
867 if (elapsed < sub->statistics->lowest_time_invoked) {
868 sub->statistics->lowest_time_invoked = elapsed;
869 }
870#endif
871}
872
873static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
874static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
875
877{
878}
879
880#ifdef AST_DEVMODE
881static void subscription_statistics_destroy(void *obj)
882{
883 struct stasis_subscription_statistics *statistics = obj;
884
885 ao2_cleanup(statistics->topics);
886}
887
888static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
889 int needs_mailbox, int use_taskpool, const char *file, int lineno,
890 const char *func)
891{
892 struct stasis_subscription_statistics *statistics;
893 RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
894
895 if (!subscription_stats) {
896 return NULL;
897 }
898
899 statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
900 if (!statistics) {
901 return NULL;
902 }
903
905 if (!statistics->topics) {
906 ao2_ref(statistics, -1);
907 return NULL;
908 }
909
910 statistics->file = file;
911 statistics->lineno = lineno;
912 statistics->func = func;
913 statistics->uses_mailbox = needs_mailbox;
914 statistics->uses_taskpool = use_taskpool;
915 strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
916 statistics->sub = sub;
917 ao2_link(subscription_stats, statistics);
918
919 return statistics;
920}
921#endif
922
924 struct stasis_topic *topic,
926 void *data,
927 int needs_mailbox,
928 int use_taskpool,
929 const char *file,
930 int lineno,
931 const char *func)
932{
933 struct stasis_subscription *sub;
934 int ret;
935
936 if (!topic) {
937 return NULL;
938 }
939
940 /* The ao2 lock is used for join_cond. */
942 if (!sub) {
943 return NULL;
944 }
945
946#ifdef AST_DEVMODE
948 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func);
949 if (ret < 0 || !sub->statistics) {
950 ao2_ref(sub, -1);
951 return NULL;
952 }
953#else
955 if (ret < 0) {
956 ao2_ref(sub, -1);
957 return NULL;
958 }
959#endif
960
961 if (needs_mailbox) {
962 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
963
964 /* Create name with seq number appended. */
965 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
966 use_taskpool ? 'p' : 'm',
968
969 /*
970 * With a small number of subscribers, a thread-per-sub is
971 * acceptable. For a large number of subscribers, a thread
972 * pool should be used.
973 */
974 if (use_taskpool) {
975 sub->mailbox = ast_taskpool_serializer(tps_name, taskpool);
976 } else {
977 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
978 }
979 if (!sub->mailbox) {
980 ao2_ref(sub, -1);
981
982 return NULL;
983 }
985 /* Taskprocessor has a reference */
986 ao2_ref(sub, +1);
987 }
988
989 ao2_ref(topic, +1);
990 sub->topic = topic;
991 sub->callback = callback;
992 sub->data = data;
993 ast_cond_init(&sub->join_cond, NULL);
995 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
996 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
997
998 if (topic_add_subscription(topic, sub) != 0) {
999 ao2_ref(sub, -1);
1000 ao2_ref(topic, -1);
1001
1002 return NULL;
1003 }
1005
1006 return sub;
1007}
1008
1010 struct stasis_topic *topic,
1012 void *data,
1013 const char *file,
1014 int lineno,
1015 const char *func)
1016{
1017 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
1018}
1019
1021 struct stasis_topic *topic,
1023 void *data,
1024 const char *file,
1025 int lineno,
1026 const char *func)
1027{
1028 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
1029}
1030
1031static int sub_cleanup(void *data)
1032{
1033 struct stasis_subscription *sub = data;
1035 return 0;
1036}
1037
1039{
1040 /* The subscription may be the last ref to this topic. Hold
1041 * the topic ref open until after the unlock. */
1042 struct stasis_topic *topic;
1043
1044 if (!sub) {
1045 return NULL;
1046 }
1047
1048 topic = ao2_bump(sub->topic);
1049
1050 /* We have to remove the subscription first, to ensure the unsubscribe
1051 * is the final message */
1052 if (topic_remove_subscription(sub->topic, sub) != 0) {
1054 "Internal error: subscription has invalid topic\n");
1055 ao2_cleanup(topic);
1056
1057 return NULL;
1058 }
1059
1060 /* Now let everyone know about the unsubscribe */
1062
1063 /* When all that's done, remove the ref the mailbox has on the sub */
1064 if (sub->mailbox) {
1065 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1066 /* Nothing we can do here, the conditional is just to keep
1067 * the compiler happy that we're not ignoring the result. */
1068 }
1069 }
1070
1071 /* Unsubscribing unrefs the subscription */
1073 ao2_cleanup(topic);
1074
1075 return NULL;
1076}
1077
1079 long low_water, long high_water)
1080{
1081 int res = -1;
1082
1083 if (subscription) {
1084 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1085 low_water, high_water);
1086 }
1087 return res;
1088}
1089
1091 const struct stasis_message_type *type)
1092{
1093 if (!subscription) {
1094 return -1;
1095 }
1096
1097 ast_assert(type != NULL);
1099
1101 /* Filtering is unreliable as this message type is not yet initialized
1102 * so force all messages through.
1103 */
1105 return 0;
1106 }
1107
1108 ao2_lock(subscription->topic);
1110 /* We do this for the same reason as above. The subscription can still operate, so allow
1111 * it to do so by forcing all messages through.
1112 */
1114 }
1115 ao2_unlock(subscription->topic);
1116
1117 return 0;
1118}
1119
1121 const struct stasis_message_type *type)
1122{
1123 if (!subscription) {
1124 return -1;
1125 }
1126
1127 ast_assert(type != NULL);
1129
1131 return 0;
1132 }
1133
1134 ao2_lock(subscription->topic);
1136 /* The memory is already allocated so this can't fail */
1138 }
1139 ao2_unlock(subscription->topic);
1140
1141 return 0;
1142}
1143
1146{
1147 if (!subscription) {
1148 return -1;
1149 }
1150
1151 ao2_lock(subscription->topic);
1152 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1153 subscription->filter = filter;
1154 }
1155 ao2_unlock(subscription->topic);
1156
1157 return 0;
1158}
1159
1162{
1163 ast_assert(subscription != NULL);
1164
1165 ao2_lock(subscription->topic);
1166 subscription->accepted_formatters = formatters;
1167 ao2_unlock(subscription->topic);
1168
1169 return;
1170}
1171
1173{
1174 if (subscription) {
1175 ao2_lock(subscription);
1176 /* Wait until the processed flag has been set */
1177 while (!subscription->final_message_processed) {
1178 ast_cond_wait(&subscription->join_cond,
1179 ao2_object_get_lockaddr(subscription));
1180 }
1181 ao2_unlock(subscription);
1182 }
1183}
1184
1186{
1187 if (subscription) {
1188 int ret;
1189
1190 ao2_lock(subscription);
1191 ret = subscription->final_message_rxed;
1192 ao2_unlock(subscription);
1193
1194 return ret;
1195 }
1196
1197 /* Null subscription is about as done as you can get */
1198 return 1;
1199}
1200
1202 struct stasis_subscription *subscription)
1203{
1204 if (!subscription) {
1205 return NULL;
1206 }
1207
1208 /* Bump refcount to hold it past the unsubscribe */
1209 ao2_ref(subscription, +1);
1210 stasis_unsubscribe(subscription);
1211 stasis_subscription_join(subscription);
1212 /* Now decrement the refcount back */
1213 ao2_cleanup(subscription);
1214 return NULL;
1215}
1216
1218{
1219 if (sub) {
1220 size_t i;
1221 struct stasis_topic *topic = sub->topic;
1222
1223 ao2_lock(topic);
1224 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1225 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1226 ao2_unlock(topic);
1227 return 1;
1228 }
1229 }
1230 ao2_unlock(topic);
1231 }
1232
1233 return 0;
1234}
1235
1237{
1238 return sub->uniqueid;
1239}
1240
1242{
1243 struct stasis_subscription_change *change;
1244
1246 return 0;
1247 }
1248
1249 change = stasis_message_data(msg);
1250 if (strcmp("Unsubscribe", change->description)) {
1251 return 0;
1252 }
1253
1254 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1255 return 0;
1256 }
1257
1258 return 1;
1259}
1260
1261/*!
1262 * \brief Add a subscriber to a topic.
1263 * \param topic Topic
1264 * \param sub Subscriber
1265 * \return 0 on success
1266 * \return Non-zero on error
1267 */
1269{
1270 size_t idx;
1271
1272 ao2_lock(topic);
1273 /* The reference from the topic to the subscription is shared with
1274 * the owner of the subscription, which will explicitly unsubscribe
1275 * to release it.
1276 *
1277 * If we bumped the refcount here, the owner would have to unsubscribe
1278 * and cleanup, which is a bit awkward. */
1280
1281 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1284 }
1285
1286#ifdef AST_DEVMODE
1288 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1289#endif
1290
1292
1293 return 0;
1294}
1295
1297{
1298 size_t idx;
1299 int res;
1300
1301 ao2_lock(topic);
1302 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1305 }
1308
1309#ifdef AST_DEVMODE
1310 if (!res) {
1312 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1313 }
1314#endif
1315
1317
1318 return res;
1319}
1320
1321/*!
1322 * \internal \brief Dispatch a message to a subscriber asynchronously
1323 * \param local \ref ast_taskprocessor_local object
1324 * \return 0
1325 */
1327{
1328 struct stasis_subscription *sub = local->local_data;
1329 struct stasis_message *message = local->data;
1330
1333
1334 return 0;
1335}
1336
1337/*!
1338 * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1339 * a published message to a subscriber
1340 */
1346};
1347
1348/*!
1349 * \internal \brief Dispatch a message to a subscriber synchronously
1350 * \param local \ref ast_taskprocessor_local object
1351 * \return 0
1352 */
1354{
1355 struct stasis_subscription *sub = local->local_data;
1356 struct sync_task_data *std = local->data;
1357 struct stasis_message *message = std->task_data;
1358
1361
1362 ast_mutex_lock(&std->lock);
1363 std->complete = 1;
1364 ast_cond_signal(&std->cond);
1365 ast_mutex_unlock(&std->lock);
1366
1367 return 0;
1368}
1369
1370/*!
1371 * \internal \brief Dispatch a message to a subscriber
1372 * \param sub The subscriber to dispatch to
1373 * \param message The message to send
1374 * \param synchronous If non-zero, synchronize on the subscriber receiving
1375 * the message
1376 * \retval 0 if message was not dispatched
1377 * \retval 1 if message was dispatched
1378 */
1379static unsigned int dispatch_message(struct stasis_subscription *sub,
1380 struct stasis_message *message,
1381 int synchronous)
1382{
1384
1385 /*
1386 * The 'do while' gives us an easy way to skip remaining logic once
1387 * we determine the message should be accepted.
1388 * The code looks more verbose than it needs to be but it optimizes
1389 * down very nicely. It's just easier to understand and debug this way.
1390 */
1391 do {
1392 struct stasis_message_type *message_type = stasis_message_type(message);
1393 int type_id = stasis_message_type_id(message_type);
1394 int type_filter_specified = 0;
1395 int formatter_filter_specified = 0;
1396 int type_filter_passed = 0;
1397 int formatter_filter_passed = 0;
1398
1399 /* We always accept final messages so only run the filter logic if not final */
1400 if (is_final) {
1401 break;
1402 }
1403
1404 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1405 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1406
1407 /* Accept if no filters of either type were specified */
1408 if (!type_filter_specified && !formatter_filter_specified) {
1409 break;
1410 }
1411
1412 type_filter_passed = type_filter_specified
1413 && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1414 && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1415
1416 /*
1417 * Since the type and formatter filters are OR'd, we can skip
1418 * the formatter check if the type check passes.
1419 */
1420 if (type_filter_passed) {
1421 break;
1422 }
1423
1424 formatter_filter_passed = formatter_filter_specified
1425 && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1426
1427 if (formatter_filter_passed) {
1428 break;
1429 }
1430
1431#ifdef AST_DEVMODE
1432 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1433#endif
1434
1435 return 0;
1436
1437 } while (0);
1438
1439#ifdef AST_DEVMODE
1440 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1441#endif
1442
1443 if (!sub->mailbox) {
1444 /* Dispatch directly */
1446 return 1;
1447 }
1448
1449 /* Bump the message for the taskprocessor push. This will get de-ref'd
1450 * by the task processor callback.
1451 */
1453 if (!synchronous) {
1455 /* Push failed; ugh. */
1456 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1458 return 0;
1459 }
1460 } else {
1461 struct sync_task_data std;
1462
1463 ast_mutex_init(&std.lock);
1464 ast_cond_init(&std.cond, NULL);
1465 std.complete = 0;
1466 std.task_data = message;
1467
1469 /* Push failed; ugh. */
1470 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1472 ast_mutex_destroy(&std.lock);
1473 ast_cond_destroy(&std.cond);
1474 return 0;
1475 }
1476
1477 ast_mutex_lock(&std.lock);
1478 while (!std.complete) {
1479 ast_cond_wait(&std.cond, &std.lock);
1480 }
1481 ast_mutex_unlock(&std.lock);
1482
1483 ast_mutex_destroy(&std.lock);
1484 ast_cond_destroy(&std.cond);
1485 }
1486
1487 return 1;
1488}
1489
1490/*!
1491 * \internal \brief Publish a message to a topic's subscribers
1492 * \brief topic The topic to publish to
1493 * \brief message The message to publish
1494 * \brief sync_sub An optional subscriber of the topic to publish synchronously
1495 * to
1496 */
1497static void publish_msg(struct stasis_topic *topic,
1498 struct stasis_message *message, struct stasis_subscription *sync_sub)
1499{
1500 size_t i;
1501#ifdef AST_DEVMODE
1502 unsigned int dispatched = 0;
1504 struct stasis_message_type_statistics *statistics;
1505 struct timeval start;
1506 long elapsed;
1507#endif
1508
1509 ast_assert(topic != NULL);
1511
1512#ifdef AST_DEVMODE
1513 ast_mutex_lock(&message_type_statistics_lock);
1514 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1515 struct stasis_message_type_statistics new_statistics = {
1516 .published = 0,
1517 };
1518 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1519 ast_mutex_unlock(&message_type_statistics_lock);
1520 return;
1521 }
1522 }
1523 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1524 statistics->message_type = stasis_message_type(message);
1525 ast_mutex_unlock(&message_type_statistics_lock);
1526
1527 ast_atomic_fetchadd_int(&statistics->published, +1);
1528#endif
1529
1530 /* If there are no subscribers don't bother */
1531 if (!stasis_topic_subscribers(topic)) {
1532#ifdef AST_DEVMODE
1533 ast_atomic_fetchadd_int(&statistics->unused, +1);
1534 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1535#endif
1536 return;
1537 }
1538
1539 /*
1540 * The topic may be unref'ed by the subscription invocation.
1541 * Make sure we hold onto a reference while dispatching.
1542 */
1543 ao2_ref(topic, +1);
1544#ifdef AST_DEVMODE
1545 start = ast_tvnow();
1546#endif
1547 ao2_lock(topic);
1548 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1550
1551 ast_assert(sub != NULL);
1552#ifdef AST_DEVMODE
1553 dispatched +=
1554#endif
1555 dispatch_message(sub, message, (sub == sync_sub));
1556 }
1558
1559#ifdef AST_DEVMODE
1560 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1561 if (elapsed > topic->statistics->highest_time_dispatched) {
1562 topic->statistics->highest_time_dispatched = elapsed;
1563 }
1564 if (elapsed < topic->statistics->lowest_time_dispatched) {
1565 topic->statistics->lowest_time_dispatched = elapsed;
1566 }
1567 if (dispatched) {
1568 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1569 } else {
1570 ast_atomic_fetchadd_int(&statistics->unused, +1);
1571 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1572 }
1573#endif
1574
1575 ao2_ref(topic, -1);
1576}
1577
1579{
1581}
1582
1584{
1585 ast_assert(sub != NULL);
1586
1587 publish_msg(sub->topic, message, sub);
1588}
1589
1590/*!
1591 * \brief Forwarding information
1592 *
1593 * Any message posted to \a from_topic is forwarded to \a to_topic.
1594 *
1595 * In cases where both the \a from_topic and \a to_topic need to be locked,
1596 * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1597 */
1599 /*! Originating topic */
1601 /*! Destination topic */
1603};
1604
1605static void forward_dtor(void *obj)
1606{
1607 struct stasis_forward *forward = obj;
1608
1609 ao2_cleanup(forward->from_topic);
1610 forward->from_topic = NULL;
1611 ao2_cleanup(forward->to_topic);
1612 forward->to_topic = NULL;
1613}
1614
1616{
1617 int idx;
1618 struct stasis_topic *from;
1619 struct stasis_topic *to;
1620
1621 if (!forward) {
1622 return NULL;
1623 }
1624
1625 from = forward->from_topic;
1626 to = forward->to_topic;
1627
1628 if (from && to) {
1629 topic_lock_both(to, from);
1632
1633 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1635 }
1636 ao2_unlock(from);
1637 ao2_unlock(to);
1638 }
1639
1640 ao2_cleanup(forward);
1641
1642 return NULL;
1643}
1644
1646 struct stasis_topic *to_topic)
1647{
1648 int res;
1649 size_t idx;
1650 struct stasis_forward *forward;
1651
1652 if (!from_topic || !to_topic) {
1653 return NULL;
1654 }
1655
1656 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1657 if (!forward) {
1658 return NULL;
1659 }
1660
1661 /* Forwards to ourselves are implicit. */
1662 if (to_topic == from_topic) {
1663 return forward;
1664 }
1665
1666 forward->from_topic = ao2_bump(from_topic);
1667 forward->to_topic = ao2_bump(to_topic);
1668
1671 if (res != 0) {
1674 ao2_ref(forward, -1);
1675 return NULL;
1676 }
1677
1678 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1680 }
1683
1684 return forward;
1685}
1686
1687static void subscription_change_dtor(void *obj)
1688{
1689 struct stasis_subscription_change *change = obj;
1690
1691 ao2_cleanup(change->topic);
1692}
1693
1695{
1696 size_t description_len = strlen(description) + 1;
1697 size_t uniqueid_len = strlen(uniqueid) + 1;
1698 struct stasis_subscription_change *change;
1699
1700 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1702 if (!change) {
1703 return NULL;
1704 }
1705
1706 strcpy(change->description, description); /* SAFE */
1707 change->uniqueid = change->description + description_len;
1708 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1709 ao2_ref(topic, +1);
1710 change->topic = topic;
1711
1712 return change;
1713}
1714
1716{
1717 struct stasis_subscription_change *change;
1718 struct stasis_message *msg;
1719
1720 /* This assumes that we have already unsubscribed */
1722
1724 return;
1725 }
1726
1727 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1728 if (!change) {
1729 return;
1730 }
1731
1733 if (!msg) {
1734 ao2_cleanup(change);
1735 return;
1736 }
1737
1738 stasis_publish(topic, msg);
1739 ao2_cleanup(msg);
1740 ao2_cleanup(change);
1741}
1742
1744 struct stasis_subscription *sub)
1745{
1746 struct stasis_subscription_change *change;
1747 struct stasis_message *msg;
1748
1749 /* This assumes that we have already unsubscribed */
1751
1753 return;
1754 }
1755
1756 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1757 if (!change) {
1758 return;
1759 }
1760
1762 if (!msg) {
1763 ao2_cleanup(change);
1764 return;
1765 }
1766
1767 stasis_publish(topic, msg);
1768
1769 /* Now we have to dispatch to the subscription itself */
1770 dispatch_message(sub, msg, 0);
1771
1772 ao2_cleanup(msg);
1773 ao2_cleanup(change);
1774}
1775
1779 char name[0];
1780};
1781
1782static void topic_pool_entry_dtor(void *obj)
1783{
1784 struct topic_pool_entry *entry = obj;
1785
1786 entry->forward = stasis_forward_cancel(entry->forward);
1787 ao2_cleanup(entry->topic);
1788 entry->topic = NULL;
1789}
1790
1791static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1792{
1794
1795 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1797 if (!topic_pool_entry) {
1798 return NULL;
1799 }
1800
1801 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1802
1803 return topic_pool_entry;
1804}
1805
1809};
1810
1811static void topic_pool_dtor(void *obj)
1812{
1813 struct stasis_topic_pool *pool = obj;
1814
1815#ifdef AO2_DEBUG
1816 {
1817 char *container_name =
1818 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1819 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1820 ao2_container_unregister(container_name);
1821 }
1822#endif
1823
1825 pool->pool_container = NULL;
1826 ao2_cleanup(pool->pool_topic);
1827 pool->pool_topic = NULL;
1828}
1829
1830static int topic_pool_entry_hash(const void *obj, const int flags)
1831{
1832 const struct topic_pool_entry *object;
1833 const char *key;
1834
1835 switch (flags & OBJ_SEARCH_MASK) {
1836 case OBJ_SEARCH_KEY:
1837 key = obj;
1838 break;
1839 case OBJ_SEARCH_OBJECT:
1840 object = obj;
1841 key = object->name;
1842 break;
1843 default:
1844 /* Hash can only work on something with a full key. */
1845 ast_assert(0);
1846 return 0;
1847 }
1848 return ast_str_case_hash(key);
1849}
1850
1851static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1852{
1853 const struct topic_pool_entry *object_left = obj;
1854 const struct topic_pool_entry *object_right = arg;
1855 const char *right_key = arg;
1856 int cmp;
1857
1858 switch (flags & OBJ_SEARCH_MASK) {
1859 case OBJ_SEARCH_OBJECT:
1860 right_key = object_right->name;
1861 /* Fall through */
1862 case OBJ_SEARCH_KEY:
1863 cmp = strcasecmp(object_left->name, right_key);
1864 break;
1866 /* Not supported by container */
1867 ast_assert(0);
1868 cmp = -1;
1869 break;
1870 default:
1871 /*
1872 * What arg points to is specific to this traversal callback
1873 * and has no special meaning to astobj2.
1874 */
1875 cmp = 0;
1876 break;
1877 }
1878 if (cmp) {
1879 return 0;
1880 }
1881 /*
1882 * At this point the traversal callback is identical to a sorted
1883 * container.
1884 */
1885 return CMP_MATCH;
1886}
1887
1888#ifdef AO2_DEBUG
1889static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1890{
1891 struct topic_pool_entry *entry = v_obj;
1892
1893 if (!entry) {
1894 return;
1895 }
1896 prnt(where, "%s", stasis_topic_name(entry->topic));
1897}
1898#endif
1899
1901{
1902 struct stasis_topic_pool *pool;
1903
1905 if (!pool) {
1906 return NULL;
1907 }
1908
1911 if (!pool->pool_container) {
1912 ao2_cleanup(pool);
1913 return NULL;
1914 }
1915
1916#ifdef AO2_DEBUG
1917 {
1918 char *container_name =
1919 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1920 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1921 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1922 }
1923#endif
1924
1925 ao2_ref(pooled_topic, +1);
1926 pool->pool_topic = pooled_topic;
1927
1928 return pool;
1929}
1930
1931void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1932{
1933 /*
1934 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1935 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1936 * name and search only on <topic_name>.
1937 */
1938 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1939 int pool_topic_name_len = strlen(pool_topic_name);
1940 const char *search_topic_name;
1941
1942 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1943 search_topic_name = topic_name + pool_topic_name_len + 1;
1944 } else {
1945 search_topic_name = topic_name;
1946 }
1947
1948 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1949}
1950
1951struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1952{
1954 SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1955 char *new_topic_name;
1956 int ret;
1957
1959 if (topic_pool_entry) {
1960 return topic_pool_entry->topic;
1961 }
1962
1964 if (!topic_pool_entry) {
1965 return NULL;
1966 }
1967
1968 /* To provide further detail and to ensure that the topic is unique within the scope of the
1969 * system we prefix it with the pooling topic name, which should itself already be unique.
1970 */
1971 ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1972 if (ret < 0) {
1973 return NULL;
1974 }
1975
1976 topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1977 ast_free(new_topic_name);
1978 if (!topic_pool_entry->topic) {
1979 return NULL;
1980 }
1981
1983 if (!topic_pool_entry->forward) {
1984 return NULL;
1985 }
1986
1988 return NULL;
1989 }
1990
1991 return topic_pool_entry->topic;
1992}
1993
1994int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1995{
1997
1999 if (!topic_pool_entry) {
2000 return 0;
2001 }
2002
2004 return 1;
2005}
2006
2008{
2009#ifdef AST_DEVMODE
2011 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
2012 }
2013#endif
2014}
2015
2016/*! \brief A multi object blob data structure to carry user event stasis messages */
2018 struct ast_json *blob; /*< A blob of JSON data */
2019 AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
2020};
2021
2022/*!
2023 * \internal
2024 * \brief Destructor for \ref ast_multi_object_blob objects
2025 */
2026static void multi_object_blob_dtor(void *obj)
2027{
2028 struct ast_multi_object_blob *multi = obj;
2029 int type;
2030 int i;
2031
2032 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2033 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2035 }
2036 AST_VECTOR_FREE(&multi->snapshots[type]);
2037 }
2038 ast_json_unref(multi->blob);
2039}
2040
2041/*! \brief Create a stasis user event multi object blob */
2043{
2044 int type;
2045 struct ast_multi_object_blob *multi;
2046
2047 ast_assert(blob != NULL);
2048
2049 multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
2050 if (!multi) {
2051 return NULL;
2052 }
2053
2054 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2055 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
2056 ao2_ref(multi, -1);
2057
2058 return NULL;
2059 }
2060 }
2061
2062 multi->blob = ast_json_ref(blob);
2063
2064 return multi;
2065}
2066
2067/*! \brief Add an object (snapshot) to the blob */
2070{
2071 if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2072 ao2_cleanup(object);
2073 }
2074}
2075
2076/*! \brief Publish single channel user event (for app_userevent compatibility) */
2078 struct stasis_message_type *type, struct ast_json *blob)
2079{
2080 struct stasis_message *message;
2081 struct ast_channel_snapshot *channel_snapshot;
2082 struct ast_multi_object_blob *multi;
2083
2084 if (!type) {
2085 return;
2086 }
2087
2089 if (!multi) {
2090 return;
2091 }
2092
2093 channel_snapshot = ast_channel_snapshot_create(chan);
2094 if (!channel_snapshot) {
2095 ao2_ref(multi, -1);
2096 return;
2097 }
2098
2099 /* this call steals the channel_snapshot reference */
2100 ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2101
2103 ao2_ref(multi, -1);
2104 if (message) {
2105 /* app_userevent still publishes to channel */
2107 ao2_ref(message, -1);
2108 }
2109}
2110
2111/*! \internal \brief convert multi object blob to ari json */
2113 struct stasis_message *message,
2114 const struct stasis_message_sanitizer *sanitize)
2115{
2116 struct ast_json *out;
2118 struct ast_json *blob = multi->blob;
2119 const struct timeval *tv = stasis_message_timestamp(message);
2121 int i;
2122
2124 if (!out) {
2125 return NULL;
2126 }
2127
2128 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2129 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2130 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2131 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2132
2133 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2134 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2135 struct ast_json *json_object = NULL;
2136 char *name = NULL;
2137 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2138
2139 switch (type) {
2141 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2142 name = "channel";
2143 break;
2144 case STASIS_UMOS_BRIDGE:
2145 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2146 name = "bridge";
2147 break;
2149 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2150 name = "endpoint";
2151 break;
2152 }
2153 if (json_object) {
2154 ast_json_object_set(out, name, json_object);
2155 }
2156 }
2157 }
2158
2159 return out;
2160}
2161
2162/*! \internal \brief convert multi object blob to ami string */
2163static struct ast_str *multi_object_blob_to_ami(void *obj)
2164{
2165 struct ast_str *ami_str=ast_str_create(1024);
2166 struct ast_str *ami_snapshot;
2167 const struct ast_multi_object_blob *multi = obj;
2169 int i;
2170
2171 if (!ami_str) {
2172 return NULL;
2173 }
2174 if (!multi) {
2175 ast_free(ami_str);
2176 return NULL;
2177 }
2178
2179 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2180 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2181 char *name = NULL;
2182 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2183 ami_snapshot = NULL;
2184
2185 if (i > 0) {
2186 ast_asprintf(&name, "%d", i + 1);
2187 }
2188
2189 switch (type) {
2191 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2192 break;
2193
2194 case STASIS_UMOS_BRIDGE:
2195 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2196 break;
2197
2199 /* currently not sending endpoint snapshots to AMI */
2200 break;
2201 }
2202 if (ami_snapshot) {
2203 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2204 ast_free(ami_snapshot);
2205 }
2206 ast_free(name);
2207 }
2208 }
2209
2210 return ami_str;
2211}
2212
2213/*! \internal \brief Callback to pass only user defined parameters from blob */
2214static int userevent_exclusion_cb(const char *key)
2215{
2216 if (!strcmp("eventname", key)) {
2217 return 1;
2218 }
2219 return 0;
2220}
2221
2223 struct stasis_message *message)
2224{
2225 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2226 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2228 const char *eventname;
2229
2230 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2232 object_string = multi_object_blob_to_ami(multi);
2233 if (!object_string || !body) {
2234 return NULL;
2235 }
2236
2238 "%s"
2239 "UserEvent: %s\r\n"
2240 "%s",
2241 ast_str_buffer(object_string),
2242 eventname,
2243 ast_str_buffer(body));
2244}
2245
2246/*! \brief A structure to hold global configuration-related options */
2248 /*! The list of message types to decline */
2250};
2251
2252/*! \brief Taskpool configuration options */
2254 /*! Minimum size of the taskpool */
2256 /*! Initial size of the taskpool */
2258 /*! Time, in seconds, before we expire a taskprocessor */
2260 /*! Maximum number of taskprocessors to allow */
2262};
2263
2265 /*! Taskpool configuration options */
2267 /*! Declined message types */
2269};
2270
2272 .type = ACO_GLOBAL,
2273 .name = "threadpool",
2274 .item_offset = offsetof(struct stasis_config, taskpool_options),
2275 .category = "threadpool",
2276 .category_match = ACO_WHITELIST_EXACT,
2277};
2278
2279static struct aco_type taskpool_option = {
2280 .type = ACO_GLOBAL,
2281 .name = "taskpool",
2282 .item_offset = offsetof(struct stasis_config, taskpool_options),
2283 .category = "taskpool",
2284 .category_match = ACO_WHITELIST_EXACT,
2285};
2286
2288
2289/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2290static struct aco_type declined_option = {
2291 .type = ACO_GLOBAL,
2292 .name = "declined_message_types",
2293 .item_offset = offsetof(struct stasis_config, declined_message_types),
2294 .category_match = ACO_WHITELIST_EXACT,
2295 .category = "declined_message_types",
2296};
2297
2299
2301 .filename = "stasis.conf",
2303};
2304
2305/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2307
2308static void *stasis_config_alloc(void);
2309
2310/*! \brief Register information about the configs being processed by this module */
2312 .files = ACO_FILES(&stasis_conf),
2313);
2314
2316{
2317 struct stasis_declined_config *declined = obj;
2318
2319 ao2_cleanup(declined->declined);
2320}
2321
2322static void stasis_config_destructor(void *obj)
2323{
2324 struct stasis_config *cfg = obj;
2325
2328}
2329
2330static void *stasis_config_alloc(void)
2331{
2332 struct stasis_config *cfg;
2333
2334 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2335 return NULL;
2336 }
2337
2338 cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options));
2339 if (!cfg->taskpool_options) {
2340 ao2_ref(cfg, -1);
2341 return NULL;
2342 }
2343
2346 if (!cfg->declined_message_types) {
2347 ao2_ref(cfg, -1);
2348 return NULL;
2349 }
2350
2352 if (!cfg->declined_message_types->declined) {
2353 ao2_ref(cfg, -1);
2354 return NULL;
2355 }
2356
2357 return cfg;
2358}
2359
2361{
2363 char *name_in_declined;
2364 int res;
2365
2366 if (!cfg || !cfg->declined_message_types) {
2367 ao2_cleanup(cfg);
2368 return 0;
2369 }
2370
2371 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2372 res = name_in_declined ? 1 : 0;
2373 ao2_cleanup(name_in_declined);
2374 ao2_ref(cfg, -1);
2375 if (res) {
2376 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2377 }
2378 return res;
2379}
2380
2381static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2382{
2383 struct stasis_declined_config *declined = obj;
2384
2385 if (ast_strlen_zero(var->value)) {
2386 return 0;
2387 }
2388
2389 if (ast_str_container_add(declined->declined, var->value)) {
2390 return -1;
2391 }
2392
2393 return 0;
2394}
2395
2396/*!
2397 * @{ \brief Define multi user event message type(s).
2398 */
2399
2401 .to_json = multi_user_event_to_json,
2403 );
2404
2405/*! @} */
2406
2407/*!
2408 * \internal
2409 * \brief CLI command implementation for 'stasis show topics'
2410 */
2411static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2412{
2413 struct ao2_iterator iter;
2414 struct topic_proxy *topic;
2415 struct ao2_container *tmp_container;
2416 int count = 0;
2417#define FMT_HEADERS "%-64s %-64s\n"
2418#define FMT_FIELDS "%-64s %-64s\n"
2419
2420 switch (cmd) {
2421 case CLI_INIT:
2422 e->command = "stasis show topics";
2423 e->usage =
2424 "Usage: stasis show topics\n"
2425 " Shows a list of topics\n";
2426 return NULL;
2427 case CLI_GENERATE:
2428 return NULL;
2429 }
2430
2431 if (a->argc != e->args) {
2432 return CLI_SHOWUSAGE;
2433 }
2434
2435 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2436
2438 topic_proxy_sort_fn, NULL);
2439
2440 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2441 ao2_cleanup(tmp_container);
2442
2443 return NULL;
2444 }
2445
2446 /* getting all topic in order */
2447 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2448 while ((topic = ao2_iterator_next(&iter))) {
2449 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2450 ao2_ref(topic, -1);
2451 ++count;
2452 }
2453 ao2_iterator_destroy(&iter);
2454 ao2_cleanup(tmp_container);
2455
2456 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2457
2458#undef FMT_HEADERS
2459#undef FMT_FIELDS
2460
2461 return CLI_SUCCESS;
2462}
2463
2464/*!
2465 * \internal
2466 * \brief CLI tab completion for topic names
2467 */
2468static char *topic_complete_name(const char *word)
2469{
2470 struct topic_proxy *topic;
2471 struct ao2_iterator it;
2472 int wordlen = strlen(word);
2473 int ret;
2474
2476 while ((topic = ao2_iterator_next(&it))) {
2477 if (!strncasecmp(word, topic->name, wordlen)) {
2478 ret = ast_cli_completion_add(ast_strdup(topic->name));
2479 if (ret) {
2480 ao2_ref(topic, -1);
2481 break;
2482 }
2483 }
2484 ao2_ref(topic, -1);
2485 }
2487 return NULL;
2488}
2489
2490/*!
2491 * \internal
2492 * \brief CLI command implementation for 'stasis show topic'
2493 */
2494static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2495{
2496 struct stasis_topic *topic;
2497 char print_time[32];
2498 int i;
2499
2500 switch (cmd) {
2501 case CLI_INIT:
2502 e->command = "stasis show topic";
2503 e->usage =
2504 "Usage: stasis show topic <name>\n"
2505 " Show stasis topic detail info.\n";
2506 return NULL;
2507 case CLI_GENERATE:
2508 if (a->pos == 3) {
2509 return topic_complete_name(a->word);
2510 } else {
2511 return NULL;
2512 }
2513 }
2514
2515 if (a->argc != 4) {
2516 return CLI_SHOWUSAGE;
2517 }
2518
2519 topic = stasis_topic_get(a->argv[3]);
2520 if (!topic) {
2521 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2522 return CLI_FAILURE;
2523 }
2524
2525 ast_cli(a->fd, "Name: %s\n", topic->name);
2526 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2527 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2528 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2529 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2530 ast_cli(a->fd, "Duration time: %s\n", print_time);
2531
2532 ao2_lock(topic);
2533 ast_cli(a->fd, "\nSubscribers:\n");
2534 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2535 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2536 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2537 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2538 }
2539
2540 ast_cli(a->fd, "\nForwarded topics:\n");
2541 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2542 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2543 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2544 }
2545 ao2_unlock(topic);
2546
2547 ao2_ref(topic, -1);
2548
2549 return CLI_SUCCESS;
2550}
2551
2552
2553static struct ast_cli_entry cli_stasis[] = {
2554 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2555 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2556};
2557
2558
2559#ifdef AST_DEVMODE
2560
2561AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2562
2563/*!
2564 * \internal
2565 * \brief CLI command implementation for 'stasis statistics show subscriptions'
2566 */
2567static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2568{
2569 struct ao2_container *sorted_subscriptions;
2570 struct ao2_container *subscription_stats;
2571 struct ao2_iterator iter;
2572 struct stasis_subscription_statistics *statistics;
2573 int count = 0;
2574 int dropped = 0;
2575 int passed = 0;
2576#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2577#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2578#define FMT_FIELDS2 "%-64s %10d %10d\n"
2579
2580 switch (cmd) {
2581 case CLI_INIT:
2582 e->command = "stasis statistics show subscriptions";
2583 e->usage =
2584 "Usage: stasis statistics show subscriptions\n"
2585 " Shows a list of subscriptions and their general statistics\n";
2586 return NULL;
2587 case CLI_GENERATE:
2588 return NULL;
2589 }
2590
2591 if (a->argc != e->args) {
2592 return CLI_SHOWUSAGE;
2593 }
2594
2595 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2596 if (!subscription_stats) {
2597 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2598 return CLI_FAILURE;
2599 }
2600
2601 sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2602 stasis_subscription_statistics_sort_fn, NULL);
2603 if (!sorted_subscriptions) {
2604 ao2_ref(subscription_stats, -1);
2605 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2606 return CLI_SUCCESS;
2607 }
2608
2609 if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2610 ao2_ref(sorted_subscriptions, -1);
2611 ao2_ref(subscription_stats, -1);
2612 ast_cli(a->fd, "Could not sort subscription statistics\n");
2613 return CLI_SUCCESS;
2614 }
2615
2616 ao2_ref(subscription_stats, -1);
2617
2618 ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2619
2620 iter = ao2_iterator_init(sorted_subscriptions, 0);
2621 while ((statistics = ao2_iterator_next(&iter))) {
2622 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2623 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2624 dropped += statistics->messages_dropped;
2625 passed += statistics->messages_passed;
2626 ao2_ref(statistics, -1);
2627 ++count;
2628 }
2629 ao2_iterator_destroy(&iter);
2630
2631 ao2_ref(sorted_subscriptions, -1);
2632
2633 ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2634 ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2635
2636#undef FMT_HEADERS
2637#undef FMT_FIELDS
2638#undef FMT_FIELDS2
2639
2640 return CLI_SUCCESS;
2641}
2642
2643/*!
2644 * \internal
2645 * \brief CLI tab completion for subscription statistics names
2646 */
2647static char *subscription_statistics_complete_name(const char *word, int state)
2648{
2649 struct stasis_subscription_statistics *statistics;
2650 struct ao2_container *subscription_stats;
2651 struct ao2_iterator it_statistics;
2652 int wordlen = strlen(word);
2653 int which = 0;
2654 char *result = NULL;
2655
2656 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2657 if (!subscription_stats) {
2658 return result;
2659 }
2660
2661 it_statistics = ao2_iterator_init(subscription_stats, 0);
2662 while ((statistics = ao2_iterator_next(&it_statistics))) {
2663 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2664 && ++which > state) {
2665 result = ast_strdup(statistics->uniqueid);
2666 }
2667 ao2_ref(statistics, -1);
2668 if (result) {
2669 break;
2670 }
2671 }
2672 ao2_iterator_destroy(&it_statistics);
2673 ao2_ref(subscription_stats, -1);
2674 return result;
2675}
2676
2677/*!
2678 * \internal
2679 * \brief CLI command implementation for 'stasis statistics show subscription'
2680 */
2681static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2682{
2683 struct stasis_subscription_statistics *statistics;
2684 struct ao2_container *subscription_stats;
2685 struct ao2_iterator i;
2686 char *name;
2687
2688 switch (cmd) {
2689 case CLI_INIT:
2690 e->command = "stasis statistics show subscription";
2691 e->usage =
2692 "Usage: stasis statistics show subscription <uniqueid>\n"
2693 " Show stasis subscription statistics.\n";
2694 return NULL;
2695 case CLI_GENERATE:
2696 if (a->pos == 4) {
2697 return subscription_statistics_complete_name(a->word, a->n);
2698 } else {
2699 return NULL;
2700 }
2701 }
2702
2703 if (a->argc != 5) {
2704 return CLI_SHOWUSAGE;
2705 }
2706
2707 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2708 if (!subscription_stats) {
2709 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2710 return CLI_FAILURE;
2711 }
2712
2713 statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2714 if (!statistics) {
2715 ao2_ref(subscription_stats, -1);
2716 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2717 return CLI_FAILURE;
2718 }
2719
2720 ao2_ref(subscription_stats, -1);
2721
2722 ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2723 ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2724 ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2725 ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2726 ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2727 ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2728 ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2729 ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2730 ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No");
2731 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2732 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2733
2735 if (statistics->highest_time_message_type) {
2736 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2737 }
2739
2740 ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2741
2742 ast_cli(a->fd, "Subscribed topics:\n");
2743 i = ao2_iterator_init(statistics->topics, 0);
2744 while ((name = ao2_iterator_next(&i))) {
2745 ast_cli(a->fd, "\t%s\n", name);
2746 ao2_ref(name, -1);
2747 }
2749
2750 ao2_ref(statistics, -1);
2751
2752 return CLI_SUCCESS;
2753}
2754
2755AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2756
2757/*!
2758 * \internal
2759 * \brief CLI command implementation for 'stasis statistics show topics'
2760 */
2761static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2762{
2763 struct ao2_container *sorted_topics;
2764 struct ao2_container *topic_stats;
2765 struct ao2_iterator iter;
2766 struct stasis_topic_statistics *statistics;
2767 int count = 0;
2768 int not_dispatched = 0;
2769 int dispatched = 0;
2770#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2771#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2772#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2773
2774 switch (cmd) {
2775 case CLI_INIT:
2776 e->command = "stasis statistics show topics";
2777 e->usage =
2778 "Usage: stasis statistics show topics\n"
2779 " Shows a list of topics and their general statistics\n";
2780 return NULL;
2781 case CLI_GENERATE:
2782 return NULL;
2783 }
2784
2785 if (a->argc != e->args) {
2786 return CLI_SHOWUSAGE;
2787 }
2788
2789 topic_stats = ao2_global_obj_ref(topic_statistics);
2790 if (!topic_stats) {
2791 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2792 return CLI_FAILURE;
2793 }
2794
2796 stasis_topic_statistics_sort_fn, NULL);
2797 if (!sorted_topics) {
2798 ao2_ref(topic_stats, -1);
2799 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2800 return CLI_SUCCESS;
2801 }
2802
2803 if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2804 ao2_ref(sorted_topics, -1);
2805 ao2_ref(topic_stats, -1);
2806 ast_cli(a->fd, "Could not sort topic statistics\n");
2807 return CLI_SUCCESS;
2808 }
2809
2810 ao2_ref(topic_stats, -1);
2811
2812 ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2813
2814 iter = ao2_iterator_init(sorted_topics, 0);
2815 while ((statistics = ao2_iterator_next(&iter))) {
2816 ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2817 statistics->messages_not_dispatched, statistics->messages_dispatched,
2818 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2819 not_dispatched += statistics->messages_not_dispatched;
2820 dispatched += statistics->messages_dispatched;
2821 ao2_ref(statistics, -1);
2822 ++count;
2823 }
2824 ao2_iterator_destroy(&iter);
2825
2826 ao2_ref(sorted_topics, -1);
2827
2828 ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2829 ast_cli(a->fd, "\n%d topics\n\n", count);
2830
2831#undef FMT_HEADERS
2832#undef FMT_FIELDS
2833#undef FMT_FIELDS2
2834
2835 return CLI_SUCCESS;
2836}
2837
2838/*!
2839 * \internal
2840 * \brief CLI tab completion for topic statistics names
2841 */
2842static char *topic_statistics_complete_name(const char *word, int state)
2843{
2844 struct stasis_topic_statistics *statistics;
2845 struct ao2_container *topic_stats;
2846 struct ao2_iterator it_statistics;
2847 int wordlen = strlen(word);
2848 int which = 0;
2849 char *result = NULL;
2850
2851 topic_stats = ao2_global_obj_ref(topic_statistics);
2852 if (!topic_stats) {
2853 return result;
2854 }
2855
2856 it_statistics = ao2_iterator_init(topic_stats, 0);
2857 while ((statistics = ao2_iterator_next(&it_statistics))) {
2858 if (!strncasecmp(word, statistics->name, wordlen)
2859 && ++which > state) {
2860 result = ast_strdup(statistics->name);
2861 }
2862 ao2_ref(statistics, -1);
2863 if (result) {
2864 break;
2865 }
2866 }
2867 ao2_iterator_destroy(&it_statistics);
2868 ao2_ref(topic_stats, -1);
2869 return result;
2870}
2871
2872/*!
2873 * \internal
2874 * \brief CLI command implementation for 'stasis statistics show topic'
2875 */
2876static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2877{
2878 struct stasis_topic_statistics *statistics;
2879 struct ao2_container *topic_stats;
2880 struct ao2_iterator i;
2881 char *uniqueid;
2882
2883 switch (cmd) {
2884 case CLI_INIT:
2885 e->command = "stasis statistics show topic";
2886 e->usage =
2887 "Usage: stasis statistics show topic <name>\n"
2888 " Show stasis topic statistics.\n";
2889 return NULL;
2890 case CLI_GENERATE:
2891 if (a->pos == 4) {
2892 return topic_statistics_complete_name(a->word, a->n);
2893 } else {
2894 return NULL;
2895 }
2896 }
2897
2898 if (a->argc != 5) {
2899 return CLI_SHOWUSAGE;
2900 }
2901
2902 topic_stats = ao2_global_obj_ref(topic_statistics);
2903 if (!topic_stats) {
2904 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2905 return CLI_FAILURE;
2906 }
2907
2908 statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2909 if (!statistics) {
2910 ao2_ref(topic_stats, -1);
2911 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2912 return CLI_FAILURE;
2913 }
2914
2915 ao2_ref(topic_stats, -1);
2916
2917 ast_cli(a->fd, "Topic: %s\n", statistics->name);
2918 ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2919 ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2920 ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2921 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2922 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2923 ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2924
2925 ast_cli(a->fd, "Subscribers:\n");
2926 i = ao2_iterator_init(statistics->subscribers, 0);
2927 while ((uniqueid = ao2_iterator_next(&i))) {
2928 ast_cli(a->fd, "\t%s\n", uniqueid);
2929 ao2_ref(uniqueid, -1);
2930 }
2932
2933 ao2_ref(statistics, -1);
2934
2935 return CLI_SUCCESS;
2936}
2937
2938/*!
2939 * \internal
2940 * \brief CLI command implementation for 'stasis statistics show messages'
2941 */
2942static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2943{
2944 int i;
2945 int count = 0;
2946 int published = 0;
2947 int unused = 0;
2948#define FMT_HEADERS "%-64s %10s %10s\n"
2949#define FMT_FIELDS "%-64s %10d %10d\n"
2950
2951 switch (cmd) {
2952 case CLI_INIT:
2953 e->command = "stasis statistics show messages";
2954 e->usage =
2955 "Usage: stasis statistics show messages\n"
2956 " Shows a list of message types and their general statistics\n";
2957 return NULL;
2958 case CLI_GENERATE:
2959 return NULL;
2960 }
2961
2962 if (a->argc != e->args) {
2963 return CLI_SHOWUSAGE;
2964 }
2965
2966 ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2967
2968 ast_mutex_lock(&message_type_statistics_lock);
2969 for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2970 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2971
2972 if (!statistics->message_type) {
2973 continue;
2974 }
2975
2976 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2977 statistics->unused);
2978 published += statistics->published;
2979 unused += statistics->unused;
2980 ++count;
2981 }
2982 ast_mutex_unlock(&message_type_statistics_lock);
2983
2984 ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2985 ast_cli(a->fd, "\n%d seen message types\n\n", count);
2986
2987#undef FMT_HEADERS
2988#undef FMT_FIELDS
2989
2990 return CLI_SUCCESS;
2991}
2992
2993static struct ast_cli_entry cli_stasis_statistics[] = {
2994 AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2995 AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2996 AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2997 AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2998 AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2999};
3000
3001static int subscription_statistics_hash(const void *obj, const int flags)
3002{
3003 const struct stasis_subscription_statistics *object;
3004 const char *key;
3005
3006 switch (flags & OBJ_SEARCH_MASK) {
3007 case OBJ_SEARCH_KEY:
3008 key = obj;
3009 break;
3010 case OBJ_SEARCH_OBJECT:
3011 object = obj;
3012 key = object->uniqueid;
3013 break;
3014 default:
3015 /* Hash can only work on something with a full key. */
3016 ast_assert(0);
3017 return 0;
3018 }
3019 return ast_str_case_hash(key);
3020}
3021
3022static int subscription_statistics_cmp(void *obj, void *arg, int flags)
3023{
3024 const struct stasis_subscription_statistics *object_left = obj;
3025 const struct stasis_subscription_statistics *object_right = arg;
3026 const char *right_key = arg;
3027 int cmp;
3028
3029 switch (flags & OBJ_SEARCH_MASK) {
3030 case OBJ_SEARCH_OBJECT:
3031 right_key = object_right->uniqueid;
3032 /* Fall through */
3033 case OBJ_SEARCH_KEY:
3034 cmp = strcasecmp(object_left->uniqueid, right_key);
3035 break;
3037 /* Not supported by container */
3038 ast_assert(0);
3039 cmp = -1;
3040 break;
3041 default:
3042 /*
3043 * What arg points to is specific to this traversal callback
3044 * and has no special meaning to astobj2.
3045 */
3046 cmp = 0;
3047 break;
3048 }
3049 if (cmp) {
3050 return 0;
3051 }
3052 /*
3053 * At this point the traversal callback is identical to a sorted
3054 * container.
3055 */
3056 return CMP_MATCH;
3057}
3058
3059static int topic_statistics_hash(const void *obj, const int flags)
3060{
3061 const struct stasis_topic_statistics *object;
3062 const char *key;
3063
3064 switch (flags & OBJ_SEARCH_MASK) {
3065 case OBJ_SEARCH_KEY:
3066 key = obj;
3067 break;
3068 case OBJ_SEARCH_OBJECT:
3069 object = obj;
3070 key = object->name;
3071 break;
3072 default:
3073 /* Hash can only work on something with a full key. */
3074 ast_assert(0);
3075 return 0;
3076 }
3077 return ast_str_case_hash(key);
3078}
3079
3080static int topic_statistics_cmp(void *obj, void *arg, int flags)
3081{
3082 const struct stasis_topic_statistics *object_left = obj;
3083 const struct stasis_topic_statistics *object_right = arg;
3084 const char *right_key = arg;
3085 int cmp;
3086
3087 switch (flags & OBJ_SEARCH_MASK) {
3088 case OBJ_SEARCH_OBJECT:
3089 right_key = object_right->name;
3090 /* Fall through */
3091 case OBJ_SEARCH_KEY:
3092 cmp = strcasecmp(object_left->name, right_key);
3093 break;
3095 /* Not supported by container */
3096 ast_assert(0);
3097 cmp = -1;
3098 break;
3099 default:
3100 /*
3101 * What arg points to is specific to this traversal callback
3102 * and has no special meaning to astobj2.
3103 */
3104 cmp = 0;
3105 break;
3106 }
3107 if (cmp) {
3108 return 0;
3109 }
3110 /*
3111 * At this point the traversal callback is identical to a sorted
3112 * container.
3113 */
3114 return CMP_MATCH;
3115}
3116#endif
3117
3118/*! \brief Cleanup function for graceful shutdowns */
3119static void stasis_cleanup(void)
3120{
3121#ifdef AST_DEVMODE
3122 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3123 AST_VECTOR_FREE(&message_type_statistics);
3124 ao2_global_obj_release(subscription_statistics);
3125 ao2_global_obj_release(topic_statistics);
3126#endif
3129 topic_all = NULL;
3131 taskpool = NULL;
3134 aco_info_destroy(&cfg_info);
3136}
3137
3139{
3140 struct stasis_config *cfg;
3141 int cache_init;
3142 struct ast_taskpool_options taskpool_opts = { 0, };
3143#ifdef AST_DEVMODE
3144 struct ao2_container *subscription_stats;
3145 struct ao2_container *topic_stats;
3146#endif
3147
3148 /* Be sure the types are cleaned up after the message bus */
3150
3151 if (aco_info_init(&cfg_info)) {
3152 return -1;
3153 }
3154
3155 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3157 aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
3159 FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
3160 INT_MAX);
3161 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3163 FLDSET(struct stasis_taskpool_conf, initial_size), 0,
3164 INT_MAX);
3165 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3167 FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
3168 INT_MAX);
3169 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3171 FLDSET(struct stasis_taskpool_conf, max_size), 0,
3172 INT_MAX);
3173
3174 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3175 struct stasis_config *default_cfg = stasis_config_alloc();
3176
3177 if (!default_cfg) {
3178 return -1;
3179 }
3180
3181 if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
3182 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3183 ao2_ref(default_cfg, -1);
3184
3185 return -1;
3186 }
3187
3188 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3189 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3190 ao2_ref(default_cfg, -1);
3191
3192 return -1;
3193 }
3194
3195 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3197 cfg = default_cfg;
3198 } else {
3200 if (!cfg) {
3201 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3202
3203 return -1;
3204 }
3205 }
3206
3207 taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
3208 taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
3209 taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
3210 taskpool_opts.auto_increment = 1;
3211 taskpool_opts.max_size = cfg->taskpool_options->max_size;
3212 taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
3213 taskpool = ast_taskpool_create("stasis", &taskpool_opts);
3214 ao2_ref(cfg, -1);
3215 if (!taskpool) {
3216 ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
3217
3218 return -1;
3219 }
3220
3221 cache_init = stasis_cache_init();
3222 if (cache_init != 0) {
3223 return -1;
3224 }
3225
3227 return -1;
3228 }
3230 return -1;
3231 }
3232
3234 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3235 if (!topic_all) {
3236 return -1;
3237 }
3238
3240 return -1;
3241 }
3242
3243#ifdef AST_DEVMODE
3244 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3245 * topic or subscripton.
3246 */
3247 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3248 subscription_statistics_hash, 0, subscription_statistics_cmp);
3249 if (!subscription_stats) {
3250 return -1;
3251 }
3252 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3253 ao2_cleanup(subscription_stats);
3254
3255 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3256 topic_statistics_hash, 0, topic_statistics_cmp);
3257 if (!topic_stats) {
3258 return -1;
3259 }
3260 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3261 ao2_cleanup(topic_stats);
3262 if (!topic_stats) {
3263 return -1;
3264 }
3265
3266 AST_VECTOR_INIT(&message_type_statistics, 0);
3267
3268 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3269 return -1;
3270 }
3271#endif
3272
3273 return 0;
3274}
#define var
Definition: ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static PGresult * result
Definition: cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags)
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2737
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:899
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:551
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2068
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2077
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:2042
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
#define ast_cond_destroy(cond)
Definition: lock.h:209
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:611
#define ast_cond_wait(cond, mutex)
Definition: lock.h:212
#define ast_cond_init(cond, attr)
Definition: lock.h:208
#define ast_mutex_init(pmutex)
Definition: lock.h:193
#define ast_mutex_unlock(a)
Definition: lock.h:197
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:764
pthread_cond_t ast_cond_t
Definition: lock.h:185
#define ast_mutex_destroy(a)
Definition: lock.h:195
#define ast_mutex_lock(a)
Definition: lock.h:196
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:527
#define ast_cond_signal(cond)
Definition: lock.h:210
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10144
#define EVENT_FLAG_USER
Definition: manager.h:81
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ao2_container * container
Definition: res_fax.c:531
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2163
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1185
static char * topic_complete_name(const char *word)
Definition: stasis.c:2468
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:1038
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2553
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:368
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2112
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1687
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1931
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2222
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1120
#define FMT_HEADERS
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:694
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1353
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1078
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:923
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1951
static int sub_cleanup(void *data)
Definition: stasis.c:1031
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1615
static void forward_dtor(void *obj)
Definition: stasis.c:1605
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:684
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:710
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1811
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1583
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1090
static void * stasis_config_alloc(void)
Definition: stasis.c:2330
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1791
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1144
static void topic_dtor(void *obj)
Definition: stasis.c:500
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:371
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1694
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3119
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:689
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3138
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:479
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:876
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:1020
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1851
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2322
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1172
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2360
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1830
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:2007
static struct aco_type threadpool_option
Definition: stasis.c:2271
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:2026
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:567
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1268
static struct aco_type taskpool_option
Definition: stasis.c:2279
static void subscription_dtor(void *obj)
Definition: stasis.c:781
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:635
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1241
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:702
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1201
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1743
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1994
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1497
static struct ast_taskpool * taskpool
Definition: stasis.c:374
static struct aco_type * taskpool_options[]
Definition: stasis.c:2287
struct aco_file stasis_conf
Definition: stasis.c:2300
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1715
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2315
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1379
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1160
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1236
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1900
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:821
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1645
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:1009
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1782
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1578
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1326
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1217
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2381
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2214
struct aco_type * declined_options[]
Definition: stasis.c:2298
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:492
struct ao2_container * topic_all
Definition: stasis.c:462
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1296
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2411
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:384
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2290
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2494
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition: cli.h:171
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:503
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:2017
struct ast_multi_object_blob::@402 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:2018
Structure for mutex and tracking information.
Definition: lock.h:142
Support for dynamic strings.
Definition: strings.h:623
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition: taskpool.h:81
int max_size
Maximum number of taskprocessors a pool may have.
Definition: taskpool.h:115
int auto_increment
Number of taskprocessors to increment the pool by.
Definition: taskpool.h:85
int minimum_size
Number of taskprocessors that will always exist.
Definition: taskpool.h:92
int initial_size
Number of taskprocessors the pool will start with.
Definition: taskpool.h:102
An opaque taskpool structure.
Definition: taskpool.c:62
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2268
struct stasis_taskpool_conf * taskpool_options
Definition: stasis.c:2266
A structure to hold global configuration-related options.
Definition: stasis.c:2247
struct ao2_container * declined
Definition: stasis.c:2249
Forwarding information.
Definition: stasis.c:1598
struct stasis_topic * from_topic
Definition: stasis.c:1600
struct stasis_topic * to_topic
Definition: stasis.c:1602
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
int final_message_rxed
Definition: stasis.c:763
struct stasis_topic * topic
Definition: stasis.c:751
ast_cond_t join_cond
Definition: stasis.c:760
stasis_subscription_cb callback
Definition: stasis.c:755
struct ast_taskprocessor * mailbox
Definition: stasis.c:753
enum stasis_subscription_message_filter filter
Definition: stasis.c:773
int final_message_processed
Definition: stasis.c:766
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:771
struct stasis_subscription::@401 accepted_message_types
Taskpool configuration options.
Definition: stasis.c:2253
struct ao2_container * pool_container
Definition: stasis.c:1807
struct stasis_topic * pool_topic
Definition: stasis.c:1808
char * name
Definition: stasis.c:453
int subscriber_id
Definition: stasis.c:450
char * detail
Definition: stasis.c:456
struct stasis_topic::@400 upstream_topics
struct stasis_topic::@399 subscribers
struct timeval * creationtime
Definition: stasis.c:459
ast_cond_t cond
Definition: stasis.c:1343
void * task_data
Definition: stasis.c:1345
ast_mutex_t lock
Definition: stasis.c:1342
Definition: stasis.c:1776
struct stasis_topic * topic
Definition: stasis.c:1778
struct stasis_forward * forward
Definition: stasis.c:1777
char name[0]
Definition: stasis.c:1779
char buf[0]
Definition: stasis.c:472
struct timeval creationtime
Definition: stasis.c:470
char * name
Definition: stasis.c:467
char * detail
Definition: stasis.c:468
#define AST_TASKPOOL_OPTIONS_VERSION
Definition: taskpool.h:69
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:819
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition: taskpool.c:653
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition: taskpool.c:324
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_val a
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2333
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
FILE * out
Definition: utils/frame.c:33
static void statistics(void)
Definition: utils/frame.c:287
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978
#define ast_assert(a)
Definition: utils.h:776
#define ARRAY_LEN(a)
Definition: utils.h:703
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:295
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:582
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:185
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:594
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:679