Asterisk - The Open Source Telephony Project GIT-master-6144b6b
Loading...
Searching...
No Matches
stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message Bus API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/stasis.h"
36#include "asterisk/taskpool.h"
37#include "asterisk/utils.h"
38#include "asterisk/uuid.h"
39#include "asterisk/vector.h"
44#include "asterisk/cli.h"
45
46/*** DOCUMENTATION
47 <managerEvent language="en_US" name="UserEvent">
48 <managerEventInstance class="EVENT_FLAG_USER">
49 <since>
50 <version>12.3.0</version>
51 </since>
52 <synopsis>A user defined event raised from the dialplan.</synopsis>
53 <syntax>
54 <channel_snapshot/>
55 <parameter name="UserEvent">
56 <para>The event name, as specified in the dialplan.</para>
57 </parameter>
58 </syntax>
59 <description>
60 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
61 </description>
62 <see-also>
63 <ref type="application">UserEvent</ref>
64 <ref type="managerEvent">UserEvent</ref>
65 </see-also>
66 </managerEventInstance>
67 </managerEvent>
68 <configInfo name="stasis" language="en_US">
69 <configFile name="stasis.conf">
70 <configObject name="threadpool">
71 <since>
72 <version>12.8.0</version>
73 <version>13.1.0</version>
74 </since>
75 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
76 <configOption name="initial_size" default="5">
77 <since>
78 <version>12.8.0</version>
79 <version>13.1.0</version>
80 </since>
81 <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
82 </configOption>
83 <configOption name="idle_timeout_sec" default="20">
84 <since>
85 <version>12.8.0</version>
86 <version>13.1.0</version>
87 </since>
88 <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
89 </configOption>
90 <configOption name="max_size" default="50">
91 <since>
92 <version>12.8.0</version>
93 <version>13.1.0</version>
94 </since>
95 <synopsis>Maximum number of threads in the threadpool.</synopsis>
96 </configOption>
97 </configObject>
98 <configObject name="taskpool">
99 <since>
100 <version>23.1.0</version>
101 <version>22.7.0</version>
102 <version>20.17.0</version>
103 </since>
104 <synopsis>Settings that configure the taskpool Stasis uses to deliver some messages.</synopsis>
105 <configOption name="minimum_size" default="5">
106 <since>
107 <version>23.1.0</version>
108 <version>22.7.0</version>
109 <version>20.17.0</version>
110 </since>
111 <synopsis>Minimum number of taskprocessors in the message bus taskpool.</synopsis>
112 </configOption>
113 <configOption name="initial_size" default="5">
114 <since>
115 <version>23.1.0</version>
116 <version>22.7.0</version>
117 <version>20.17.0</version>
118 </since>
119 <synopsis>Initial number of taskprocessors in the message bus taskpool.</synopsis>
120 </configOption>
121 <configOption name="idle_timeout_sec" default="20">
122 <since>
123 <version>23.1.0</version>
124 <version>22.7.0</version>
125 <version>20.17.0</version>
126 </since>
127 <synopsis>Number of seconds before an idle taskprocessor is disposed of.</synopsis>
128 </configOption>
129 <configOption name="max_size" default="50">
130 <since>
131 <version>23.1.0</version>
132 <version>22.7.0</version>
133 <version>20.17.0</version>
134 </since>
135 <synopsis>Maximum number of taskprocessors in the taskpool.</synopsis>
136 </configOption>
137 </configObject>
138 <configObject name="declined_message_types">
139 <since>
140 <version>13.0.0</version>
141 </since>
142 <synopsis>Stasis message types for which to decline creation.</synopsis>
143 <configOption name="decline">
144 <since>
145 <version>12.8.0</version>
146 <version>13.1.0</version>
147 </since>
148 <synopsis>The message type to decline.</synopsis>
149 <description>
150 <para>This configuration option defines the name of the Stasis
151 message type that Asterisk is forbidden from creating and can be
152 specified as many times as necessary to achieve the desired result.</para>
153 <enumlist>
154 <enum name="stasis_app_recording_snapshot_type" />
155 <enum name="stasis_app_playback_snapshot_type" />
156 <enum name="stasis_test_message_type" />
157 <enum name="confbridge_start_type" />
158 <enum name="confbridge_end_type" />
159 <enum name="confbridge_join_type" />
160 <enum name="confbridge_leave_type" />
161 <enum name="confbridge_start_record_type" />
162 <enum name="confbridge_stop_record_type" />
163 <enum name="confbridge_mute_type" />
164 <enum name="confbridge_unmute_type" />
165 <enum name="confbridge_talking_type" />
166 <enum name="cel_generic_type" />
167 <enum name="ast_bridge_snapshot_type" />
168 <enum name="ast_bridge_merge_message_type" />
169 <enum name="ast_channel_entered_bridge_type" />
170 <enum name="ast_channel_left_bridge_type" />
171 <enum name="ast_blind_transfer_type" />
172 <enum name="ast_attended_transfer_type" />
173 <enum name="ast_endpoint_snapshot_type" />
174 <enum name="ast_endpoint_state_type" />
175 <enum name="ast_device_state_message_type" />
176 <enum name="ast_test_suite_message_type" />
177 <enum name="ast_mwi_state_type" />
178 <enum name="ast_mwi_vm_app_type" />
179 <enum name="ast_format_register_type" />
180 <enum name="ast_format_unregister_type" />
181 <enum name="ast_manager_get_generic_type" />
182 <enum name="ast_parked_call_type" />
183 <enum name="ast_channel_snapshot_type" />
184 <enum name="ast_channel_dial_type" />
185 <enum name="ast_channel_varset_type" />
186 <enum name="ast_channel_hangup_request_type" />
187 <enum name="ast_channel_dtmf_begin_type" />
188 <enum name="ast_channel_dtmf_end_type" />
189 <enum name="ast_channel_flash_type" />
190 <enum name="ast_channel_wink_type" />
191 <enum name="ast_channel_hold_type" />
192 <enum name="ast_channel_unhold_type" />
193 <enum name="ast_channel_chanspy_start_type" />
194 <enum name="ast_channel_chanspy_stop_type" />
195 <enum name="ast_channel_fax_type" />
196 <enum name="ast_channel_hangup_handler_type" />
197 <enum name="ast_channel_moh_start_type" />
198 <enum name="ast_channel_moh_stop_type" />
199 <enum name="ast_channel_mixmonitor_start_type" />
200 <enum name="ast_channel_mixmonitor_stop_type" />
201 <enum name="ast_channel_mixmonitor_mute_type" />
202 <enum name="ast_channel_agent_login_type" />
203 <enum name="ast_channel_agent_logoff_type" />
204 <enum name="ast_channel_talking_start" />
205 <enum name="ast_channel_talking_stop" />
206 <enum name="ast_channel_tone_detect" />
207 <enum name="ast_security_event_type" />
208 <enum name="ast_named_acl_change_type" />
209 <enum name="ast_local_bridge_type" />
210 <enum name="ast_local_optimization_begin_type" />
211 <enum name="ast_local_optimization_end_type" />
212 <enum name="stasis_subscription_change_type" />
213 <enum name="ast_multi_user_event_type" />
214 <enum name="stasis_cache_clear_type" />
215 <enum name="stasis_cache_update_type" />
216 <enum name="ast_network_change_type" />
217 <enum name="ast_system_registry_type" />
218 <enum name="ast_cc_available_type" />
219 <enum name="ast_cc_offertimerstart_type" />
220 <enum name="ast_cc_requested_type" />
221 <enum name="ast_cc_requestacknowledged_type" />
222 <enum name="ast_cc_callerstopmonitoring_type" />
223 <enum name="ast_cc_callerstartmonitoring_type" />
224 <enum name="ast_cc_callerrecalling_type" />
225 <enum name="ast_cc_recallcomplete_type" />
226 <enum name="ast_cc_failure_type" />
227 <enum name="ast_cc_monitorfailed_type" />
228 <enum name="ast_presence_state_message_type" />
229 <enum name="ast_rtp_rtcp_sent_type" />
230 <enum name="ast_rtp_rtcp_received_type" />
231 <enum name="ast_call_pickup_type" />
232 <enum name="aoc_s_type" />
233 <enum name="aoc_d_type" />
234 <enum name="aoc_e_type" />
235 <enum name="dahdichannel_type" />
236 <enum name="mcid_type" />
237 <enum name="session_timeout_type" />
238 <enum name="cdr_read_message_type" />
239 <enum name="cdr_write_message_type" />
240 <enum name="cdr_prop_write_message_type" />
241 <enum name="corosync_ping_message_type" />
242 <enum name="agi_exec_start_type" />
243 <enum name="agi_exec_end_type" />
244 <enum name="agi_async_start_type" />
245 <enum name="agi_async_exec_type" />
246 <enum name="agi_async_end_type" />
247 <enum name="queue_caller_join_type" />
248 <enum name="queue_caller_leave_type" />
249 <enum name="queue_caller_abandon_type" />
250 <enum name="queue_member_status_type" />
251 <enum name="queue_member_added_type" />
252 <enum name="queue_member_removed_type" />
253 <enum name="queue_member_pause_type" />
254 <enum name="queue_member_penalty_type" />
255 <enum name="queue_member_ringinuse_type" />
256 <enum name="queue_agent_called_type" />
257 <enum name="queue_agent_connect_type" />
258 <enum name="queue_agent_complete_type" />
259 <enum name="queue_agent_dump_type" />
260 <enum name="queue_agent_ringnoanswer_type" />
261 <enum name="meetme_join_type" />
262 <enum name="meetme_leave_type" />
263 <enum name="meetme_end_type" />
264 <enum name="meetme_mute_type" />
265 <enum name="meetme_talking_type" />
266 <enum name="meetme_talk_request_type" />
267 <enum name="appcdr_message_type" />
268 <enum name="forkcdr_message_type" />
269 <enum name="cdr_sync_message_type" />
270 </enumlist>
271 </description>
272 </configOption>
273 </configObject>
274 </configFile>
275 </configInfo>
276***/
277
278/*!
279 * \page stasis-impl Stasis Implementation Notes
280 *
281 * \par Reference counting
282 *
283 * Stasis introduces a number of objects, which are tightly related to one
284 * another. Because we rely on ref-counting for memory management, understanding
285 * these relationships is important to understanding this code.
286 *
287 * \code{.txt}
288 *
289 * stasis_topic <----> stasis_subscription
290 * ^ ^
291 * \ /
292 * \ /
293 * dispatch
294 * |
295 * |
296 * v
297 * stasis_message
298 * |
299 * |
300 * v
301 * stasis_message_type
302 *
303 * \endcode
304 *
305 * The most troubling thing in this chart is the cyclic reference between
306 * stasis_topic and stasis_subscription. This is both unfortunate, and
307 * necessary. Topics need the subscription in order to dispatch messages;
308 * subscriptions need the topics to unsubscribe and check subscription status.
309 *
310 * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
311 * topic's reference to a subscription. When the subscription is destroyed, it
312 * will remove its reference to the topic.
313 *
314 * This means that until a subscription has be explicitly unsubscribed, it will
315 * not be destroyed. Neither will a topic be destroyed while it has subscribers.
316 * The destructors of both have assertions regarding this to catch ref-counting
317 * problems where a subscription or topic has had an extra ao2_cleanup().
318 *
319 * The \ref dispatch_exec_sync object is a transient object, which is posted to
320 * a subscription's taskprocessor to send a message to the subscriber. They have
321 * short life cycles, allocated on one thread, destroyed on another.
322 *
323 * During shutdown, or the deletion of a domain object, there are a flurry of
324 * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
325 * are processed. Any one of these cleanups could be the one to actually destroy
326 * a given object, so care must be taken to ensure that an object isn't
327 * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
328 * that might happen when a RAII_VAR() goes out of scope.
329 *
330 * \par Typical life cycles
331 *
332 * \li stasis_topic - There are several topics which live for the duration of
333 * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
334 * are actually fed by shorter-lived topics whose lifetime is associated
335 * with some domain object (like ast_channel_topic() for a given
336 * ast_channel).
337 *
338 * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
339 * topics, for similar reasons.
340 *
341 * \li dispatch - Very short lived; just long enough to post a message to a
342 * subscriber.
343 *
344 * \li stasis_message - Short to intermediate lifetimes, but that is mostly
345 * irrelevant. Messages are strictly data and have no behavior associated
346 * with them, so it doesn't really matter if/when they are destroyed. By
347 * design, a component could hold a ref to a message forever without any
348 * ill consequences (aside from consuming more memory).
349 *
350 * \li stasis_message_type - Long life cycles, typically only destroyed on
351 * module unloading or _clean_ process exit.
352 *
353 * \par Subscriber shutdown sequencing
354 *
355 * Subscribers are sensitive to shutdown sequencing, specifically in how the
356 * reference message types. This is fully detailed in the documentation at
357 * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
358 *
359 * In short, the lifetime of the \a data (and \a callback, if in a module) must
360 * be held until the stasis_subscription_final_message() has been received.
361 * Depending on the structure of the subscriber code, this can be handled by
362 * using stasis_subscription_final_message() to free resources on the final
363 * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
364 * block until the unsubscribe has completed.
365 */
366
367/*! Initial size of the subscribers list. */
368#define INITIAL_SUBSCRIBERS_MAX 4
369
370/*! The number of buckets to use for topic pools */
371#define TOPIC_POOL_BUCKETS 57
372
373/*! Taskpool for topics that don't want a dedicated taskprocessor */
374static struct ast_taskpool *taskpool;
375
377
378#if defined(LOW_MEMORY)
379
380#define TOPIC_ALL_BUCKETS 257
381
382#else
383
384#define TOPIC_ALL_BUCKETS 997
385
386#endif
387
388#ifdef AST_DEVMODE
389
390/*! The number of buckets to use for topic statistics */
391#define TOPIC_STATISTICS_BUCKETS 57
392
393/*! The number of buckets to use for subscription statistics */
394#define SUBSCRIPTION_STATISTICS_BUCKETS 57
395
396/*! Global container which stores statistics for topics */
397static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
398
399/*! Global container which stores statistics for subscriptions */
400static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
401
402/*! \internal */
403struct stasis_message_type_statistics {
404 /*! \brief The number of messages of this published */
405 int published;
406 /*! \brief The number of messages of this that did not reach a subscriber */
407 int unused;
408 /*! \brief The stasis message type */
409 struct stasis_message_type *message_type;
410};
411
412/*! Lock to protect the message types vector */
413AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
414
415/*! Vector containing message type information */
416static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
417
418/*! \internal */
419struct stasis_topic_statistics {
420 /*! \brief Highest time spent dispatching messages to subscribers */
421 long highest_time_dispatched;
422 /*! \brief Lowest time spent dispatching messages to subscribers */
423 long lowest_time_dispatched;
424 /*! \brief The number of messages that were not dispatched to any subscriber */
425 int messages_not_dispatched;
426 /*! \brief The number of messages that were dispatched to at least 1 subscriber */
427 int messages_dispatched;
428 /*! \brief The ids of the subscribers to this topic */
429 struct ao2_container *subscribers;
430 /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
431 struct stasis_topic *topic;
432 /*! \brief Name of the topic */
433 char name[0];
434};
435#endif
436
437/*! \internal */
439 /*! Variable length array of the subscribers */
441
442 /*! Topics forwarding into this topic */
444
445#ifdef AST_DEVMODE
446 struct stasis_topic_statistics *statistics;
447#endif
448
449 /*! Unique incrementing integer for subscriber ids */
451
452 /*! Name of the topic */
453 char *name;
454
455 /*! Detail of the topic */
456 char *detail;
457
458 /*! Creation time */
459 struct timeval *creationtime;
460};
461
463
466
467 char *name;
468 char *detail;
469
470 struct timeval creationtime;
471
472 char buf[0];
473};
474
478
479static void proxy_dtor(void *weakproxy, void *container)
480{
481 ao2_unlink(container, weakproxy);
483}
484
485/* Forward declarations for the tightly-coupled subscription object */
486static int topic_add_subscription(struct stasis_topic *topic,
487 struct stasis_subscription *sub);
488
489static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
490
491/*! \brief Lock two topics. */
492#define topic_lock_both(topic1, topic2) \
493 do { \
494 ao2_lock(topic1); \
495 while (ao2_trylock(topic2)) { \
496 AO2_DEADLOCK_AVOIDANCE(topic1); \
497 } \
498 } while (0)
499
500static void topic_dtor(void *obj)
501{
502 struct stasis_topic *topic = obj;
503#ifdef AST_DEVMODE
504 struct ao2_container *topic_stats;
505#endif
506
507 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
508 topic->name, topic->detail);
509
510 /* Subscribers hold a reference to topics, so they should all be
511 * unsubscribed before we get here. */
513
516 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
517
518#ifdef AST_DEVMODE
519 if (topic->statistics) {
520 topic_stats = ao2_global_obj_ref(topic_statistics);
521 if (topic_stats) {
522 ao2_unlink(topic_stats, topic->statistics);
523 ao2_ref(topic_stats, -1);
524 }
525 ao2_ref(topic->statistics, -1);
526 }
527#endif
528}
529
530#ifdef AST_DEVMODE
531static void topic_statistics_destroy(void *obj)
532{
533 struct stasis_topic_statistics *statistics = obj;
534
535 ao2_cleanup(statistics->subscribers);
536}
537
538static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
539{
540 struct stasis_topic_statistics *statistics;
541 RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
542
543 if (!topic_stats) {
544 return NULL;
545 }
546
547 statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
548 if (!statistics) {
549 return NULL;
550 }
551
552 statistics->subscribers = ast_str_container_alloc(1);
553 if (!statistics->subscribers) {
554 ao2_ref(statistics, -1);
555 return NULL;
556 }
557
558 /* This is strictly used for the pointer address when showing the topic */
559 statistics->topic = topic;
560 strcpy(statistics->name, topic->name); /* SAFE */
561 ao2_link(topic_stats, statistics);
562
563 return statistics;
564}
565#endif
566
567static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
568{
569 struct topic_proxy *proxy;
570 struct stasis_topic* topic_tmp;
571 size_t detail_len;
572
573 if (!topic || !name || !strlen(name) || !detail) {
574 return -1;
575 }
576
578
579 topic_tmp = stasis_topic_get(name);
580 if (topic_tmp) {
581 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
582 ao2_ref(topic_tmp, -1);
584
585 return -1;
586 }
587
588 detail_len = strlen(detail) + 1;
589
590 proxy = ao2_t_weakproxy_alloc(
591 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
592 if (!proxy) {
594
595 return -1;
596 }
597
598 /* set the proxy info */
599 proxy->name = proxy->buf;
600 proxy->detail = proxy->name + strlen(name) + 1;
601
602 strcpy(proxy->name, name); /* SAFE */
603 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
604 proxy->creationtime = ast_tvnow();
605
606 /* We have exclusive access to proxy, no need for locking here. */
607 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
608 ao2_cleanup(proxy);
610
611 return -1;
612 }
613
615 ao2_cleanup(proxy);
618
619 return -1;
620 }
621
622 /* setting the topic point to the proxy */
623 topic->name = proxy->name;
624 topic->detail = proxy->detail;
625 topic->creationtime = &(proxy->creationtime);
626
628 ao2_ref(proxy, -1);
629
631
632 return 0;
633}
634
636 const char *name, const char* detail
637 )
638{
639 struct stasis_topic *topic;
640 int res = 0;
641
642 if (!name|| !strlen(name) || !detail) {
643 return NULL;
644 }
645 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
646
647 topic = stasis_topic_get(name);
648 if (topic) {
649 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
650 name, detail);
651 return topic;
652 }
653
654 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
655 if (!topic) {
656 return NULL;
657 }
658
660 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
661 if (res) {
662 ao2_ref(topic, -1);
663 return NULL;
664 }
665
666 /* link to the proxy */
667 if (link_topic_proxy(topic, name, detail)) {
668 ao2_ref(topic, -1);
669 return NULL;
670 }
671
672#ifdef AST_DEVMODE
673 topic->statistics = stasis_topic_statistics_create(topic);
674 if (!topic->statistics) {
675 ao2_ref(topic, -1);
676 return NULL;
677 }
678#endif
679 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
680
681 return topic;
682}
683
685{
687}
688
690{
692}
693
694const char *stasis_topic_name(const struct stasis_topic *topic)
695{
696 if (!topic) {
697 return NULL;
698 }
699 return topic->name;
700}
701
702const char *stasis_topic_detail(const struct stasis_topic *topic)
703{
704 if (!topic) {
705 return NULL;
706 }
707 return topic->detail;
708}
709
710size_t stasis_topic_subscribers(const struct stasis_topic *topic)
711{
712 return AST_VECTOR_SIZE(&topic->subscribers);
713}
714
715#ifdef AST_DEVMODE
716struct stasis_subscription_statistics {
717 /*! \brief The filename where the subscription originates */
718 const char *file;
719 /*! \brief The function where the subscription originates */
720 const char *func;
721 /*! \brief Names of the topics we are subscribed to */
722 struct ao2_container *topics;
723 /*! \brief The message type that currently took the longest to process */
724 struct stasis_message_type *highest_time_message_type;
725 /*! \brief Highest time spent invoking a message */
726 long highest_time_invoked;
727 /*! \brief Lowest time spent invoking a message */
728 long lowest_time_invoked;
729 /*! \brief The number of messages that were filtered out */
730 int messages_dropped;
731 /*! \brief The number of messages that passed filtering */
732 int messages_passed;
733 /*! \brief Using a mailbox to queue messages */
734 int uses_mailbox;
735 /*! \brief Using stasis taskpool for handling messages */
736 int uses_taskpool;
737 /*! \brief The line number where the subscription originates */
738 int lineno;
739 /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
740 struct stasis_subscription *sub;
741 /*! \brief Unique ID of the subscription */
742 char uniqueid[0];
743};
744#endif
745
746/*! \internal */
748 /*! Unique ID for this subscription */
749 char *uniqueid;
750 /*! Topic subscribed to. */
752 /*! Mailbox for processing incoming messages. */
754 /*! Callback function for incoming message processing. */
756 /*! Data pointer to be handed to the callback. */
757 void *data;
758
759 /*! Condition for joining with subscription. */
761 /*! Flag set when final message for sub has been received.
762 * Be sure join_lock is held before reading/setting. */
764 /*! Flag set when final message for sub has been processed.
765 * Be sure join_lock is held before reading/setting. */
767
768 /*! The message types this subscription is accepting */
770 /*! The message formatters this subscription is accepting */
772 /*! The message filter currently in use */
774
775#ifdef AST_DEVMODE
776 /*! Statistics information */
777 struct stasis_subscription_statistics *statistics;
778#endif
779};
780
781static void subscription_dtor(void *obj)
782{
783 struct stasis_subscription *sub = obj;
784#ifdef AST_DEVMODE
785 struct ao2_container *subscription_stats;
786#endif
787
788 /* Subscriptions need to be manually unsubscribed before destruction
789 * b/c there's a cyclic reference between topics and subscriptions */
791 /* If there are any messages in flight to this subscription; that would
792 * be bad. */
794
797 sub->topic = NULL;
799 sub->mailbox = NULL;
801
803
804#ifdef AST_DEVMODE
805 if (sub->statistics) {
806 subscription_stats = ao2_global_obj_ref(subscription_statistics);
807 if (subscription_stats) {
808 ao2_unlink(subscription_stats, sub->statistics);
809 ao2_ref(subscription_stats, -1);
810 }
811 ao2_ref(sub->statistics, -1);
812 }
813#endif
814}
815
816/*!
817 * \brief Invoke the subscription's callback.
818 * \param sub Subscription to invoke.
819 * \param message Message to send.
820 */
822 struct stasis_message *message)
823{
824 unsigned int final = stasis_subscription_final_message(sub, message);
826#ifdef AST_DEVMODE
827 struct timeval start;
828 long elapsed;
829
830 start = ast_tvnow();
831#endif
832
833 /* Notify that the final message has been received */
834 if (final) {
835 ao2_lock(sub);
839 }
840
841 /*
842 * If filtering is turned on and this is a 'final' message, we only invoke the callback
843 * if the subscriber accepts subscription_change message types.
844 */
847 /* Since sub is mostly immutable, no need to lock sub */
849 }
850
851 /* Notify that the final message has been processed */
852 if (final) {
853 ao2_lock(sub);
857 }
858
859#ifdef AST_DEVMODE
860 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
861 if (elapsed > sub->statistics->highest_time_invoked) {
862 sub->statistics->highest_time_invoked = elapsed;
863 ao2_lock(sub->statistics);
864 sub->statistics->highest_time_message_type = stasis_message_type(message);
865 ao2_unlock(sub->statistics);
866 }
867 if (elapsed < sub->statistics->lowest_time_invoked) {
868 sub->statistics->lowest_time_invoked = elapsed;
869 }
870#endif
871}
872
873static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
874static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
875
877{
878}
879
880#ifdef AST_DEVMODE
881static void subscription_statistics_destroy(void *obj)
882{
883 struct stasis_subscription_statistics *statistics = obj;
884
885 ao2_cleanup(statistics->topics);
886}
887
888static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
889 int needs_mailbox, int use_taskpool, const char *file, int lineno,
890 const char *func)
891{
892 struct stasis_subscription_statistics *statistics;
893 RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
894
895 if (!subscription_stats) {
896 return NULL;
897 }
898
899 statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
900 if (!statistics) {
901 return NULL;
902 }
903
905 if (!statistics->topics) {
906 ao2_ref(statistics, -1);
907 return NULL;
908 }
909
910 statistics->file = file;
911 statistics->lineno = lineno;
912 statistics->func = func;
913 statistics->uses_mailbox = needs_mailbox;
914 statistics->uses_taskpool = use_taskpool;
915 strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
916 statistics->sub = sub;
917 ao2_link(subscription_stats, statistics);
918
919 return statistics;
920}
921#endif
922
924 struct stasis_topic *topic,
926 void *data,
927 int needs_mailbox,
928 int use_taskpool,
929 const char *file,
930 int lineno,
931 const char *func)
932{
933 struct stasis_subscription *sub;
934 int ret;
935
936 if (!topic) {
937 return NULL;
938 }
939
940 /* The ao2 lock is used for join_cond. */
942 if (!sub) {
943 return NULL;
944 }
945
946#ifdef AST_DEVMODE
948 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func);
949 if (ret < 0 || !sub->statistics) {
950 ao2_ref(sub, -1);
951 return NULL;
952 }
953#else
955 if (ret < 0) {
956 ao2_ref(sub, -1);
957 return NULL;
958 }
959#endif
960
961 if (needs_mailbox) {
962 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
963
964 /* Create name with seq number appended. */
965 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
966 use_taskpool ? 'p' : 'm',
968
969 /*
970 * With a small number of subscribers, a thread-per-sub is
971 * acceptable. For a large number of subscribers, a thread
972 * pool should be used.
973 */
974 if (use_taskpool) {
976 } else {
978 }
979 if (!sub->mailbox) {
980 ao2_ref(sub, -1);
981
982 return NULL;
983 }
985 /* Taskprocessor has a reference */
986 ao2_ref(sub, +1);
987 }
988
989 ao2_ref(topic, +1);
990 sub->topic = topic;
992 sub->data = data;
997
998 if (topic_add_subscription(topic, sub) != 0) {
999 ao2_ref(sub, -1);
1000 ao2_ref(topic, -1);
1001
1002 return NULL;
1003 }
1005
1006 return sub;
1007}
1008
1010 struct stasis_topic *topic,
1012 void *data,
1013 const char *file,
1014 int lineno,
1015 const char *func)
1016{
1017 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
1018}
1019
1021 struct stasis_topic *topic,
1023 void *data,
1024 const char *file,
1025 int lineno,
1026 const char *func)
1027{
1028 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
1029}
1030
1032 struct stasis_topic *topic,
1034 void *data,
1035 const char *file,
1036 int lineno,
1037 const char *func)
1038{
1039 return internal_stasis_subscribe(topic, callback, data, 0, 0, file, lineno, func);
1040}
1041
1042static int sub_cleanup(void *data)
1043{
1044 struct stasis_subscription *sub = data;
1046 return 0;
1047}
1048
1050{
1051 /* The subscription may be the last ref to this topic. Hold
1052 * the topic ref open until after the unlock. */
1053 struct stasis_topic *topic;
1054
1055 if (!sub) {
1056 return NULL;
1057 }
1058
1059 topic = ao2_bump(sub->topic);
1060
1061 /* We have to remove the subscription first, to ensure the unsubscribe
1062 * is the final message */
1063 if (topic_remove_subscription(sub->topic, sub) != 0) {
1065 "Internal error: subscription has invalid topic\n");
1066 ao2_cleanup(topic);
1067
1068 return NULL;
1069 }
1070
1071 /* Now let everyone know about the unsubscribe */
1073
1074 /* When all that's done, remove the ref the mailbox has on the sub */
1075 if (sub->mailbox) {
1077 /* Nothing we can do here, the conditional is just to keep
1078 * the compiler happy that we're not ignoring the result. */
1079 }
1080 }
1081
1082 /* Unsubscribing unrefs the subscription */
1084 ao2_cleanup(topic);
1085
1086 return NULL;
1087}
1088
1090 long low_water, long high_water)
1091{
1092 int res = -1;
1093
1094 if (subscription) {
1095 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1096 low_water, high_water);
1097 }
1098 return res;
1099}
1100
1102 const struct stasis_message_type *type)
1103{
1104 if (!subscription) {
1105 return -1;
1106 }
1107
1108 ast_assert(type != NULL);
1110
1112 /* Filtering is unreliable as this message type is not yet initialized
1113 * so force all messages through.
1114 */
1116 return 0;
1117 }
1118
1119 ao2_lock(subscription->topic);
1121 /* We do this for the same reason as above. The subscription can still operate, so allow
1122 * it to do so by forcing all messages through.
1123 */
1125 }
1126 ao2_unlock(subscription->topic);
1127
1128 return 0;
1129}
1130
1132 const struct stasis_message_type *type)
1133{
1134 if (!subscription) {
1135 return -1;
1136 }
1137
1138 ast_assert(type != NULL);
1140
1142 return 0;
1143 }
1144
1145 ao2_lock(subscription->topic);
1147 /* The memory is already allocated so this can't fail */
1149 }
1150 ao2_unlock(subscription->topic);
1151
1152 return 0;
1153}
1154
1157{
1158 if (!subscription) {
1159 return -1;
1160 }
1161
1162 ao2_lock(subscription->topic);
1163 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1164 subscription->filter = filter;
1165 }
1166 ao2_unlock(subscription->topic);
1167
1168 return 0;
1169}
1170
1173{
1174 ast_assert(subscription != NULL);
1175
1176 ao2_lock(subscription->topic);
1177 subscription->accepted_formatters = formatters;
1178 ao2_unlock(subscription->topic);
1179
1180 return;
1181}
1182
1184{
1185 if (subscription) {
1186 ao2_lock(subscription);
1187 /* Wait until the processed flag has been set */
1188 while (!subscription->final_message_processed) {
1189 ast_cond_wait(&subscription->join_cond,
1190 ao2_object_get_lockaddr(subscription));
1191 }
1192 ao2_unlock(subscription);
1193 }
1194}
1195
1197{
1198 if (subscription) {
1199 int ret;
1200
1201 ao2_lock(subscription);
1202 ret = subscription->final_message_rxed;
1203 ao2_unlock(subscription);
1204
1205 return ret;
1206 }
1207
1208 /* Null subscription is about as done as you can get */
1209 return 1;
1210}
1211
1213 struct stasis_subscription *subscription)
1214{
1215 if (!subscription) {
1216 return NULL;
1217 }
1218
1219 /* Bump refcount to hold it past the unsubscribe */
1220 ao2_ref(subscription, +1);
1221 stasis_unsubscribe(subscription);
1222 stasis_subscription_join(subscription);
1223 /* Now decrement the refcount back */
1224 ao2_cleanup(subscription);
1225 return NULL;
1226}
1227
1229{
1230 if (sub) {
1231 size_t i;
1232 struct stasis_topic *topic = sub->topic;
1233
1234 ao2_lock(topic);
1235 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1236 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1237 ao2_unlock(topic);
1238 return 1;
1239 }
1240 }
1241 ao2_unlock(topic);
1242 }
1243
1244 return 0;
1245}
1246
1248{
1249 return sub->uniqueid;
1250}
1251
1253{
1254 struct stasis_subscription_change *change;
1255
1257 return 0;
1258 }
1259
1260 change = stasis_message_data(msg);
1261 if (strcmp("Unsubscribe", change->description)) {
1262 return 0;
1263 }
1264
1265 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1266 return 0;
1267 }
1268
1269 return 1;
1270}
1271
1272/*!
1273 * \brief Add a subscriber to a topic.
1274 * \param topic Topic
1275 * \param sub Subscriber
1276 * \return 0 on success
1277 * \return Non-zero on error
1278 */
1280{
1281 size_t idx;
1282
1283 ao2_lock(topic);
1284 /* The reference from the topic to the subscription is shared with
1285 * the owner of the subscription, which will explicitly unsubscribe
1286 * to release it.
1287 *
1288 * If we bumped the refcount here, the owner would have to unsubscribe
1289 * and cleanup, which is a bit awkward. */
1291
1292 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1295 }
1296
1297#ifdef AST_DEVMODE
1299 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1300#endif
1301
1303
1304 return 0;
1305}
1306
1308{
1309 size_t idx;
1310 int res;
1311
1312 ao2_lock(topic);
1313 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1316 }
1319
1320#ifdef AST_DEVMODE
1321 if (!res) {
1323 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1324 }
1325#endif
1326
1328
1329 return res;
1330}
1331
1332/*!
1333 * \internal \brief Dispatch a message to a subscriber asynchronously
1334 * \param local \ref ast_taskprocessor_local object
1335 * \return 0
1336 */
1338{
1339 struct stasis_subscription *sub = local->local_data;
1340 struct stasis_message *message = local->data;
1341
1344
1345 return 0;
1346}
1347
1348/*!
1349 * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1350 * a published message to a subscriber
1351 */
1358
1359/*!
1360 * \internal \brief Dispatch a message to a subscriber synchronously
1361 * \param local \ref ast_taskprocessor_local object
1362 * \return 0
1363 */
1365{
1366 struct stasis_subscription *sub = local->local_data;
1367 struct sync_task_data *std = local->data;
1368 struct stasis_message *message = std->task_data;
1369
1372
1373 ast_mutex_lock(&std->lock);
1374 std->complete = 1;
1375 ast_cond_signal(&std->cond);
1376 ast_mutex_unlock(&std->lock);
1377
1378 return 0;
1379}
1380
1381/*!
1382 * \internal \brief Dispatch a message to a subscriber
1383 * \param sub The subscriber to dispatch to
1384 * \param message The message to send
1385 * \param synchronous If non-zero, synchronize on the subscriber receiving
1386 * the message
1387 * \retval 0 if message was not dispatched
1388 * \retval 1 if message was dispatched
1389 */
1390static unsigned int dispatch_message(struct stasis_subscription *sub,
1391 struct stasis_message *message,
1392 int synchronous)
1393{
1395
1396 /*
1397 * The 'do while' gives us an easy way to skip remaining logic once
1398 * we determine the message should be accepted.
1399 * The code looks more verbose than it needs to be but it optimizes
1400 * down very nicely. It's just easier to understand and debug this way.
1401 */
1402 do {
1403 struct stasis_message_type *message_type = stasis_message_type(message);
1404 int type_id = stasis_message_type_id(message_type);
1405 int type_filter_specified = 0;
1406 int formatter_filter_specified = 0;
1407 int type_filter_passed = 0;
1408 int formatter_filter_passed = 0;
1409
1410 /* We always accept final messages so only run the filter logic if not final */
1411 if (is_final) {
1412 break;
1413 }
1414
1415 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1416 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1417
1418 /* Accept if no filters of either type were specified */
1419 if (!type_filter_specified && !formatter_filter_specified) {
1420 break;
1421 }
1422
1423 type_filter_passed = type_filter_specified
1426
1427 /*
1428 * Since the type and formatter filters are OR'd, we can skip
1429 * the formatter check if the type check passes.
1430 */
1431 if (type_filter_passed) {
1432 break;
1433 }
1434
1435 formatter_filter_passed = formatter_filter_specified
1437
1438 if (formatter_filter_passed) {
1439 break;
1440 }
1441
1442#ifdef AST_DEVMODE
1443 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1444#endif
1445
1446 return 0;
1447
1448 } while (0);
1449
1450#ifdef AST_DEVMODE
1451 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1452#endif
1453
1454 if (!sub->mailbox) {
1455 /* Dispatch directly */
1457 return 1;
1458 }
1459
1460 /* Bump the message for the taskprocessor push. This will get de-ref'd
1461 * by the task processor callback.
1462 */
1464 if (!synchronous) {
1466 /* Push failed; ugh. */
1467 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1469 return 0;
1470 }
1471 } else {
1472 struct sync_task_data std;
1473
1474 ast_mutex_init(&std.lock);
1475 ast_cond_init(&std.cond, NULL);
1476 std.complete = 0;
1477 std.task_data = message;
1478
1480 /* Push failed; ugh. */
1481 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1483 ast_mutex_destroy(&std.lock);
1484 ast_cond_destroy(&std.cond);
1485 return 0;
1486 }
1487
1488 ast_mutex_lock(&std.lock);
1489 while (!std.complete) {
1490 ast_cond_wait(&std.cond, &std.lock);
1491 }
1492 ast_mutex_unlock(&std.lock);
1493
1494 ast_mutex_destroy(&std.lock);
1495 ast_cond_destroy(&std.cond);
1496 }
1497
1498 return 1;
1499}
1500
1501/*!
1502 * \internal \brief Publish a message to a topic's subscribers
1503 * \brief topic The topic to publish to
1504 * \brief message The message to publish
1505 * \brief sync_sub An optional subscriber of the topic to publish synchronously
1506 * to
1507 */
1508static void publish_msg(struct stasis_topic *topic,
1509 struct stasis_message *message, struct stasis_subscription *sync_sub)
1510{
1511 size_t i;
1512#ifdef AST_DEVMODE
1513 unsigned int dispatched = 0;
1515 struct stasis_message_type_statistics *statistics;
1516 struct timeval start;
1517 long elapsed;
1518#endif
1519
1520 ast_assert(topic != NULL);
1522
1523#ifdef AST_DEVMODE
1524 ast_mutex_lock(&message_type_statistics_lock);
1525 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1526 struct stasis_message_type_statistics new_statistics = {
1527 .published = 0,
1528 };
1529 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1530 ast_mutex_unlock(&message_type_statistics_lock);
1531 return;
1532 }
1533 }
1534 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1535 statistics->message_type = stasis_message_type(message);
1536 ast_mutex_unlock(&message_type_statistics_lock);
1537
1538 ast_atomic_fetchadd_int(&statistics->published, +1);
1539#endif
1540
1541 /* If there are no subscribers don't bother */
1542 if (!stasis_topic_subscribers(topic)) {
1543#ifdef AST_DEVMODE
1544 ast_atomic_fetchadd_int(&statistics->unused, +1);
1545 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1546#endif
1547 return;
1548 }
1549
1550 /*
1551 * The topic may be unref'ed by the subscription invocation.
1552 * Make sure we hold onto a reference while dispatching.
1553 */
1554 ao2_ref(topic, +1);
1555#ifdef AST_DEVMODE
1556 start = ast_tvnow();
1557#endif
1558 ao2_lock(topic);
1559 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1561
1562 ast_assert(sub != NULL);
1563#ifdef AST_DEVMODE
1564 dispatched +=
1565#endif
1566 dispatch_message(sub, message, (sub == sync_sub));
1567 }
1569
1570#ifdef AST_DEVMODE
1571 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1572 if (elapsed > topic->statistics->highest_time_dispatched) {
1573 topic->statistics->highest_time_dispatched = elapsed;
1574 }
1575 if (elapsed < topic->statistics->lowest_time_dispatched) {
1576 topic->statistics->lowest_time_dispatched = elapsed;
1577 }
1578 if (dispatched) {
1579 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1580 } else {
1581 ast_atomic_fetchadd_int(&statistics->unused, +1);
1582 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1583 }
1584#endif
1585
1586 ao2_ref(topic, -1);
1587}
1588
1593
1600
1601/*!
1602 * \brief Forwarding information
1603 *
1604 * Any message posted to \a from_topic is forwarded to \a to_topic.
1605 *
1606 * In cases where both the \a from_topic and \a to_topic need to be locked,
1607 * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1608 */
1610 /*! Originating topic */
1612 /*! Destination topic */
1614};
1615
1616static void forward_dtor(void *obj)
1617{
1618 struct stasis_forward *forward = obj;
1619
1620 ao2_cleanup(forward->from_topic);
1621 forward->from_topic = NULL;
1622 ao2_cleanup(forward->to_topic);
1623 forward->to_topic = NULL;
1624}
1625
1627{
1628 int idx;
1629 struct stasis_topic *from;
1630 struct stasis_topic *to;
1631
1632 if (!forward) {
1633 return NULL;
1634 }
1635
1636 from = forward->from_topic;
1637 to = forward->to_topic;
1638
1639 if (from && to) {
1640 topic_lock_both(to, from);
1643
1644 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1646 }
1647 ao2_unlock(from);
1648 ao2_unlock(to);
1649 }
1650
1651 ao2_cleanup(forward);
1652
1653 return NULL;
1654}
1655
1657 struct stasis_topic *to_topic)
1658{
1659 int res;
1660 size_t idx;
1661 struct stasis_forward *forward;
1662
1663 if (!from_topic || !to_topic) {
1664 return NULL;
1665 }
1666
1667 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1668 if (!forward) {
1669 return NULL;
1670 }
1671
1672 /* Forwards to ourselves are implicit. */
1673 if (to_topic == from_topic) {
1674 return forward;
1675 }
1676
1677 forward->from_topic = ao2_bump(from_topic);
1678 forward->to_topic = ao2_bump(to_topic);
1679
1682 if (res != 0) {
1685 ao2_ref(forward, -1);
1686 return NULL;
1687 }
1688
1689 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1691 }
1694
1695 return forward;
1696}
1697
1698static void subscription_change_dtor(void *obj)
1699{
1700 struct stasis_subscription_change *change = obj;
1701
1702 ao2_cleanup(change->topic);
1703}
1704
1706{
1707 size_t description_len = strlen(description) + 1;
1708 size_t uniqueid_len = strlen(uniqueid) + 1;
1709 struct stasis_subscription_change *change;
1710
1711 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1713 if (!change) {
1714 return NULL;
1715 }
1716
1717 strcpy(change->description, description); /* SAFE */
1718 change->uniqueid = change->description + description_len;
1719 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1720 ao2_ref(topic, +1);
1721 change->topic = topic;
1722
1723 return change;
1724}
1725
1727{
1728 struct stasis_subscription_change *change;
1729 struct stasis_message *msg;
1730
1731 /* This assumes that we have already unsubscribed */
1733
1735 return;
1736 }
1737
1738 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1739 if (!change) {
1740 return;
1741 }
1742
1744 if (!msg) {
1745 ao2_cleanup(change);
1746 return;
1747 }
1748
1749 stasis_publish(topic, msg);
1750 ao2_cleanup(msg);
1751 ao2_cleanup(change);
1752}
1753
1755 struct stasis_subscription *sub)
1756{
1757 struct stasis_subscription_change *change;
1758 struct stasis_message *msg;
1759
1760 /* This assumes that we have already unsubscribed */
1762
1764 return;
1765 }
1766
1767 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1768 if (!change) {
1769 return;
1770 }
1771
1773 if (!msg) {
1774 ao2_cleanup(change);
1775 return;
1776 }
1777
1778 stasis_publish(topic, msg);
1779
1780 /* Now we have to dispatch to the subscription itself */
1781 dispatch_message(sub, msg, 0);
1782
1783 ao2_cleanup(msg);
1784 ao2_cleanup(change);
1785}
1786
1790 /*
1791 * Per-entry initialization state. This lets us serialize creation of a
1792 * given topic name without holding the pool container lock while doing
1793 * the heavy lifting (topic creation, forwarding setup, etc).
1794 *
1795 * A topic_pool_entry starts life in an "in-flight" state where neither
1796 * initialized nor failed are set. The first thread to link the entry
1797 * into the pool becomes the creator and is responsible for completing
1798 * initialization, setting one of the flags, and broadcasting init_cond.
1799 * Other threads that find the same entry simply wait for initialization
1800 * to complete and then reuse the created topic.
1801 */
1804 unsigned int initialized; /* terminal success state */
1805 unsigned int failed; /* terminal failure state */
1806 char name[0];
1807};
1808
1809static void topic_pool_entry_dtor(void *obj)
1810{
1811 struct topic_pool_entry *entry = obj;
1812
1813 entry->forward = stasis_forward_cancel(entry->forward);
1814 ao2_cleanup(entry->topic);
1815 entry->topic = NULL;
1816 ast_cond_destroy(&entry->init_cond);
1818}
1819
1820static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1821{
1823
1824 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1826 if (!topic_pool_entry) {
1827 return NULL;
1828 }
1831 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1832 return topic_pool_entry;
1833}
1834
1839
1840static void topic_pool_dtor(void *obj)
1841{
1842 struct stasis_topic_pool *pool = obj;
1843
1844#ifdef AO2_DEBUG
1845 {
1846 char *container_name =
1847 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1848 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1849 ao2_container_unregister(container_name);
1850 }
1851#endif
1852
1854 pool->pool_container = NULL;
1855 ao2_cleanup(pool->pool_topic);
1856 pool->pool_topic = NULL;
1857}
1858
1859static int topic_pool_entry_hash(const void *obj, const int flags)
1860{
1861 const struct topic_pool_entry *object;
1862 const char *key;
1863
1864 switch (flags & OBJ_SEARCH_MASK) {
1865 case OBJ_SEARCH_KEY:
1866 key = obj;
1867 break;
1868 case OBJ_SEARCH_OBJECT:
1869 object = obj;
1870 key = object->name;
1871 break;
1872 default:
1873 /* Hash can only work on something with a full key. */
1874 ast_assert(0);
1875 return 0;
1876 }
1877 return ast_str_case_hash(key);
1878}
1879
1880static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1881{
1882 const struct topic_pool_entry *object_left = obj;
1883 const struct topic_pool_entry *object_right = arg;
1884 const char *right_key = arg;
1885 int cmp;
1886
1887 switch (flags & OBJ_SEARCH_MASK) {
1888 case OBJ_SEARCH_OBJECT:
1889 right_key = object_right->name;
1890 /* Fall through */
1891 case OBJ_SEARCH_KEY:
1892 cmp = strcasecmp(object_left->name, right_key);
1893 break;
1895 /* Not supported by container */
1896 ast_assert(0);
1897 cmp = -1;
1898 break;
1899 default:
1900 /*
1901 * What arg points to is specific to this traversal callback
1902 * and has no special meaning to astobj2.
1903 */
1904 cmp = 0;
1905 break;
1906 }
1907 if (cmp) {
1908 return 0;
1909 }
1910 /*
1911 * At this point the traversal callback is identical to a sorted
1912 * container.
1913 */
1914 return CMP_MATCH;
1915}
1916
1917#ifdef AO2_DEBUG
1918static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1919{
1920 struct topic_pool_entry *entry = v_obj;
1921
1922 if (!entry) {
1923 return;
1924 }
1925 prnt(where, "%s", stasis_topic_name(entry->topic));
1926}
1927#endif
1928
1930{
1931 struct stasis_topic_pool *pool;
1932
1934 if (!pool) {
1935 return NULL;
1936 }
1937
1940 if (!pool->pool_container) {
1941 ao2_cleanup(pool);
1942 return NULL;
1943 }
1944
1945#ifdef AO2_DEBUG
1946 {
1947 char *container_name =
1948 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1949 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1950 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1951 }
1952#endif
1953
1954 ao2_ref(pooled_topic, +1);
1955 pool->pool_topic = pooled_topic;
1956
1957 return pool;
1958}
1959
1960void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1961{
1962 /*
1963 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1964 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1965 * name and search only on <topic_name>.
1966 */
1967 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1968 int pool_topic_name_len = strlen(pool_topic_name);
1969 const char *search_topic_name;
1970
1971 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1972 search_topic_name = topic_name + pool_topic_name_len + 1;
1973 } else {
1974 search_topic_name = topic_name;
1975 }
1976
1977 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1978}
1979/*!
1980 * \brief Get a topic from the pool for the given name.
1981 *
1982 * This returns a **borrowed** reference: the pool container owns the topic
1983 * and callers MUST NOT ao2_cleanup() the returned pointer.
1984 *
1985 * To avoid both deadlocks and wasted work we use a per-name "in-flight"
1986 * topic_pool_entry while a topic is being created:
1987 *
1988 * - The pool container lock is held only while looking up or inserting
1989 * the topic_pool_entry for a name.
1990 * - Exactly one thread becomes the creator for a given name and is
1991 * responsible for allocating the topic and wiring up forwarding.
1992 * - Other threads that race for the same name find the in-flight entry
1993 * and wait on its condition variable until initialization completes.
1994 */
1995
1996struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1997{
1998 /*
1999 * Lock ordering:
2000 *
2001 * pool->pool_container (AO2 lock)
2002 * → entry->init_lock
2003 * → topic locks (inside stasis_topic_create() /
2004 * stasis_forward_all())
2005 *
2006 * We intentionally do NOT hold the pool container lock while calling
2007 * stasis_topic_create() or stasis_forward_all() to avoid deadlocks with
2008 * other code that may take topic locks first and then need the pool lock.
2009 */
2010 RAII_VAR(struct topic_pool_entry *, entry, NULL, ao2_cleanup);
2011 char *fq = NULL;
2012 int creator = 0;
2013 int ret;
2014
2015 if (!pool || ast_strlen_zero(topic_name)) {
2016 return NULL;
2017 }
2018
2019 /* Creator / waiter split:
2020 *
2021 * - The first thread to create/link an entry for topic_name becomes the
2022 * "creator" and is responsible for creating the underlying stasis
2023 * topic and wiring up forwarding.
2024 *
2025 * - Other threads that find the entry become "waiters"; they block on
2026 * entry->init_cond until either initialization succeeds or fails.
2027 */
2028
2029 /* --- Creator selection under pool container lock --- */
2030 ao2_lock(pool->pool_container);
2031
2032 entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2033 if (!entry) {
2034 entry = topic_pool_entry_alloc(topic_name);
2035 if (!entry) {
2037 return NULL;
2038 }
2039
2040 if (!ao2_link_flags(pool->pool_container, entry, OBJ_NOLOCK)) {
2041 struct topic_pool_entry *other;
2042
2043 other = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2044 if (other) {
2045 struct topic_pool_entry *tmp = entry;
2046
2047 entry = other;
2048 creator = 0;
2050 ao2_ref(tmp, -1);
2051 goto waiter_path;
2052 }
2053
2055 return NULL;
2056 }
2057
2058 creator = 1;
2059 }
2060
2062
2063/* --- Waiter path: wait for creator to finish --- */
2064waiter_path:
2065 if (!creator) {
2066 ast_mutex_lock(&entry->init_lock);
2067 while (!entry->initialized && !entry->failed) {
2068 ast_cond_wait(&entry->init_cond, &entry->init_lock);
2069 }
2070
2071 if (entry->initialized && !entry->failed) {
2072 struct stasis_topic *topic = entry->topic;
2073
2074 if (!topic) {
2075 ast_debug(1, "Pooled topic '%s' marked initialized but topic is NULL\n", entry->name);
2076 ast_mutex_unlock(&entry->init_lock);
2077 return NULL;
2078 }
2079 ast_mutex_unlock(&entry->init_lock);
2080 /* Borrowed reference: container owns the topic */
2081 return topic;
2082 }
2083
2084 ast_mutex_unlock(&entry->init_lock);
2085 return NULL;
2086 }
2087
2088 /* --- Creator path: perform topic creation without pool lock --- */
2089 ast_mutex_lock(&entry->init_lock);
2090 /* Defensive: entry may have been initialized/failed before we acquired init_lock. */
2091 if (entry->initialized || entry->failed) {
2092 struct stasis_topic *topic = entry->initialized ? entry->topic : NULL;
2093 ast_mutex_unlock(&entry->init_lock);
2094 return topic;
2095 }
2096
2097 ret = ast_asprintf(&fq, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
2098 if (ret < 0) {
2099 entry->failed = 1;
2100 goto creator_fail;
2101 }
2102
2103 entry->topic = stasis_topic_create(fq);
2104 ast_free(fq);
2105 fq = NULL;
2106
2107 if (!entry->topic) {
2108 entry->failed = 1;
2109 goto creator_fail;
2110 }
2111
2112 entry->forward = stasis_forward_all(entry->topic, pool->pool_topic);
2113 if (!entry->forward) {
2114 ao2_cleanup(entry->topic);
2115 entry->topic = NULL;
2116 entry->failed = 1;
2117 goto creator_fail;
2118 }
2119
2120 entry->initialized = 1;
2122 ast_mutex_unlock(&entry->init_lock);
2123
2124 return entry->topic; /* borrowed ref */
2125
2126creator_fail:
2127 ast_debug(1, "Failed to create pooled stasis topic '%s/%s'\n", stasis_topic_name(pool->pool_topic), entry->name);
2129 ast_mutex_unlock(&entry->init_lock);
2130
2131 /* Remove failed entry so future callers can retry */
2132 ao2_lock(pool->pool_container);
2133 ao2_unlink(pool->pool_container, entry);
2135
2136 return NULL;
2137}
2138
2139int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
2140{
2142
2144 if (!topic_pool_entry) {
2145 return 0;
2146 }
2147
2149 return 1;
2150}
2151
2153{
2154#ifdef AST_DEVMODE
2156 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
2157 }
2158#endif
2159}
2160
2161/*! \brief A multi object blob data structure to carry user event stasis messages */
2163 struct ast_json *blob; /*< A blob of JSON data */
2164 AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
2165};
2166
2167/*!
2168 * \internal
2169 * \brief Destructor for \ref ast_multi_object_blob objects
2170 */
2171static void multi_object_blob_dtor(void *obj)
2172{
2173 struct ast_multi_object_blob *multi = obj;
2174 int type;
2175 int i;
2176
2177 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2178 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2180 }
2181 AST_VECTOR_FREE(&multi->snapshots[type]);
2182 }
2183 ast_json_unref(multi->blob);
2184}
2185
2186/*! \brief Create a stasis user event multi object blob */
2188{
2189 int type;
2190 struct ast_multi_object_blob *multi;
2191
2192 ast_assert(blob != NULL);
2193
2194 multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
2195 if (!multi) {
2196 return NULL;
2197 }
2198
2199 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2200 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
2201 ao2_ref(multi, -1);
2202
2203 return NULL;
2204 }
2205 }
2206
2207 multi->blob = ast_json_ref(blob);
2208
2209 return multi;
2210}
2211
2212/*! \brief Add an object (snapshot) to the blob */
2215{
2216 if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2217 ao2_cleanup(object);
2218 }
2219}
2220
2221/*! \brief Publish single channel user event (for app_userevent compatibility) */
2223 struct stasis_message_type *type, struct ast_json *blob)
2224{
2225 struct stasis_message *message;
2226 struct ast_channel_snapshot *channel_snapshot;
2227 struct ast_multi_object_blob *multi;
2228
2229 if (!type) {
2230 return;
2231 }
2232
2234 if (!multi) {
2235 return;
2236 }
2237
2238 channel_snapshot = ast_channel_snapshot_create(chan);
2239 if (!channel_snapshot) {
2240 ao2_ref(multi, -1);
2241 return;
2242 }
2243
2244 /* this call steals the channel_snapshot reference */
2245 ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2246
2248 ao2_ref(multi, -1);
2249 if (message) {
2250 /* app_userevent still publishes to channel */
2252 ao2_ref(message, -1);
2253 }
2254}
2255
2256/*! \internal \brief convert multi object blob to ari json */
2258 struct stasis_message *message,
2259 const struct stasis_message_sanitizer *sanitize)
2260{
2261 struct ast_json *out;
2263 struct ast_json *blob = multi->blob;
2264 const struct timeval *tv = stasis_message_timestamp(message);
2266 int i;
2267
2269 if (!out) {
2270 return NULL;
2271 }
2272
2273 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2274 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2275 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2276 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2277
2278 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2279 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2280 struct ast_json *json_object = NULL;
2281 char *name = NULL;
2282 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2283
2284 switch (type) {
2286 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2287 name = "channel";
2288 break;
2289 case STASIS_UMOS_BRIDGE:
2290 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2291 name = "bridge";
2292 break;
2294 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2295 name = "endpoint";
2296 break;
2297 }
2298 if (json_object) {
2299 ast_json_object_set(out, name, json_object);
2300 }
2301 }
2302 }
2303
2304 return out;
2305}
2306
2307/*! \internal \brief convert multi object blob to ami string */
2308static struct ast_str *multi_object_blob_to_ami(void *obj)
2309{
2310 struct ast_str *ami_str=ast_str_create(1024);
2311 struct ast_str *ami_snapshot;
2312 const struct ast_multi_object_blob *multi = obj;
2314 int i;
2315
2316 if (!ami_str) {
2317 return NULL;
2318 }
2319 if (!multi) {
2320 ast_free(ami_str);
2321 return NULL;
2322 }
2323
2324 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2325 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2326 char *name = NULL;
2327 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2328 ami_snapshot = NULL;
2329
2330 if (i > 0) {
2331 ast_asprintf(&name, "%d", i + 1);
2332 }
2333
2334 switch (type) {
2336 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2337 break;
2338
2339 case STASIS_UMOS_BRIDGE:
2340 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2341 break;
2342
2344 /* currently not sending endpoint snapshots to AMI */
2345 break;
2346 }
2347 if (ami_snapshot) {
2348 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2349 ast_free(ami_snapshot);
2350 }
2351 ast_free(name);
2352 }
2353 }
2354
2355 return ami_str;
2356}
2357
2358/*! \internal \brief Callback to pass only user defined parameters from blob */
2359static int userevent_exclusion_cb(const char *key)
2360{
2361 if (!strcmp("eventname", key)) {
2362 return 1;
2363 }
2364 return 0;
2365}
2366
2368 struct stasis_message *message)
2369{
2370 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2371 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2373 const char *eventname;
2374
2375 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2377 object_string = multi_object_blob_to_ami(multi);
2378 if (!object_string || !body) {
2379 return NULL;
2380 }
2381
2383 "%s"
2384 "UserEvent: %s\r\n"
2385 "%s",
2386 ast_str_buffer(object_string),
2387 eventname,
2388 ast_str_buffer(body));
2389}
2390
2391/*! \brief A structure to hold global configuration-related options */
2393 /*! The list of message types to decline */
2395};
2396
2397/*! \brief Taskpool configuration options */
2399 /*! Minimum size of the taskpool */
2401 /*! Initial size of the taskpool */
2403 /*! Time, in seconds, before we expire a taskprocessor */
2405 /*! Maximum number of taskprocessors to allow */
2407};
2408
2410 /*! Taskpool configuration options */
2412 /*! Declined message types */
2414};
2415
2417 .type = ACO_GLOBAL,
2418 .name = "threadpool",
2419 .item_offset = offsetof(struct stasis_config, taskpool_options),
2420 .category = "threadpool",
2421 .category_match = ACO_WHITELIST_EXACT,
2422};
2423
2424static struct aco_type taskpool_option = {
2425 .type = ACO_GLOBAL,
2426 .name = "taskpool",
2427 .item_offset = offsetof(struct stasis_config, taskpool_options),
2428 .category = "taskpool",
2429 .category_match = ACO_WHITELIST_EXACT,
2430};
2431
2433
2434/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2435static struct aco_type declined_option = {
2436 .type = ACO_GLOBAL,
2437 .name = "declined_message_types",
2438 .item_offset = offsetof(struct stasis_config, declined_message_types),
2439 .category_match = ACO_WHITELIST_EXACT,
2440 .category = "declined_message_types",
2441};
2442
2444
2446 .filename = "stasis.conf",
2448};
2449
2450/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2452
2453static void *stasis_config_alloc(void);
2454
2455/*! \brief Register information about the configs being processed by this module */
2457 .files = ACO_FILES(&stasis_conf),
2458);
2459
2461{
2462 struct stasis_declined_config *declined = obj;
2463
2464 ao2_cleanup(declined->declined);
2465}
2466
2467static void stasis_config_destructor(void *obj)
2468{
2469 struct stasis_config *cfg = obj;
2470
2473}
2474
2475static void *stasis_config_alloc(void)
2476{
2477 struct stasis_config *cfg;
2478
2479 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2480 return NULL;
2481 }
2482
2483 cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options));
2484 if (!cfg->taskpool_options) {
2485 ao2_ref(cfg, -1);
2486 return NULL;
2487 }
2488
2491 if (!cfg->declined_message_types) {
2492 ao2_ref(cfg, -1);
2493 return NULL;
2494 }
2495
2497 if (!cfg->declined_message_types->declined) {
2498 ao2_ref(cfg, -1);
2499 return NULL;
2500 }
2501
2502 return cfg;
2503}
2504
2506{
2508 char *name_in_declined;
2509 int res;
2510
2511 if (!cfg || !cfg->declined_message_types) {
2512 ao2_cleanup(cfg);
2513 return 0;
2514 }
2515
2516 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2517 res = name_in_declined ? 1 : 0;
2518 ao2_cleanup(name_in_declined);
2519 ao2_ref(cfg, -1);
2520 if (res) {
2521 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2522 }
2523 return res;
2524}
2525
2526static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2527{
2528 struct stasis_declined_config *declined = obj;
2529
2530 if (ast_strlen_zero(var->value)) {
2531 return 0;
2532 }
2533
2534 if (ast_str_container_add(declined->declined, var->value)) {
2535 return -1;
2536 }
2537
2538 return 0;
2539}
2540
2541/*!
2542 * @{ \brief Define multi user event message type(s).
2543 */
2544
2546 .to_json = multi_user_event_to_json,
2548 );
2549
2550/*! @} */
2551
2552/*!
2553 * \internal
2554 * \brief CLI command implementation for 'stasis show topics'
2555 */
2556static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2557{
2558 struct ao2_iterator iter;
2559 struct topic_proxy *topic;
2560 struct ao2_container *tmp_container;
2561 int count = 0;
2562#define FMT_HEADERS "%-64s %-64s\n"
2563#define FMT_FIELDS "%-64s %-64s\n"
2564
2565 switch (cmd) {
2566 case CLI_INIT:
2567 e->command = "stasis show topics";
2568 e->usage =
2569 "Usage: stasis show topics\n"
2570 " Shows a list of topics\n";
2571 return NULL;
2572 case CLI_GENERATE:
2573 return NULL;
2574 }
2575
2576 if (a->argc != e->args) {
2577 return CLI_SHOWUSAGE;
2578 }
2579
2580 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2581
2583 topic_proxy_sort_fn, NULL);
2584
2585 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, 0)) {
2586 ao2_cleanup(tmp_container);
2587
2588 return NULL;
2589 }
2590
2591 /* getting all topic in order */
2592 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2593 while ((topic = ao2_iterator_next(&iter))) {
2594 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2595 ao2_ref(topic, -1);
2596 ++count;
2597 }
2598 ao2_iterator_destroy(&iter);
2599 ao2_cleanup(tmp_container);
2600
2601 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2602
2603#undef FMT_HEADERS
2604#undef FMT_FIELDS
2605
2606 return CLI_SUCCESS;
2607}
2608
2609/*!
2610 * \internal
2611 * \brief CLI tab completion for topic names
2612 */
2613static char *topic_complete_name(const char *word)
2614{
2615 struct topic_proxy *topic;
2616 struct ao2_iterator it;
2617 int wordlen = strlen(word);
2618 int ret;
2619
2621 while ((topic = ao2_iterator_next(&it))) {
2622 if (!strncasecmp(word, topic->name, wordlen)) {
2623 ret = ast_cli_completion_add(ast_strdup(topic->name));
2624 if (ret) {
2625 ao2_ref(topic, -1);
2626 break;
2627 }
2628 }
2629 ao2_ref(topic, -1);
2630 }
2632 return NULL;
2633}
2634
2635/*!
2636 * \internal
2637 * \brief CLI command implementation for 'stasis show topic'
2638 */
2639static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2640{
2641 struct stasis_topic *topic;
2642 char print_time[32];
2643 int i;
2644
2645 switch (cmd) {
2646 case CLI_INIT:
2647 e->command = "stasis show topic";
2648 e->usage =
2649 "Usage: stasis show topic <name>\n"
2650 " Show stasis topic detail info.\n";
2651 return NULL;
2652 case CLI_GENERATE:
2653 if (a->pos == 3) {
2654 return topic_complete_name(a->word);
2655 } else {
2656 return NULL;
2657 }
2658 }
2659
2660 if (a->argc != 4) {
2661 return CLI_SHOWUSAGE;
2662 }
2663
2664 topic = stasis_topic_get(a->argv[3]);
2665 if (!topic) {
2666 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2667 return CLI_FAILURE;
2668 }
2669
2670 ast_cli(a->fd, "Name: %s\n", topic->name);
2671 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2672 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2673 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2674 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2675 ast_cli(a->fd, "Duration time: %s\n", print_time);
2676
2677 ao2_lock(topic);
2678 ast_cli(a->fd, "\nSubscribers:\n");
2679 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2680 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2681 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2682 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2683 }
2684
2685 ast_cli(a->fd, "\nForwarded topics:\n");
2686 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2687 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2688 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2689 }
2690 ao2_unlock(topic);
2691
2692 ao2_ref(topic, -1);
2693
2694 return CLI_SUCCESS;
2695}
2696
2697
2698static struct ast_cli_entry cli_stasis[] = {
2699 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2700 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2701};
2702
2703
2704#ifdef AST_DEVMODE
2705
2706AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2707
2708/*!
2709 * \internal
2710 * \brief CLI command implementation for 'stasis statistics show subscriptions'
2711 */
2712static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2713{
2714 struct ao2_container *sorted_subscriptions;
2715 struct ao2_container *subscription_stats;
2716 struct ao2_iterator iter;
2717 struct stasis_subscription_statistics *statistics;
2718 int count = 0;
2719 int dropped = 0;
2720 int passed = 0;
2721#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2722#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2723#define FMT_FIELDS2 "%-64s %10d %10d\n"
2724
2725 switch (cmd) {
2726 case CLI_INIT:
2727 e->command = "stasis statistics show subscriptions";
2728 e->usage =
2729 "Usage: stasis statistics show subscriptions\n"
2730 " Shows a list of subscriptions and their general statistics\n";
2731 return NULL;
2732 case CLI_GENERATE:
2733 return NULL;
2734 }
2735
2736 if (a->argc != e->args) {
2737 return CLI_SHOWUSAGE;
2738 }
2739
2740 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2741 if (!subscription_stats) {
2742 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2743 return CLI_FAILURE;
2744 }
2745
2746 sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2747 stasis_subscription_statistics_sort_fn, NULL);
2748 if (!sorted_subscriptions) {
2749 ao2_ref(subscription_stats, -1);
2750 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2751 return CLI_SUCCESS;
2752 }
2753
2754 if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2755 ao2_ref(sorted_subscriptions, -1);
2756 ao2_ref(subscription_stats, -1);
2757 ast_cli(a->fd, "Could not sort subscription statistics\n");
2758 return CLI_SUCCESS;
2759 }
2760
2761 ao2_ref(subscription_stats, -1);
2762
2763 ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2764
2765 iter = ao2_iterator_init(sorted_subscriptions, 0);
2766 while ((statistics = ao2_iterator_next(&iter))) {
2767 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2768 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2769 dropped += statistics->messages_dropped;
2770 passed += statistics->messages_passed;
2771 ao2_ref(statistics, -1);
2772 ++count;
2773 }
2774 ao2_iterator_destroy(&iter);
2775
2776 ao2_ref(sorted_subscriptions, -1);
2777
2778 ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2779 ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2780
2781#undef FMT_HEADERS
2782#undef FMT_FIELDS
2783#undef FMT_FIELDS2
2784
2785 return CLI_SUCCESS;
2786}
2787
2788/*!
2789 * \internal
2790 * \brief CLI tab completion for subscription statistics names
2791 */
2792static char *subscription_statistics_complete_name(const char *word, int state)
2793{
2794 struct stasis_subscription_statistics *statistics;
2795 struct ao2_container *subscription_stats;
2796 struct ao2_iterator it_statistics;
2797 int wordlen = strlen(word);
2798 int which = 0;
2799 char *result = NULL;
2800
2801 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2802 if (!subscription_stats) {
2803 return result;
2804 }
2805
2806 it_statistics = ao2_iterator_init(subscription_stats, 0);
2807 while ((statistics = ao2_iterator_next(&it_statistics))) {
2808 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2809 && ++which > state) {
2810 result = ast_strdup(statistics->uniqueid);
2811 }
2812 ao2_ref(statistics, -1);
2813 if (result) {
2814 break;
2815 }
2816 }
2817 ao2_iterator_destroy(&it_statistics);
2818 ao2_ref(subscription_stats, -1);
2819 return result;
2820}
2821
2822/*!
2823 * \internal
2824 * \brief CLI command implementation for 'stasis statistics show subscription'
2825 */
2826static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2827{
2828 struct stasis_subscription_statistics *statistics;
2829 struct ao2_container *subscription_stats;
2830 struct ao2_iterator i;
2831 char *name;
2832
2833 switch (cmd) {
2834 case CLI_INIT:
2835 e->command = "stasis statistics show subscription";
2836 e->usage =
2837 "Usage: stasis statistics show subscription <uniqueid>\n"
2838 " Show stasis subscription statistics.\n";
2839 return NULL;
2840 case CLI_GENERATE:
2841 if (a->pos == 4) {
2842 return subscription_statistics_complete_name(a->word, a->n);
2843 } else {
2844 return NULL;
2845 }
2846 }
2847
2848 if (a->argc != 5) {
2849 return CLI_SHOWUSAGE;
2850 }
2851
2852 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2853 if (!subscription_stats) {
2854 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2855 return CLI_FAILURE;
2856 }
2857
2858 statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2859 if (!statistics) {
2860 ao2_ref(subscription_stats, -1);
2861 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2862 return CLI_FAILURE;
2863 }
2864
2865 ao2_ref(subscription_stats, -1);
2866
2867 ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2868 ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2869 ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2870 ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2871 ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2872 ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2873 ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2874 ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2875 ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No");
2876 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2877 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2878
2880 if (statistics->highest_time_message_type) {
2881 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2882 }
2884
2885 ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2886
2887 ast_cli(a->fd, "Subscribed topics:\n");
2888 i = ao2_iterator_init(statistics->topics, 0);
2889 while ((name = ao2_iterator_next(&i))) {
2890 ast_cli(a->fd, "\t%s\n", name);
2891 ao2_ref(name, -1);
2892 }
2894
2895 ao2_ref(statistics, -1);
2896
2897 return CLI_SUCCESS;
2898}
2899
2900AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2901
2902/*!
2903 * \internal
2904 * \brief CLI command implementation for 'stasis statistics show topics'
2905 */
2906static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2907{
2908 struct ao2_container *sorted_topics;
2909 struct ao2_container *topic_stats;
2910 struct ao2_iterator iter;
2911 struct stasis_topic_statistics *statistics;
2912 int count = 0;
2913 int not_dispatched = 0;
2914 int dispatched = 0;
2915#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2916#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2917#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2918
2919 switch (cmd) {
2920 case CLI_INIT:
2921 e->command = "stasis statistics show topics";
2922 e->usage =
2923 "Usage: stasis statistics show topics\n"
2924 " Shows a list of topics and their general statistics\n";
2925 return NULL;
2926 case CLI_GENERATE:
2927 return NULL;
2928 }
2929
2930 if (a->argc != e->args) {
2931 return CLI_SHOWUSAGE;
2932 }
2933
2934 topic_stats = ao2_global_obj_ref(topic_statistics);
2935 if (!topic_stats) {
2936 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2937 return CLI_FAILURE;
2938 }
2939
2941 stasis_topic_statistics_sort_fn, NULL);
2942 if (!sorted_topics) {
2943 ao2_ref(topic_stats, -1);
2944 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2945 return CLI_SUCCESS;
2946 }
2947
2948 if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2949 ao2_ref(sorted_topics, -1);
2950 ao2_ref(topic_stats, -1);
2951 ast_cli(a->fd, "Could not sort topic statistics\n");
2952 return CLI_SUCCESS;
2953 }
2954
2955 ao2_ref(topic_stats, -1);
2956
2957 ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2958
2959 iter = ao2_iterator_init(sorted_topics, 0);
2960 while ((statistics = ao2_iterator_next(&iter))) {
2962 statistics->messages_not_dispatched, statistics->messages_dispatched,
2963 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2964 not_dispatched += statistics->messages_not_dispatched;
2965 dispatched += statistics->messages_dispatched;
2966 ao2_ref(statistics, -1);
2967 ++count;
2968 }
2969 ao2_iterator_destroy(&iter);
2970
2971 ao2_ref(sorted_topics, -1);
2972
2973 ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2974 ast_cli(a->fd, "\n%d topics\n\n", count);
2975
2976#undef FMT_HEADERS
2977#undef FMT_FIELDS
2978#undef FMT_FIELDS2
2979
2980 return CLI_SUCCESS;
2981}
2982
2983/*!
2984 * \internal
2985 * \brief CLI tab completion for topic statistics names
2986 */
2987static char *topic_statistics_complete_name(const char *word, int state)
2988{
2989 struct stasis_topic_statistics *statistics;
2990 struct ao2_container *topic_stats;
2991 struct ao2_iterator it_statistics;
2992 int wordlen = strlen(word);
2993 int which = 0;
2994 char *result = NULL;
2995
2996 topic_stats = ao2_global_obj_ref(topic_statistics);
2997 if (!topic_stats) {
2998 return result;
2999 }
3000
3001 it_statistics = ao2_iterator_init(topic_stats, 0);
3002 while ((statistics = ao2_iterator_next(&it_statistics))) {
3003 if (!strncasecmp(word, statistics->name, wordlen)
3004 && ++which > state) {
3005 result = ast_strdup(statistics->name);
3006 }
3007 ao2_ref(statistics, -1);
3008 if (result) {
3009 break;
3010 }
3011 }
3012 ao2_iterator_destroy(&it_statistics);
3013 ao2_ref(topic_stats, -1);
3014 return result;
3015}
3016
3017/*!
3018 * \internal
3019 * \brief CLI command implementation for 'stasis statistics show topic'
3020 */
3021static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
3022{
3023 struct stasis_topic_statistics *statistics;
3024 struct ao2_container *topic_stats;
3025 struct ao2_iterator i;
3026 char *uniqueid;
3027
3028 switch (cmd) {
3029 case CLI_INIT:
3030 e->command = "stasis statistics show topic";
3031 e->usage =
3032 "Usage: stasis statistics show topic <name>\n"
3033 " Show stasis topic statistics.\n";
3034 return NULL;
3035 case CLI_GENERATE:
3036 if (a->pos == 4) {
3037 return topic_statistics_complete_name(a->word, a->n);
3038 } else {
3039 return NULL;
3040 }
3041 }
3042
3043 if (a->argc != 5) {
3044 return CLI_SHOWUSAGE;
3045 }
3046
3047 topic_stats = ao2_global_obj_ref(topic_statistics);
3048 if (!topic_stats) {
3049 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
3050 return CLI_FAILURE;
3051 }
3052
3053 statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
3054 if (!statistics) {
3055 ao2_ref(topic_stats, -1);
3056 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
3057 return CLI_FAILURE;
3058 }
3059
3060 ao2_ref(topic_stats, -1);
3061
3062 ast_cli(a->fd, "Topic: %s\n", statistics->name);
3063 ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
3064 ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
3065 ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
3066 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
3067 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
3068 ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
3069
3070 ast_cli(a->fd, "Subscribers:\n");
3071 i = ao2_iterator_init(statistics->subscribers, 0);
3072 while ((uniqueid = ao2_iterator_next(&i))) {
3073 ast_cli(a->fd, "\t%s\n", uniqueid);
3074 ao2_ref(uniqueid, -1);
3075 }
3077
3078 ao2_ref(statistics, -1);
3079
3080 return CLI_SUCCESS;
3081}
3082
3083/*!
3084 * \internal
3085 * \brief CLI command implementation for 'stasis statistics show messages'
3086 */
3087static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
3088{
3089 int i;
3090 int count = 0;
3091 int published = 0;
3092 int unused = 0;
3093#define FMT_HEADERS "%-64s %10s %10s\n"
3094#define FMT_FIELDS "%-64s %10d %10d\n"
3095
3096 switch (cmd) {
3097 case CLI_INIT:
3098 e->command = "stasis statistics show messages";
3099 e->usage =
3100 "Usage: stasis statistics show messages\n"
3101 " Shows a list of message types and their general statistics\n";
3102 return NULL;
3103 case CLI_GENERATE:
3104 return NULL;
3105 }
3106
3107 if (a->argc != e->args) {
3108 return CLI_SHOWUSAGE;
3109 }
3110
3111 ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
3112
3113 ast_mutex_lock(&message_type_statistics_lock);
3114 for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
3115 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
3116
3117 if (!statistics->message_type) {
3118 continue;
3119 }
3120
3121 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
3122 statistics->unused);
3123 published += statistics->published;
3124 unused += statistics->unused;
3125 ++count;
3126 }
3127 ast_mutex_unlock(&message_type_statistics_lock);
3128
3129 ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
3130 ast_cli(a->fd, "\n%d seen message types\n\n", count);
3131
3132#undef FMT_HEADERS
3133#undef FMT_FIELDS
3134
3135 return CLI_SUCCESS;
3136}
3137
3138static struct ast_cli_entry cli_stasis_statistics[] = {
3139 AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
3140 AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
3141 AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
3142 AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
3143 AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
3144};
3145
3146static int subscription_statistics_hash(const void *obj, const int flags)
3147{
3148 const struct stasis_subscription_statistics *object;
3149 const char *key;
3150
3151 switch (flags & OBJ_SEARCH_MASK) {
3152 case OBJ_SEARCH_KEY:
3153 key = obj;
3154 break;
3155 case OBJ_SEARCH_OBJECT:
3156 object = obj;
3157 key = object->uniqueid;
3158 break;
3159 default:
3160 /* Hash can only work on something with a full key. */
3161 ast_assert(0);
3162 return 0;
3163 }
3164 return ast_str_case_hash(key);
3165}
3166
3167static int subscription_statistics_cmp(void *obj, void *arg, int flags)
3168{
3169 const struct stasis_subscription_statistics *object_left = obj;
3170 const struct stasis_subscription_statistics *object_right = arg;
3171 const char *right_key = arg;
3172 int cmp;
3173
3174 switch (flags & OBJ_SEARCH_MASK) {
3175 case OBJ_SEARCH_OBJECT:
3176 right_key = object_right->uniqueid;
3177 /* Fall through */
3178 case OBJ_SEARCH_KEY:
3179 cmp = strcasecmp(object_left->uniqueid, right_key);
3180 break;
3182 /* Not supported by container */
3183 ast_assert(0);
3184 cmp = -1;
3185 break;
3186 default:
3187 /*
3188 * What arg points to is specific to this traversal callback
3189 * and has no special meaning to astobj2.
3190 */
3191 cmp = 0;
3192 break;
3193 }
3194 if (cmp) {
3195 return 0;
3196 }
3197 /*
3198 * At this point the traversal callback is identical to a sorted
3199 * container.
3200 */
3201 return CMP_MATCH;
3202}
3203
3204static int topic_statistics_hash(const void *obj, const int flags)
3205{
3206 const struct stasis_topic_statistics *object;
3207 const char *key;
3208
3209 switch (flags & OBJ_SEARCH_MASK) {
3210 case OBJ_SEARCH_KEY:
3211 key = obj;
3212 break;
3213 case OBJ_SEARCH_OBJECT:
3214 object = obj;
3215 key = object->name;
3216 break;
3217 default:
3218 /* Hash can only work on something with a full key. */
3219 ast_assert(0);
3220 return 0;
3221 }
3222 return ast_str_case_hash(key);
3223}
3224
3225static int topic_statistics_cmp(void *obj, void *arg, int flags)
3226{
3227 const struct stasis_topic_statistics *object_left = obj;
3228 const struct stasis_topic_statistics *object_right = arg;
3229 const char *right_key = arg;
3230 int cmp;
3231
3232 switch (flags & OBJ_SEARCH_MASK) {
3233 case OBJ_SEARCH_OBJECT:
3234 right_key = object_right->name;
3235 /* Fall through */
3236 case OBJ_SEARCH_KEY:
3237 cmp = strcasecmp(object_left->name, right_key);
3238 break;
3240 /* Not supported by container */
3241 ast_assert(0);
3242 cmp = -1;
3243 break;
3244 default:
3245 /*
3246 * What arg points to is specific to this traversal callback
3247 * and has no special meaning to astobj2.
3248 */
3249 cmp = 0;
3250 break;
3251 }
3252 if (cmp) {
3253 return 0;
3254 }
3255 /*
3256 * At this point the traversal callback is identical to a sorted
3257 * container.
3258 */
3259 return CMP_MATCH;
3260}
3261#endif
3262
3263/*! \brief Cleanup function for graceful shutdowns */
3264static void stasis_cleanup(void)
3265{
3266#ifdef AST_DEVMODE
3267 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3268 AST_VECTOR_FREE(&message_type_statistics);
3269 ao2_global_obj_release(subscription_statistics);
3270 ao2_global_obj_release(topic_statistics);
3271#endif
3274 topic_all = NULL;
3276 taskpool = NULL;
3279 aco_info_destroy(&cfg_info);
3281}
3282
3284{
3285 struct stasis_config *cfg;
3286 int cache_init;
3287 struct ast_taskpool_options taskpool_opts = { 0, };
3288#ifdef AST_DEVMODE
3289 struct ao2_container *subscription_stats;
3290 struct ao2_container *topic_stats;
3291#endif
3292
3293 /* Be sure the types are cleaned up after the message bus */
3295
3296 if (aco_info_init(&cfg_info)) {
3297 return -1;
3298 }
3299
3300 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3302 aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
3304 FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
3305 INT_MAX);
3306 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3308 FLDSET(struct stasis_taskpool_conf, initial_size), 0,
3309 INT_MAX);
3310 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3312 FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
3313 INT_MAX);
3314 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3316 FLDSET(struct stasis_taskpool_conf, max_size), 0,
3317 INT_MAX);
3318
3319 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3320 struct stasis_config *default_cfg = stasis_config_alloc();
3321
3322 if (!default_cfg) {
3323 return -1;
3324 }
3325
3326 if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
3327 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3328 ao2_ref(default_cfg, -1);
3329
3330 return -1;
3331 }
3332
3333 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3334 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3335 ao2_ref(default_cfg, -1);
3336
3337 return -1;
3338 }
3339
3340 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3342 cfg = default_cfg;
3343 } else {
3345 if (!cfg) {
3346 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3347
3348 return -1;
3349 }
3350 }
3351
3352 taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
3353 taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
3354 taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
3355 taskpool_opts.auto_increment = 1;
3356 taskpool_opts.max_size = cfg->taskpool_options->max_size;
3357 taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
3358 taskpool = ast_taskpool_create("stasis", &taskpool_opts);
3359 ao2_ref(cfg, -1);
3360 if (!taskpool) {
3361 ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
3362
3363 return -1;
3364 }
3365
3366 cache_init = stasis_cache_init();
3367 if (cache_init != 0) {
3368 return -1;
3369 }
3370
3372 return -1;
3373 }
3375 return -1;
3376 }
3377
3379 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3380 if (!topic_all) {
3381 return -1;
3382 }
3383
3385 return -1;
3386 }
3387
3388#ifdef AST_DEVMODE
3389 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3390 * topic or subscripton.
3391 */
3392 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3393 subscription_statistics_hash, 0, subscription_statistics_cmp);
3394 if (!subscription_stats) {
3395 return -1;
3396 }
3397 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3398 ao2_cleanup(subscription_stats);
3399
3400 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3401 topic_statistics_hash, 0, topic_statistics_cmp);
3402 if (!topic_stats) {
3403 return -1;
3404 }
3405 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3406 ao2_cleanup(topic_stats);
3407 if (!topic_stats) {
3408 return -1;
3409 }
3410
3411 AST_VECTOR_INIT(&message_type_statistics, 0);
3412
3413 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3414 return -1;
3415 }
3416#endif
3417
3418 return 0;
3419}
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
#define var
Definition ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ast_log
Definition astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition astobj2.c:934
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
#define AO2_GLOBAL_OBJ_STATIC(name)
Define a global object holder to be used to hold an ao2 object, statically initialized.
Definition astobj2.h:847
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
@ CMP_MATCH
Definition astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_wrlock(a)
Definition astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition astobj2.h:918
#define AO2_STRING_FIELD_CASE_SORT_FN(stype, field)
Definition astobj2.h:2066
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition astobj2.h:2032
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition astobj2.h:1072
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition astobj2.h:582
static PGresult * result
Definition cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags, int rdlock)
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition cli.h:45
#define CLI_SUCCESS
Definition cli.h:44
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition main/cli.c:2845
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define CLI_FAILURE
Definition cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
#define CONFIG_INFO_CORE(mod, name, arr, alloc,...)
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition manager.c:551
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition stasis.c:2213
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition stasis.h:1384
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition stasis.c:2222
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition stasis.c:2187
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition stasis.h:1391
@ STASIS_UMOS_ENDPOINT
Definition stasis.h:1387
@ STASIS_UMOS_BRIDGE
Definition stasis.h:1386
@ STASIS_UMOS_CHANNEL
Definition stasis.h:1385
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition json.c:407
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_mutex_init(pmutex)
Definition lock.h:193
#define ast_mutex_unlock(a)
Definition lock.h:197
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
#define ast_cond_broadcast(cond)
Definition lock.h:211
pthread_cond_t ast_cond_t
Definition lock.h:185
#define ast_mutex_destroy(a)
Definition lock.h:195
#define ast_mutex_lock(a)
Definition lock.h:196
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition lock.h:527
#define ast_cond_signal(cond)
Definition lock.h:210
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition manager.c:10198
#define EVENT_FLAG_USER
Definition manager.h:81
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
struct ao2_container * container
Definition res_fax.c:603
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition resample.c:96
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition stasis.c:2308
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition stasis.c:1196
static char * topic_complete_name(const char *word)
Definition stasis.c:2613
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition stasis.c:1049
static struct ast_cli_entry cli_stasis[]
Definition stasis.c:2698
struct stasis_subscription * __stasis_subscribe_synchronous(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur synchronously on message publishing.
Definition stasis.c:1031
#define INITIAL_SUBSCRIBERS_MAX
Definition stasis.c:368
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition stasis.c:2257
static void subscription_change_dtor(void *obj)
Definition stasis.c:1698
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition stasis.c:1960
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition stasis.c:2367
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition stasis.c:1131
#define FMT_HEADERS
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition stasis.c:694
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition stasis.c:1364
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition stasis.c:1089
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:923
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Get a topic from the pool for the given name.
Definition stasis.c:1996
static int sub_cleanup(void *data)
Definition stasis.c:1042
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition stasis.c:1626
static void forward_dtor(void *obj)
Definition stasis.c:1616
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition stasis.c:710
static void topic_pool_dtor(void *obj)
Definition stasis.c:1840
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition stasis.c:1594
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition stasis.c:1101
static void * stasis_config_alloc(void)
Definition stasis.c:2475
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition stasis.c:1820
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition stasis.c:1155
static void topic_dtor(void *obj)
Definition stasis.c:500
#define TOPIC_POOL_BUCKETS
Definition stasis.c:371
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition stasis.c:1705
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition stasis.c:3264
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition stasis.c:689
int stasis_init(void)
Initialize the Stasis subsystem.
Definition stasis.c:3283
static void proxy_dtor(void *weakproxy, void *container)
Definition stasis.c:479
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition stasis.c:876
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a task pool.
Definition stasis.c:1020
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition stasis.c:1880
static void stasis_config_destructor(void *obj)
Definition stasis.c:2467
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition stasis.c:1183
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition stasis.c:2505
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition stasis.c:1859
void stasis_log_bad_type_access(const char *name)
Definition stasis.c:2152
static struct aco_type threadpool_option
Definition stasis.c:2416
static void multi_object_blob_dtor(void *obj)
Definition stasis.c:2171
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition stasis.c:567
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition stasis.c:1279
static struct aco_type taskpool_option
Definition stasis.c:2424
static void subscription_dtor(void *obj)
Definition stasis.c:781
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition stasis.c:635
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition stasis.c:1252
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition stasis.c:702
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition stasis.c:1212
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1754
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition stasis.c:2139
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition stasis.c:1508
static struct ast_taskpool * taskpool
Definition stasis.c:374
static struct aco_type * taskpool_options[]
Definition stasis.c:2432
struct aco_file stasis_conf
Definition stasis.c:2445
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1726
static void stasis_declined_config_destructor(void *obj)
Definition stasis.c:2460
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition stasis.c:1390
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition stasis.c:1171
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition stasis.c:1247
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition stasis.c:1929
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition stasis.c:821
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1656
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:1009
static void topic_pool_entry_dtor(void *obj)
Definition stasis.c:1809
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition stasis.c:1589
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition stasis.c:1337
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition stasis.c:1228
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition stasis.c:2526
static int userevent_exclusion_cb(const char *key)
Definition stasis.c:2359
struct aco_type * declined_options[]
Definition stasis.c:2443
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition stasis.c:492
struct ao2_container * topic_all
Definition stasis.c:462
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1307
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition stasis.c:2556
#define TOPIC_ALL_BUCKETS
Definition stasis.c:384
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition stasis.c:2435
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition stasis.c:2639
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition stasis.h:1546
stasis_subscription_message_filter
Stasis subscription message filters.
Definition stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition stasis.h:1524
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
Definition stasis.h:1471
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition strings.h:1139
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition strings.h:1365
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition strings.h:659
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition strings.h:1303
char *attribute_pure ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition strings.h:761
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition strings.c:221
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition strings.c:205
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition cli.h:171
int args
This gets set in ast_cli_register()
Definition cli.h:185
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition manager.h:504
A multi object blob data structure to carry user event stasis messages.
Definition stasis.c:2162
struct ast_multi_object_blob::@425 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition stasis.c:2163
Structure for mutex and tracking information.
Definition lock.h:142
Support for dynamic strings.
Definition strings.h:623
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition taskpool.h:88
int max_size
Maximum number of taskprocessors a pool may have.
Definition taskpool.h:122
int auto_increment
Number of taskprocessors to increment the pool by.
Definition taskpool.h:92
int minimum_size
Number of taskprocessors that will always exist.
Definition taskpool.h:99
int initial_size
Number of taskprocessors the pool will start with.
Definition taskpool.h:109
An opaque taskpool structure.
Definition taskpool.c:62
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
Definition stasis.c:2413
struct stasis_taskpool_conf * taskpool_options
Definition stasis.c:2411
A structure to hold global configuration-related options.
Definition stasis.c:2392
struct ao2_container * declined
Definition stasis.c:2394
Forwarding information.
Definition stasis.c:1609
struct stasis_topic * from_topic
Definition stasis.c:1611
struct stasis_topic * to_topic
Definition stasis.c:1613
Structure containing callbacks for Stasis message sanitization.
Definition stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition stasis.h:921
struct stasis_topic * topic
Definition stasis.h:922
struct stasis_topic * topic
Definition stasis.c:751
struct stasis_subscription::@424 accepted_message_types
ast_cond_t join_cond
Definition stasis.c:760
stasis_subscription_cb callback
Definition stasis.c:755
struct ast_taskprocessor * mailbox
Definition stasis.c:753
enum stasis_subscription_message_filter filter
Definition stasis.c:773
int final_message_processed
Definition stasis.c:766
enum stasis_subscription_message_formatters accepted_formatters
Definition stasis.c:771
Taskpool configuration options.
Definition stasis.c:2398
struct ao2_container * pool_container
Definition stasis.c:1836
struct stasis_topic * pool_topic
Definition stasis.c:1837
struct stasis_topic::@423 upstream_topics
char * name
Definition stasis.c:453
int subscriber_id
Definition stasis.c:450
char * detail
Definition stasis.c:456
struct timeval * creationtime
Definition stasis.c:459
struct stasis_topic::@422 subscribers
ast_cond_t cond
Definition stasis.c:1354
void * task_data
Definition stasis.c:1356
ast_mutex_t lock
Definition stasis.c:1353
const char * name
Definition stasis.c:1787
struct stasis_topic * topic
Definition stasis.c:1789
ast_cond_t init_cond
Definition stasis.c:1803
ast_mutex_t init_lock
Definition stasis.c:1802
unsigned int failed
Definition stasis.c:1805
unsigned int initialized
Definition stasis.c:1804
struct stasis_forward * forward
Definition stasis.c:1788
char name[0]
Definition stasis.c:1806
char buf[0]
Definition stasis.c:472
struct timeval creationtime
Definition stasis.c:470
char * name
Definition stasis.c:467
char * detail
Definition stasis.c:468
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
Definition taskpool.c:860
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition taskpool.c:675
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition taskpool.c:324
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
#define ast_taskprocessor_push_local(tps, task_exe, datap)
#define ast_taskprocessor_push(tps, task_exe, datap)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
static struct test_val a
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition utils.c:2331
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
FILE * out
Definition utils/frame.c:33
static void statistics(void)
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define ast_assert(a)
Definition utils.h:779
#define ARRAY_LEN(a)
Definition utils.h:706
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition vector.h:295
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition vector.h:582
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition vector.h:594
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267
#define AST_VECTOR(name, type)
Define a vector structure.
Definition vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition vector.h:679