Asterisk - The Open Source Telephony Project GIT-master-1f1c5bb
stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message Bus API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/stasis.h"
36#include "asterisk/threadpool.h"
37#include "asterisk/utils.h"
38#include "asterisk/uuid.h"
39#include "asterisk/vector.h"
44#include "asterisk/cli.h"
45
46/*** DOCUMENTATION
47 <managerEvent language="en_US" name="UserEvent">
48 <managerEventInstance class="EVENT_FLAG_USER">
49 <synopsis>A user defined event raised from the dialplan.</synopsis>
50 <syntax>
51 <channel_snapshot/>
52 <parameter name="UserEvent">
53 <para>The event name, as specified in the dialplan.</para>
54 </parameter>
55 </syntax>
56 <description>
57 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58 </description>
59 <see-also>
60 <ref type="application">UserEvent</ref>
61 <ref type="managerEvent">UserEvent</ref>
62 </see-also>
63 </managerEventInstance>
64 </managerEvent>
65 <configInfo name="stasis" language="en_US">
66 <configFile name="stasis.conf">
67 <configObject name="threadpool">
68 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69 <configOption name="initial_size" default="5">
70 <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71 </configOption>
72 <configOption name="idle_timeout_sec" default="20">
73 <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74 </configOption>
75 <configOption name="max_size" default="50">
76 <synopsis>Maximum number of threads in the threadpool.</synopsis>
77 </configOption>
78 </configObject>
79 <configObject name="declined_message_types">
80 <synopsis>Stasis message types for which to decline creation.</synopsis>
81 <configOption name="decline">
82 <synopsis>The message type to decline.</synopsis>
83 <description>
84 <para>This configuration option defines the name of the Stasis
85 message type that Asterisk is forbidden from creating and can be
86 specified as many times as necessary to achieve the desired result.</para>
87 <enumlist>
88 <enum name="stasis_app_recording_snapshot_type" />
89 <enum name="stasis_app_playback_snapshot_type" />
90 <enum name="stasis_test_message_type" />
91 <enum name="confbridge_start_type" />
92 <enum name="confbridge_end_type" />
93 <enum name="confbridge_join_type" />
94 <enum name="confbridge_leave_type" />
95 <enum name="confbridge_start_record_type" />
96 <enum name="confbridge_stop_record_type" />
97 <enum name="confbridge_mute_type" />
98 <enum name="confbridge_unmute_type" />
99 <enum name="confbridge_talking_type" />
100 <enum name="cel_generic_type" />
101 <enum name="ast_bridge_snapshot_type" />
102 <enum name="ast_bridge_merge_message_type" />
103 <enum name="ast_channel_entered_bridge_type" />
104 <enum name="ast_channel_left_bridge_type" />
105 <enum name="ast_blind_transfer_type" />
106 <enum name="ast_attended_transfer_type" />
107 <enum name="ast_endpoint_snapshot_type" />
108 <enum name="ast_endpoint_state_type" />
109 <enum name="ast_device_state_message_type" />
110 <enum name="ast_test_suite_message_type" />
111 <enum name="ast_mwi_state_type" />
112 <enum name="ast_mwi_vm_app_type" />
113 <enum name="ast_format_register_type" />
114 <enum name="ast_format_unregister_type" />
115 <enum name="ast_manager_get_generic_type" />
116 <enum name="ast_parked_call_type" />
117 <enum name="ast_channel_snapshot_type" />
118 <enum name="ast_channel_dial_type" />
119 <enum name="ast_channel_varset_type" />
120 <enum name="ast_channel_hangup_request_type" />
121 <enum name="ast_channel_dtmf_begin_type" />
122 <enum name="ast_channel_dtmf_end_type" />
123 <enum name="ast_channel_flash_type" />
124 <enum name="ast_channel_wink_type" />
125 <enum name="ast_channel_hold_type" />
126 <enum name="ast_channel_unhold_type" />
127 <enum name="ast_channel_chanspy_start_type" />
128 <enum name="ast_channel_chanspy_stop_type" />
129 <enum name="ast_channel_fax_type" />
130 <enum name="ast_channel_hangup_handler_type" />
131 <enum name="ast_channel_moh_start_type" />
132 <enum name="ast_channel_moh_stop_type" />
133 <enum name="ast_channel_mixmonitor_start_type" />
134 <enum name="ast_channel_mixmonitor_stop_type" />
135 <enum name="ast_channel_mixmonitor_mute_type" />
136 <enum name="ast_channel_agent_login_type" />
137 <enum name="ast_channel_agent_logoff_type" />
138 <enum name="ast_channel_talking_start" />
139 <enum name="ast_channel_talking_stop" />
140 <enum name="ast_security_event_type" />
141 <enum name="ast_named_acl_change_type" />
142 <enum name="ast_local_bridge_type" />
143 <enum name="ast_local_optimization_begin_type" />
144 <enum name="ast_local_optimization_end_type" />
145 <enum name="stasis_subscription_change_type" />
146 <enum name="ast_multi_user_event_type" />
147 <enum name="stasis_cache_clear_type" />
148 <enum name="stasis_cache_update_type" />
149 <enum name="ast_network_change_type" />
150 <enum name="ast_system_registry_type" />
151 <enum name="ast_cc_available_type" />
152 <enum name="ast_cc_offertimerstart_type" />
153 <enum name="ast_cc_requested_type" />
154 <enum name="ast_cc_requestacknowledged_type" />
155 <enum name="ast_cc_callerstopmonitoring_type" />
156 <enum name="ast_cc_callerstartmonitoring_type" />
157 <enum name="ast_cc_callerrecalling_type" />
158 <enum name="ast_cc_recallcomplete_type" />
159 <enum name="ast_cc_failure_type" />
160 <enum name="ast_cc_monitorfailed_type" />
161 <enum name="ast_presence_state_message_type" />
162 <enum name="ast_rtp_rtcp_sent_type" />
163 <enum name="ast_rtp_rtcp_received_type" />
164 <enum name="ast_call_pickup_type" />
165 <enum name="aoc_s_type" />
166 <enum name="aoc_d_type" />
167 <enum name="aoc_e_type" />
168 <enum name="dahdichannel_type" />
169 <enum name="mcid_type" />
170 <enum name="session_timeout_type" />
171 <enum name="cdr_read_message_type" />
172 <enum name="cdr_write_message_type" />
173 <enum name="cdr_prop_write_message_type" />
174 <enum name="corosync_ping_message_type" />
175 <enum name="agi_exec_start_type" />
176 <enum name="agi_exec_end_type" />
177 <enum name="agi_async_start_type" />
178 <enum name="agi_async_exec_type" />
179 <enum name="agi_async_end_type" />
180 <enum name="queue_caller_join_type" />
181 <enum name="queue_caller_leave_type" />
182 <enum name="queue_caller_abandon_type" />
183 <enum name="queue_member_status_type" />
184 <enum name="queue_member_added_type" />
185 <enum name="queue_member_removed_type" />
186 <enum name="queue_member_pause_type" />
187 <enum name="queue_member_penalty_type" />
188 <enum name="queue_member_ringinuse_type" />
189 <enum name="queue_agent_called_type" />
190 <enum name="queue_agent_connect_type" />
191 <enum name="queue_agent_complete_type" />
192 <enum name="queue_agent_dump_type" />
193 <enum name="queue_agent_ringnoanswer_type" />
194 <enum name="meetme_join_type" />
195 <enum name="meetme_leave_type" />
196 <enum name="meetme_end_type" />
197 <enum name="meetme_mute_type" />
198 <enum name="meetme_talking_type" />
199 <enum name="meetme_talk_request_type" />
200 <enum name="appcdr_message_type" />
201 <enum name="forkcdr_message_type" />
202 <enum name="cdr_sync_message_type" />
203 </enumlist>
204 </description>
205 </configOption>
206 </configObject>
207 </configFile>
208 </configInfo>
209***/
210
211/*!
212 * \page stasis-impl Stasis Implementation Notes
213 *
214 * \par Reference counting
215 *
216 * Stasis introduces a number of objects, which are tightly related to one
217 * another. Because we rely on ref-counting for memory management, understanding
218 * these relationships is important to understanding this code.
219 *
220 * \code{.txt}
221 *
222 * stasis_topic <----> stasis_subscription
223 * ^ ^
224 * \ /
225 * \ /
226 * dispatch
227 * |
228 * |
229 * v
230 * stasis_message
231 * |
232 * |
233 * v
234 * stasis_message_type
235 *
236 * \endcode
237 *
238 * The most troubling thing in this chart is the cyclic reference between
239 * stasis_topic and stasis_subscription. This is both unfortunate, and
240 * necessary. Topics need the subscription in order to dispatch messages;
241 * subscriptions need the topics to unsubscribe and check subscription status.
242 *
243 * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
244 * topic's reference to a subscription. When the subcription is destroyed, it
245 * will remove its reference to the topic.
246 *
247 * This means that until a subscription has be explicitly unsubscribed, it will
248 * not be destroyed. Neither will a topic be destroyed while it has subscribers.
249 * The destructors of both have assertions regarding this to catch ref-counting
250 * problems where a subscription or topic has had an extra ao2_cleanup().
251 *
252 * The \ref dispatch_exec_sync object is a transient object, which is posted to
253 * a subscription's taskprocessor to send a message to the subscriber. They have
254 * short life cycles, allocated on one thread, destroyed on another.
255 *
256 * During shutdown, or the deletion of a domain object, there are a flurry of
257 * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
258 * are processed. Any one of these cleanups could be the one to actually destroy
259 * a given object, so care must be taken to ensure that an object isn't
260 * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
261 * that might happen when a RAII_VAR() goes out of scope.
262 *
263 * \par Typical life cycles
264 *
265 * \li stasis_topic - There are several topics which live for the duration of
266 * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
267 * are actually fed by shorter-lived topics whose lifetime is associated
268 * with some domain object (like ast_channel_topic() for a given
269 * ast_channel).
270 *
271 * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
272 * topics, for similar reasons.
273 *
274 * \li dispatch - Very short lived; just long enough to post a message to a
275 * subscriber.
276 *
277 * \li stasis_message - Short to intermediate lifetimes, but that is mostly
278 * irrelevant. Messages are strictly data and have no behavior associated
279 * with them, so it doesn't really matter if/when they are destroyed. By
280 * design, a component could hold a ref to a message forever without any
281 * ill consequences (aside from consuming more memory).
282 *
283 * \li stasis_message_type - Long life cycles, typically only destroyed on
284 * module unloading or _clean_ process exit.
285 *
286 * \par Subscriber shutdown sequencing
287 *
288 * Subscribers are sensitive to shutdown sequencing, specifically in how the
289 * reference message types. This is fully detailed in the documentation at
290 * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
291 *
292 * In short, the lifetime of the \a data (and \a callback, if in a module) must
293 * be held until the stasis_subscription_final_message() has been received.
294 * Depending on the structure of the subscriber code, this can be handled by
295 * using stasis_subscription_final_message() to free resources on the final
296 * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
297 * block until the unsubscribe has completed.
298 */
299
300/*! Initial size of the subscribers list. */
301#define INITIAL_SUBSCRIBERS_MAX 4
302
303/*! The number of buckets to use for topic pools */
304#define TOPIC_POOL_BUCKETS 57
305
306/*! Thread pool for topics that don't want a dedicated taskprocessor */
308
310
311#if defined(LOW_MEMORY)
312
313#define TOPIC_ALL_BUCKETS 257
314
315#else
316
317#define TOPIC_ALL_BUCKETS 997
318
319#endif
320
321#ifdef AST_DEVMODE
322
323/*! The number of buckets to use for topic statistics */
324#define TOPIC_STATISTICS_BUCKETS 57
325
326/*! The number of buckets to use for subscription statistics */
327#define SUBSCRIPTION_STATISTICS_BUCKETS 57
328
329/*! Global container which stores statistics for topics */
330static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
331
332/*! Global container which stores statistics for subscriptions */
333static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
334
335/*! \internal */
336struct stasis_message_type_statistics {
337 /*! \brief The number of messages of this published */
338 int published;
339 /*! \brief The number of messages of this that did not reach a subscriber */
340 int unused;
341 /*! \brief The stasis message type */
342 struct stasis_message_type *message_type;
343};
344
345/*! Lock to protect the message types vector */
346AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
347
348/*! Vector containing message type information */
349static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
350
351/*! \internal */
352struct stasis_topic_statistics {
353 /*! \brief Highest time spent dispatching messages to subscribers */
354 long highest_time_dispatched;
355 /*! \brief Lowest time spent dispatching messages to subscribers */
356 long lowest_time_dispatched;
357 /*! \brief The number of messages that were not dispatched to any subscriber */
358 int messages_not_dispatched;
359 /*! \brief The number of messages that were dispatched to at least 1 subscriber */
360 int messages_dispatched;
361 /*! \brief The ids of the subscribers to this topic */
362 struct ao2_container *subscribers;
363 /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
364 struct stasis_topic *topic;
365 /*! \brief Name of the topic */
366 char name[0];
367};
368#endif
369
370/*! \internal */
372 /*! Variable length array of the subscribers */
374
375 /*! Topics forwarding into this topic */
377
378#ifdef AST_DEVMODE
379 struct stasis_topic_statistics *statistics;
380#endif
381
382 /*! Unique incrementing integer for subscriber ids */
384
385 /*! Name of the topic */
386 char *name;
387
388 /*! Detail of the topic */
389 char *detail;
390
391 /*! Creation time */
392 struct timeval *creationtime;
393};
394
396
399
400 char *name;
401 char *detail;
402
403 struct timeval creationtime;
404
405 char buf[0];
406};
407
411
412static void proxy_dtor(void *weakproxy, void *container)
413{
414 ao2_unlink(container, weakproxy);
416}
417
418/* Forward declarations for the tightly-coupled subscription object */
419static int topic_add_subscription(struct stasis_topic *topic,
420 struct stasis_subscription *sub);
421
422static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
423
424/*! \brief Lock two topics. */
425#define topic_lock_both(topic1, topic2) \
426 do { \
427 ao2_lock(topic1); \
428 while (ao2_trylock(topic2)) { \
429 AO2_DEADLOCK_AVOIDANCE(topic1); \
430 } \
431 } while (0)
432
433static void topic_dtor(void *obj)
434{
435 struct stasis_topic *topic = obj;
436#ifdef AST_DEVMODE
437 struct ao2_container *topic_stats;
438#endif
439
440 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
441 topic->name, topic->detail);
442
443 /* Subscribers hold a reference to topics, so they should all be
444 * unsubscribed before we get here. */
446
449 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
450
451#ifdef AST_DEVMODE
452 if (topic->statistics) {
453 topic_stats = ao2_global_obj_ref(topic_statistics);
454 if (topic_stats) {
455 ao2_unlink(topic_stats, topic->statistics);
456 ao2_ref(topic_stats, -1);
457 }
458 ao2_ref(topic->statistics, -1);
459 }
460#endif
461}
462
463#ifdef AST_DEVMODE
464static void topic_statistics_destroy(void *obj)
465{
466 struct stasis_topic_statistics *statistics = obj;
467
468 ao2_cleanup(statistics->subscribers);
469}
470
471static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
472{
473 struct stasis_topic_statistics *statistics;
474 RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
475
476 if (!topic_stats) {
477 return NULL;
478 }
479
480 statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
481 if (!statistics) {
482 return NULL;
483 }
484
485 statistics->subscribers = ast_str_container_alloc(1);
486 if (!statistics->subscribers) {
487 ao2_ref(statistics, -1);
488 return NULL;
489 }
490
491 /* This is strictly used for the pointer address when showing the topic */
492 statistics->topic = topic;
493 strcpy(statistics->name, topic->name); /* SAFE */
494 ao2_link(topic_stats, statistics);
495
496 return statistics;
497}
498#endif
499
500static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
501{
502 struct topic_proxy *proxy;
503 struct stasis_topic* topic_tmp;
504 size_t detail_len;
505
506 if (!topic || !name || !strlen(name) || !detail) {
507 return -1;
508 }
509
511
512 topic_tmp = stasis_topic_get(name);
513 if (topic_tmp) {
514 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
515 ao2_ref(topic_tmp, -1);
517
518 return -1;
519 }
520
521 detail_len = strlen(detail) + 1;
522
523 proxy = ao2_t_weakproxy_alloc(
524 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
525 if (!proxy) {
527
528 return -1;
529 }
530
531 /* set the proxy info */
532 proxy->name = proxy->buf;
533 proxy->detail = proxy->name + strlen(name) + 1;
534
535 strcpy(proxy->name, name); /* SAFE */
536 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
537 proxy->creationtime = ast_tvnow();
538
539 /* We have exclusive access to proxy, no need for locking here. */
540 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
541 ao2_cleanup(proxy);
543
544 return -1;
545 }
546
548 ao2_cleanup(proxy);
551
552 return -1;
553 }
554
555 /* setting the topic point to the proxy */
556 topic->name = proxy->name;
557 topic->detail = proxy->detail;
558 topic->creationtime = &(proxy->creationtime);
559
561 ao2_ref(proxy, -1);
562
564
565 return 0;
566}
567
569 const char *name, const char* detail
570 )
571{
572 struct stasis_topic *topic;
573 int res = 0;
574
575 if (!name|| !strlen(name) || !detail) {
576 return NULL;
577 }
578 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
579
580 topic = stasis_topic_get(name);
581 if (topic) {
582 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
583 name, detail);
584 return topic;
585 }
586
587 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
588 if (!topic) {
589 return NULL;
590 }
591
593 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
594 if (res) {
595 ao2_ref(topic, -1);
596 return NULL;
597 }
598
599 /* link to the proxy */
600 if (link_topic_proxy(topic, name, detail)) {
601 ao2_ref(topic, -1);
602 return NULL;
603 }
604
605#ifdef AST_DEVMODE
606 topic->statistics = stasis_topic_statistics_create(topic);
607 if (!topic->statistics) {
608 ao2_ref(topic, -1);
609 return NULL;
610 }
611#endif
612 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
613
614 return topic;
615}
616
618{
620}
621
623{
625}
626
627const char *stasis_topic_name(const struct stasis_topic *topic)
628{
629 if (!topic) {
630 return NULL;
631 }
632 return topic->name;
633}
634
635const char *stasis_topic_detail(const struct stasis_topic *topic)
636{
637 if (!topic) {
638 return NULL;
639 }
640 return topic->detail;
641}
642
643size_t stasis_topic_subscribers(const struct stasis_topic *topic)
644{
645 return AST_VECTOR_SIZE(&topic->subscribers);
646}
647
648#ifdef AST_DEVMODE
649struct stasis_subscription_statistics {
650 /*! \brief The filename where the subscription originates */
651 const char *file;
652 /*! \brief The function where the subscription originates */
653 const char *func;
654 /*! \brief Names of the topics we are subscribed to */
655 struct ao2_container *topics;
656 /*! \brief The message type that currently took the longest to process */
657 struct stasis_message_type *highest_time_message_type;
658 /*! \brief Highest time spent invoking a message */
659 long highest_time_invoked;
660 /*! \brief Lowest time spent invoking a message */
661 long lowest_time_invoked;
662 /*! \brief The number of messages that were filtered out */
663 int messages_dropped;
664 /*! \brief The number of messages that passed filtering */
665 int messages_passed;
666 /*! \brief Using a mailbox to queue messages */
667 int uses_mailbox;
668 /*! \brief Using stasis threadpool for handling messages */
669 int uses_threadpool;
670 /*! \brief The line number where the subscription originates */
671 int lineno;
672 /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
673 struct stasis_subscription *sub;
674 /*! \brief Unique ID of the subscription */
675 char uniqueid[0];
676};
677#endif
678
679/*! \internal */
681 /*! Unique ID for this subscription */
682 char *uniqueid;
683 /*! Topic subscribed to. */
685 /*! Mailbox for processing incoming messages. */
687 /*! Callback function for incoming message processing. */
689 /*! Data pointer to be handed to the callback. */
690 void *data;
691
692 /*! Condition for joining with subscription. */
694 /*! Flag set when final message for sub has been received.
695 * Be sure join_lock is held before reading/setting. */
697 /*! Flag set when final message for sub has been processed.
698 * Be sure join_lock is held before reading/setting. */
700
701 /*! The message types this subscription is accepting */
703 /*! The message formatters this subscription is accepting */
705 /*! The message filter currently in use */
707
708#ifdef AST_DEVMODE
709 /*! Statistics information */
710 struct stasis_subscription_statistics *statistics;
711#endif
712};
713
714static void subscription_dtor(void *obj)
715{
716 struct stasis_subscription *sub = obj;
717#ifdef AST_DEVMODE
718 struct ao2_container *subscription_stats;
719#endif
720
721 /* Subscriptions need to be manually unsubscribed before destruction
722 * b/c there's a cyclic reference between topics and subscriptions */
724 /* If there are any messages in flight to this subscription; that would
725 * be bad. */
727
728 ast_free(sub->uniqueid);
729 ao2_cleanup(sub->topic);
730 sub->topic = NULL;
732 sub->mailbox = NULL;
733 ast_cond_destroy(&sub->join_cond);
734
735 AST_VECTOR_FREE(&sub->accepted_message_types);
736
737#ifdef AST_DEVMODE
738 if (sub->statistics) {
739 subscription_stats = ao2_global_obj_ref(subscription_statistics);
740 if (subscription_stats) {
741 ao2_unlink(subscription_stats, sub->statistics);
742 ao2_ref(subscription_stats, -1);
743 }
744 ao2_ref(sub->statistics, -1);
745 }
746#endif
747}
748
749/*!
750 * \brief Invoke the subscription's callback.
751 * \param sub Subscription to invoke.
752 * \param message Message to send.
753 */
755 struct stasis_message *message)
756{
757 unsigned int final = stasis_subscription_final_message(sub, message);
759#ifdef AST_DEVMODE
760 struct timeval start;
761 long elapsed;
762
763 start = ast_tvnow();
764#endif
765
766 /* Notify that the final message has been received */
767 if (final) {
768 ao2_lock(sub);
769 sub->final_message_rxed = 1;
770 ast_cond_signal(&sub->join_cond);
772 }
773
774 /*
775 * If filtering is turned on and this is a 'final' message, we only invoke the callback
776 * if the subscriber accepts subscription_change message types.
777 */
778 if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
779 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
780 /* Since sub is mostly immutable, no need to lock sub */
781 sub->callback(sub->data, sub, message);
782 }
783
784 /* Notify that the final message has been processed */
785 if (final) {
786 ao2_lock(sub);
787 sub->final_message_processed = 1;
788 ast_cond_signal(&sub->join_cond);
790 }
791
792#ifdef AST_DEVMODE
793 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
794 if (elapsed > sub->statistics->highest_time_invoked) {
795 sub->statistics->highest_time_invoked = elapsed;
796 ao2_lock(sub->statistics);
797 sub->statistics->highest_time_message_type = stasis_message_type(message);
798 ao2_unlock(sub->statistics);
799 }
800 if (elapsed < sub->statistics->lowest_time_invoked) {
801 sub->statistics->lowest_time_invoked = elapsed;
802 }
803#endif
804}
805
806static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
807static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
808
810{
811}
812
813#ifdef AST_DEVMODE
814static void subscription_statistics_destroy(void *obj)
815{
816 struct stasis_subscription_statistics *statistics = obj;
817
818 ao2_cleanup(statistics->topics);
819}
820
821static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
822 int needs_mailbox, int use_thread_pool, const char *file, int lineno,
823 const char *func)
824{
825 struct stasis_subscription_statistics *statistics;
826 RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
827
828 if (!subscription_stats) {
829 return NULL;
830 }
831
832 statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
833 if (!statistics) {
834 return NULL;
835 }
836
838 if (!statistics->topics) {
839 ao2_ref(statistics, -1);
840 return NULL;
841 }
842
843 statistics->file = file;
844 statistics->lineno = lineno;
845 statistics->func = func;
846 statistics->uses_mailbox = needs_mailbox;
847 statistics->uses_threadpool = use_thread_pool;
848 strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
849 statistics->sub = sub;
850 ao2_link(subscription_stats, statistics);
851
852 return statistics;
853}
854#endif
855
857 struct stasis_topic *topic,
859 void *data,
860 int needs_mailbox,
861 int use_thread_pool,
862 const char *file,
863 int lineno,
864 const char *func)
865{
866 struct stasis_subscription *sub;
867 int ret;
868
869 if (!topic) {
870 return NULL;
871 }
872
873 /* The ao2 lock is used for join_cond. */
875 if (!sub) {
876 return NULL;
877 }
878
879#ifdef AST_DEVMODE
881 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
882 if (ret < 0 || !sub->statistics) {
883 ao2_ref(sub, -1);
884 return NULL;
885 }
886#else
888 if (ret < 0) {
889 ao2_ref(sub, -1);
890 return NULL;
891 }
892#endif
893
894 if (needs_mailbox) {
895 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
896
897 /* Create name with seq number appended. */
898 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
899 use_thread_pool ? 'p' : 'm',
901
902 /*
903 * With a small number of subscribers, a thread-per-sub is
904 * acceptable. For a large number of subscribers, a thread
905 * pool should be used.
906 */
907 if (use_thread_pool) {
908 sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
909 } else {
910 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
911 }
912 if (!sub->mailbox) {
913 ao2_ref(sub, -1);
914
915 return NULL;
916 }
918 /* Taskprocessor has a reference */
919 ao2_ref(sub, +1);
920 }
921
922 ao2_ref(topic, +1);
923 sub->topic = topic;
924 sub->callback = callback;
925 sub->data = data;
926 ast_cond_init(&sub->join_cond, NULL);
928 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
929 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
930
931 if (topic_add_subscription(topic, sub) != 0) {
932 ao2_ref(sub, -1);
933 ao2_ref(topic, -1);
934
935 return NULL;
936 }
938
939 return sub;
940}
941
943 struct stasis_topic *topic,
945 void *data,
946 const char *file,
947 int lineno,
948 const char *func)
949{
950 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
951}
952
954 struct stasis_topic *topic,
956 void *data,
957 const char *file,
958 int lineno,
959 const char *func)
960{
961 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
962}
963
964static int sub_cleanup(void *data)
965{
966 struct stasis_subscription *sub = data;
968 return 0;
969}
970
972{
973 /* The subscription may be the last ref to this topic. Hold
974 * the topic ref open until after the unlock. */
975 struct stasis_topic *topic;
976
977 if (!sub) {
978 return NULL;
979 }
980
981 topic = ao2_bump(sub->topic);
982
983 /* We have to remove the subscription first, to ensure the unsubscribe
984 * is the final message */
985 if (topic_remove_subscription(sub->topic, sub) != 0) {
987 "Internal error: subscription has invalid topic\n");
988 ao2_cleanup(topic);
989
990 return NULL;
991 }
992
993 /* Now let everyone know about the unsubscribe */
995
996 /* When all that's done, remove the ref the mailbox has on the sub */
997 if (sub->mailbox) {
998 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
999 /* Nothing we can do here, the conditional is just to keep
1000 * the compiler happy that we're not ignoring the result. */
1001 }
1002 }
1003
1004 /* Unsubscribing unrefs the subscription */
1006 ao2_cleanup(topic);
1007
1008 return NULL;
1009}
1010
1012 long low_water, long high_water)
1013{
1014 int res = -1;
1015
1016 if (subscription) {
1017 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1018 low_water, high_water);
1019 }
1020 return res;
1021}
1022
1024 const struct stasis_message_type *type)
1025{
1026 if (!subscription) {
1027 return -1;
1028 }
1029
1030 ast_assert(type != NULL);
1032
1034 /* Filtering is unreliable as this message type is not yet initialized
1035 * so force all messages through.
1036 */
1038 return 0;
1039 }
1040
1041 ao2_lock(subscription->topic);
1043 /* We do this for the same reason as above. The subscription can still operate, so allow
1044 * it to do so by forcing all messages through.
1045 */
1047 }
1048 ao2_unlock(subscription->topic);
1049
1050 return 0;
1051}
1052
1054 const struct stasis_message_type *type)
1055{
1056 if (!subscription) {
1057 return -1;
1058 }
1059
1060 ast_assert(type != NULL);
1062
1064 return 0;
1065 }
1066
1067 ao2_lock(subscription->topic);
1069 /* The memory is already allocated so this can't fail */
1071 }
1072 ao2_unlock(subscription->topic);
1073
1074 return 0;
1075}
1076
1079{
1080 if (!subscription) {
1081 return -1;
1082 }
1083
1084 ao2_lock(subscription->topic);
1085 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1086 subscription->filter = filter;
1087 }
1088 ao2_unlock(subscription->topic);
1089
1090 return 0;
1091}
1092
1095{
1096 ast_assert(subscription != NULL);
1097
1098 ao2_lock(subscription->topic);
1099 subscription->accepted_formatters = formatters;
1100 ao2_unlock(subscription->topic);
1101
1102 return;
1103}
1104
1106{
1107 if (subscription) {
1108 ao2_lock(subscription);
1109 /* Wait until the processed flag has been set */
1110 while (!subscription->final_message_processed) {
1111 ast_cond_wait(&subscription->join_cond,
1112 ao2_object_get_lockaddr(subscription));
1113 }
1114 ao2_unlock(subscription);
1115 }
1116}
1117
1119{
1120 if (subscription) {
1121 int ret;
1122
1123 ao2_lock(subscription);
1124 ret = subscription->final_message_rxed;
1125 ao2_unlock(subscription);
1126
1127 return ret;
1128 }
1129
1130 /* Null subscription is about as done as you can get */
1131 return 1;
1132}
1133
1135 struct stasis_subscription *subscription)
1136{
1137 if (!subscription) {
1138 return NULL;
1139 }
1140
1141 /* Bump refcount to hold it past the unsubscribe */
1142 ao2_ref(subscription, +1);
1143 stasis_unsubscribe(subscription);
1144 stasis_subscription_join(subscription);
1145 /* Now decrement the refcount back */
1146 ao2_cleanup(subscription);
1147 return NULL;
1148}
1149
1151{
1152 if (sub) {
1153 size_t i;
1154 struct stasis_topic *topic = sub->topic;
1155
1156 ao2_lock(topic);
1157 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1158 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1159 ao2_unlock(topic);
1160 return 1;
1161 }
1162 }
1163 ao2_unlock(topic);
1164 }
1165
1166 return 0;
1167}
1168
1170{
1171 return sub->uniqueid;
1172}
1173
1175{
1176 struct stasis_subscription_change *change;
1177
1179 return 0;
1180 }
1181
1182 change = stasis_message_data(msg);
1183 if (strcmp("Unsubscribe", change->description)) {
1184 return 0;
1185 }
1186
1187 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1188 return 0;
1189 }
1190
1191 return 1;
1192}
1193
1194/*!
1195 * \brief Add a subscriber to a topic.
1196 * \param topic Topic
1197 * \param sub Subscriber
1198 * \return 0 on success
1199 * \return Non-zero on error
1200 */
1202{
1203 size_t idx;
1204
1205 ao2_lock(topic);
1206 /* The reference from the topic to the subscription is shared with
1207 * the owner of the subscription, which will explicitly unsubscribe
1208 * to release it.
1209 *
1210 * If we bumped the refcount here, the owner would have to unsubscribe
1211 * and cleanup, which is a bit awkward. */
1213
1214 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1217 }
1218
1219#ifdef AST_DEVMODE
1221 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1222#endif
1223
1225
1226 return 0;
1227}
1228
1230{
1231 size_t idx;
1232 int res;
1233
1234 ao2_lock(topic);
1235 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1238 }
1241
1242#ifdef AST_DEVMODE
1243 if (!res) {
1245 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1246 }
1247#endif
1248
1250
1251 return res;
1252}
1253
1254/*!
1255 * \internal \brief Dispatch a message to a subscriber asynchronously
1256 * \param local \ref ast_taskprocessor_local object
1257 * \return 0
1258 */
1260{
1261 struct stasis_subscription *sub = local->local_data;
1262 struct stasis_message *message = local->data;
1263
1266
1267 return 0;
1268}
1269
1270/*!
1271 * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1272 * a published message to a subscriber
1273 */
1279};
1280
1281/*!
1282 * \internal \brief Dispatch a message to a subscriber synchronously
1283 * \param local \ref ast_taskprocessor_local object
1284 * \return 0
1285 */
1287{
1288 struct stasis_subscription *sub = local->local_data;
1289 struct sync_task_data *std = local->data;
1290 struct stasis_message *message = std->task_data;
1291
1294
1295 ast_mutex_lock(&std->lock);
1296 std->complete = 1;
1297 ast_cond_signal(&std->cond);
1298 ast_mutex_unlock(&std->lock);
1299
1300 return 0;
1301}
1302
1303/*!
1304 * \internal \brief Dispatch a message to a subscriber
1305 * \param sub The subscriber to dispatch to
1306 * \param message The message to send
1307 * \param synchronous If non-zero, synchronize on the subscriber receiving
1308 * the message
1309 * \retval 0 if message was not dispatched
1310 * \retval 1 if message was dispatched
1311 */
1312static unsigned int dispatch_message(struct stasis_subscription *sub,
1313 struct stasis_message *message,
1314 int synchronous)
1315{
1317
1318 /*
1319 * The 'do while' gives us an easy way to skip remaining logic once
1320 * we determine the message should be accepted.
1321 * The code looks more verbose than it needs to be but it optimizes
1322 * down very nicely. It's just easier to understand and debug this way.
1323 */
1324 do {
1325 struct stasis_message_type *message_type = stasis_message_type(message);
1326 int type_id = stasis_message_type_id(message_type);
1327 int type_filter_specified = 0;
1328 int formatter_filter_specified = 0;
1329 int type_filter_passed = 0;
1330 int formatter_filter_passed = 0;
1331
1332 /* We always accept final messages so only run the filter logic if not final */
1333 if (is_final) {
1334 break;
1335 }
1336
1337 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1338 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1339
1340 /* Accept if no filters of either type were specified */
1341 if (!type_filter_specified && !formatter_filter_specified) {
1342 break;
1343 }
1344
1345 type_filter_passed = type_filter_specified
1346 && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1347 && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1348
1349 /*
1350 * Since the type and formatter filters are OR'd, we can skip
1351 * the formatter check if the type check passes.
1352 */
1353 if (type_filter_passed) {
1354 break;
1355 }
1356
1357 formatter_filter_passed = formatter_filter_specified
1358 && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1359
1360 if (formatter_filter_passed) {
1361 break;
1362 }
1363
1364#ifdef AST_DEVMODE
1365 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1366#endif
1367
1368 return 0;
1369
1370 } while (0);
1371
1372#ifdef AST_DEVMODE
1373 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1374#endif
1375
1376 if (!sub->mailbox) {
1377 /* Dispatch directly */
1379 return 1;
1380 }
1381
1382 /* Bump the message for the taskprocessor push. This will get de-ref'd
1383 * by the task processor callback.
1384 */
1386 if (!synchronous) {
1388 /* Push failed; ugh. */
1389 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1391 return 0;
1392 }
1393 } else {
1394 struct sync_task_data std;
1395
1396 ast_mutex_init(&std.lock);
1397 ast_cond_init(&std.cond, NULL);
1398 std.complete = 0;
1399 std.task_data = message;
1400
1402 /* Push failed; ugh. */
1403 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1405 ast_mutex_destroy(&std.lock);
1406 ast_cond_destroy(&std.cond);
1407 return 0;
1408 }
1409
1410 ast_mutex_lock(&std.lock);
1411 while (!std.complete) {
1412 ast_cond_wait(&std.cond, &std.lock);
1413 }
1414 ast_mutex_unlock(&std.lock);
1415
1416 ast_mutex_destroy(&std.lock);
1417 ast_cond_destroy(&std.cond);
1418 }
1419
1420 return 1;
1421}
1422
1423/*!
1424 * \internal \brief Publish a message to a topic's subscribers
1425 * \brief topic The topic to publish to
1426 * \brief message The message to publish
1427 * \brief sync_sub An optional subscriber of the topic to publish synchronously
1428 * to
1429 */
1430static void publish_msg(struct stasis_topic *topic,
1431 struct stasis_message *message, struct stasis_subscription *sync_sub)
1432{
1433 size_t i;
1434#ifdef AST_DEVMODE
1435 unsigned int dispatched = 0;
1437 struct stasis_message_type_statistics *statistics;
1438 struct timeval start;
1439 long elapsed;
1440#endif
1441
1442 ast_assert(topic != NULL);
1444
1445#ifdef AST_DEVMODE
1446 ast_mutex_lock(&message_type_statistics_lock);
1447 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1448 struct stasis_message_type_statistics new_statistics = {
1449 .published = 0,
1450 };
1451 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1452 ast_mutex_unlock(&message_type_statistics_lock);
1453 return;
1454 }
1455 }
1456 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1457 statistics->message_type = stasis_message_type(message);
1458 ast_mutex_unlock(&message_type_statistics_lock);
1459
1460 ast_atomic_fetchadd_int(&statistics->published, +1);
1461#endif
1462
1463 /* If there are no subscribers don't bother */
1464 if (!stasis_topic_subscribers(topic)) {
1465#ifdef AST_DEVMODE
1466 ast_atomic_fetchadd_int(&statistics->unused, +1);
1467 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1468#endif
1469 return;
1470 }
1471
1472 /*
1473 * The topic may be unref'ed by the subscription invocation.
1474 * Make sure we hold onto a reference while dispatching.
1475 */
1476 ao2_ref(topic, +1);
1477#ifdef AST_DEVMODE
1478 start = ast_tvnow();
1479#endif
1480 ao2_lock(topic);
1481 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1483
1484 ast_assert(sub != NULL);
1485#ifdef AST_DEVMODE
1486 dispatched +=
1487#endif
1488 dispatch_message(sub, message, (sub == sync_sub));
1489 }
1491
1492#ifdef AST_DEVMODE
1493 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1494 if (elapsed > topic->statistics->highest_time_dispatched) {
1495 topic->statistics->highest_time_dispatched = elapsed;
1496 }
1497 if (elapsed < topic->statistics->lowest_time_dispatched) {
1498 topic->statistics->lowest_time_dispatched = elapsed;
1499 }
1500 if (dispatched) {
1501 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1502 } else {
1503 ast_atomic_fetchadd_int(&statistics->unused, +1);
1504 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1505 }
1506#endif
1507
1508 ao2_ref(topic, -1);
1509}
1510
1512{
1514}
1515
1517{
1518 ast_assert(sub != NULL);
1519
1520 publish_msg(sub->topic, message, sub);
1521}
1522
1523/*!
1524 * \brief Forwarding information
1525 *
1526 * Any message posted to \a from_topic is forwarded to \a to_topic.
1527 *
1528 * In cases where both the \a from_topic and \a to_topic need to be locked,
1529 * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1530 */
1532 /*! Originating topic */
1534 /*! Destination topic */
1536};
1537
1538static void forward_dtor(void *obj)
1539{
1540 struct stasis_forward *forward = obj;
1541
1542 ao2_cleanup(forward->from_topic);
1543 forward->from_topic = NULL;
1544 ao2_cleanup(forward->to_topic);
1545 forward->to_topic = NULL;
1546}
1547
1549{
1550 int idx;
1551 struct stasis_topic *from;
1552 struct stasis_topic *to;
1553
1554 if (!forward) {
1555 return NULL;
1556 }
1557
1558 from = forward->from_topic;
1559 to = forward->to_topic;
1560
1561 if (from && to) {
1562 topic_lock_both(to, from);
1565
1566 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1568 }
1569 ao2_unlock(from);
1570 ao2_unlock(to);
1571 }
1572
1573 ao2_cleanup(forward);
1574
1575 return NULL;
1576}
1577
1579 struct stasis_topic *to_topic)
1580{
1581 int res;
1582 size_t idx;
1583 struct stasis_forward *forward;
1584
1585 if (!from_topic || !to_topic) {
1586 return NULL;
1587 }
1588
1589 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1590 if (!forward) {
1591 return NULL;
1592 }
1593
1594 /* Forwards to ourselves are implicit. */
1595 if (to_topic == from_topic) {
1596 return forward;
1597 }
1598
1599 forward->from_topic = ao2_bump(from_topic);
1600 forward->to_topic = ao2_bump(to_topic);
1601
1604 if (res != 0) {
1607 ao2_ref(forward, -1);
1608 return NULL;
1609 }
1610
1611 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1613 }
1616
1617 return forward;
1618}
1619
1620static void subscription_change_dtor(void *obj)
1621{
1622 struct stasis_subscription_change *change = obj;
1623
1624 ao2_cleanup(change->topic);
1625}
1626
1628{
1629 size_t description_len = strlen(description) + 1;
1630 size_t uniqueid_len = strlen(uniqueid) + 1;
1631 struct stasis_subscription_change *change;
1632
1633 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1635 if (!change) {
1636 return NULL;
1637 }
1638
1639 strcpy(change->description, description); /* SAFE */
1640 change->uniqueid = change->description + description_len;
1641 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1642 ao2_ref(topic, +1);
1643 change->topic = topic;
1644
1645 return change;
1646}
1647
1649{
1650 struct stasis_subscription_change *change;
1651 struct stasis_message *msg;
1652
1653 /* This assumes that we have already unsubscribed */
1655
1657 return;
1658 }
1659
1660 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1661 if (!change) {
1662 return;
1663 }
1664
1666 if (!msg) {
1667 ao2_cleanup(change);
1668 return;
1669 }
1670
1671 stasis_publish(topic, msg);
1672 ao2_cleanup(msg);
1673 ao2_cleanup(change);
1674}
1675
1677 struct stasis_subscription *sub)
1678{
1679 struct stasis_subscription_change *change;
1680 struct stasis_message *msg;
1681
1682 /* This assumes that we have already unsubscribed */
1684
1686 return;
1687 }
1688
1689 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1690 if (!change) {
1691 return;
1692 }
1693
1695 if (!msg) {
1696 ao2_cleanup(change);
1697 return;
1698 }
1699
1700 stasis_publish(topic, msg);
1701
1702 /* Now we have to dispatch to the subscription itself */
1703 dispatch_message(sub, msg, 0);
1704
1705 ao2_cleanup(msg);
1706 ao2_cleanup(change);
1707}
1708
1712 char name[0];
1713};
1714
1715static void topic_pool_entry_dtor(void *obj)
1716{
1717 struct topic_pool_entry *entry = obj;
1718
1719 entry->forward = stasis_forward_cancel(entry->forward);
1720 ao2_cleanup(entry->topic);
1721 entry->topic = NULL;
1722}
1723
1724static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1725{
1727
1728 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1730 if (!topic_pool_entry) {
1731 return NULL;
1732 }
1733
1734 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1735
1736 return topic_pool_entry;
1737}
1738
1742};
1743
1744static void topic_pool_dtor(void *obj)
1745{
1746 struct stasis_topic_pool *pool = obj;
1747
1748#ifdef AO2_DEBUG
1749 {
1750 char *container_name =
1751 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1752 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1753 ao2_container_unregister(container_name);
1754 }
1755#endif
1756
1758 pool->pool_container = NULL;
1759 ao2_cleanup(pool->pool_topic);
1760 pool->pool_topic = NULL;
1761}
1762
1763static int topic_pool_entry_hash(const void *obj, const int flags)
1764{
1765 const struct topic_pool_entry *object;
1766 const char *key;
1767
1768 switch (flags & OBJ_SEARCH_MASK) {
1769 case OBJ_SEARCH_KEY:
1770 key = obj;
1771 break;
1772 case OBJ_SEARCH_OBJECT:
1773 object = obj;
1774 key = object->name;
1775 break;
1776 default:
1777 /* Hash can only work on something with a full key. */
1778 ast_assert(0);
1779 return 0;
1780 }
1781 return ast_str_case_hash(key);
1782}
1783
1784static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1785{
1786 const struct topic_pool_entry *object_left = obj;
1787 const struct topic_pool_entry *object_right = arg;
1788 const char *right_key = arg;
1789 int cmp;
1790
1791 switch (flags & OBJ_SEARCH_MASK) {
1792 case OBJ_SEARCH_OBJECT:
1793 right_key = object_right->name;
1794 /* Fall through */
1795 case OBJ_SEARCH_KEY:
1796 cmp = strcasecmp(object_left->name, right_key);
1797 break;
1799 /* Not supported by container */
1800 ast_assert(0);
1801 cmp = -1;
1802 break;
1803 default:
1804 /*
1805 * What arg points to is specific to this traversal callback
1806 * and has no special meaning to astobj2.
1807 */
1808 cmp = 0;
1809 break;
1810 }
1811 if (cmp) {
1812 return 0;
1813 }
1814 /*
1815 * At this point the traversal callback is identical to a sorted
1816 * container.
1817 */
1818 return CMP_MATCH;
1819}
1820
1821#ifdef AO2_DEBUG
1822static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1823{
1824 struct topic_pool_entry *entry = v_obj;
1825
1826 if (!entry) {
1827 return;
1828 }
1829 prnt(where, "%s", stasis_topic_name(entry->topic));
1830}
1831#endif
1832
1834{
1835 struct stasis_topic_pool *pool;
1836
1838 if (!pool) {
1839 return NULL;
1840 }
1841
1844 if (!pool->pool_container) {
1845 ao2_cleanup(pool);
1846 return NULL;
1847 }
1848
1849#ifdef AO2_DEBUG
1850 {
1851 char *container_name =
1852 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1853 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1854 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1855 }
1856#endif
1857
1858 ao2_ref(pooled_topic, +1);
1859 pool->pool_topic = pooled_topic;
1860
1861 return pool;
1862}
1863
1864void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1865{
1866 /*
1867 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1868 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1869 * name and search only on <topic_name>.
1870 */
1871 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1872 int pool_topic_name_len = strlen(pool_topic_name);
1873 const char *search_topic_name;
1874
1875 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876 search_topic_name = topic_name + pool_topic_name_len + 1;
1877 } else {
1878 search_topic_name = topic_name;
1879 }
1880
1881 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1882}
1883
1884struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1885{
1887 SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1888 char *new_topic_name;
1889 int ret;
1890
1892 if (topic_pool_entry) {
1893 return topic_pool_entry->topic;
1894 }
1895
1897 if (!topic_pool_entry) {
1898 return NULL;
1899 }
1900
1901 /* To provide further detail and to ensure that the topic is unique within the scope of the
1902 * system we prefix it with the pooling topic name, which should itself already be unique.
1903 */
1904 ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1905 if (ret < 0) {
1906 return NULL;
1907 }
1908
1909 topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1910 ast_free(new_topic_name);
1911 if (!topic_pool_entry->topic) {
1912 return NULL;
1913 }
1914
1916 if (!topic_pool_entry->forward) {
1917 return NULL;
1918 }
1919
1921 return NULL;
1922 }
1923
1924 return topic_pool_entry->topic;
1925}
1926
1927int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1928{
1930
1932 if (!topic_pool_entry) {
1933 return 0;
1934 }
1935
1937 return 1;
1938}
1939
1941{
1942#ifdef AST_DEVMODE
1944 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1945 }
1946#endif
1947}
1948
1949/*! \brief A multi object blob data structure to carry user event stasis messages */
1951 struct ast_json *blob; /*< A blob of JSON data */
1952 AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1953};
1954
1955/*!
1956 * \internal
1957 * \brief Destructor for \ref ast_multi_object_blob objects
1958 */
1959static void multi_object_blob_dtor(void *obj)
1960{
1961 struct ast_multi_object_blob *multi = obj;
1962 int type;
1963 int i;
1964
1965 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1966 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1968 }
1969 AST_VECTOR_FREE(&multi->snapshots[type]);
1970 }
1971 ast_json_unref(multi->blob);
1972}
1973
1974/*! \brief Create a stasis user event multi object blob */
1976{
1977 int type;
1978 struct ast_multi_object_blob *multi;
1979
1980 ast_assert(blob != NULL);
1981
1982 multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1983 if (!multi) {
1984 return NULL;
1985 }
1986
1987 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1988 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1989 ao2_ref(multi, -1);
1990
1991 return NULL;
1992 }
1993 }
1994
1995 multi->blob = ast_json_ref(blob);
1996
1997 return multi;
1998}
1999
2000/*! \brief Add an object (snapshot) to the blob */
2003{
2004 if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2005 ao2_cleanup(object);
2006 }
2007}
2008
2009/*! \brief Publish single channel user event (for app_userevent compatibility) */
2011 struct stasis_message_type *type, struct ast_json *blob)
2012{
2013 struct stasis_message *message;
2014 struct ast_channel_snapshot *channel_snapshot;
2015 struct ast_multi_object_blob *multi;
2016
2017 if (!type) {
2018 return;
2019 }
2020
2022 if (!multi) {
2023 return;
2024 }
2025
2026 channel_snapshot = ast_channel_snapshot_create(chan);
2027 if (!channel_snapshot) {
2028 ao2_ref(multi, -1);
2029 return;
2030 }
2031
2032 /* this call steals the channel_snapshot reference */
2033 ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2034
2036 ao2_ref(multi, -1);
2037 if (message) {
2038 /* app_userevent still publishes to channel */
2040 ao2_ref(message, -1);
2041 }
2042}
2043
2044/*! \internal \brief convert multi object blob to ari json */
2046 struct stasis_message *message,
2047 const struct stasis_message_sanitizer *sanitize)
2048{
2049 struct ast_json *out;
2051 struct ast_json *blob = multi->blob;
2052 const struct timeval *tv = stasis_message_timestamp(message);
2054 int i;
2055
2057 if (!out) {
2058 return NULL;
2059 }
2060
2061 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2062 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2063 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2064 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2065
2066 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2067 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2068 struct ast_json *json_object = NULL;
2069 char *name = NULL;
2070 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2071
2072 switch (type) {
2074 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2075 name = "channel";
2076 break;
2077 case STASIS_UMOS_BRIDGE:
2078 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2079 name = "bridge";
2080 break;
2082 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2083 name = "endpoint";
2084 break;
2085 }
2086 if (json_object) {
2087 ast_json_object_set(out, name, json_object);
2088 }
2089 }
2090 }
2091
2092 return out;
2093}
2094
2095/*! \internal \brief convert multi object blob to ami string */
2096static struct ast_str *multi_object_blob_to_ami(void *obj)
2097{
2098 struct ast_str *ami_str=ast_str_create(1024);
2099 struct ast_str *ami_snapshot;
2100 const struct ast_multi_object_blob *multi = obj;
2102 int i;
2103
2104 if (!ami_str) {
2105 return NULL;
2106 }
2107 if (!multi) {
2108 ast_free(ami_str);
2109 return NULL;
2110 }
2111
2112 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2113 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2114 char *name = NULL;
2115 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2116 ami_snapshot = NULL;
2117
2118 if (i > 0) {
2119 ast_asprintf(&name, "%d", i + 1);
2120 }
2121
2122 switch (type) {
2124 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2125 break;
2126
2127 case STASIS_UMOS_BRIDGE:
2128 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2129 break;
2130
2132 /* currently not sending endpoint snapshots to AMI */
2133 break;
2134 }
2135 if (ami_snapshot) {
2136 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2137 ast_free(ami_snapshot);
2138 }
2139 ast_free(name);
2140 }
2141 }
2142
2143 return ami_str;
2144}
2145
2146/*! \internal \brief Callback to pass only user defined parameters from blob */
2147static int userevent_exclusion_cb(const char *key)
2148{
2149 if (!strcmp("eventname", key)) {
2150 return 1;
2151 }
2152 return 0;
2153}
2154
2156 struct stasis_message *message)
2157{
2158 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2159 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2161 const char *eventname;
2162
2163 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2165 object_string = multi_object_blob_to_ami(multi);
2166 if (!object_string || !body) {
2167 return NULL;
2168 }
2169
2171 "%s"
2172 "UserEvent: %s\r\n"
2173 "%s",
2174 ast_str_buffer(object_string),
2175 eventname,
2176 ast_str_buffer(body));
2177}
2178
2179/*! \brief A structure to hold global configuration-related options */
2181 /*! The list of message types to decline */
2183};
2184
2185/*! \brief Threadpool configuration options */
2187 /*! Initial size of the thread pool */
2189 /*! Time, in seconds, before we expire a thread */
2191 /*! Maximum number of thread to allow */
2193};
2194
2196 /*! Thread pool configuration options */
2198 /*! Declined message types */
2200};
2201
2203 .type = ACO_GLOBAL,
2204 .name = "threadpool",
2205 .item_offset = offsetof(struct stasis_config, threadpool_options),
2206 .category = "threadpool",
2207 .category_match = ACO_WHITELIST_EXACT,
2208};
2209
2211
2212/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2213static struct aco_type declined_option = {
2214 .type = ACO_GLOBAL,
2215 .name = "declined_message_types",
2216 .item_offset = offsetof(struct stasis_config, declined_message_types),
2217 .category_match = ACO_WHITELIST_EXACT,
2218 .category = "declined_message_types",
2219};
2220
2222
2224 .filename = "stasis.conf",
2226};
2227
2228/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2230
2231static void *stasis_config_alloc(void);
2232
2233/*! \brief Register information about the configs being processed by this module */
2235 .files = ACO_FILES(&stasis_conf),
2236);
2237
2239{
2240 struct stasis_declined_config *declined = obj;
2241
2242 ao2_cleanup(declined->declined);
2243}
2244
2245static void stasis_config_destructor(void *obj)
2246{
2247 struct stasis_config *cfg = obj;
2248
2251}
2252
2253static void *stasis_config_alloc(void)
2254{
2255 struct stasis_config *cfg;
2256
2257 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2258 return NULL;
2259 }
2260
2261 cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2262 if (!cfg->threadpool_options) {
2263 ao2_ref(cfg, -1);
2264 return NULL;
2265 }
2266
2269 if (!cfg->declined_message_types) {
2270 ao2_ref(cfg, -1);
2271 return NULL;
2272 }
2273
2275 if (!cfg->declined_message_types->declined) {
2276 ao2_ref(cfg, -1);
2277 return NULL;
2278 }
2279
2280 return cfg;
2281}
2282
2284{
2286 char *name_in_declined;
2287 int res;
2288
2289 if (!cfg || !cfg->declined_message_types) {
2290 ao2_cleanup(cfg);
2291 return 0;
2292 }
2293
2294 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2295 res = name_in_declined ? 1 : 0;
2296 ao2_cleanup(name_in_declined);
2297 ao2_ref(cfg, -1);
2298 if (res) {
2299 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2300 }
2301 return res;
2302}
2303
2304static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2305{
2306 struct stasis_declined_config *declined = obj;
2307
2308 if (ast_strlen_zero(var->value)) {
2309 return 0;
2310 }
2311
2312 if (ast_str_container_add(declined->declined, var->value)) {
2313 return -1;
2314 }
2315
2316 return 0;
2317}
2318
2319/*!
2320 * @{ \brief Define multi user event message type(s).
2321 */
2322
2324 .to_json = multi_user_event_to_json,
2326 );
2327
2328/*! @} */
2329
2330/*!
2331 * \internal
2332 * \brief CLI command implementation for 'stasis show topics'
2333 */
2334static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2335{
2336 struct ao2_iterator iter;
2337 struct topic_proxy *topic;
2338 struct ao2_container *tmp_container;
2339 int count = 0;
2340#define FMT_HEADERS "%-64s %-64s\n"
2341#define FMT_FIELDS "%-64s %-64s\n"
2342
2343 switch (cmd) {
2344 case CLI_INIT:
2345 e->command = "stasis show topics";
2346 e->usage =
2347 "Usage: stasis show topics\n"
2348 " Shows a list of topics\n";
2349 return NULL;
2350 case CLI_GENERATE:
2351 return NULL;
2352 }
2353
2354 if (a->argc != e->args) {
2355 return CLI_SHOWUSAGE;
2356 }
2357
2358 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2359
2361 topic_proxy_sort_fn, NULL);
2362
2363 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2364 ao2_cleanup(tmp_container);
2365
2366 return NULL;
2367 }
2368
2369 /* getting all topic in order */
2370 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2371 while ((topic = ao2_iterator_next(&iter))) {
2372 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2373 ao2_ref(topic, -1);
2374 ++count;
2375 }
2376 ao2_iterator_destroy(&iter);
2377 ao2_cleanup(tmp_container);
2378
2379 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2380
2381#undef FMT_HEADERS
2382#undef FMT_FIELDS
2383
2384 return CLI_SUCCESS;
2385}
2386
2387/*!
2388 * \internal
2389 * \brief CLI tab completion for topic names
2390 */
2391static char *topic_complete_name(const char *word)
2392{
2393 struct topic_proxy *topic;
2394 struct ao2_iterator it;
2395 int wordlen = strlen(word);
2396 int ret;
2397
2399 while ((topic = ao2_iterator_next(&it))) {
2400 if (!strncasecmp(word, topic->name, wordlen)) {
2401 ret = ast_cli_completion_add(ast_strdup(topic->name));
2402 if (ret) {
2403 ao2_ref(topic, -1);
2404 break;
2405 }
2406 }
2407 ao2_ref(topic, -1);
2408 }
2410 return NULL;
2411}
2412
2413/*!
2414 * \internal
2415 * \brief CLI command implementation for 'stasis show topic'
2416 */
2417static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2418{
2419 struct stasis_topic *topic;
2420 char print_time[32];
2421 int i;
2422
2423 switch (cmd) {
2424 case CLI_INIT:
2425 e->command = "stasis show topic";
2426 e->usage =
2427 "Usage: stasis show topic <name>\n"
2428 " Show stasis topic detail info.\n";
2429 return NULL;
2430 case CLI_GENERATE:
2431 if (a->pos == 3) {
2432 return topic_complete_name(a->word);
2433 } else {
2434 return NULL;
2435 }
2436 }
2437
2438 if (a->argc != 4) {
2439 return CLI_SHOWUSAGE;
2440 }
2441
2442 topic = stasis_topic_get(a->argv[3]);
2443 if (!topic) {
2444 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2445 return CLI_FAILURE;
2446 }
2447
2448 ast_cli(a->fd, "Name: %s\n", topic->name);
2449 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2450 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2451 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2452 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2453 ast_cli(a->fd, "Duration time: %s\n", print_time);
2454
2455 ao2_lock(topic);
2456 ast_cli(a->fd, "\nSubscribers:\n");
2457 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2458 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2459 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2460 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2461 }
2462
2463 ast_cli(a->fd, "\nForwarded topics:\n");
2464 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2465 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2466 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2467 }
2468 ao2_unlock(topic);
2469
2470 ao2_ref(topic, -1);
2471
2472 return CLI_SUCCESS;
2473}
2474
2475
2476static struct ast_cli_entry cli_stasis[] = {
2477 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2478 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2479};
2480
2481
2482#ifdef AST_DEVMODE
2483
2484AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2485
2486/*!
2487 * \internal
2488 * \brief CLI command implementation for 'stasis statistics show subscriptions'
2489 */
2490static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2491{
2492 struct ao2_container *sorted_subscriptions;
2493 struct ao2_container *subscription_stats;
2494 struct ao2_iterator iter;
2495 struct stasis_subscription_statistics *statistics;
2496 int count = 0;
2497 int dropped = 0;
2498 int passed = 0;
2499#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2500#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2501#define FMT_FIELDS2 "%-64s %10d %10d\n"
2502
2503 switch (cmd) {
2504 case CLI_INIT:
2505 e->command = "stasis statistics show subscriptions";
2506 e->usage =
2507 "Usage: stasis statistics show subscriptions\n"
2508 " Shows a list of subscriptions and their general statistics\n";
2509 return NULL;
2510 case CLI_GENERATE:
2511 return NULL;
2512 }
2513
2514 if (a->argc != e->args) {
2515 return CLI_SHOWUSAGE;
2516 }
2517
2518 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2519 if (!subscription_stats) {
2520 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2521 return CLI_FAILURE;
2522 }
2523
2524 sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2525 stasis_subscription_statistics_sort_fn, NULL);
2526 if (!sorted_subscriptions) {
2527 ao2_ref(subscription_stats, -1);
2528 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2529 return CLI_SUCCESS;
2530 }
2531
2532 if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2533 ao2_ref(sorted_subscriptions, -1);
2534 ao2_ref(subscription_stats, -1);
2535 ast_cli(a->fd, "Could not sort subscription statistics\n");
2536 return CLI_SUCCESS;
2537 }
2538
2539 ao2_ref(subscription_stats, -1);
2540
2541 ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2542
2543 iter = ao2_iterator_init(sorted_subscriptions, 0);
2544 while ((statistics = ao2_iterator_next(&iter))) {
2545 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2546 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2547 dropped += statistics->messages_dropped;
2548 passed += statistics->messages_passed;
2549 ao2_ref(statistics, -1);
2550 ++count;
2551 }
2552 ao2_iterator_destroy(&iter);
2553
2554 ao2_ref(sorted_subscriptions, -1);
2555
2556 ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2557 ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2558
2559#undef FMT_HEADERS
2560#undef FMT_FIELDS
2561#undef FMT_FIELDS2
2562
2563 return CLI_SUCCESS;
2564}
2565
2566/*!
2567 * \internal
2568 * \brief CLI tab completion for subscription statistics names
2569 */
2570static char *subscription_statistics_complete_name(const char *word, int state)
2571{
2572 struct stasis_subscription_statistics *statistics;
2573 struct ao2_container *subscription_stats;
2574 struct ao2_iterator it_statistics;
2575 int wordlen = strlen(word);
2576 int which = 0;
2577 char *result = NULL;
2578
2579 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2580 if (!subscription_stats) {
2581 return result;
2582 }
2583
2584 it_statistics = ao2_iterator_init(subscription_stats, 0);
2585 while ((statistics = ao2_iterator_next(&it_statistics))) {
2586 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2587 && ++which > state) {
2588 result = ast_strdup(statistics->uniqueid);
2589 }
2590 ao2_ref(statistics, -1);
2591 if (result) {
2592 break;
2593 }
2594 }
2595 ao2_iterator_destroy(&it_statistics);
2596 ao2_ref(subscription_stats, -1);
2597 return result;
2598}
2599
2600/*!
2601 * \internal
2602 * \brief CLI command implementation for 'stasis statistics show subscription'
2603 */
2604static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2605{
2606 struct stasis_subscription_statistics *statistics;
2607 struct ao2_container *subscription_stats;
2608 struct ao2_iterator i;
2609 char *name;
2610
2611 switch (cmd) {
2612 case CLI_INIT:
2613 e->command = "stasis statistics show subscription";
2614 e->usage =
2615 "Usage: stasis statistics show subscription <uniqueid>\n"
2616 " Show stasis subscription statistics.\n";
2617 return NULL;
2618 case CLI_GENERATE:
2619 if (a->pos == 4) {
2620 return subscription_statistics_complete_name(a->word, a->n);
2621 } else {
2622 return NULL;
2623 }
2624 }
2625
2626 if (a->argc != 5) {
2627 return CLI_SHOWUSAGE;
2628 }
2629
2630 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2631 if (!subscription_stats) {
2632 ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2633 return CLI_FAILURE;
2634 }
2635
2636 statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2637 if (!statistics) {
2638 ao2_ref(subscription_stats, -1);
2639 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2640 return CLI_FAILURE;
2641 }
2642
2643 ao2_ref(subscription_stats, -1);
2644
2645 ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2646 ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2647 ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2648 ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2649 ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2650 ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2651 ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2652 ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2653 ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2654 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2655 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2656
2658 if (statistics->highest_time_message_type) {
2659 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2660 }
2662
2663 ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2664
2665 ast_cli(a->fd, "Subscribed topics:\n");
2666 i = ao2_iterator_init(statistics->topics, 0);
2667 while ((name = ao2_iterator_next(&i))) {
2668 ast_cli(a->fd, "\t%s\n", name);
2669 ao2_ref(name, -1);
2670 }
2672
2673 ao2_ref(statistics, -1);
2674
2675 return CLI_SUCCESS;
2676}
2677
2678AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2679
2680/*!
2681 * \internal
2682 * \brief CLI command implementation for 'stasis statistics show topics'
2683 */
2684static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2685{
2686 struct ao2_container *sorted_topics;
2687 struct ao2_container *topic_stats;
2688 struct ao2_iterator iter;
2689 struct stasis_topic_statistics *statistics;
2690 int count = 0;
2691 int not_dispatched = 0;
2692 int dispatched = 0;
2693#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2694#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2695#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2696
2697 switch (cmd) {
2698 case CLI_INIT:
2699 e->command = "stasis statistics show topics";
2700 e->usage =
2701 "Usage: stasis statistics show topics\n"
2702 " Shows a list of topics and their general statistics\n";
2703 return NULL;
2704 case CLI_GENERATE:
2705 return NULL;
2706 }
2707
2708 if (a->argc != e->args) {
2709 return CLI_SHOWUSAGE;
2710 }
2711
2712 topic_stats = ao2_global_obj_ref(topic_statistics);
2713 if (!topic_stats) {
2714 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2715 return CLI_FAILURE;
2716 }
2717
2719 stasis_topic_statistics_sort_fn, NULL);
2720 if (!sorted_topics) {
2721 ao2_ref(topic_stats, -1);
2722 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2723 return CLI_SUCCESS;
2724 }
2725
2726 if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2727 ao2_ref(sorted_topics, -1);
2728 ao2_ref(topic_stats, -1);
2729 ast_cli(a->fd, "Could not sort topic statistics\n");
2730 return CLI_SUCCESS;
2731 }
2732
2733 ao2_ref(topic_stats, -1);
2734
2735 ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2736
2737 iter = ao2_iterator_init(sorted_topics, 0);
2738 while ((statistics = ao2_iterator_next(&iter))) {
2739 ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2740 statistics->messages_not_dispatched, statistics->messages_dispatched,
2741 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742 not_dispatched += statistics->messages_not_dispatched;
2743 dispatched += statistics->messages_dispatched;
2744 ao2_ref(statistics, -1);
2745 ++count;
2746 }
2747 ao2_iterator_destroy(&iter);
2748
2749 ao2_ref(sorted_topics, -1);
2750
2751 ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2752 ast_cli(a->fd, "\n%d topics\n\n", count);
2753
2754#undef FMT_HEADERS
2755#undef FMT_FIELDS
2756#undef FMT_FIELDS2
2757
2758 return CLI_SUCCESS;
2759}
2760
2761/*!
2762 * \internal
2763 * \brief CLI tab completion for topic statistics names
2764 */
2765static char *topic_statistics_complete_name(const char *word, int state)
2766{
2767 struct stasis_topic_statistics *statistics;
2768 struct ao2_container *topic_stats;
2769 struct ao2_iterator it_statistics;
2770 int wordlen = strlen(word);
2771 int which = 0;
2772 char *result = NULL;
2773
2774 topic_stats = ao2_global_obj_ref(topic_statistics);
2775 if (!topic_stats) {
2776 return result;
2777 }
2778
2779 it_statistics = ao2_iterator_init(topic_stats, 0);
2780 while ((statistics = ao2_iterator_next(&it_statistics))) {
2781 if (!strncasecmp(word, statistics->name, wordlen)
2782 && ++which > state) {
2783 result = ast_strdup(statistics->name);
2784 }
2785 ao2_ref(statistics, -1);
2786 if (result) {
2787 break;
2788 }
2789 }
2790 ao2_iterator_destroy(&it_statistics);
2791 ao2_ref(topic_stats, -1);
2792 return result;
2793}
2794
2795/*!
2796 * \internal
2797 * \brief CLI command implementation for 'stasis statistics show topic'
2798 */
2799static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2800{
2801 struct stasis_topic_statistics *statistics;
2802 struct ao2_container *topic_stats;
2803 struct ao2_iterator i;
2804 char *uniqueid;
2805
2806 switch (cmd) {
2807 case CLI_INIT:
2808 e->command = "stasis statistics show topic";
2809 e->usage =
2810 "Usage: stasis statistics show topic <name>\n"
2811 " Show stasis topic statistics.\n";
2812 return NULL;
2813 case CLI_GENERATE:
2814 if (a->pos == 4) {
2815 return topic_statistics_complete_name(a->word, a->n);
2816 } else {
2817 return NULL;
2818 }
2819 }
2820
2821 if (a->argc != 5) {
2822 return CLI_SHOWUSAGE;
2823 }
2824
2825 topic_stats = ao2_global_obj_ref(topic_statistics);
2826 if (!topic_stats) {
2827 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2828 return CLI_FAILURE;
2829 }
2830
2831 statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2832 if (!statistics) {
2833 ao2_ref(topic_stats, -1);
2834 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2835 return CLI_FAILURE;
2836 }
2837
2838 ao2_ref(topic_stats, -1);
2839
2840 ast_cli(a->fd, "Topic: %s\n", statistics->name);
2841 ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2842 ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843 ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2846 ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2847
2848 ast_cli(a->fd, "Subscribers:\n");
2849 i = ao2_iterator_init(statistics->subscribers, 0);
2850 while ((uniqueid = ao2_iterator_next(&i))) {
2851 ast_cli(a->fd, "\t%s\n", uniqueid);
2852 ao2_ref(uniqueid, -1);
2853 }
2855
2856 ao2_ref(statistics, -1);
2857
2858 return CLI_SUCCESS;
2859}
2860
2861/*!
2862 * \internal
2863 * \brief CLI command implementation for 'stasis statistics show messages'
2864 */
2865static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2866{
2867 int i;
2868 int count = 0;
2869 int published = 0;
2870 int unused = 0;
2871#define FMT_HEADERS "%-64s %10s %10s\n"
2872#define FMT_FIELDS "%-64s %10d %10d\n"
2873
2874 switch (cmd) {
2875 case CLI_INIT:
2876 e->command = "stasis statistics show messages";
2877 e->usage =
2878 "Usage: stasis statistics show messages\n"
2879 " Shows a list of message types and their general statistics\n";
2880 return NULL;
2881 case CLI_GENERATE:
2882 return NULL;
2883 }
2884
2885 if (a->argc != e->args) {
2886 return CLI_SHOWUSAGE;
2887 }
2888
2889 ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2890
2891 ast_mutex_lock(&message_type_statistics_lock);
2892 for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2893 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2894
2895 if (!statistics->message_type) {
2896 continue;
2897 }
2898
2899 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2900 statistics->unused);
2901 published += statistics->published;
2902 unused += statistics->unused;
2903 ++count;
2904 }
2905 ast_mutex_unlock(&message_type_statistics_lock);
2906
2907 ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2908 ast_cli(a->fd, "\n%d seen message types\n\n", count);
2909
2910#undef FMT_HEADERS
2911#undef FMT_FIELDS
2912
2913 return CLI_SUCCESS;
2914}
2915
2916static struct ast_cli_entry cli_stasis_statistics[] = {
2917 AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2918 AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2919 AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2920 AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2921 AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2922};
2923
2924static int subscription_statistics_hash(const void *obj, const int flags)
2925{
2926 const struct stasis_subscription_statistics *object;
2927 const char *key;
2928
2929 switch (flags & OBJ_SEARCH_MASK) {
2930 case OBJ_SEARCH_KEY:
2931 key = obj;
2932 break;
2933 case OBJ_SEARCH_OBJECT:
2934 object = obj;
2935 key = object->uniqueid;
2936 break;
2937 default:
2938 /* Hash can only work on something with a full key. */
2939 ast_assert(0);
2940 return 0;
2941 }
2942 return ast_str_case_hash(key);
2943}
2944
2945static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2946{
2947 const struct stasis_subscription_statistics *object_left = obj;
2948 const struct stasis_subscription_statistics *object_right = arg;
2949 const char *right_key = arg;
2950 int cmp;
2951
2952 switch (flags & OBJ_SEARCH_MASK) {
2953 case OBJ_SEARCH_OBJECT:
2954 right_key = object_right->uniqueid;
2955 /* Fall through */
2956 case OBJ_SEARCH_KEY:
2957 cmp = strcasecmp(object_left->uniqueid, right_key);
2958 break;
2960 /* Not supported by container */
2961 ast_assert(0);
2962 cmp = -1;
2963 break;
2964 default:
2965 /*
2966 * What arg points to is specific to this traversal callback
2967 * and has no special meaning to astobj2.
2968 */
2969 cmp = 0;
2970 break;
2971 }
2972 if (cmp) {
2973 return 0;
2974 }
2975 /*
2976 * At this point the traversal callback is identical to a sorted
2977 * container.
2978 */
2979 return CMP_MATCH;
2980}
2981
2982static int topic_statistics_hash(const void *obj, const int flags)
2983{
2984 const struct stasis_topic_statistics *object;
2985 const char *key;
2986
2987 switch (flags & OBJ_SEARCH_MASK) {
2988 case OBJ_SEARCH_KEY:
2989 key = obj;
2990 break;
2991 case OBJ_SEARCH_OBJECT:
2992 object = obj;
2993 key = object->name;
2994 break;
2995 default:
2996 /* Hash can only work on something with a full key. */
2997 ast_assert(0);
2998 return 0;
2999 }
3000 return ast_str_case_hash(key);
3001}
3002
3003static int topic_statistics_cmp(void *obj, void *arg, int flags)
3004{
3005 const struct stasis_topic_statistics *object_left = obj;
3006 const struct stasis_topic_statistics *object_right = arg;
3007 const char *right_key = arg;
3008 int cmp;
3009
3010 switch (flags & OBJ_SEARCH_MASK) {
3011 case OBJ_SEARCH_OBJECT:
3012 right_key = object_right->name;
3013 /* Fall through */
3014 case OBJ_SEARCH_KEY:
3015 cmp = strcasecmp(object_left->name, right_key);
3016 break;
3018 /* Not supported by container */
3019 ast_assert(0);
3020 cmp = -1;
3021 break;
3022 default:
3023 /*
3024 * What arg points to is specific to this traversal callback
3025 * and has no special meaning to astobj2.
3026 */
3027 cmp = 0;
3028 break;
3029 }
3030 if (cmp) {
3031 return 0;
3032 }
3033 /*
3034 * At this point the traversal callback is identical to a sorted
3035 * container.
3036 */
3037 return CMP_MATCH;
3038}
3039#endif
3040
3041/*! \brief Cleanup function for graceful shutdowns */
3042static void stasis_cleanup(void)
3043{
3044#ifdef AST_DEVMODE
3045 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3046 AST_VECTOR_FREE(&message_type_statistics);
3047 ao2_global_obj_release(subscription_statistics);
3048 ao2_global_obj_release(topic_statistics);
3049#endif
3052 topic_all = NULL;
3054 threadpool = NULL;
3057 aco_info_destroy(&cfg_info);
3059}
3060
3062{
3063 struct stasis_config *cfg;
3064 int cache_init;
3065 struct ast_threadpool_options threadpool_opts = { 0, };
3066#ifdef AST_DEVMODE
3067 struct ao2_container *subscription_stats;
3068 struct ao2_container *topic_stats;
3069#endif
3070
3071 /* Be sure the types are cleaned up after the message bus */
3073
3074 if (aco_info_init(&cfg_info)) {
3075 return -1;
3076 }
3077
3078 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3080 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3082 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3083 INT_MAX);
3084 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3086 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3087 INT_MAX);
3088 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3090 FLDSET(struct stasis_threadpool_conf, max_size), 0,
3091 INT_MAX);
3092
3093 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3094 struct stasis_config *default_cfg = stasis_config_alloc();
3095
3096 if (!default_cfg) {
3097 return -1;
3098 }
3099
3100 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3101 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3102 ao2_ref(default_cfg, -1);
3103
3104 return -1;
3105 }
3106
3107 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3108 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3109 ao2_ref(default_cfg, -1);
3110
3111 return -1;
3112 }
3113
3114 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3116 cfg = default_cfg;
3117 } else {
3119 if (!cfg) {
3120 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3121
3122 return -1;
3123 }
3124 }
3125
3126 threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3127 threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3128 threadpool_opts.auto_increment = 1;
3129 threadpool_opts.max_size = cfg->threadpool_options->max_size;
3130 threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3131 threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3132 ao2_ref(cfg, -1);
3133 if (!threadpool) {
3134 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3135
3136 return -1;
3137 }
3138
3139 cache_init = stasis_cache_init();
3140 if (cache_init != 0) {
3141 return -1;
3142 }
3143
3145 return -1;
3146 }
3148 return -1;
3149 }
3150
3152 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3153 if (!topic_all) {
3154 return -1;
3155 }
3156
3158 return -1;
3159 }
3160
3161#ifdef AST_DEVMODE
3162 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3163 * topic or subscripton.
3164 */
3165 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3166 subscription_statistics_hash, 0, subscription_statistics_cmp);
3167 if (!subscription_stats) {
3168 return -1;
3169 }
3170 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3171 ao2_cleanup(subscription_stats);
3172
3173 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3174 topic_statistics_hash, 0, topic_statistics_cmp);
3175 if (!topic_stats) {
3176 return -1;
3177 }
3178 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3179 ao2_cleanup(topic_stats);
3180 if (!topic_stats) {
3181 return -1;
3182 }
3183
3184 AST_VECTOR_INIT(&message_type_statistics, 0);
3185
3186 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3187 return -1;
3188 }
3189#endif
3190
3191 return 0;
3192}
#define var
Definition: ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static PGresult * result
Definition: cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2761
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1981
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2001
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2010
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1975
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:520
#define ast_cond_signal(cond)
Definition: lock.h:203
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10564
#define EVENT_FLAG_USER
Definition: manager.h:81
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ao2_container * container
Definition: res_fax.c:501
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2096
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1118
static char * topic_complete_name(const char *word)
Definition: stasis.c:2391
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:971
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2476
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:301
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2045
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1620
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1864
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2155
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1053
#define FMT_HEADERS
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1286
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1011
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1884
static int sub_cleanup(void *data)
Definition: stasis.c:964
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
static void forward_dtor(void *obj)
Definition: stasis.c:1538
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:643
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1744
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
static void * stasis_config_alloc(void)
Definition: stasis.c:2253
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1724
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
static void topic_dtor(void *obj)
Definition: stasis.c:433
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:304
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1627
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3042
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:622
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3061
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:412
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:809
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:953
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1784
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2245
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1105
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2283
static struct ast_threadpool * threadpool
Definition: stasis.c:307
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1763
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1940
static struct aco_type threadpool_option
Definition: stasis.c:2202
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1959
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:500
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
static void subscription_dtor(void *obj)
Definition: stasis.c:714
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:568
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:635
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1676
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1927
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1430
struct aco_file stasis_conf
Definition: stasis.c:2223
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1648
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2238
static struct aco_type * threadpool_options[]
Definition: stasis.c:2210
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1312
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1169
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1833
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:754
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:942
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1715
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1259
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1150
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:856
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2304
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2147
struct aco_type * declined_options[]
Definition: stasis.c:2221
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct ao2_container * topic_all
Definition: stasis.c:395
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1229
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2334
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:317
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2417
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition: cli.h:171
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:502
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
struct ast_multi_object_blob::@395 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:1951
Structure for mutex and tracking information.
Definition: lock.h:135
Support for dynamic strings.
Definition: strings.h:623
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
An opaque threadpool structure.
Definition: threadpool.c:36
Structure for variables, used for configurations and for channel variables.
Definition: search.h:40
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
A structure to hold global configuration-related options.
Definition: stasis.c:2180
struct ao2_container * declined
Definition: stasis.c:2182
Forwarding information.
Definition: stasis.c:1531
struct stasis_topic * from_topic
Definition: stasis.c:1533
struct stasis_topic * to_topic
Definition: stasis.c:1535
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
int final_message_rxed
Definition: stasis.c:696
struct stasis_topic * topic
Definition: stasis.c:684
ast_cond_t join_cond
Definition: stasis.c:693
stasis_subscription_cb callback
Definition: stasis.c:688
struct stasis_subscription::@394 accepted_message_types
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
int final_message_processed
Definition: stasis.c:699
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:704
Threadpool configuration options.
Definition: stasis.c:2186
struct ao2_container * pool_container
Definition: stasis.c:1740
struct stasis_topic * pool_topic
Definition: stasis.c:1741
struct stasis_topic::@393 upstream_topics
char * name
Definition: stasis.c:386
int subscriber_id
Definition: stasis.c:383
char * detail
Definition: stasis.c:389
struct timeval * creationtime
Definition: stasis.c:392
struct stasis_topic::@392 subscribers
ast_cond_t cond
Definition: stasis.c:1276
void * task_data
Definition: stasis.c:1278
ast_mutex_t lock
Definition: stasis.c:1275
Definition: stasis.c:1709
struct stasis_topic * topic
Definition: stasis.c:1711
struct stasis_forward * forward
Definition: stasis.c:1710
char name[0]
Definition: stasis.c:1712
char buf[0]
Definition: stasis.c:405
struct timeval creationtime
Definition: stasis.c:403
char * name
Definition: stasis.c:400
char * detail
Definition: stasis.c:401
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_val a
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2297
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
FILE * out
Definition: utils/frame.c:33
static void statistics(void)
Definition: utils/frame.c:287
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
#define ARRAY_LEN(a)
Definition: utils.h:666
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668