Asterisk - The Open Source Telephony Project GIT-master-754dea3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message Bus API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/stasis.h"
36#include "asterisk/threadpool.h"
37#include "asterisk/utils.h"
38#include "asterisk/uuid.h"
39#include "asterisk/vector.h"
44#include "asterisk/cli.h"
45
46/*** DOCUMENTATION
47 <managerEvent language="en_US" name="UserEvent">
48 <managerEventInstance class="EVENT_FLAG_USER">
49 <since>
50 <version>12.3.0</version>
51 </since>
52 <synopsis>A user defined event raised from the dialplan.</synopsis>
53 <syntax>
54 <channel_snapshot/>
55 <parameter name="UserEvent">
56 <para>The event name, as specified in the dialplan.</para>
57 </parameter>
58 </syntax>
59 <description>
60 <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
61 </description>
62 <see-also>
63 <ref type="application">UserEvent</ref>
64 <ref type="managerEvent">UserEvent</ref>
65 </see-also>
66 </managerEventInstance>
67 </managerEvent>
68 <configInfo name="stasis" language="en_US">
69 <configFile name="stasis.conf">
70 <configObject name="threadpool">
71 <since>
72 <version>12.8.0</version>
73 <version>13.1.0</version>
74 </since>
75 <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
76 <configOption name="initial_size" default="5">
77 <since>
78 <version>12.8.0</version>
79 <version>13.1.0</version>
80 </since>
81 <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
82 </configOption>
83 <configOption name="idle_timeout_sec" default="20">
84 <since>
85 <version>12.8.0</version>
86 <version>13.1.0</version>
87 </since>
88 <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
89 </configOption>
90 <configOption name="max_size" default="50">
91 <since>
92 <version>12.8.0</version>
93 <version>13.1.0</version>
94 </since>
95 <synopsis>Maximum number of threads in the threadpool.</synopsis>
96 </configOption>
97 </configObject>
98 <configObject name="declined_message_types">
99 <since>
100 <version>13.0.0</version>
101 </since>
102 <synopsis>Stasis message types for which to decline creation.</synopsis>
103 <configOption name="decline">
104 <since>
105 <version>12.8.0</version>
106 <version>13.1.0</version>
107 </since>
108 <synopsis>The message type to decline.</synopsis>
109 <description>
110 <para>This configuration option defines the name of the Stasis
111 message type that Asterisk is forbidden from creating and can be
112 specified as many times as necessary to achieve the desired result.</para>
113 <enumlist>
114 <enum name="stasis_app_recording_snapshot_type" />
115 <enum name="stasis_app_playback_snapshot_type" />
116 <enum name="stasis_test_message_type" />
117 <enum name="confbridge_start_type" />
118 <enum name="confbridge_end_type" />
119 <enum name="confbridge_join_type" />
120 <enum name="confbridge_leave_type" />
121 <enum name="confbridge_start_record_type" />
122 <enum name="confbridge_stop_record_type" />
123 <enum name="confbridge_mute_type" />
124 <enum name="confbridge_unmute_type" />
125 <enum name="confbridge_talking_type" />
126 <enum name="cel_generic_type" />
127 <enum name="ast_bridge_snapshot_type" />
128 <enum name="ast_bridge_merge_message_type" />
129 <enum name="ast_channel_entered_bridge_type" />
130 <enum name="ast_channel_left_bridge_type" />
131 <enum name="ast_blind_transfer_type" />
132 <enum name="ast_attended_transfer_type" />
133 <enum name="ast_endpoint_snapshot_type" />
134 <enum name="ast_endpoint_state_type" />
135 <enum name="ast_device_state_message_type" />
136 <enum name="ast_test_suite_message_type" />
137 <enum name="ast_mwi_state_type" />
138 <enum name="ast_mwi_vm_app_type" />
139 <enum name="ast_format_register_type" />
140 <enum name="ast_format_unregister_type" />
141 <enum name="ast_manager_get_generic_type" />
142 <enum name="ast_parked_call_type" />
143 <enum name="ast_channel_snapshot_type" />
144 <enum name="ast_channel_dial_type" />
145 <enum name="ast_channel_varset_type" />
146 <enum name="ast_channel_hangup_request_type" />
147 <enum name="ast_channel_dtmf_begin_type" />
148 <enum name="ast_channel_dtmf_end_type" />
149 <enum name="ast_channel_flash_type" />
150 <enum name="ast_channel_wink_type" />
151 <enum name="ast_channel_hold_type" />
152 <enum name="ast_channel_unhold_type" />
153 <enum name="ast_channel_chanspy_start_type" />
154 <enum name="ast_channel_chanspy_stop_type" />
155 <enum name="ast_channel_fax_type" />
156 <enum name="ast_channel_hangup_handler_type" />
157 <enum name="ast_channel_moh_start_type" />
158 <enum name="ast_channel_moh_stop_type" />
159 <enum name="ast_channel_mixmonitor_start_type" />
160 <enum name="ast_channel_mixmonitor_stop_type" />
161 <enum name="ast_channel_mixmonitor_mute_type" />
162 <enum name="ast_channel_agent_login_type" />
163 <enum name="ast_channel_agent_logoff_type" />
164 <enum name="ast_channel_talking_start" />
165 <enum name="ast_channel_talking_stop" />
166 <enum name="ast_channel_tone_detect" />
167 <enum name="ast_security_event_type" />
168 <enum name="ast_named_acl_change_type" />
169 <enum name="ast_local_bridge_type" />
170 <enum name="ast_local_optimization_begin_type" />
171 <enum name="ast_local_optimization_end_type" />
172 <enum name="stasis_subscription_change_type" />
173 <enum name="ast_multi_user_event_type" />
174 <enum name="stasis_cache_clear_type" />
175 <enum name="stasis_cache_update_type" />
176 <enum name="ast_network_change_type" />
177 <enum name="ast_system_registry_type" />
178 <enum name="ast_cc_available_type" />
179 <enum name="ast_cc_offertimerstart_type" />
180 <enum name="ast_cc_requested_type" />
181 <enum name="ast_cc_requestacknowledged_type" />
182 <enum name="ast_cc_callerstopmonitoring_type" />
183 <enum name="ast_cc_callerstartmonitoring_type" />
184 <enum name="ast_cc_callerrecalling_type" />
185 <enum name="ast_cc_recallcomplete_type" />
186 <enum name="ast_cc_failure_type" />
187 <enum name="ast_cc_monitorfailed_type" />
188 <enum name="ast_presence_state_message_type" />
189 <enum name="ast_rtp_rtcp_sent_type" />
190 <enum name="ast_rtp_rtcp_received_type" />
191 <enum name="ast_call_pickup_type" />
192 <enum name="aoc_s_type" />
193 <enum name="aoc_d_type" />
194 <enum name="aoc_e_type" />
195 <enum name="dahdichannel_type" />
196 <enum name="mcid_type" />
197 <enum name="session_timeout_type" />
198 <enum name="cdr_read_message_type" />
199 <enum name="cdr_write_message_type" />
200 <enum name="cdr_prop_write_message_type" />
201 <enum name="corosync_ping_message_type" />
202 <enum name="agi_exec_start_type" />
203 <enum name="agi_exec_end_type" />
204 <enum name="agi_async_start_type" />
205 <enum name="agi_async_exec_type" />
206 <enum name="agi_async_end_type" />
207 <enum name="queue_caller_join_type" />
208 <enum name="queue_caller_leave_type" />
209 <enum name="queue_caller_abandon_type" />
210 <enum name="queue_member_status_type" />
211 <enum name="queue_member_added_type" />
212 <enum name="queue_member_removed_type" />
213 <enum name="queue_member_pause_type" />
214 <enum name="queue_member_penalty_type" />
215 <enum name="queue_member_ringinuse_type" />
216 <enum name="queue_agent_called_type" />
217 <enum name="queue_agent_connect_type" />
218 <enum name="queue_agent_complete_type" />
219 <enum name="queue_agent_dump_type" />
220 <enum name="queue_agent_ringnoanswer_type" />
221 <enum name="meetme_join_type" />
222 <enum name="meetme_leave_type" />
223 <enum name="meetme_end_type" />
224 <enum name="meetme_mute_type" />
225 <enum name="meetme_talking_type" />
226 <enum name="meetme_talk_request_type" />
227 <enum name="appcdr_message_type" />
228 <enum name="forkcdr_message_type" />
229 <enum name="cdr_sync_message_type" />
230 </enumlist>
231 </description>
232 </configOption>
233 </configObject>
234 </configFile>
235 </configInfo>
236***/
237
238/*!
239 * \page stasis-impl Stasis Implementation Notes
240 *
241 * \par Reference counting
242 *
243 * Stasis introduces a number of objects, which are tightly related to one
244 * another. Because we rely on ref-counting for memory management, understanding
245 * these relationships is important to understanding this code.
246 *
247 * \code{.txt}
248 *
249 * stasis_topic <----> stasis_subscription
250 * ^ ^
251 * \ /
252 * \ /
253 * dispatch
254 * |
255 * |
256 * v
257 * stasis_message
258 * |
259 * |
260 * v
261 * stasis_message_type
262 *
263 * \endcode
264 *
265 * The most troubling thing in this chart is the cyclic reference between
266 * stasis_topic and stasis_subscription. This is both unfortunate, and
267 * necessary. Topics need the subscription in order to dispatch messages;
268 * subscriptions need the topics to unsubscribe and check subscription status.
269 *
270 * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
271 * topic's reference to a subscription. When the subscription is destroyed, it
272 * will remove its reference to the topic.
273 *
274 * This means that until a subscription has be explicitly unsubscribed, it will
275 * not be destroyed. Neither will a topic be destroyed while it has subscribers.
276 * The destructors of both have assertions regarding this to catch ref-counting
277 * problems where a subscription or topic has had an extra ao2_cleanup().
278 *
279 * The \ref dispatch_exec_sync object is a transient object, which is posted to
280 * a subscription's taskprocessor to send a message to the subscriber. They have
281 * short life cycles, allocated on one thread, destroyed on another.
282 *
283 * During shutdown, or the deletion of a domain object, there are a flurry of
284 * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
285 * are processed. Any one of these cleanups could be the one to actually destroy
286 * a given object, so care must be taken to ensure that an object isn't
287 * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
288 * that might happen when a RAII_VAR() goes out of scope.
289 *
290 * \par Typical life cycles
291 *
292 * \li stasis_topic - There are several topics which live for the duration of
293 * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
294 * are actually fed by shorter-lived topics whose lifetime is associated
295 * with some domain object (like ast_channel_topic() for a given
296 * ast_channel).
297 *
298 * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
299 * topics, for similar reasons.
300 *
301 * \li dispatch - Very short lived; just long enough to post a message to a
302 * subscriber.
303 *
304 * \li stasis_message - Short to intermediate lifetimes, but that is mostly
305 * irrelevant. Messages are strictly data and have no behavior associated
306 * with them, so it doesn't really matter if/when they are destroyed. By
307 * design, a component could hold a ref to a message forever without any
308 * ill consequences (aside from consuming more memory).
309 *
310 * \li stasis_message_type - Long life cycles, typically only destroyed on
311 * module unloading or _clean_ process exit.
312 *
313 * \par Subscriber shutdown sequencing
314 *
315 * Subscribers are sensitive to shutdown sequencing, specifically in how the
316 * reference message types. This is fully detailed in the documentation at
317 * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
318 *
319 * In short, the lifetime of the \a data (and \a callback, if in a module) must
320 * be held until the stasis_subscription_final_message() has been received.
321 * Depending on the structure of the subscriber code, this can be handled by
322 * using stasis_subscription_final_message() to free resources on the final
323 * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
324 * block until the unsubscribe has completed.
325 */
326
327/*! Initial size of the subscribers list. */
328#define INITIAL_SUBSCRIBERS_MAX 4
329
330/*! The number of buckets to use for topic pools */
331#define TOPIC_POOL_BUCKETS 57
332
333/*! Thread pool for topics that don't want a dedicated taskprocessor */
335
337
338#if defined(LOW_MEMORY)
339
340#define TOPIC_ALL_BUCKETS 257
341
342#else
343
344#define TOPIC_ALL_BUCKETS 997
345
346#endif
347
348#ifdef AST_DEVMODE
349
350/*! The number of buckets to use for topic statistics */
351#define TOPIC_STATISTICS_BUCKETS 57
352
353/*! The number of buckets to use for subscription statistics */
354#define SUBSCRIPTION_STATISTICS_BUCKETS 57
355
356/*! Global container which stores statistics for topics */
357static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
358
359/*! Global container which stores statistics for subscriptions */
360static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
361
362/*! \internal */
363struct stasis_message_type_statistics {
364 /*! \brief The number of messages of this published */
365 int published;
366 /*! \brief The number of messages of this that did not reach a subscriber */
367 int unused;
368 /*! \brief The stasis message type */
369 struct stasis_message_type *message_type;
370};
371
372/*! Lock to protect the message types vector */
373AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
374
375/*! Vector containing message type information */
376static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
377
378/*! \internal */
379struct stasis_topic_statistics {
380 /*! \brief Highest time spent dispatching messages to subscribers */
381 long highest_time_dispatched;
382 /*! \brief Lowest time spent dispatching messages to subscribers */
383 long lowest_time_dispatched;
384 /*! \brief The number of messages that were not dispatched to any subscriber */
385 int messages_not_dispatched;
386 /*! \brief The number of messages that were dispatched to at least 1 subscriber */
387 int messages_dispatched;
388 /*! \brief The ids of the subscribers to this topic */
389 struct ao2_container *subscribers;
390 /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
391 struct stasis_topic *topic;
392 /*! \brief Name of the topic */
393 char name[0];
394};
395#endif
396
397/*! \internal */
399 /*! Variable length array of the subscribers */
401
402 /*! Topics forwarding into this topic */
404
405#ifdef AST_DEVMODE
406 struct stasis_topic_statistics *statistics;
407#endif
408
409 /*! Unique incrementing integer for subscriber ids */
411
412 /*! Name of the topic */
413 char *name;
414
415 /*! Detail of the topic */
416 char *detail;
417
418 /*! Creation time */
419 struct timeval *creationtime;
420};
421
423
426
427 char *name;
428 char *detail;
429
430 struct timeval creationtime;
431
432 char buf[0];
433};
434
438
439static void proxy_dtor(void *weakproxy, void *container)
440{
441 ao2_unlink(container, weakproxy);
443}
444
445/* Forward declarations for the tightly-coupled subscription object */
446static int topic_add_subscription(struct stasis_topic *topic,
447 struct stasis_subscription *sub);
448
449static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
450
451/*! \brief Lock two topics. */
452#define topic_lock_both(topic1, topic2) \
453 do { \
454 ao2_lock(topic1); \
455 while (ao2_trylock(topic2)) { \
456 AO2_DEADLOCK_AVOIDANCE(topic1); \
457 } \
458 } while (0)
459
460static void topic_dtor(void *obj)
461{
462 struct stasis_topic *topic = obj;
463#ifdef AST_DEVMODE
464 struct ao2_container *topic_stats;
465#endif
466
467 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
468 topic->name, topic->detail);
469
470 /* Subscribers hold a reference to topics, so they should all be
471 * unsubscribed before we get here. */
473
476 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
477
478#ifdef AST_DEVMODE
479 if (topic->statistics) {
480 topic_stats = ao2_global_obj_ref(topic_statistics);
481 if (topic_stats) {
482 ao2_unlink(topic_stats, topic->statistics);
483 ao2_ref(topic_stats, -1);
484 }
485 ao2_ref(topic->statistics, -1);
486 }
487#endif
488}
489
490#ifdef AST_DEVMODE
491static void topic_statistics_destroy(void *obj)
492{
493 struct stasis_topic_statistics *statistics = obj;
494
495 ao2_cleanup(statistics->subscribers);
496}
497
498static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
499{
500 struct stasis_topic_statistics *statistics;
501 RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
502
503 if (!topic_stats) {
504 return NULL;
505 }
506
507 statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
508 if (!statistics) {
509 return NULL;
510 }
511
512 statistics->subscribers = ast_str_container_alloc(1);
513 if (!statistics->subscribers) {
514 ao2_ref(statistics, -1);
515 return NULL;
516 }
517
518 /* This is strictly used for the pointer address when showing the topic */
519 statistics->topic = topic;
520 strcpy(statistics->name, topic->name); /* SAFE */
521 ao2_link(topic_stats, statistics);
522
523 return statistics;
524}
525#endif
526
527static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
528{
529 struct topic_proxy *proxy;
530 struct stasis_topic* topic_tmp;
531 size_t detail_len;
532
533 if (!topic || !name || !strlen(name) || !detail) {
534 return -1;
535 }
536
538
539 topic_tmp = stasis_topic_get(name);
540 if (topic_tmp) {
541 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
542 ao2_ref(topic_tmp, -1);
544
545 return -1;
546 }
547
548 detail_len = strlen(detail) + 1;
549
550 proxy = ao2_t_weakproxy_alloc(
551 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
552 if (!proxy) {
554
555 return -1;
556 }
557
558 /* set the proxy info */
559 proxy->name = proxy->buf;
560 proxy->detail = proxy->name + strlen(name) + 1;
561
562 strcpy(proxy->name, name); /* SAFE */
563 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
564 proxy->creationtime = ast_tvnow();
565
566 /* We have exclusive access to proxy, no need for locking here. */
567 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
568 ao2_cleanup(proxy);
570
571 return -1;
572 }
573
575 ao2_cleanup(proxy);
578
579 return -1;
580 }
581
582 /* setting the topic point to the proxy */
583 topic->name = proxy->name;
584 topic->detail = proxy->detail;
585 topic->creationtime = &(proxy->creationtime);
586
588 ao2_ref(proxy, -1);
589
591
592 return 0;
593}
594
596 const char *name, const char* detail
597 )
598{
599 struct stasis_topic *topic;
600 int res = 0;
601
602 if (!name|| !strlen(name) || !detail) {
603 return NULL;
604 }
605 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
606
607 topic = stasis_topic_get(name);
608 if (topic) {
609 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
610 name, detail);
611 return topic;
612 }
613
614 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
615 if (!topic) {
616 return NULL;
617 }
618
620 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
621 if (res) {
622 ao2_ref(topic, -1);
623 return NULL;
624 }
625
626 /* link to the proxy */
627 if (link_topic_proxy(topic, name, detail)) {
628 ao2_ref(topic, -1);
629 return NULL;
630 }
631
632#ifdef AST_DEVMODE
633 topic->statistics = stasis_topic_statistics_create(topic);
634 if (!topic->statistics) {
635 ao2_ref(topic, -1);
636 return NULL;
637 }
638#endif
639 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
640
641 return topic;
642}
643
645{
647}
648
650{
652}
653
654const char *stasis_topic_name(const struct stasis_topic *topic)
655{
656 if (!topic) {
657 return NULL;
658 }
659 return topic->name;
660}
661
662const char *stasis_topic_detail(const struct stasis_topic *topic)
663{
664 if (!topic) {
665 return NULL;
666 }
667 return topic->detail;
668}
669
670size_t stasis_topic_subscribers(const struct stasis_topic *topic)
671{
672 return AST_VECTOR_SIZE(&topic->subscribers);
673}
674
675#ifdef AST_DEVMODE
676struct stasis_subscription_statistics {
677 /*! \brief The filename where the subscription originates */
678 const char *file;
679 /*! \brief The function where the subscription originates */
680 const char *func;
681 /*! \brief Names of the topics we are subscribed to */
682 struct ao2_container *topics;
683 /*! \brief The message type that currently took the longest to process */
684 struct stasis_message_type *highest_time_message_type;
685 /*! \brief Highest time spent invoking a message */
686 long highest_time_invoked;
687 /*! \brief Lowest time spent invoking a message */
688 long lowest_time_invoked;
689 /*! \brief The number of messages that were filtered out */
690 int messages_dropped;
691 /*! \brief The number of messages that passed filtering */
692 int messages_passed;
693 /*! \brief Using a mailbox to queue messages */
694 int uses_mailbox;
695 /*! \brief Using stasis threadpool for handling messages */
696 int uses_threadpool;
697 /*! \brief The line number where the subscription originates */
698 int lineno;
699 /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
700 struct stasis_subscription *sub;
701 /*! \brief Unique ID of the subscription */
702 char uniqueid[0];
703};
704#endif
705
706/*! \internal */
708 /*! Unique ID for this subscription */
709 char *uniqueid;
710 /*! Topic subscribed to. */
712 /*! Mailbox for processing incoming messages. */
714 /*! Callback function for incoming message processing. */
716 /*! Data pointer to be handed to the callback. */
717 void *data;
718
719 /*! Condition for joining with subscription. */
721 /*! Flag set when final message for sub has been received.
722 * Be sure join_lock is held before reading/setting. */
724 /*! Flag set when final message for sub has been processed.
725 * Be sure join_lock is held before reading/setting. */
727
728 /*! The message types this subscription is accepting */
730 /*! The message formatters this subscription is accepting */
732 /*! The message filter currently in use */
734
735#ifdef AST_DEVMODE
736 /*! Statistics information */
737 struct stasis_subscription_statistics *statistics;
738#endif
739};
740
741static void subscription_dtor(void *obj)
742{
743 struct stasis_subscription *sub = obj;
744#ifdef AST_DEVMODE
745 struct ao2_container *subscription_stats;
746#endif
747
748 /* Subscriptions need to be manually unsubscribed before destruction
749 * b/c there's a cyclic reference between topics and subscriptions */
751 /* If there are any messages in flight to this subscription; that would
752 * be bad. */
754
755 ast_free(sub->uniqueid);
756 ao2_cleanup(sub->topic);
757 sub->topic = NULL;
759 sub->mailbox = NULL;
760 ast_cond_destroy(&sub->join_cond);
761
762 AST_VECTOR_FREE(&sub->accepted_message_types);
763
764#ifdef AST_DEVMODE
765 if (sub->statistics) {
766 subscription_stats = ao2_global_obj_ref(subscription_statistics);
767 if (subscription_stats) {
768 ao2_unlink(subscription_stats, sub->statistics);
769 ao2_ref(subscription_stats, -1);
770 }
771 ao2_ref(sub->statistics, -1);
772 }
773#endif
774}
775
776/*!
777 * \brief Invoke the subscription's callback.
778 * \param sub Subscription to invoke.
779 * \param message Message to send.
780 */
782 struct stasis_message *message)
783{
784 unsigned int final = stasis_subscription_final_message(sub, message);
786#ifdef AST_DEVMODE
787 struct timeval start;
788 long elapsed;
789
790 start = ast_tvnow();
791#endif
792
793 /* Notify that the final message has been received */
794 if (final) {
795 ao2_lock(sub);
796 sub->final_message_rxed = 1;
797 ast_cond_signal(&sub->join_cond);
799 }
800
801 /*
802 * If filtering is turned on and this is a 'final' message, we only invoke the callback
803 * if the subscriber accepts subscription_change message types.
804 */
805 if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
806 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
807 /* Since sub is mostly immutable, no need to lock sub */
808 sub->callback(sub->data, sub, message);
809 }
810
811 /* Notify that the final message has been processed */
812 if (final) {
813 ao2_lock(sub);
814 sub->final_message_processed = 1;
815 ast_cond_signal(&sub->join_cond);
817 }
818
819#ifdef AST_DEVMODE
820 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
821 if (elapsed > sub->statistics->highest_time_invoked) {
822 sub->statistics->highest_time_invoked = elapsed;
823 ao2_lock(sub->statistics);
824 sub->statistics->highest_time_message_type = stasis_message_type(message);
825 ao2_unlock(sub->statistics);
826 }
827 if (elapsed < sub->statistics->lowest_time_invoked) {
828 sub->statistics->lowest_time_invoked = elapsed;
829 }
830#endif
831}
832
833static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
834static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
835
837{
838}
839
840#ifdef AST_DEVMODE
841static void subscription_statistics_destroy(void *obj)
842{
843 struct stasis_subscription_statistics *statistics = obj;
844
845 ao2_cleanup(statistics->topics);
846}
847
848static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
849 int needs_mailbox, int use_thread_pool, const char *file, int lineno,
850 const char *func)
851{
852 struct stasis_subscription_statistics *statistics;
853 RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
854
855 if (!subscription_stats) {
856 return NULL;
857 }
858
859 statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
860 if (!statistics) {
861 return NULL;
862 }
863
865 if (!statistics->topics) {
866 ao2_ref(statistics, -1);
867 return NULL;
868 }
869
870 statistics->file = file;
871 statistics->lineno = lineno;
872 statistics->func = func;
873 statistics->uses_mailbox = needs_mailbox;
874 statistics->uses_threadpool = use_thread_pool;
875 strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
876 statistics->sub = sub;
877 ao2_link(subscription_stats, statistics);
878
879 return statistics;
880}
881#endif
882
884 struct stasis_topic *topic,
886 void *data,
887 int needs_mailbox,
888 int use_thread_pool,
889 const char *file,
890 int lineno,
891 const char *func)
892{
893 struct stasis_subscription *sub;
894 int ret;
895
896 if (!topic) {
897 return NULL;
898 }
899
900 /* The ao2 lock is used for join_cond. */
902 if (!sub) {
903 return NULL;
904 }
905
906#ifdef AST_DEVMODE
908 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
909 if (ret < 0 || !sub->statistics) {
910 ao2_ref(sub, -1);
911 return NULL;
912 }
913#else
915 if (ret < 0) {
916 ao2_ref(sub, -1);
917 return NULL;
918 }
919#endif
920
921 if (needs_mailbox) {
922 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
923
924 /* Create name with seq number appended. */
925 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
926 use_thread_pool ? 'p' : 'm',
928
929 /*
930 * With a small number of subscribers, a thread-per-sub is
931 * acceptable. For a large number of subscribers, a thread
932 * pool should be used.
933 */
934 if (use_thread_pool) {
935 sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
936 } else {
937 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
938 }
939 if (!sub->mailbox) {
940 ao2_ref(sub, -1);
941
942 return NULL;
943 }
945 /* Taskprocessor has a reference */
946 ao2_ref(sub, +1);
947 }
948
949 ao2_ref(topic, +1);
950 sub->topic = topic;
951 sub->callback = callback;
952 sub->data = data;
953 ast_cond_init(&sub->join_cond, NULL);
955 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
956 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
957
958 if (topic_add_subscription(topic, sub) != 0) {
959 ao2_ref(sub, -1);
960 ao2_ref(topic, -1);
961
962 return NULL;
963 }
965
966 return sub;
967}
968
970 struct stasis_topic *topic,
972 void *data,
973 const char *file,
974 int lineno,
975 const char *func)
976{
977 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
978}
979
981 struct stasis_topic *topic,
983 void *data,
984 const char *file,
985 int lineno,
986 const char *func)
987{
988 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
989}
990
991static int sub_cleanup(void *data)
992{
993 struct stasis_subscription *sub = data;
995 return 0;
996}
997
999{
1000 /* The subscription may be the last ref to this topic. Hold
1001 * the topic ref open until after the unlock. */
1002 struct stasis_topic *topic;
1003
1004 if (!sub) {
1005 return NULL;
1006 }
1007
1008 topic = ao2_bump(sub->topic);
1009
1010 /* We have to remove the subscription first, to ensure the unsubscribe
1011 * is the final message */
1012 if (topic_remove_subscription(sub->topic, sub) != 0) {
1014 "Internal error: subscription has invalid topic\n");
1015 ao2_cleanup(topic);
1016
1017 return NULL;
1018 }
1019
1020 /* Now let everyone know about the unsubscribe */
1022
1023 /* When all that's done, remove the ref the mailbox has on the sub */
1024 if (sub->mailbox) {
1025 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1026 /* Nothing we can do here, the conditional is just to keep
1027 * the compiler happy that we're not ignoring the result. */
1028 }
1029 }
1030
1031 /* Unsubscribing unrefs the subscription */
1033 ao2_cleanup(topic);
1034
1035 return NULL;
1036}
1037
1039 long low_water, long high_water)
1040{
1041 int res = -1;
1042
1043 if (subscription) {
1044 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1045 low_water, high_water);
1046 }
1047 return res;
1048}
1049
1051 const struct stasis_message_type *type)
1052{
1053 if (!subscription) {
1054 return -1;
1055 }
1056
1057 ast_assert(type != NULL);
1059
1061 /* Filtering is unreliable as this message type is not yet initialized
1062 * so force all messages through.
1063 */
1065 return 0;
1066 }
1067
1068 ao2_lock(subscription->topic);
1070 /* We do this for the same reason as above. The subscription can still operate, so allow
1071 * it to do so by forcing all messages through.
1072 */
1074 }
1075 ao2_unlock(subscription->topic);
1076
1077 return 0;
1078}
1079
1081 const struct stasis_message_type *type)
1082{
1083 if (!subscription) {
1084 return -1;
1085 }
1086
1087 ast_assert(type != NULL);
1089
1091 return 0;
1092 }
1093
1094 ao2_lock(subscription->topic);
1096 /* The memory is already allocated so this can't fail */
1098 }
1099 ao2_unlock(subscription->topic);
1100
1101 return 0;
1102}
1103
1106{
1107 if (!subscription) {
1108 return -1;
1109 }
1110
1111 ao2_lock(subscription->topic);
1112 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1113 subscription->filter = filter;
1114 }
1115 ao2_unlock(subscription->topic);
1116
1117 return 0;
1118}
1119
1122{
1123 ast_assert(subscription != NULL);
1124
1125 ao2_lock(subscription->topic);
1126 subscription->accepted_formatters = formatters;
1127 ao2_unlock(subscription->topic);
1128
1129 return;
1130}
1131
1133{
1134 if (subscription) {
1135 ao2_lock(subscription);
1136 /* Wait until the processed flag has been set */
1137 while (!subscription->final_message_processed) {
1138 ast_cond_wait(&subscription->join_cond,
1139 ao2_object_get_lockaddr(subscription));
1140 }
1141 ao2_unlock(subscription);
1142 }
1143}
1144
1146{
1147 if (subscription) {
1148 int ret;
1149
1150 ao2_lock(subscription);
1151 ret = subscription->final_message_rxed;
1152 ao2_unlock(subscription);
1153
1154 return ret;
1155 }
1156
1157 /* Null subscription is about as done as you can get */
1158 return 1;
1159}
1160
1162 struct stasis_subscription *subscription)
1163{
1164 if (!subscription) {
1165 return NULL;
1166 }
1167
1168 /* Bump refcount to hold it past the unsubscribe */
1169 ao2_ref(subscription, +1);
1170 stasis_unsubscribe(subscription);
1171 stasis_subscription_join(subscription);
1172 /* Now decrement the refcount back */
1173 ao2_cleanup(subscription);
1174 return NULL;
1175}
1176
1178{
1179 if (sub) {
1180 size_t i;
1181 struct stasis_topic *topic = sub->topic;
1182
1183 ao2_lock(topic);
1184 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1185 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1186 ao2_unlock(topic);
1187 return 1;
1188 }
1189 }
1190 ao2_unlock(topic);
1191 }
1192
1193 return 0;
1194}
1195
1197{
1198 return sub->uniqueid;
1199}
1200
1202{
1203 struct stasis_subscription_change *change;
1204
1206 return 0;
1207 }
1208
1209 change = stasis_message_data(msg);
1210 if (strcmp("Unsubscribe", change->description)) {
1211 return 0;
1212 }
1213
1214 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1215 return 0;
1216 }
1217
1218 return 1;
1219}
1220
1221/*!
1222 * \brief Add a subscriber to a topic.
1223 * \param topic Topic
1224 * \param sub Subscriber
1225 * \return 0 on success
1226 * \return Non-zero on error
1227 */
1229{
1230 size_t idx;
1231
1232 ao2_lock(topic);
1233 /* The reference from the topic to the subscription is shared with
1234 * the owner of the subscription, which will explicitly unsubscribe
1235 * to release it.
1236 *
1237 * If we bumped the refcount here, the owner would have to unsubscribe
1238 * and cleanup, which is a bit awkward. */
1240
1241 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1244 }
1245
1246#ifdef AST_DEVMODE
1248 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1249#endif
1250
1252
1253 return 0;
1254}
1255
1257{
1258 size_t idx;
1259 int res;
1260
1261 ao2_lock(topic);
1262 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1265 }
1268
1269#ifdef AST_DEVMODE
1270 if (!res) {
1272 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1273 }
1274#endif
1275
1277
1278 return res;
1279}
1280
1281/*!
1282 * \internal \brief Dispatch a message to a subscriber asynchronously
1283 * \param local \ref ast_taskprocessor_local object
1284 * \return 0
1285 */
1287{
1288 struct stasis_subscription *sub = local->local_data;
1289 struct stasis_message *message = local->data;
1290
1293
1294 return 0;
1295}
1296
1297/*!
1298 * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1299 * a published message to a subscriber
1300 */
1306};
1307
1308/*!
1309 * \internal \brief Dispatch a message to a subscriber synchronously
1310 * \param local \ref ast_taskprocessor_local object
1311 * \return 0
1312 */
1314{
1315 struct stasis_subscription *sub = local->local_data;
1316 struct sync_task_data *std = local->data;
1317 struct stasis_message *message = std->task_data;
1318
1321
1322 ast_mutex_lock(&std->lock);
1323 std->complete = 1;
1324 ast_cond_signal(&std->cond);
1325 ast_mutex_unlock(&std->lock);
1326
1327 return 0;
1328}
1329
1330/*!
1331 * \internal \brief Dispatch a message to a subscriber
1332 * \param sub The subscriber to dispatch to
1333 * \param message The message to send
1334 * \param synchronous If non-zero, synchronize on the subscriber receiving
1335 * the message
1336 * \retval 0 if message was not dispatched
1337 * \retval 1 if message was dispatched
1338 */
1339static unsigned int dispatch_message(struct stasis_subscription *sub,
1340 struct stasis_message *message,
1341 int synchronous)
1342{
1344
1345 /*
1346 * The 'do while' gives us an easy way to skip remaining logic once
1347 * we determine the message should be accepted.
1348 * The code looks more verbose than it needs to be but it optimizes
1349 * down very nicely. It's just easier to understand and debug this way.
1350 */
1351 do {
1352 struct stasis_message_type *message_type = stasis_message_type(message);
1353 int type_id = stasis_message_type_id(message_type);
1354 int type_filter_specified = 0;
1355 int formatter_filter_specified = 0;
1356 int type_filter_passed = 0;
1357 int formatter_filter_passed = 0;
1358
1359 /* We always accept final messages so only run the filter logic if not final */
1360 if (is_final) {
1361 break;
1362 }
1363
1364 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1365 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1366
1367 /* Accept if no filters of either type were specified */
1368 if (!type_filter_specified && !formatter_filter_specified) {
1369 break;
1370 }
1371
1372 type_filter_passed = type_filter_specified
1373 && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1374 && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1375
1376 /*
1377 * Since the type and formatter filters are OR'd, we can skip
1378 * the formatter check if the type check passes.
1379 */
1380 if (type_filter_passed) {
1381 break;
1382 }
1383
1384 formatter_filter_passed = formatter_filter_specified
1385 && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1386
1387 if (formatter_filter_passed) {
1388 break;
1389 }
1390
1391#ifdef AST_DEVMODE
1392 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1393#endif
1394
1395 return 0;
1396
1397 } while (0);
1398
1399#ifdef AST_DEVMODE
1400 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1401#endif
1402
1403 if (!sub->mailbox) {
1404 /* Dispatch directly */
1406 return 1;
1407 }
1408
1409 /* Bump the message for the taskprocessor push. This will get de-ref'd
1410 * by the task processor callback.
1411 */
1413 if (!synchronous) {
1415 /* Push failed; ugh. */
1416 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1418 return 0;
1419 }
1420 } else {
1421 struct sync_task_data std;
1422
1423 ast_mutex_init(&std.lock);
1424 ast_cond_init(&std.cond, NULL);
1425 std.complete = 0;
1426 std.task_data = message;
1427
1429 /* Push failed; ugh. */
1430 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1432 ast_mutex_destroy(&std.lock);
1433 ast_cond_destroy(&std.cond);
1434 return 0;
1435 }
1436
1437 ast_mutex_lock(&std.lock);
1438 while (!std.complete) {
1439 ast_cond_wait(&std.cond, &std.lock);
1440 }
1441 ast_mutex_unlock(&std.lock);
1442
1443 ast_mutex_destroy(&std.lock);
1444 ast_cond_destroy(&std.cond);
1445 }
1446
1447 return 1;
1448}
1449
1450/*!
1451 * \internal \brief Publish a message to a topic's subscribers
1452 * \brief topic The topic to publish to
1453 * \brief message The message to publish
1454 * \brief sync_sub An optional subscriber of the topic to publish synchronously
1455 * to
1456 */
1457static void publish_msg(struct stasis_topic *topic,
1458 struct stasis_message *message, struct stasis_subscription *sync_sub)
1459{
1460 size_t i;
1461#ifdef AST_DEVMODE
1462 unsigned int dispatched = 0;
1464 struct stasis_message_type_statistics *statistics;
1465 struct timeval start;
1466 long elapsed;
1467#endif
1468
1469 ast_assert(topic != NULL);
1471
1472#ifdef AST_DEVMODE
1473 ast_mutex_lock(&message_type_statistics_lock);
1474 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1475 struct stasis_message_type_statistics new_statistics = {
1476 .published = 0,
1477 };
1478 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1479 ast_mutex_unlock(&message_type_statistics_lock);
1480 return;
1481 }
1482 }
1483 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1484 statistics->message_type = stasis_message_type(message);
1485 ast_mutex_unlock(&message_type_statistics_lock);
1486
1487 ast_atomic_fetchadd_int(&statistics->published, +1);
1488#endif
1489
1490 /* If there are no subscribers don't bother */
1491 if (!stasis_topic_subscribers(topic)) {
1492#ifdef AST_DEVMODE
1493 ast_atomic_fetchadd_int(&statistics->unused, +1);
1494 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1495#endif
1496 return;
1497 }
1498
1499 /*
1500 * The topic may be unref'ed by the subscription invocation.
1501 * Make sure we hold onto a reference while dispatching.
1502 */
1503 ao2_ref(topic, +1);
1504#ifdef AST_DEVMODE
1505 start = ast_tvnow();
1506#endif
1507 ao2_lock(topic);
1508 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1510
1511 ast_assert(sub != NULL);
1512#ifdef AST_DEVMODE
1513 dispatched +=
1514#endif
1515 dispatch_message(sub, message, (sub == sync_sub));
1516 }
1518
1519#ifdef AST_DEVMODE
1520 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1521 if (elapsed > topic->statistics->highest_time_dispatched) {
1522 topic->statistics->highest_time_dispatched = elapsed;
1523 }
1524 if (elapsed < topic->statistics->lowest_time_dispatched) {
1525 topic->statistics->lowest_time_dispatched = elapsed;
1526 }
1527 if (dispatched) {
1528 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1529 } else {
1530 ast_atomic_fetchadd_int(&statistics->unused, +1);
1531 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1532 }
1533#endif
1534
1535 ao2_ref(topic, -1);
1536}
1537
1539{
1541}
1542
1544{
1545 ast_assert(sub != NULL);
1546
1547 publish_msg(sub->topic, message, sub);
1548}
1549
1550/*!
1551 * \brief Forwarding information
1552 *
1553 * Any message posted to \a from_topic is forwarded to \a to_topic.
1554 *
1555 * In cases where both the \a from_topic and \a to_topic need to be locked,
1556 * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1557 */
1559 /*! Originating topic */
1561 /*! Destination topic */
1563};
1564
1565static void forward_dtor(void *obj)
1566{
1567 struct stasis_forward *forward = obj;
1568
1569 ao2_cleanup(forward->from_topic);
1570 forward->from_topic = NULL;
1571 ao2_cleanup(forward->to_topic);
1572 forward->to_topic = NULL;
1573}
1574
1576{
1577 int idx;
1578 struct stasis_topic *from;
1579 struct stasis_topic *to;
1580
1581 if (!forward) {
1582 return NULL;
1583 }
1584
1585 from = forward->from_topic;
1586 to = forward->to_topic;
1587
1588 if (from && to) {
1589 topic_lock_both(to, from);
1592
1593 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1595 }
1596 ao2_unlock(from);
1597 ao2_unlock(to);
1598 }
1599
1600 ao2_cleanup(forward);
1601
1602 return NULL;
1603}
1604
1606 struct stasis_topic *to_topic)
1607{
1608 int res;
1609 size_t idx;
1610 struct stasis_forward *forward;
1611
1612 if (!from_topic || !to_topic) {
1613 return NULL;
1614 }
1615
1616 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1617 if (!forward) {
1618 return NULL;
1619 }
1620
1621 /* Forwards to ourselves are implicit. */
1622 if (to_topic == from_topic) {
1623 return forward;
1624 }
1625
1626 forward->from_topic = ao2_bump(from_topic);
1627 forward->to_topic = ao2_bump(to_topic);
1628
1631 if (res != 0) {
1634 ao2_ref(forward, -1);
1635 return NULL;
1636 }
1637
1638 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1640 }
1643
1644 return forward;
1645}
1646
1647static void subscription_change_dtor(void *obj)
1648{
1649 struct stasis_subscription_change *change = obj;
1650
1651 ao2_cleanup(change->topic);
1652}
1653
1655{
1656 size_t description_len = strlen(description) + 1;
1657 size_t uniqueid_len = strlen(uniqueid) + 1;
1658 struct stasis_subscription_change *change;
1659
1660 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1662 if (!change) {
1663 return NULL;
1664 }
1665
1666 strcpy(change->description, description); /* SAFE */
1667 change->uniqueid = change->description + description_len;
1668 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1669 ao2_ref(topic, +1);
1670 change->topic = topic;
1671
1672 return change;
1673}
1674
1676{
1677 struct stasis_subscription_change *change;
1678 struct stasis_message *msg;
1679
1680 /* This assumes that we have already unsubscribed */
1682
1684 return;
1685 }
1686
1687 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1688 if (!change) {
1689 return;
1690 }
1691
1693 if (!msg) {
1694 ao2_cleanup(change);
1695 return;
1696 }
1697
1698 stasis_publish(topic, msg);
1699 ao2_cleanup(msg);
1700 ao2_cleanup(change);
1701}
1702
1704 struct stasis_subscription *sub)
1705{
1706 struct stasis_subscription_change *change;
1707 struct stasis_message *msg;
1708
1709 /* This assumes that we have already unsubscribed */
1711
1713 return;
1714 }
1715
1716 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1717 if (!change) {
1718 return;
1719 }
1720
1722 if (!msg) {
1723 ao2_cleanup(change);
1724 return;
1725 }
1726
1727 stasis_publish(topic, msg);
1728
1729 /* Now we have to dispatch to the subscription itself */
1730 dispatch_message(sub, msg, 0);
1731
1732 ao2_cleanup(msg);
1733 ao2_cleanup(change);
1734}
1735
1739 char name[0];
1740};
1741
1742static void topic_pool_entry_dtor(void *obj)
1743{
1744 struct topic_pool_entry *entry = obj;
1745
1746 entry->forward = stasis_forward_cancel(entry->forward);
1747 ao2_cleanup(entry->topic);
1748 entry->topic = NULL;
1749}
1750
1751static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1752{
1754
1755 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1757 if (!topic_pool_entry) {
1758 return NULL;
1759 }
1760
1761 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1762
1763 return topic_pool_entry;
1764}
1765
1769};
1770
1771static void topic_pool_dtor(void *obj)
1772{
1773 struct stasis_topic_pool *pool = obj;
1774
1775#ifdef AO2_DEBUG
1776 {
1777 char *container_name =
1778 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1779 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1780 ao2_container_unregister(container_name);
1781 }
1782#endif
1783
1785 pool->pool_container = NULL;
1786 ao2_cleanup(pool->pool_topic);
1787 pool->pool_topic = NULL;
1788}
1789
1790static int topic_pool_entry_hash(const void *obj, const int flags)
1791{
1792 const struct topic_pool_entry *object;
1793 const char *key;
1794
1795 switch (flags & OBJ_SEARCH_MASK) {
1796 case OBJ_SEARCH_KEY:
1797 key = obj;
1798 break;
1799 case OBJ_SEARCH_OBJECT:
1800 object = obj;
1801 key = object->name;
1802 break;
1803 default:
1804 /* Hash can only work on something with a full key. */
1805 ast_assert(0);
1806 return 0;
1807 }
1808 return ast_str_case_hash(key);
1809}
1810
1811static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1812{
1813 const struct topic_pool_entry *object_left = obj;
1814 const struct topic_pool_entry *object_right = arg;
1815 const char *right_key = arg;
1816 int cmp;
1817
1818 switch (flags & OBJ_SEARCH_MASK) {
1819 case OBJ_SEARCH_OBJECT:
1820 right_key = object_right->name;
1821 /* Fall through */
1822 case OBJ_SEARCH_KEY:
1823 cmp = strcasecmp(object_left->name, right_key);
1824 break;
1826 /* Not supported by container */
1827 ast_assert(0);
1828 cmp = -1;
1829 break;
1830 default:
1831 /*
1832 * What arg points to is specific to this traversal callback
1833 * and has no special meaning to astobj2.
1834 */
1835 cmp = 0;
1836 break;
1837 }
1838 if (cmp) {
1839 return 0;
1840 }
1841 /*
1842 * At this point the traversal callback is identical to a sorted
1843 * container.
1844 */
1845 return CMP_MATCH;
1846}
1847
1848#ifdef AO2_DEBUG
1849static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1850{
1851 struct topic_pool_entry *entry = v_obj;
1852
1853 if (!entry) {
1854 return;
1855 }
1856 prnt(where, "%s", stasis_topic_name(entry->topic));
1857}
1858#endif
1859
1861{
1862 struct stasis_topic_pool *pool;
1863
1865 if (!pool) {
1866 return NULL;
1867 }
1868
1871 if (!pool->pool_container) {
1872 ao2_cleanup(pool);
1873 return NULL;
1874 }
1875
1876#ifdef AO2_DEBUG
1877 {
1878 char *container_name =
1879 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1880 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1881 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1882 }
1883#endif
1884
1885 ao2_ref(pooled_topic, +1);
1886 pool->pool_topic = pooled_topic;
1887
1888 return pool;
1889}
1890
1891void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1892{
1893 /*
1894 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1895 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1896 * name and search only on <topic_name>.
1897 */
1898 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1899 int pool_topic_name_len = strlen(pool_topic_name);
1900 const char *search_topic_name;
1901
1902 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1903 search_topic_name = topic_name + pool_topic_name_len + 1;
1904 } else {
1905 search_topic_name = topic_name;
1906 }
1907
1908 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1909}
1910
1911struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1912{
1914 SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1915 char *new_topic_name;
1916 int ret;
1917
1919 if (topic_pool_entry) {
1920 return topic_pool_entry->topic;
1921 }
1922
1924 if (!topic_pool_entry) {
1925 return NULL;
1926 }
1927
1928 /* To provide further detail and to ensure that the topic is unique within the scope of the
1929 * system we prefix it with the pooling topic name, which should itself already be unique.
1930 */
1931 ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1932 if (ret < 0) {
1933 return NULL;
1934 }
1935
1936 topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1937 ast_free(new_topic_name);
1938 if (!topic_pool_entry->topic) {
1939 return NULL;
1940 }
1941
1943 if (!topic_pool_entry->forward) {
1944 return NULL;
1945 }
1946
1948 return NULL;
1949 }
1950
1951 return topic_pool_entry->topic;
1952}
1953
1954int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1955{
1957
1959 if (!topic_pool_entry) {
1960 return 0;
1961 }
1962
1964 return 1;
1965}
1966
1968{
1969#ifdef AST_DEVMODE
1971 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1972 }
1973#endif
1974}
1975
1976/*! \brief A multi object blob data structure to carry user event stasis messages */
1978 struct ast_json *blob; /*< A blob of JSON data */
1979 AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1980};
1981
1982/*!
1983 * \internal
1984 * \brief Destructor for \ref ast_multi_object_blob objects
1985 */
1986static void multi_object_blob_dtor(void *obj)
1987{
1988 struct ast_multi_object_blob *multi = obj;
1989 int type;
1990 int i;
1991
1992 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1993 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1995 }
1996 AST_VECTOR_FREE(&multi->snapshots[type]);
1997 }
1998 ast_json_unref(multi->blob);
1999}
2000
2001/*! \brief Create a stasis user event multi object blob */
2003{
2004 int type;
2005 struct ast_multi_object_blob *multi;
2006
2007 ast_assert(blob != NULL);
2008
2009 multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
2010 if (!multi) {
2011 return NULL;
2012 }
2013
2014 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2015 if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
2016 ao2_ref(multi, -1);
2017
2018 return NULL;
2019 }
2020 }
2021
2022 multi->blob = ast_json_ref(blob);
2023
2024 return multi;
2025}
2026
2027/*! \brief Add an object (snapshot) to the blob */
2030{
2031 if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2032 ao2_cleanup(object);
2033 }
2034}
2035
2036/*! \brief Publish single channel user event (for app_userevent compatibility) */
2038 struct stasis_message_type *type, struct ast_json *blob)
2039{
2040 struct stasis_message *message;
2041 struct ast_channel_snapshot *channel_snapshot;
2042 struct ast_multi_object_blob *multi;
2043
2044 if (!type) {
2045 return;
2046 }
2047
2049 if (!multi) {
2050 return;
2051 }
2052
2053 channel_snapshot = ast_channel_snapshot_create(chan);
2054 if (!channel_snapshot) {
2055 ao2_ref(multi, -1);
2056 return;
2057 }
2058
2059 /* this call steals the channel_snapshot reference */
2060 ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2061
2063 ao2_ref(multi, -1);
2064 if (message) {
2065 /* app_userevent still publishes to channel */
2067 ao2_ref(message, -1);
2068 }
2069}
2070
2071/*! \internal \brief convert multi object blob to ari json */
2073 struct stasis_message *message,
2074 const struct stasis_message_sanitizer *sanitize)
2075{
2076 struct ast_json *out;
2078 struct ast_json *blob = multi->blob;
2079 const struct timeval *tv = stasis_message_timestamp(message);
2081 int i;
2082
2084 if (!out) {
2085 return NULL;
2086 }
2087
2088 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2089 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2090 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2091 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2092
2093 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2094 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2095 struct ast_json *json_object = NULL;
2096 char *name = NULL;
2097 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2098
2099 switch (type) {
2101 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2102 name = "channel";
2103 break;
2104 case STASIS_UMOS_BRIDGE:
2105 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2106 name = "bridge";
2107 break;
2109 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2110 name = "endpoint";
2111 break;
2112 }
2113 if (json_object) {
2114 ast_json_object_set(out, name, json_object);
2115 }
2116 }
2117 }
2118
2119 return out;
2120}
2121
2122/*! \internal \brief convert multi object blob to ami string */
2123static struct ast_str *multi_object_blob_to_ami(void *obj)
2124{
2125 struct ast_str *ami_str=ast_str_create(1024);
2126 struct ast_str *ami_snapshot;
2127 const struct ast_multi_object_blob *multi = obj;
2129 int i;
2130
2131 if (!ami_str) {
2132 return NULL;
2133 }
2134 if (!multi) {
2135 ast_free(ami_str);
2136 return NULL;
2137 }
2138
2139 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2140 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2141 char *name = NULL;
2142 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2143 ami_snapshot = NULL;
2144
2145 if (i > 0) {
2146 ast_asprintf(&name, "%d", i + 1);
2147 }
2148
2149 switch (type) {
2151 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2152 break;
2153
2154 case STASIS_UMOS_BRIDGE:
2155 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2156 break;
2157
2159 /* currently not sending endpoint snapshots to AMI */
2160 break;
2161 }
2162 if (ami_snapshot) {
2163 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2164 ast_free(ami_snapshot);
2165 }
2166 ast_free(name);
2167 }
2168 }
2169
2170 return ami_str;
2171}
2172
2173/*! \internal \brief Callback to pass only user defined parameters from blob */
2174static int userevent_exclusion_cb(const char *key)
2175{
2176 if (!strcmp("eventname", key)) {
2177 return 1;
2178 }
2179 return 0;
2180}
2181
2183 struct stasis_message *message)
2184{
2185 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2186 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2188 const char *eventname;
2189
2190 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2192 object_string = multi_object_blob_to_ami(multi);
2193 if (!object_string || !body) {
2194 return NULL;
2195 }
2196
2198 "%s"
2199 "UserEvent: %s\r\n"
2200 "%s",
2201 ast_str_buffer(object_string),
2202 eventname,
2203 ast_str_buffer(body));
2204}
2205
2206/*! \brief A structure to hold global configuration-related options */
2208 /*! The list of message types to decline */
2210};
2211
2212/*! \brief Threadpool configuration options */
2214 /*! Initial size of the thread pool */
2216 /*! Time, in seconds, before we expire a thread */
2218 /*! Maximum number of thread to allow */
2220};
2221
2223 /*! Thread pool configuration options */
2225 /*! Declined message types */
2227};
2228
2230 .type = ACO_GLOBAL,
2231 .name = "threadpool",
2232 .item_offset = offsetof(struct stasis_config, threadpool_options),
2233 .category = "threadpool",
2234 .category_match = ACO_WHITELIST_EXACT,
2235};
2236
2238
2239/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2240static struct aco_type declined_option = {
2241 .type = ACO_GLOBAL,
2242 .name = "declined_message_types",
2243 .item_offset = offsetof(struct stasis_config, declined_message_types),
2244 .category_match = ACO_WHITELIST_EXACT,
2245 .category = "declined_message_types",
2246};
2247
2249
2251 .filename = "stasis.conf",
2253};
2254
2255/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2257
2258static void *stasis_config_alloc(void);
2259
2260/*! \brief Register information about the configs being processed by this module */
2262 .files = ACO_FILES(&stasis_conf),
2263);
2264
2266{
2267 struct stasis_declined_config *declined = obj;
2268
2269 ao2_cleanup(declined->declined);
2270}
2271
2272static void stasis_config_destructor(void *obj)
2273{
2274 struct stasis_config *cfg = obj;
2275
2278}
2279
2280static void *stasis_config_alloc(void)
2281{
2282 struct stasis_config *cfg;
2283
2284 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2285 return NULL;
2286 }
2287
2288 cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2289 if (!cfg->threadpool_options) {
2290 ao2_ref(cfg, -1);
2291 return NULL;
2292 }
2293
2296 if (!cfg->declined_message_types) {
2297 ao2_ref(cfg, -1);
2298 return NULL;
2299 }
2300
2302 if (!cfg->declined_message_types->declined) {
2303 ao2_ref(cfg, -1);
2304 return NULL;
2305 }
2306
2307 return cfg;
2308}
2309
2311{
2313 char *name_in_declined;
2314 int res;
2315
2316 if (!cfg || !cfg->declined_message_types) {
2317 ao2_cleanup(cfg);
2318 return 0;
2319 }
2320
2321 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2322 res = name_in_declined ? 1 : 0;
2323 ao2_cleanup(name_in_declined);
2324 ao2_ref(cfg, -1);
2325 if (res) {
2326 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2327 }
2328 return res;
2329}
2330
2331static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2332{
2333 struct stasis_declined_config *declined = obj;
2334
2335 if (ast_strlen_zero(var->value)) {
2336 return 0;
2337 }
2338
2339 if (ast_str_container_add(declined->declined, var->value)) {
2340 return -1;
2341 }
2342
2343 return 0;
2344}
2345
2346/*!
2347 * @{ \brief Define multi user event message type(s).
2348 */
2349
2351 .to_json = multi_user_event_to_json,
2353 );
2354
2355/*! @} */
2356
2357/*!
2358 * \internal
2359 * \brief CLI command implementation for 'stasis show topics'
2360 */
2361static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2362{
2363 struct ao2_iterator iter;
2364 struct topic_proxy *topic;
2365 struct ao2_container *tmp_container;
2366 int count = 0;
2367#define FMT_HEADERS "%-64s %-64s\n"
2368#define FMT_FIELDS "%-64s %-64s\n"
2369
2370 switch (cmd) {
2371 case CLI_INIT:
2372 e->command = "stasis show topics";
2373 e->usage =
2374 "Usage: stasis show topics\n"
2375 " Shows a list of topics\n";
2376 return NULL;
2377 case CLI_GENERATE:
2378 return NULL;
2379 }
2380
2381 if (a->argc != e->args) {
2382 return CLI_SHOWUSAGE;
2383 }
2384
2385 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2386
2388 topic_proxy_sort_fn, NULL);
2389
2390 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2391 ao2_cleanup(tmp_container);
2392
2393 return NULL;
2394 }
2395
2396 /* getting all topic in order */
2397 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2398 while ((topic = ao2_iterator_next(&iter))) {
2399 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2400 ao2_ref(topic, -1);
2401 ++count;
2402 }
2403 ao2_iterator_destroy(&iter);
2404 ao2_cleanup(tmp_container);
2405
2406 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2407
2408#undef FMT_HEADERS
2409#undef FMT_FIELDS
2410
2411 return CLI_SUCCESS;
2412}
2413
2414/*!
2415 * \internal
2416 * \brief CLI tab completion for topic names
2417 */
2418static char *topic_complete_name(const char *word)
2419{
2420 struct topic_proxy *topic;
2421 struct ao2_iterator it;
2422 int wordlen = strlen(word);
2423 int ret;
2424
2426 while ((topic = ao2_iterator_next(&it))) {
2427 if (!strncasecmp(word, topic->name, wordlen)) {
2428 ret = ast_cli_completion_add(ast_strdup(topic->name));
2429 if (ret) {
2430 ao2_ref(topic, -1);
2431 break;
2432 }
2433 }
2434 ao2_ref(topic, -1);
2435 }
2437 return NULL;
2438}
2439
2440/*!
2441 * \internal
2442 * \brief CLI command implementation for 'stasis show topic'
2443 */
2444static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2445{
2446 struct stasis_topic *topic;
2447 char print_time[32];
2448 int i;
2449
2450 switch (cmd) {
2451 case CLI_INIT:
2452 e->command = "stasis show topic";
2453 e->usage =
2454 "Usage: stasis show topic <name>\n"
2455 " Show stasis topic detail info.\n";
2456 return NULL;
2457 case CLI_GENERATE:
2458 if (a->pos == 3) {
2459 return topic_complete_name(a->word);
2460 } else {
2461 return NULL;
2462 }
2463 }
2464
2465 if (a->argc != 4) {
2466 return CLI_SHOWUSAGE;
2467 }
2468
2469 topic = stasis_topic_get(a->argv[3]);
2470 if (!topic) {
2471 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2472 return CLI_FAILURE;
2473 }
2474
2475 ast_cli(a->fd, "Name: %s\n", topic->name);
2476 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2477 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2478 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2479 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2480 ast_cli(a->fd, "Duration time: %s\n", print_time);
2481
2482 ao2_lock(topic);
2483 ast_cli(a->fd, "\nSubscribers:\n");
2484 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2485 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2486 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2487 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2488 }
2489
2490 ast_cli(a->fd, "\nForwarded topics:\n");
2491 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2492 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2493 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2494 }
2495 ao2_unlock(topic);
2496
2497 ao2_ref(topic, -1);
2498
2499 return CLI_SUCCESS;
2500}
2501
2502
2503static struct ast_cli_entry cli_stasis[] = {
2504 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2505 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2506};
2507
2508
2509#ifdef AST_DEVMODE
2510
2511AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2512
2513/*!
2514 * \internal
2515 * \brief CLI command implementation for 'stasis statistics show subscriptions'
2516 */
2517static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2518{
2519 struct ao2_container *sorted_subscriptions;
2520 struct ao2_container *subscription_stats;
2521 struct ao2_iterator iter;
2522 struct stasis_subscription_statistics *statistics;
2523 int count = 0;
2524 int dropped = 0;
2525 int passed = 0;
2526#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2527#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2528#define FMT_FIELDS2 "%-64s %10d %10d\n"
2529
2530 switch (cmd) {
2531 case CLI_INIT:
2532 e->command = "stasis statistics show subscriptions";
2533 e->usage =
2534 "Usage: stasis statistics show subscriptions\n"
2535 " Shows a list of subscriptions and their general statistics\n";
2536 return NULL;
2537 case CLI_GENERATE:
2538 return NULL;
2539 }
2540
2541 if (a->argc != e->args) {
2542 return CLI_SHOWUSAGE;
2543 }
2544
2545 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2546 if (!subscription_stats) {
2547 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2548 return CLI_FAILURE;
2549 }
2550
2551 sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2552 stasis_subscription_statistics_sort_fn, NULL);
2553 if (!sorted_subscriptions) {
2554 ao2_ref(subscription_stats, -1);
2555 ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2556 return CLI_SUCCESS;
2557 }
2558
2559 if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2560 ao2_ref(sorted_subscriptions, -1);
2561 ao2_ref(subscription_stats, -1);
2562 ast_cli(a->fd, "Could not sort subscription statistics\n");
2563 return CLI_SUCCESS;
2564 }
2565
2566 ao2_ref(subscription_stats, -1);
2567
2568 ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2569
2570 iter = ao2_iterator_init(sorted_subscriptions, 0);
2571 while ((statistics = ao2_iterator_next(&iter))) {
2572 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2573 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2574 dropped += statistics->messages_dropped;
2575 passed += statistics->messages_passed;
2576 ao2_ref(statistics, -1);
2577 ++count;
2578 }
2579 ao2_iterator_destroy(&iter);
2580
2581 ao2_ref(sorted_subscriptions, -1);
2582
2583 ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2584 ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2585
2586#undef FMT_HEADERS
2587#undef FMT_FIELDS
2588#undef FMT_FIELDS2
2589
2590 return CLI_SUCCESS;
2591}
2592
2593/*!
2594 * \internal
2595 * \brief CLI tab completion for subscription statistics names
2596 */
2597static char *subscription_statistics_complete_name(const char *word, int state)
2598{
2599 struct stasis_subscription_statistics *statistics;
2600 struct ao2_container *subscription_stats;
2601 struct ao2_iterator it_statistics;
2602 int wordlen = strlen(word);
2603 int which = 0;
2604 char *result = NULL;
2605
2606 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2607 if (!subscription_stats) {
2608 return result;
2609 }
2610
2611 it_statistics = ao2_iterator_init(subscription_stats, 0);
2612 while ((statistics = ao2_iterator_next(&it_statistics))) {
2613 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2614 && ++which > state) {
2615 result = ast_strdup(statistics->uniqueid);
2616 }
2617 ao2_ref(statistics, -1);
2618 if (result) {
2619 break;
2620 }
2621 }
2622 ao2_iterator_destroy(&it_statistics);
2623 ao2_ref(subscription_stats, -1);
2624 return result;
2625}
2626
2627/*!
2628 * \internal
2629 * \brief CLI command implementation for 'stasis statistics show subscription'
2630 */
2631static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2632{
2633 struct stasis_subscription_statistics *statistics;
2634 struct ao2_container *subscription_stats;
2635 struct ao2_iterator i;
2636 char *name;
2637
2638 switch (cmd) {
2639 case CLI_INIT:
2640 e->command = "stasis statistics show subscription";
2641 e->usage =
2642 "Usage: stasis statistics show subscription <uniqueid>\n"
2643 " Show stasis subscription statistics.\n";
2644 return NULL;
2645 case CLI_GENERATE:
2646 if (a->pos == 4) {
2647 return subscription_statistics_complete_name(a->word, a->n);
2648 } else {
2649 return NULL;
2650 }
2651 }
2652
2653 if (a->argc != 5) {
2654 return CLI_SHOWUSAGE;
2655 }
2656
2657 subscription_stats = ao2_global_obj_ref(subscription_statistics);
2658 if (!subscription_stats) {
2659 ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2660 return CLI_FAILURE;
2661 }
2662
2663 statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2664 if (!statistics) {
2665 ao2_ref(subscription_stats, -1);
2666 ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2667 return CLI_FAILURE;
2668 }
2669
2670 ao2_ref(subscription_stats, -1);
2671
2672 ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2673 ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2674 ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2675 ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2676 ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2677 ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2678 ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2679 ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2680 ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2681 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2682 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2683
2685 if (statistics->highest_time_message_type) {
2686 ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2687 }
2689
2690 ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2691
2692 ast_cli(a->fd, "Subscribed topics:\n");
2693 i = ao2_iterator_init(statistics->topics, 0);
2694 while ((name = ao2_iterator_next(&i))) {
2695 ast_cli(a->fd, "\t%s\n", name);
2696 ao2_ref(name, -1);
2697 }
2699
2700 ao2_ref(statistics, -1);
2701
2702 return CLI_SUCCESS;
2703}
2704
2705AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2706
2707/*!
2708 * \internal
2709 * \brief CLI command implementation for 'stasis statistics show topics'
2710 */
2711static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2712{
2713 struct ao2_container *sorted_topics;
2714 struct ao2_container *topic_stats;
2715 struct ao2_iterator iter;
2716 struct stasis_topic_statistics *statistics;
2717 int count = 0;
2718 int not_dispatched = 0;
2719 int dispatched = 0;
2720#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2721#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2722#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2723
2724 switch (cmd) {
2725 case CLI_INIT:
2726 e->command = "stasis statistics show topics";
2727 e->usage =
2728 "Usage: stasis statistics show topics\n"
2729 " Shows a list of topics and their general statistics\n";
2730 return NULL;
2731 case CLI_GENERATE:
2732 return NULL;
2733 }
2734
2735 if (a->argc != e->args) {
2736 return CLI_SHOWUSAGE;
2737 }
2738
2739 topic_stats = ao2_global_obj_ref(topic_statistics);
2740 if (!topic_stats) {
2741 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2742 return CLI_FAILURE;
2743 }
2744
2746 stasis_topic_statistics_sort_fn, NULL);
2747 if (!sorted_topics) {
2748 ao2_ref(topic_stats, -1);
2749 ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2750 return CLI_SUCCESS;
2751 }
2752
2753 if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2754 ao2_ref(sorted_topics, -1);
2755 ao2_ref(topic_stats, -1);
2756 ast_cli(a->fd, "Could not sort topic statistics\n");
2757 return CLI_SUCCESS;
2758 }
2759
2760 ao2_ref(topic_stats, -1);
2761
2762 ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2763
2764 iter = ao2_iterator_init(sorted_topics, 0);
2765 while ((statistics = ao2_iterator_next(&iter))) {
2766 ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2767 statistics->messages_not_dispatched, statistics->messages_dispatched,
2768 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2769 not_dispatched += statistics->messages_not_dispatched;
2770 dispatched += statistics->messages_dispatched;
2771 ao2_ref(statistics, -1);
2772 ++count;
2773 }
2774 ao2_iterator_destroy(&iter);
2775
2776 ao2_ref(sorted_topics, -1);
2777
2778 ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2779 ast_cli(a->fd, "\n%d topics\n\n", count);
2780
2781#undef FMT_HEADERS
2782#undef FMT_FIELDS
2783#undef FMT_FIELDS2
2784
2785 return CLI_SUCCESS;
2786}
2787
2788/*!
2789 * \internal
2790 * \brief CLI tab completion for topic statistics names
2791 */
2792static char *topic_statistics_complete_name(const char *word, int state)
2793{
2794 struct stasis_topic_statistics *statistics;
2795 struct ao2_container *topic_stats;
2796 struct ao2_iterator it_statistics;
2797 int wordlen = strlen(word);
2798 int which = 0;
2799 char *result = NULL;
2800
2801 topic_stats = ao2_global_obj_ref(topic_statistics);
2802 if (!topic_stats) {
2803 return result;
2804 }
2805
2806 it_statistics = ao2_iterator_init(topic_stats, 0);
2807 while ((statistics = ao2_iterator_next(&it_statistics))) {
2808 if (!strncasecmp(word, statistics->name, wordlen)
2809 && ++which > state) {
2810 result = ast_strdup(statistics->name);
2811 }
2812 ao2_ref(statistics, -1);
2813 if (result) {
2814 break;
2815 }
2816 }
2817 ao2_iterator_destroy(&it_statistics);
2818 ao2_ref(topic_stats, -1);
2819 return result;
2820}
2821
2822/*!
2823 * \internal
2824 * \brief CLI command implementation for 'stasis statistics show topic'
2825 */
2826static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2827{
2828 struct stasis_topic_statistics *statistics;
2829 struct ao2_container *topic_stats;
2830 struct ao2_iterator i;
2831 char *uniqueid;
2832
2833 switch (cmd) {
2834 case CLI_INIT:
2835 e->command = "stasis statistics show topic";
2836 e->usage =
2837 "Usage: stasis statistics show topic <name>\n"
2838 " Show stasis topic statistics.\n";
2839 return NULL;
2840 case CLI_GENERATE:
2841 if (a->pos == 4) {
2842 return topic_statistics_complete_name(a->word, a->n);
2843 } else {
2844 return NULL;
2845 }
2846 }
2847
2848 if (a->argc != 5) {
2849 return CLI_SHOWUSAGE;
2850 }
2851
2852 topic_stats = ao2_global_obj_ref(topic_statistics);
2853 if (!topic_stats) {
2854 ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2855 return CLI_FAILURE;
2856 }
2857
2858 statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2859 if (!statistics) {
2860 ao2_ref(topic_stats, -1);
2861 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2862 return CLI_FAILURE;
2863 }
2864
2865 ao2_ref(topic_stats, -1);
2866
2867 ast_cli(a->fd, "Topic: %s\n", statistics->name);
2868 ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2869 ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2870 ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2871 ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2872 ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2873 ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2874
2875 ast_cli(a->fd, "Subscribers:\n");
2876 i = ao2_iterator_init(statistics->subscribers, 0);
2877 while ((uniqueid = ao2_iterator_next(&i))) {
2878 ast_cli(a->fd, "\t%s\n", uniqueid);
2879 ao2_ref(uniqueid, -1);
2880 }
2882
2883 ao2_ref(statistics, -1);
2884
2885 return CLI_SUCCESS;
2886}
2887
2888/*!
2889 * \internal
2890 * \brief CLI command implementation for 'stasis statistics show messages'
2891 */
2892static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2893{
2894 int i;
2895 int count = 0;
2896 int published = 0;
2897 int unused = 0;
2898#define FMT_HEADERS "%-64s %10s %10s\n"
2899#define FMT_FIELDS "%-64s %10d %10d\n"
2900
2901 switch (cmd) {
2902 case CLI_INIT:
2903 e->command = "stasis statistics show messages";
2904 e->usage =
2905 "Usage: stasis statistics show messages\n"
2906 " Shows a list of message types and their general statistics\n";
2907 return NULL;
2908 case CLI_GENERATE:
2909 return NULL;
2910 }
2911
2912 if (a->argc != e->args) {
2913 return CLI_SHOWUSAGE;
2914 }
2915
2916 ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2917
2918 ast_mutex_lock(&message_type_statistics_lock);
2919 for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2920 struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2921
2922 if (!statistics->message_type) {
2923 continue;
2924 }
2925
2926 ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2927 statistics->unused);
2928 published += statistics->published;
2929 unused += statistics->unused;
2930 ++count;
2931 }
2932 ast_mutex_unlock(&message_type_statistics_lock);
2933
2934 ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2935 ast_cli(a->fd, "\n%d seen message types\n\n", count);
2936
2937#undef FMT_HEADERS
2938#undef FMT_FIELDS
2939
2940 return CLI_SUCCESS;
2941}
2942
2943static struct ast_cli_entry cli_stasis_statistics[] = {
2944 AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2945 AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2946 AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2947 AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2948 AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2949};
2950
2951static int subscription_statistics_hash(const void *obj, const int flags)
2952{
2953 const struct stasis_subscription_statistics *object;
2954 const char *key;
2955
2956 switch (flags & OBJ_SEARCH_MASK) {
2957 case OBJ_SEARCH_KEY:
2958 key = obj;
2959 break;
2960 case OBJ_SEARCH_OBJECT:
2961 object = obj;
2962 key = object->uniqueid;
2963 break;
2964 default:
2965 /* Hash can only work on something with a full key. */
2966 ast_assert(0);
2967 return 0;
2968 }
2969 return ast_str_case_hash(key);
2970}
2971
2972static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2973{
2974 const struct stasis_subscription_statistics *object_left = obj;
2975 const struct stasis_subscription_statistics *object_right = arg;
2976 const char *right_key = arg;
2977 int cmp;
2978
2979 switch (flags & OBJ_SEARCH_MASK) {
2980 case OBJ_SEARCH_OBJECT:
2981 right_key = object_right->uniqueid;
2982 /* Fall through */
2983 case OBJ_SEARCH_KEY:
2984 cmp = strcasecmp(object_left->uniqueid, right_key);
2985 break;
2987 /* Not supported by container */
2988 ast_assert(0);
2989 cmp = -1;
2990 break;
2991 default:
2992 /*
2993 * What arg points to is specific to this traversal callback
2994 * and has no special meaning to astobj2.
2995 */
2996 cmp = 0;
2997 break;
2998 }
2999 if (cmp) {
3000 return 0;
3001 }
3002 /*
3003 * At this point the traversal callback is identical to a sorted
3004 * container.
3005 */
3006 return CMP_MATCH;
3007}
3008
3009static int topic_statistics_hash(const void *obj, const int flags)
3010{
3011 const struct stasis_topic_statistics *object;
3012 const char *key;
3013
3014 switch (flags & OBJ_SEARCH_MASK) {
3015 case OBJ_SEARCH_KEY:
3016 key = obj;
3017 break;
3018 case OBJ_SEARCH_OBJECT:
3019 object = obj;
3020 key = object->name;
3021 break;
3022 default:
3023 /* Hash can only work on something with a full key. */
3024 ast_assert(0);
3025 return 0;
3026 }
3027 return ast_str_case_hash(key);
3028}
3029
3030static int topic_statistics_cmp(void *obj, void *arg, int flags)
3031{
3032 const struct stasis_topic_statistics *object_left = obj;
3033 const struct stasis_topic_statistics *object_right = arg;
3034 const char *right_key = arg;
3035 int cmp;
3036
3037 switch (flags & OBJ_SEARCH_MASK) {
3038 case OBJ_SEARCH_OBJECT:
3039 right_key = object_right->name;
3040 /* Fall through */
3041 case OBJ_SEARCH_KEY:
3042 cmp = strcasecmp(object_left->name, right_key);
3043 break;
3045 /* Not supported by container */
3046 ast_assert(0);
3047 cmp = -1;
3048 break;
3049 default:
3050 /*
3051 * What arg points to is specific to this traversal callback
3052 * and has no special meaning to astobj2.
3053 */
3054 cmp = 0;
3055 break;
3056 }
3057 if (cmp) {
3058 return 0;
3059 }
3060 /*
3061 * At this point the traversal callback is identical to a sorted
3062 * container.
3063 */
3064 return CMP_MATCH;
3065}
3066#endif
3067
3068/*! \brief Cleanup function for graceful shutdowns */
3069static void stasis_cleanup(void)
3070{
3071#ifdef AST_DEVMODE
3072 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3073 AST_VECTOR_FREE(&message_type_statistics);
3074 ao2_global_obj_release(subscription_statistics);
3075 ao2_global_obj_release(topic_statistics);
3076#endif
3079 topic_all = NULL;
3081 threadpool = NULL;
3084 aco_info_destroy(&cfg_info);
3086}
3087
3089{
3090 struct stasis_config *cfg;
3091 int cache_init;
3092 struct ast_threadpool_options threadpool_opts = { 0, };
3093#ifdef AST_DEVMODE
3094 struct ao2_container *subscription_stats;
3095 struct ao2_container *topic_stats;
3096#endif
3097
3098 /* Be sure the types are cleaned up after the message bus */
3100
3101 if (aco_info_init(&cfg_info)) {
3102 return -1;
3103 }
3104
3105 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3107 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3109 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3110 INT_MAX);
3111 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3113 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3114 INT_MAX);
3115 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3117 FLDSET(struct stasis_threadpool_conf, max_size), 0,
3118 INT_MAX);
3119
3120 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3121 struct stasis_config *default_cfg = stasis_config_alloc();
3122
3123 if (!default_cfg) {
3124 return -1;
3125 }
3126
3127 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3128 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3129 ao2_ref(default_cfg, -1);
3130
3131 return -1;
3132 }
3133
3134 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3135 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3136 ao2_ref(default_cfg, -1);
3137
3138 return -1;
3139 }
3140
3141 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3143 cfg = default_cfg;
3144 } else {
3146 if (!cfg) {
3147 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3148
3149 return -1;
3150 }
3151 }
3152
3153 threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3154 threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3155 threadpool_opts.auto_increment = 1;
3156 threadpool_opts.max_size = cfg->threadpool_options->max_size;
3157 threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3158 threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3159 ao2_ref(cfg, -1);
3160 if (!threadpool) {
3161 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3162
3163 return -1;
3164 }
3165
3166 cache_init = stasis_cache_init();
3167 if (cache_init != 0) {
3168 return -1;
3169 }
3170
3172 return -1;
3173 }
3175 return -1;
3176 }
3177
3179 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3180 if (!topic_all) {
3181 return -1;
3182 }
3183
3185 return -1;
3186 }
3187
3188#ifdef AST_DEVMODE
3189 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3190 * topic or subscripton.
3191 */
3192 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3193 subscription_statistics_hash, 0, subscription_statistics_cmp);
3194 if (!subscription_stats) {
3195 return -1;
3196 }
3197 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3198 ao2_cleanup(subscription_stats);
3199
3200 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3201 topic_statistics_hash, 0, topic_statistics_cmp);
3202 if (!topic_stats) {
3203 return -1;
3204 }
3205 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3206 ao2_cleanup(topic_stats);
3207 if (!topic_stats) {
3208 return -1;
3209 }
3210
3211 AST_VECTOR_INIT(&message_type_statistics, 0);
3212
3213 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3214 return -1;
3215 }
3216#endif
3217
3218 return 0;
3219}
#define var
Definition: ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static PGresult * result
Definition: cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2768
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:899
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:555
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2028
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2037
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:2002
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
#define ast_cond_destroy(cond)
Definition: lock.h:206
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:608
#define ast_cond_wait(cond, mutex)
Definition: lock.h:209
#define ast_cond_init(cond, attr)
Definition: lock.h:205
#define ast_mutex_init(pmutex)
Definition: lock.h:190
#define ast_mutex_unlock(a)
Definition: lock.h:194
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:761
pthread_cond_t ast_cond_t
Definition: lock.h:182
#define ast_mutex_destroy(a)
Definition: lock.h:192
#define ast_mutex_lock(a)
Definition: lock.h:193
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:524
#define ast_cond_signal(cond)
Definition: lock.h:207
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10237
#define EVENT_FLAG_USER
Definition: manager.h:81
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ao2_container * container
Definition: res_fax.c:531
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2123
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1145
static char * topic_complete_name(const char *word)
Definition: stasis.c:2418
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:998
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2503
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:328
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2072
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1647
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1891
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2182
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1080
#define FMT_HEADERS
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:654
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1313
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1038
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1911
static int sub_cleanup(void *data)
Definition: stasis.c:991
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1575
static void forward_dtor(void *obj)
Definition: stasis.c:1565
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:644
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:670
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1771
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1543
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1050
static void * stasis_config_alloc(void)
Definition: stasis.c:2280
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1751
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1104
static void topic_dtor(void *obj)
Definition: stasis.c:460
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:331
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1654
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3069
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:649
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3088
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:439
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:836
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:980
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1811
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2272
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1132
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2310
static struct ast_threadpool * threadpool
Definition: stasis.c:334
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1790
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1967
static struct aco_type threadpool_option
Definition: stasis.c:2229
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1986
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:527
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1228
static void subscription_dtor(void *obj)
Definition: stasis.c:741
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:595
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1201
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:662
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1161
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1703
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1954
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1457
struct aco_file stasis_conf
Definition: stasis.c:2250
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1675
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2265
static struct aco_type * threadpool_options[]
Definition: stasis.c:2237
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1339
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1120
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1196
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1860
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:781
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1605
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:969
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1742
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1538
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1286
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1177
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:883
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2331
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2174
struct aco_type * declined_options[]
Definition: stasis.c:2248
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:452
struct ao2_container * topic_all
Definition: stasis.c:422
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1256
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2361
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:344
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2240
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2444
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition: cli.h:171
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:503
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1977
struct ast_multi_object_blob::@398 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:1978
Structure for mutex and tracking information.
Definition: lock.h:139
Support for dynamic strings.
Definition: strings.h:623
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
An opaque threadpool structure.
Definition: threadpool.c:36
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2226
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2224
A structure to hold global configuration-related options.
Definition: stasis.c:2207
struct ao2_container * declined
Definition: stasis.c:2209
Forwarding information.
Definition: stasis.c:1558
struct stasis_topic * from_topic
Definition: stasis.c:1560
struct stasis_topic * to_topic
Definition: stasis.c:1562
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
int final_message_rxed
Definition: stasis.c:723
struct stasis_topic * topic
Definition: stasis.c:711
ast_cond_t join_cond
Definition: stasis.c:720
stasis_subscription_cb callback
Definition: stasis.c:715
struct stasis_subscription::@397 accepted_message_types
struct ast_taskprocessor * mailbox
Definition: stasis.c:713
enum stasis_subscription_message_filter filter
Definition: stasis.c:733
int final_message_processed
Definition: stasis.c:726
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:731
Threadpool configuration options.
Definition: stasis.c:2213
struct ao2_container * pool_container
Definition: stasis.c:1767
struct stasis_topic * pool_topic
Definition: stasis.c:1768
struct stasis_topic::@396 upstream_topics
char * name
Definition: stasis.c:413
int subscriber_id
Definition: stasis.c:410
struct stasis_topic::@395 subscribers
char * detail
Definition: stasis.c:416
struct timeval * creationtime
Definition: stasis.c:419
ast_cond_t cond
Definition: stasis.c:1303
void * task_data
Definition: stasis.c:1305
ast_mutex_t lock
Definition: stasis.c:1302
Definition: stasis.c:1736
struct stasis_topic * topic
Definition: stasis.c:1738
struct stasis_forward * forward
Definition: stasis.c:1737
char name[0]
Definition: stasis.c:1739
char buf[0]
Definition: stasis.c:432
struct timeval creationtime
Definition: stasis.c:430
char * name
Definition: stasis.c:427
char * detail
Definition: stasis.c:428
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_val a
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2297
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
FILE * out
Definition: utils/frame.c:33
static void statistics(void)
Definition: utils/frame.c:287
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
#define ARRAY_LEN(a)
Definition: utils.h:666
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668