Asterisk - The Open Source Telephony Project  GIT-master-a24979a
stasis.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
44 #include "asterisk/cli.h"
45 
46 /*** DOCUMENTATION
47  <managerEvent language="en_US" name="UserEvent">
48  <managerEventInstance class="EVENT_FLAG_USER">
49  <synopsis>A user defined event raised from the dialplan.</synopsis>
50  <syntax>
51  <channel_snapshot/>
52  <parameter name="UserEvent">
53  <para>The event name, as specified in the dialplan.</para>
54  </parameter>
55  </syntax>
56  <description>
57  <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58  </description>
59  <see-also>
60  <ref type="application">UserEvent</ref>
61  <ref type="managerEvent">UserEvent</ref>
62  </see-also>
63  </managerEventInstance>
64  </managerEvent>
65  <configInfo name="stasis" language="en_US">
66  <configFile name="stasis.conf">
67  <configObject name="threadpool">
68  <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69  <configOption name="initial_size" default="5">
70  <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71  </configOption>
72  <configOption name="idle_timeout_sec" default="20">
73  <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74  </configOption>
75  <configOption name="max_size" default="50">
76  <synopsis>Maximum number of threads in the threadpool.</synopsis>
77  </configOption>
78  </configObject>
79  <configObject name="declined_message_types">
80  <synopsis>Stasis message types for which to decline creation.</synopsis>
81  <configOption name="decline">
82  <synopsis>The message type to decline.</synopsis>
83  <description>
84  <para>This configuration option defines the name of the Stasis
85  message type that Asterisk is forbidden from creating and can be
86  specified as many times as necessary to achieve the desired result.</para>
87  <enumlist>
88  <enum name="stasis_app_recording_snapshot_type" />
89  <enum name="stasis_app_playback_snapshot_type" />
90  <enum name="stasis_test_message_type" />
91  <enum name="confbridge_start_type" />
92  <enum name="confbridge_end_type" />
93  <enum name="confbridge_join_type" />
94  <enum name="confbridge_leave_type" />
95  <enum name="confbridge_start_record_type" />
96  <enum name="confbridge_stop_record_type" />
97  <enum name="confbridge_mute_type" />
98  <enum name="confbridge_unmute_type" />
99  <enum name="confbridge_talking_type" />
100  <enum name="cel_generic_type" />
101  <enum name="ast_bridge_snapshot_type" />
102  <enum name="ast_bridge_merge_message_type" />
103  <enum name="ast_channel_entered_bridge_type" />
104  <enum name="ast_channel_left_bridge_type" />
105  <enum name="ast_blind_transfer_type" />
106  <enum name="ast_attended_transfer_type" />
107  <enum name="ast_endpoint_snapshot_type" />
108  <enum name="ast_endpoint_state_type" />
109  <enum name="ast_device_state_message_type" />
110  <enum name="ast_test_suite_message_type" />
111  <enum name="ast_mwi_state_type" />
112  <enum name="ast_mwi_vm_app_type" />
113  <enum name="ast_format_register_type" />
114  <enum name="ast_format_unregister_type" />
115  <enum name="ast_manager_get_generic_type" />
116  <enum name="ast_parked_call_type" />
117  <enum name="ast_channel_snapshot_type" />
118  <enum name="ast_channel_dial_type" />
119  <enum name="ast_channel_varset_type" />
120  <enum name="ast_channel_hangup_request_type" />
121  <enum name="ast_channel_dtmf_begin_type" />
122  <enum name="ast_channel_dtmf_end_type" />
123  <enum name="ast_channel_flash_type" />
124  <enum name="ast_channel_wink_type" />
125  <enum name="ast_channel_hold_type" />
126  <enum name="ast_channel_unhold_type" />
127  <enum name="ast_channel_chanspy_start_type" />
128  <enum name="ast_channel_chanspy_stop_type" />
129  <enum name="ast_channel_fax_type" />
130  <enum name="ast_channel_hangup_handler_type" />
131  <enum name="ast_channel_moh_start_type" />
132  <enum name="ast_channel_moh_stop_type" />
133  <enum name="ast_channel_monitor_start_type" />
134  <enum name="ast_channel_monitor_stop_type" />
135  <enum name="ast_channel_mixmonitor_start_type" />
136  <enum name="ast_channel_mixmonitor_stop_type" />
137  <enum name="ast_channel_mixmonitor_mute_type" />
138  <enum name="ast_channel_agent_login_type" />
139  <enum name="ast_channel_agent_logoff_type" />
140  <enum name="ast_channel_talking_start" />
141  <enum name="ast_channel_talking_stop" />
142  <enum name="ast_security_event_type" />
143  <enum name="ast_named_acl_change_type" />
144  <enum name="ast_local_bridge_type" />
145  <enum name="ast_local_optimization_begin_type" />
146  <enum name="ast_local_optimization_end_type" />
147  <enum name="stasis_subscription_change_type" />
148  <enum name="ast_multi_user_event_type" />
149  <enum name="stasis_cache_clear_type" />
150  <enum name="stasis_cache_update_type" />
151  <enum name="ast_network_change_type" />
152  <enum name="ast_system_registry_type" />
153  <enum name="ast_cc_available_type" />
154  <enum name="ast_cc_offertimerstart_type" />
155  <enum name="ast_cc_requested_type" />
156  <enum name="ast_cc_requestacknowledged_type" />
157  <enum name="ast_cc_callerstopmonitoring_type" />
158  <enum name="ast_cc_callerstartmonitoring_type" />
159  <enum name="ast_cc_callerrecalling_type" />
160  <enum name="ast_cc_recallcomplete_type" />
161  <enum name="ast_cc_failure_type" />
162  <enum name="ast_cc_monitorfailed_type" />
163  <enum name="ast_presence_state_message_type" />
164  <enum name="ast_rtp_rtcp_sent_type" />
165  <enum name="ast_rtp_rtcp_received_type" />
166  <enum name="ast_call_pickup_type" />
167  <enum name="aoc_s_type" />
168  <enum name="aoc_d_type" />
169  <enum name="aoc_e_type" />
170  <enum name="dahdichannel_type" />
171  <enum name="mcid_type" />
172  <enum name="session_timeout_type" />
173  <enum name="cdr_read_message_type" />
174  <enum name="cdr_write_message_type" />
175  <enum name="cdr_prop_write_message_type" />
176  <enum name="corosync_ping_message_type" />
177  <enum name="agi_exec_start_type" />
178  <enum name="agi_exec_end_type" />
179  <enum name="agi_async_start_type" />
180  <enum name="agi_async_exec_type" />
181  <enum name="agi_async_end_type" />
182  <enum name="queue_caller_join_type" />
183  <enum name="queue_caller_leave_type" />
184  <enum name="queue_caller_abandon_type" />
185  <enum name="queue_member_status_type" />
186  <enum name="queue_member_added_type" />
187  <enum name="queue_member_removed_type" />
188  <enum name="queue_member_pause_type" />
189  <enum name="queue_member_penalty_type" />
190  <enum name="queue_member_ringinuse_type" />
191  <enum name="queue_agent_called_type" />
192  <enum name="queue_agent_connect_type" />
193  <enum name="queue_agent_complete_type" />
194  <enum name="queue_agent_dump_type" />
195  <enum name="queue_agent_ringnoanswer_type" />
196  <enum name="meetme_join_type" />
197  <enum name="meetme_leave_type" />
198  <enum name="meetme_end_type" />
199  <enum name="meetme_mute_type" />
200  <enum name="meetme_talking_type" />
201  <enum name="meetme_talk_request_type" />
202  <enum name="appcdr_message_type" />
203  <enum name="forkcdr_message_type" />
204  <enum name="cdr_sync_message_type" />
205  </enumlist>
206  </description>
207  </configOption>
208  </configObject>
209  </configFile>
210  </configInfo>
211 ***/
212 
213 /*!
214  * \page stasis-impl Stasis Implementation Notes
215  *
216  * \par Reference counting
217  *
218  * Stasis introduces a number of objects, which are tightly related to one
219  * another. Because we rely on ref-counting for memory management, understanding
220  * these relationships is important to understanding this code.
221  *
222  * \code{.txt}
223  *
224  * stasis_topic <----> stasis_subscription
225  * ^ ^
226  * \ /
227  * \ /
228  * dispatch
229  * |
230  * |
231  * v
232  * stasis_message
233  * |
234  * |
235  * v
236  * stasis_message_type
237  *
238  * \endcode
239  *
240  * The most troubling thing in this chart is the cyclic reference between
241  * stasis_topic and stasis_subscription. This is both unfortunate, and
242  * necessary. Topics need the subscription in order to dispatch messages;
243  * subscriptions need the topics to unsubscribe and check subscription status.
244  *
245  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
246  * topic's reference to a subscription. When the subcription is destroyed, it
247  * will remove its reference to the topic.
248  *
249  * This means that until a subscription has be explicitly unsubscribed, it will
250  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
251  * The destructors of both have assertions regarding this to catch ref-counting
252  * problems where a subscription or topic has had an extra ao2_cleanup().
253  *
254  * The \ref dispatch_exec_sync object is a transient object, which is posted to
255  * a subscription's taskprocessor to send a message to the subscriber. They have
256  * short life cycles, allocated on one thread, destroyed on another.
257  *
258  * During shutdown, or the deletion of a domain object, there are a flurry of
259  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
260  * are processed. Any one of these cleanups could be the one to actually destroy
261  * a given object, so care must be taken to ensure that an object isn't
262  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
263  * that might happen when a RAII_VAR() goes out of scope.
264  *
265  * \par Typical life cycles
266  *
267  * \li stasis_topic - There are several topics which live for the duration of
268  * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
269  * are actually fed by shorter-lived topics whose lifetime is associated
270  * with some domain object (like ast_channel_topic() for a given
271  * ast_channel).
272  *
273  * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
274  * topics, for similar reasons.
275  *
276  * \li dispatch - Very short lived; just long enough to post a message to a
277  * subscriber.
278  *
279  * \li stasis_message - Short to intermediate lifetimes, but that is mostly
280  * irrelevant. Messages are strictly data and have no behavior associated
281  * with them, so it doesn't really matter if/when they are destroyed. By
282  * design, a component could hold a ref to a message forever without any
283  * ill consequences (aside from consuming more memory).
284  *
285  * \li stasis_message_type - Long life cycles, typically only destroyed on
286  * module unloading or _clean_ process exit.
287  *
288  * \par Subscriber shutdown sequencing
289  *
290  * Subscribers are sensitive to shutdown sequencing, specifically in how the
291  * reference message types. This is fully detailed on the wiki at
292  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
293  *
294  * In short, the lifetime of the \a data (and \a callback, if in a module) must
295  * be held until the stasis_subscription_final_message() has been received.
296  * Depending on the structure of the subscriber code, this can be handled by
297  * using stasis_subscription_final_message() to free resources on the final
298  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
299  * block until the unsubscribe has completed.
300  */
301 
302 /*! Initial size of the subscribers list. */
303 #define INITIAL_SUBSCRIBERS_MAX 4
304 
305 /*! The number of buckets to use for topic pools */
306 #define TOPIC_POOL_BUCKETS 57
307 
308 /*! Thread pool for topics that don't want a dedicated taskprocessor */
309 static struct ast_threadpool *threadpool;
310 
312 
313 #if defined(LOW_MEMORY)
314 
315 #define TOPIC_ALL_BUCKETS 257
316 
317 #else
318 
319 #define TOPIC_ALL_BUCKETS 997
320 
321 #endif
322 
323 #ifdef AST_DEVMODE
324 
325 /*! The number of buckets to use for topic statistics */
326 #define TOPIC_STATISTICS_BUCKETS 57
327 
328 /*! The number of buckets to use for subscription statistics */
329 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
330 
331 /*! Global container which stores statistics for topics */
332 static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
333 
334 /*! Global container which stores statistics for subscriptions */
335 static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
336 
337 /*! \internal */
338 struct stasis_message_type_statistics {
339  /*! \brief The number of messages of this published */
340  int published;
341  /*! \brief The number of messages of this that did not reach a subscriber */
342  int unused;
343  /*! \brief The stasis message type */
344  struct stasis_message_type *message_type;
345 };
346 
347 /*! Lock to protect the message types vector */
348 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
349 
350 /*! Vector containing message type information */
351 static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
352 
353 /*! \internal */
354 struct stasis_topic_statistics {
355  /*! \brief Highest time spent dispatching messages to subscribers */
356  long highest_time_dispatched;
357  /*! \brief Lowest time spent dispatching messages to subscribers */
358  long lowest_time_dispatched;
359  /*! \brief The number of messages that were not dispatched to any subscriber */
360  int messages_not_dispatched;
361  /*! \brief The number of messages that were dispatched to at least 1 subscriber */
362  int messages_dispatched;
363  /*! \brief The ids of the subscribers to this topic */
364  struct ao2_container *subscribers;
365  /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
366  struct stasis_topic *topic;
367  /*! \brief Name of the topic */
368  char name[0];
369 };
370 #endif
371 
372 /*! \internal */
373 struct stasis_topic {
374  /*! Variable length array of the subscribers */
376 
377  /*! Topics forwarding into this topic */
379 
380 #ifdef AST_DEVMODE
381  struct stasis_topic_statistics *statistics;
382 #endif
383 
384  /*! Unique incrementing integer for subscriber ids */
386 
387  /*! Name of the topic */
388  char *name;
389 
390  /*! Detail of the topic */
391  char *detail;
392 
393  /*! Creation time */
394  struct timeval *creationtime;
395 };
396 
398 
399 struct topic_proxy {
401 
402  char *name;
403  char *detail;
404 
405  struct timeval creationtime;
406 
407  char buf[0];
408 };
409 
413 
414 static void proxy_dtor(void *weakproxy, void *container)
415 {
416  ao2_unlink(container, weakproxy);
418 }
419 
420 /* Forward declarations for the tightly-coupled subscription object */
421 static int topic_add_subscription(struct stasis_topic *topic,
422  struct stasis_subscription *sub);
423 
424 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
425 
426 /*! \brief Lock two topics. */
427 #define topic_lock_both(topic1, topic2) \
428  do { \
429  ao2_lock(topic1); \
430  while (ao2_trylock(topic2)) { \
431  AO2_DEADLOCK_AVOIDANCE(topic1); \
432  } \
433  } while (0)
434 
435 static void topic_dtor(void *obj)
436 {
437  struct stasis_topic *topic = obj;
438 #ifdef AST_DEVMODE
439  struct ao2_container *topic_stats;
440 #endif
441 
442  ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
443  topic->name, topic->detail);
444 
445  /* Subscribers hold a reference to topics, so they should all be
446  * unsubscribed before we get here. */
447  ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
448 
449  AST_VECTOR_FREE(&topic->subscribers);
451  ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
452 
453 #ifdef AST_DEVMODE
454  if (topic->statistics) {
455  topic_stats = ao2_global_obj_ref(topic_statistics);
456  if (topic_stats) {
457  ao2_unlink(topic_stats, topic->statistics);
458  ao2_ref(topic_stats, -1);
459  }
460  ao2_ref(topic->statistics, -1);
461  }
462 #endif
463 }
464 
465 #ifdef AST_DEVMODE
466 static void topic_statistics_destroy(void *obj)
467 {
468  struct stasis_topic_statistics *statistics = obj;
469 
470  ao2_cleanup(statistics->subscribers);
471 }
472 
473 static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
474 {
475  struct stasis_topic_statistics *statistics;
476  RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
477 
478  if (!topic_stats) {
479  return NULL;
480  }
481 
482  statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
483  if (!statistics) {
484  return NULL;
485  }
486 
487  statistics->subscribers = ast_str_container_alloc(1);
488  if (!statistics->subscribers) {
489  ao2_ref(statistics, -1);
490  return NULL;
491  }
492 
493  /* This is strictly used for the pointer address when showing the topic */
494  statistics->topic = topic;
495  strcpy(statistics->name, topic->name); /* SAFE */
496  ao2_link(topic_stats, statistics);
497 
498  return statistics;
499 }
500 #endif
501 
502 static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
503 {
504  struct topic_proxy *proxy;
505  struct stasis_topic* topic_tmp;
506  size_t detail_len;
507 
508  if (!topic || !name || !strlen(name) || !detail) {
509  return -1;
510  }
511 
513 
514  topic_tmp = stasis_topic_get(name);
515  if (topic_tmp) {
516  ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
517  ao2_ref(topic_tmp, -1);
519 
520  return -1;
521  }
522 
523  detail_len = strlen(detail) + 1;
524 
525  proxy = ao2_t_weakproxy_alloc(
526  sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
527  if (!proxy) {
529 
530  return -1;
531  }
532 
533  /* set the proxy info */
534  proxy->name = proxy->buf;
535  proxy->detail = proxy->name + strlen(name) + 1;
536 
537  strcpy(proxy->name, name); /* SAFE */
538  ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
539  proxy->creationtime = ast_tvnow();
540 
541  /* We have exclusive access to proxy, no need for locking here. */
542  if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
543  ao2_cleanup(proxy);
545 
546  return -1;
547  }
548 
550  ao2_cleanup(proxy);
553 
554  return -1;
555  }
556 
557  /* setting the topic point to the proxy */
558  topic->name = proxy->name;
559  topic->detail = proxy->detail;
560  topic->creationtime = &(proxy->creationtime);
561 
563  ao2_ref(proxy, -1);
564 
566 
567  return 0;
568 }
569 
571  const char *name, const char* detail
572  )
573 {
574  struct stasis_topic *topic;
575  int res = 0;
576 
577  if (!name|| !strlen(name) || !detail) {
578  return NULL;
579  }
580  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
581 
582  topic = stasis_topic_get(name);
583  if (topic) {
584  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
585  name, detail);
586  return topic;
587  }
588 
589  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
590  if (!topic) {
591  return NULL;
592  }
593 
595  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
596  if (res) {
597  ao2_ref(topic, -1);
598  return NULL;
599  }
600 
601  /* link to the proxy */
602  if (link_topic_proxy(topic, name, detail)) {
603  ao2_ref(topic, -1);
604  return NULL;
605  }
606 
607 #ifdef AST_DEVMODE
608  topic->statistics = stasis_topic_statistics_create(topic);
609  if (!topic->statistics) {
610  ao2_ref(topic, -1);
611  return NULL;
612  }
613 #endif
614  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
615 
616  return topic;
617 }
618 
620 {
622 }
623 
624 struct stasis_topic *stasis_topic_get(const char *name)
625 {
627 }
628 
629 const char *stasis_topic_name(const struct stasis_topic *topic)
630 {
631  if (!topic) {
632  return NULL;
633  }
634  return topic->name;
635 }
636 
637 const char *stasis_topic_detail(const struct stasis_topic *topic)
638 {
639  if (!topic) {
640  return NULL;
641  }
642  return topic->detail;
643 }
644 
645 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
646 {
647  return AST_VECTOR_SIZE(&topic->subscribers);
648 }
649 
650 #ifdef AST_DEVMODE
651 struct stasis_subscription_statistics {
652  /*! \brief The filename where the subscription originates */
653  const char *file;
654  /*! \brief The function where the subscription originates */
655  const char *func;
656  /*! \brief Names of the topics we are subscribed to */
657  struct ao2_container *topics;
658  /*! \brief The message type that currently took the longest to process */
659  struct stasis_message_type *highest_time_message_type;
660  /*! \brief Highest time spent invoking a message */
661  long highest_time_invoked;
662  /*! \brief Lowest time spent invoking a message */
663  long lowest_time_invoked;
664  /*! \brief The number of messages that were filtered out */
665  int messages_dropped;
666  /*! \brief The number of messages that passed filtering */
667  int messages_passed;
668  /*! \brief Using a mailbox to queue messages */
669  int uses_mailbox;
670  /*! \brief Using stasis threadpool for handling messages */
671  int uses_threadpool;
672  /*! \brief The line number where the subscription originates */
673  int lineno;
674  /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
675  struct stasis_subscription *sub;
676  /*! \brief Unique ID of the subscription */
677  char uniqueid[0];
678 };
679 #endif
680 
681 /*! \internal */
683  /*! Unique ID for this subscription */
684  char *uniqueid;
685  /*! Topic subscribed to. */
687  /*! Mailbox for processing incoming messages. */
689  /*! Callback function for incoming message processing. */
691  /*! Data pointer to be handed to the callback. */
692  void *data;
693 
694  /*! Condition for joining with subscription. */
696  /*! Flag set when final message for sub has been received.
697  * Be sure join_lock is held before reading/setting. */
699  /*! Flag set when final message for sub has been processed.
700  * Be sure join_lock is held before reading/setting. */
702 
703  /*! The message types this subscription is accepting */
705  /*! The message formatters this subscription is accepting */
707  /*! The message filter currently in use */
709 
710 #ifdef AST_DEVMODE
711  /*! Statistics information */
712  struct stasis_subscription_statistics *statistics;
713 #endif
714 };
715 
716 static void subscription_dtor(void *obj)
717 {
718  struct stasis_subscription *sub = obj;
719 #ifdef AST_DEVMODE
720  struct ao2_container *subscription_stats;
721 #endif
722 
723  /* Subscriptions need to be manually unsubscribed before destruction
724  * b/c there's a cyclic reference between topics and subscriptions */
726  /* If there are any messages in flight to this subscription; that would
727  * be bad. */
729 
730  ast_free(sub->uniqueid);
731  ao2_cleanup(sub->topic);
732  sub->topic = NULL;
734  sub->mailbox = NULL;
735  ast_cond_destroy(&sub->join_cond);
736 
737  AST_VECTOR_FREE(&sub->accepted_message_types);
738 
739 #ifdef AST_DEVMODE
740  if (sub->statistics) {
741  subscription_stats = ao2_global_obj_ref(subscription_statistics);
742  if (subscription_stats) {
743  ao2_unlink(subscription_stats, sub->statistics);
744  ao2_ref(subscription_stats, -1);
745  }
746  ao2_ref(sub->statistics, -1);
747  }
748 #endif
749 }
750 
751 /*!
752  * \brief Invoke the subscription's callback.
753  * \param sub Subscription to invoke.
754  * \param message Message to send.
755  */
757  struct stasis_message *message)
758 {
759  unsigned int final = stasis_subscription_final_message(sub, message);
761 #ifdef AST_DEVMODE
762  struct timeval start;
763  long elapsed;
764 
765  start = ast_tvnow();
766 #endif
767 
768  /* Notify that the final message has been received */
769  if (final) {
770  ao2_lock(sub);
771  sub->final_message_rxed = 1;
772  ast_cond_signal(&sub->join_cond);
773  ao2_unlock(sub);
774  }
775 
776  /*
777  * If filtering is turned on and this is a 'final' message, we only invoke the callback
778  * if the subscriber accepts subscription_change message types.
779  */
780  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
781  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
782  /* Since sub is mostly immutable, no need to lock sub */
783  sub->callback(sub->data, sub, message);
784  }
785 
786  /* Notify that the final message has been processed */
787  if (final) {
788  ao2_lock(sub);
789  sub->final_message_processed = 1;
790  ast_cond_signal(&sub->join_cond);
791  ao2_unlock(sub);
792  }
793 
794 #ifdef AST_DEVMODE
795  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
796  if (elapsed > sub->statistics->highest_time_invoked) {
797  sub->statistics->highest_time_invoked = elapsed;
798  ao2_lock(sub->statistics);
799  sub->statistics->highest_time_message_type = stasis_message_type(message);
800  ao2_unlock(sub->statistics);
801  }
802  if (elapsed < sub->statistics->lowest_time_invoked) {
803  sub->statistics->lowest_time_invoked = elapsed;
804  }
805 #endif
806 }
807 
808 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
809 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
810 
812 {
813 }
814 
815 #ifdef AST_DEVMODE
816 static void subscription_statistics_destroy(void *obj)
817 {
818  struct stasis_subscription_statistics *statistics = obj;
819 
820  ao2_cleanup(statistics->topics);
821 }
822 
823 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
824  int needs_mailbox, int use_thread_pool, const char *file, int lineno,
825  const char *func)
826 {
827  struct stasis_subscription_statistics *statistics;
828  RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
829 
830  if (!subscription_stats) {
831  return NULL;
832  }
833 
834  statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
835  if (!statistics) {
836  return NULL;
837  }
838 
839  statistics->topics = ast_str_container_alloc(1);
840  if (!statistics->topics) {
841  ao2_ref(statistics, -1);
842  return NULL;
843  }
844 
845  statistics->file = file;
846  statistics->lineno = lineno;
847  statistics->func = func;
848  statistics->uses_mailbox = needs_mailbox;
849  statistics->uses_threadpool = use_thread_pool;
850  strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
851  statistics->sub = sub;
852  ao2_link(subscription_stats, statistics);
853 
854  return statistics;
855 }
856 #endif
857 
859  struct stasis_topic *topic,
861  void *data,
862  int needs_mailbox,
863  int use_thread_pool,
864  const char *file,
865  int lineno,
866  const char *func)
867 {
868  struct stasis_subscription *sub;
869  int ret;
870 
871  if (!topic) {
872  return NULL;
873  }
874 
875  /* The ao2 lock is used for join_cond. */
877  if (!sub) {
878  return NULL;
879  }
880 
881 #ifdef AST_DEVMODE
882  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
883  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
884  if (ret < 0 || !sub->statistics) {
885  ao2_ref(sub, -1);
886  return NULL;
887  }
888 #else
890  if (ret < 0) {
891  ao2_ref(sub, -1);
892  return NULL;
893  }
894 #endif
895 
896  if (needs_mailbox) {
897  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
898 
899  /* Create name with seq number appended. */
900  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
901  use_thread_pool ? 'p' : 'm',
903 
904  /*
905  * With a small number of subscribers, a thread-per-sub is
906  * acceptable. For a large number of subscribers, a thread
907  * pool should be used.
908  */
909  if (use_thread_pool) {
910  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
911  } else {
912  sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
913  }
914  if (!sub->mailbox) {
915  ao2_ref(sub, -1);
916 
917  return NULL;
918  }
920  /* Taskprocessor has a reference */
921  ao2_ref(sub, +1);
922  }
923 
924  ao2_ref(topic, +1);
925  sub->topic = topic;
926  sub->callback = callback;
927  sub->data = data;
928  ast_cond_init(&sub->join_cond, NULL);
930  AST_VECTOR_INIT(&sub->accepted_message_types, 0);
931  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
932 
933  if (topic_add_subscription(topic, sub) != 0) {
934  ao2_ref(sub, -1);
935  ao2_ref(topic, -1);
936 
937  return NULL;
938  }
940 
941  return sub;
942 }
943 
945  struct stasis_topic *topic,
947  void *data,
948  const char *file,
949  int lineno,
950  const char *func)
951 {
952  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
953 }
954 
956  struct stasis_topic *topic,
958  void *data,
959  const char *file,
960  int lineno,
961  const char *func)
962 {
963  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
964 }
965 
966 static int sub_cleanup(void *data)
967 {
968  struct stasis_subscription *sub = data;
969  ao2_cleanup(sub);
970  return 0;
971 }
972 
974 {
975  /* The subscription may be the last ref to this topic. Hold
976  * the topic ref open until after the unlock. */
977  struct stasis_topic *topic;
978 
979  if (!sub) {
980  return NULL;
981  }
982 
983  topic = ao2_bump(sub->topic);
984 
985  /* We have to remove the subscription first, to ensure the unsubscribe
986  * is the final message */
987  if (topic_remove_subscription(sub->topic, sub) != 0) {
989  "Internal error: subscription has invalid topic\n");
990  ao2_cleanup(topic);
991 
992  return NULL;
993  }
994 
995  /* Now let everyone know about the unsubscribe */
997 
998  /* When all that's done, remove the ref the mailbox has on the sub */
999  if (sub->mailbox) {
1000  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1001  /* Nothing we can do here, the conditional is just to keep
1002  * the compiler happy that we're not ignoring the result. */
1003  }
1004  }
1005 
1006  /* Unsubscribing unrefs the subscription */
1007  ao2_cleanup(sub);
1008  ao2_cleanup(topic);
1009 
1010  return NULL;
1011 }
1012 
1014  long low_water, long high_water)
1015 {
1016  int res = -1;
1017 
1018  if (subscription) {
1019  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1020  low_water, high_water);
1021  }
1022  return res;
1023 }
1024 
1026  const struct stasis_message_type *type)
1027 {
1028  if (!subscription) {
1029  return -1;
1030  }
1031 
1032  ast_assert(type != NULL);
1034 
1035  if (!type || !stasis_message_type_name(type)) {
1036  /* Filtering is unreliable as this message type is not yet initialized
1037  * so force all messages through.
1038  */
1040  return 0;
1041  }
1042 
1043  ao2_lock(subscription->topic);
1045  /* We do this for the same reason as above. The subscription can still operate, so allow
1046  * it to do so by forcing all messages through.
1047  */
1049  }
1050  ao2_unlock(subscription->topic);
1051 
1052  return 0;
1053 }
1054 
1056  const struct stasis_message_type *type)
1057 {
1058  if (!subscription) {
1059  return -1;
1060  }
1061 
1062  ast_assert(type != NULL);
1064 
1065  if (!type || !stasis_message_type_name(type)) {
1066  return 0;
1067  }
1068 
1069  ao2_lock(subscription->topic);
1071  /* The memory is already allocated so this can't fail */
1073  }
1074  ao2_unlock(subscription->topic);
1075 
1076  return 0;
1077 }
1078 
1081 {
1082  if (!subscription) {
1083  return -1;
1084  }
1085 
1086  ao2_lock(subscription->topic);
1087  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1088  subscription->filter = filter;
1089  }
1090  ao2_unlock(subscription->topic);
1091 
1092  return 0;
1093 }
1094 
1097 {
1098  ast_assert(subscription != NULL);
1099 
1100  ao2_lock(subscription->topic);
1101  subscription->accepted_formatters = formatters;
1102  ao2_unlock(subscription->topic);
1103 
1104  return;
1105 }
1106 
1108 {
1109  if (subscription) {
1110  ao2_lock(subscription);
1111  /* Wait until the processed flag has been set */
1112  while (!subscription->final_message_processed) {
1113  ast_cond_wait(&subscription->join_cond,
1114  ao2_object_get_lockaddr(subscription));
1115  }
1116  ao2_unlock(subscription);
1117  }
1118 }
1119 
1121 {
1122  if (subscription) {
1123  int ret;
1124 
1125  ao2_lock(subscription);
1126  ret = subscription->final_message_rxed;
1127  ao2_unlock(subscription);
1128 
1129  return ret;
1130  }
1131 
1132  /* Null subscription is about as done as you can get */
1133  return 1;
1134 }
1135 
1137  struct stasis_subscription *subscription)
1138 {
1139  if (!subscription) {
1140  return NULL;
1141  }
1142 
1143  /* Bump refcount to hold it past the unsubscribe */
1144  ao2_ref(subscription, +1);
1145  stasis_unsubscribe(subscription);
1146  stasis_subscription_join(subscription);
1147  /* Now decrement the refcount back */
1148  ao2_cleanup(subscription);
1149  return NULL;
1150 }
1151 
1153 {
1154  if (sub) {
1155  size_t i;
1156  struct stasis_topic *topic = sub->topic;
1157 
1158  ao2_lock(topic);
1159  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1160  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1161  ao2_unlock(topic);
1162  return 1;
1163  }
1164  }
1165  ao2_unlock(topic);
1166  }
1167 
1168  return 0;
1169 }
1170 
1172 {
1173  return sub->uniqueid;
1174 }
1175 
1177 {
1178  struct stasis_subscription_change *change;
1179 
1181  return 0;
1182  }
1183 
1184  change = stasis_message_data(msg);
1185  if (strcmp("Unsubscribe", change->description)) {
1186  return 0;
1187  }
1188 
1189  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1190  return 0;
1191  }
1192 
1193  return 1;
1194 }
1195 
1196 /*!
1197  * \brief Add a subscriber to a topic.
1198  * \param topic Topic
1199  * \param sub Subscriber
1200  * \return 0 on success
1201  * \return Non-zero on error
1202  */
1204 {
1205  size_t idx;
1206 
1207  ao2_lock(topic);
1208  /* The reference from the topic to the subscription is shared with
1209  * the owner of the subscription, which will explicitly unsubscribe
1210  * to release it.
1211  *
1212  * If we bumped the refcount here, the owner would have to unsubscribe
1213  * and cleanup, which is a bit awkward. */
1215 
1216  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1219  }
1220 
1221 #ifdef AST_DEVMODE
1223  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1224 #endif
1225 
1226  ao2_unlock(topic);
1227 
1228  return 0;
1229 }
1230 
1232 {
1233  size_t idx;
1234  int res;
1235 
1236  ao2_lock(topic);
1237  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1240  }
1243 
1244 #ifdef AST_DEVMODE
1245  if (!res) {
1247  ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1248  }
1249 #endif
1250 
1251  ao2_unlock(topic);
1252 
1253  return res;
1254 }
1255 
1256 /*!
1257  * \internal \brief Dispatch a message to a subscriber asynchronously
1258  * \param local \ref ast_taskprocessor_local object
1259  * \return 0
1260  */
1262 {
1263  struct stasis_subscription *sub = local->local_data;
1264  struct stasis_message *message = local->data;
1265 
1268 
1269  return 0;
1270 }
1271 
1272 /*!
1273  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1274  * a published message to a subscriber
1275  */
1280  void *task_data;
1281 };
1282 
1283 /*!
1284  * \internal \brief Dispatch a message to a subscriber synchronously
1285  * \param local \ref ast_taskprocessor_local object
1286  * \return 0
1287  */
1289 {
1290  struct stasis_subscription *sub = local->local_data;
1291  struct sync_task_data *std = local->data;
1292  struct stasis_message *message = std->task_data;
1293 
1296 
1297  ast_mutex_lock(&std->lock);
1298  std->complete = 1;
1299  ast_cond_signal(&std->cond);
1300  ast_mutex_unlock(&std->lock);
1301 
1302  return 0;
1303 }
1304 
1305 /*!
1306  * \internal \brief Dispatch a message to a subscriber
1307  * \param sub The subscriber to dispatch to
1308  * \param message The message to send
1309  * \param synchronous If non-zero, synchronize on the subscriber receiving
1310  * the message
1311  * \retval 0 if message was not dispatched
1312  * \retval 1 if message was dispatched
1313  */
1314 static unsigned int dispatch_message(struct stasis_subscription *sub,
1315  struct stasis_message *message,
1316  int synchronous)
1317 {
1319 
1320  /*
1321  * The 'do while' gives us an easy way to skip remaining logic once
1322  * we determine the message should be accepted.
1323  * The code looks more verbose than it needs to be but it optimizes
1324  * down very nicely. It's just easier to understand and debug this way.
1325  */
1326  do {
1327  struct stasis_message_type *message_type = stasis_message_type(message);
1328  int type_id = stasis_message_type_id(message_type);
1329  int type_filter_specified = 0;
1330  int formatter_filter_specified = 0;
1331  int type_filter_passed = 0;
1332  int formatter_filter_passed = 0;
1333 
1334  /* We always accept final messages so only run the filter logic if not final */
1335  if (is_final) {
1336  break;
1337  }
1338 
1339  type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1340  formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1341 
1342  /* Accept if no filters of either type were specified */
1343  if (!type_filter_specified && !formatter_filter_specified) {
1344  break;
1345  }
1346 
1347  type_filter_passed = type_filter_specified
1348  && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1349  && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1350 
1351  /*
1352  * Since the type and formatter filters are OR'd, we can skip
1353  * the formatter check if the type check passes.
1354  */
1355  if (type_filter_passed) {
1356  break;
1357  }
1358 
1359  formatter_filter_passed = formatter_filter_specified
1360  && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1361 
1362  if (formatter_filter_passed) {
1363  break;
1364  }
1365 
1366 #ifdef AST_DEVMODE
1367  ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1368 #endif
1369 
1370  return 0;
1371 
1372  } while (0);
1373 
1374 #ifdef AST_DEVMODE
1375  ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1376 #endif
1377 
1378  if (!sub->mailbox) {
1379  /* Dispatch directly */
1381  return 1;
1382  }
1383 
1384  /* Bump the message for the taskprocessor push. This will get de-ref'd
1385  * by the task processor callback.
1386  */
1387  ao2_bump(message);
1388  if (!synchronous) {
1390  /* Push failed; ugh. */
1391  ast_log(LOG_ERROR, "Dropping async dispatch\n");
1393  return 0;
1394  }
1395  } else {
1396  struct sync_task_data std;
1397 
1398  ast_mutex_init(&std.lock);
1399  ast_cond_init(&std.cond, NULL);
1400  std.complete = 0;
1401  std.task_data = message;
1402 
1403  if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
1404  /* Push failed; ugh. */
1405  ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1407  ast_mutex_destroy(&std.lock);
1408  ast_cond_destroy(&std.cond);
1409  return 0;
1410  }
1411 
1412  ast_mutex_lock(&std.lock);
1413  while (!std.complete) {
1414  ast_cond_wait(&std.cond, &std.lock);
1415  }
1416  ast_mutex_unlock(&std.lock);
1417 
1418  ast_mutex_destroy(&std.lock);
1419  ast_cond_destroy(&std.cond);
1420  }
1421 
1422  return 1;
1423 }
1424 
1425 /*!
1426  * \internal \brief Publish a message to a topic's subscribers
1427  * \brief topic The topic to publish to
1428  * \brief message The message to publish
1429  * \brief sync_sub An optional subscriber of the topic to publish synchronously
1430  * to
1431  */
1432 static void publish_msg(struct stasis_topic *topic,
1433  struct stasis_message *message, struct stasis_subscription *sync_sub)
1434 {
1435  size_t i;
1436 #ifdef AST_DEVMODE
1437  unsigned int dispatched = 0;
1439  struct stasis_message_type_statistics *statistics;
1440  struct timeval start;
1441  long elapsed;
1442 #endif
1443 
1444  ast_assert(topic != NULL);
1445  ast_assert(message != NULL);
1446 
1447 #ifdef AST_DEVMODE
1448  ast_mutex_lock(&message_type_statistics_lock);
1449  if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1450  struct stasis_message_type_statistics new_statistics = {
1451  .published = 0,
1452  };
1453  if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1454  ast_mutex_unlock(&message_type_statistics_lock);
1455  return;
1456  }
1457  }
1458  statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1459  statistics->message_type = stasis_message_type(message);
1460  ast_mutex_unlock(&message_type_statistics_lock);
1461 
1462  ast_atomic_fetchadd_int(&statistics->published, +1);
1463 #endif
1464 
1465  /* If there are no subscribers don't bother */
1466  if (!stasis_topic_subscribers(topic)) {
1467 #ifdef AST_DEVMODE
1468  ast_atomic_fetchadd_int(&statistics->unused, +1);
1469  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1470 #endif
1471  return;
1472  }
1473 
1474  /*
1475  * The topic may be unref'ed by the subscription invocation.
1476  * Make sure we hold onto a reference while dispatching.
1477  */
1478  ao2_ref(topic, +1);
1479 #ifdef AST_DEVMODE
1480  start = ast_tvnow();
1481 #endif
1482  ao2_lock(topic);
1483  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1485 
1486  ast_assert(sub != NULL);
1487 #ifdef AST_DEVMODE
1488  dispatched +=
1489 #endif
1490  dispatch_message(sub, message, (sub == sync_sub));
1491  }
1492  ao2_unlock(topic);
1493 
1494 #ifdef AST_DEVMODE
1495  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1496  if (elapsed > topic->statistics->highest_time_dispatched) {
1497  topic->statistics->highest_time_dispatched = elapsed;
1498  }
1499  if (elapsed < topic->statistics->lowest_time_dispatched) {
1500  topic->statistics->lowest_time_dispatched = elapsed;
1501  }
1502  if (dispatched) {
1503  ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1504  } else {
1505  ast_atomic_fetchadd_int(&statistics->unused, +1);
1506  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1507  }
1508 #endif
1509 
1510  ao2_ref(topic, -1);
1511 }
1512 
1514 {
1516 }
1517 
1519 {
1520  ast_assert(sub != NULL);
1521 
1522  publish_msg(sub->topic, message, sub);
1523 }
1524 
1525 /*!
1526  * \brief Forwarding information
1527  *
1528  * Any message posted to \a from_topic is forwarded to \a to_topic.
1529  *
1530  * In cases where both the \a from_topic and \a to_topic need to be locked,
1531  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1532  */
1534  /*! Originating topic */
1536  /*! Destination topic */
1538 };
1539 
1540 static void forward_dtor(void *obj)
1541 {
1542  struct stasis_forward *forward = obj;
1543 
1544  ao2_cleanup(forward->from_topic);
1545  forward->from_topic = NULL;
1546  ao2_cleanup(forward->to_topic);
1547  forward->to_topic = NULL;
1548 }
1549 
1551 {
1552  int idx;
1553  struct stasis_topic *from;
1554  struct stasis_topic *to;
1555 
1556  if (!forward) {
1557  return NULL;
1558  }
1559 
1560  from = forward->from_topic;
1561  to = forward->to_topic;
1562 
1563  if (from && to) {
1564  topic_lock_both(to, from);
1567 
1568  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1570  }
1571  ao2_unlock(from);
1572  ao2_unlock(to);
1573  }
1574 
1575  ao2_cleanup(forward);
1576 
1577  return NULL;
1578 }
1579 
1581  struct stasis_topic *to_topic)
1582 {
1583  int res;
1584  size_t idx;
1585  struct stasis_forward *forward;
1586 
1587  if (!from_topic || !to_topic) {
1588  return NULL;
1589  }
1590 
1591  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1592  if (!forward) {
1593  return NULL;
1594  }
1595 
1596  /* Forwards to ourselves are implicit. */
1597  if (to_topic == from_topic) {
1598  return forward;
1599  }
1600 
1601  forward->from_topic = ao2_bump(from_topic);
1602  forward->to_topic = ao2_bump(to_topic);
1603 
1606  if (res != 0) {
1609  ao2_ref(forward, -1);
1610  return NULL;
1611  }
1612 
1613  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1615  }
1618 
1619  return forward;
1620 }
1621 
1622 static void subscription_change_dtor(void *obj)
1623 {
1624  struct stasis_subscription_change *change = obj;
1625 
1626  ao2_cleanup(change->topic);
1627 }
1628 
1630 {
1631  size_t description_len = strlen(description) + 1;
1632  size_t uniqueid_len = strlen(uniqueid) + 1;
1633  struct stasis_subscription_change *change;
1634 
1635  change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1637  if (!change) {
1638  return NULL;
1639  }
1640 
1641  strcpy(change->description, description); /* SAFE */
1642  change->uniqueid = change->description + description_len;
1643  ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1644  ao2_ref(topic, +1);
1645  change->topic = topic;
1646 
1647  return change;
1648 }
1649 
1651 {
1652  struct stasis_subscription_change *change;
1653  struct stasis_message *msg;
1654 
1655  /* This assumes that we have already unsubscribed */
1657 
1659  return;
1660  }
1661 
1662  change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1663  if (!change) {
1664  return;
1665  }
1666 
1668  if (!msg) {
1669  ao2_cleanup(change);
1670  return;
1671  }
1672 
1673  stasis_publish(topic, msg);
1674  ao2_cleanup(msg);
1675  ao2_cleanup(change);
1676 }
1677 
1679  struct stasis_subscription *sub)
1680 {
1681  struct stasis_subscription_change *change;
1682  struct stasis_message *msg;
1683 
1684  /* This assumes that we have already unsubscribed */
1686 
1688  return;
1689  }
1690 
1691  change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1692  if (!change) {
1693  return;
1694  }
1695 
1697  if (!msg) {
1698  ao2_cleanup(change);
1699  return;
1700  }
1701 
1702  stasis_publish(topic, msg);
1703 
1704  /* Now we have to dispatch to the subscription itself */
1705  dispatch_message(sub, msg, 0);
1706 
1707  ao2_cleanup(msg);
1708  ao2_cleanup(change);
1709 }
1710 
1714  char name[0];
1715 };
1716 
1717 static void topic_pool_entry_dtor(void *obj)
1718 {
1719  struct topic_pool_entry *entry = obj;
1720 
1721  entry->forward = stasis_forward_cancel(entry->forward);
1722  ao2_cleanup(entry->topic);
1723  entry->topic = NULL;
1724 }
1725 
1726 static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1727 {
1729 
1730  topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1732  if (!topic_pool_entry) {
1733  return NULL;
1734  }
1735 
1736  strcpy(topic_pool_entry->name, topic_name); /* Safe */
1737 
1738  return topic_pool_entry;
1739 }
1740 
1744 };
1745 
1746 static void topic_pool_dtor(void *obj)
1747 {
1748  struct stasis_topic_pool *pool = obj;
1749 
1750 #ifdef AO2_DEBUG
1751  {
1752  char *container_name =
1753  ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1754  sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1755  ao2_container_unregister(container_name);
1756  }
1757 #endif
1758 
1759  ao2_cleanup(pool->pool_container);
1760  pool->pool_container = NULL;
1761  ao2_cleanup(pool->pool_topic);
1762  pool->pool_topic = NULL;
1763 }
1764 
1765 static int topic_pool_entry_hash(const void *obj, const int flags)
1766 {
1767  const struct topic_pool_entry *object;
1768  const char *key;
1769 
1770  switch (flags & OBJ_SEARCH_MASK) {
1771  case OBJ_SEARCH_KEY:
1772  key = obj;
1773  break;
1774  case OBJ_SEARCH_OBJECT:
1775  object = obj;
1776  key = object->name;
1777  break;
1778  default:
1779  /* Hash can only work on something with a full key. */
1780  ast_assert(0);
1781  return 0;
1782  }
1783  return ast_str_case_hash(key);
1784 }
1785 
1786 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1787 {
1788  const struct topic_pool_entry *object_left = obj;
1789  const struct topic_pool_entry *object_right = arg;
1790  const char *right_key = arg;
1791  int cmp;
1792 
1793  switch (flags & OBJ_SEARCH_MASK) {
1794  case OBJ_SEARCH_OBJECT:
1795  right_key = object_right->name;
1796  /* Fall through */
1797  case OBJ_SEARCH_KEY:
1798  cmp = strcasecmp(object_left->name, right_key);
1799  break;
1801  /* Not supported by container */
1802  ast_assert(0);
1803  cmp = -1;
1804  break;
1805  default:
1806  /*
1807  * What arg points to is specific to this traversal callback
1808  * and has no special meaning to astobj2.
1809  */
1810  cmp = 0;
1811  break;
1812  }
1813  if (cmp) {
1814  return 0;
1815  }
1816  /*
1817  * At this point the traversal callback is identical to a sorted
1818  * container.
1819  */
1820  return CMP_MATCH;
1821 }
1822 
1823 #ifdef AO2_DEBUG
1824 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1825 {
1826  struct topic_pool_entry *entry = v_obj;
1827 
1828  if (!entry) {
1829  return;
1830  }
1831  prnt(where, "%s", stasis_topic_name(entry->topic));
1832 }
1833 #endif
1834 
1836 {
1837  struct stasis_topic_pool *pool;
1838 
1840  if (!pool) {
1841  return NULL;
1842  }
1843 
1846  if (!pool->pool_container) {
1847  ao2_cleanup(pool);
1848  return NULL;
1849  }
1850 
1851 #ifdef AO2_DEBUG
1852  {
1853  char *container_name =
1854  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1855  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1856  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1857  }
1858 #endif
1859 
1860  ao2_ref(pooled_topic, +1);
1861  pool->pool_topic = pooled_topic;
1862 
1863  return pool;
1864 }
1865 
1866 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1867 {
1868  /*
1869  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1870  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1871  * name and search only on <topic_name>.
1872  */
1873  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1874  int pool_topic_name_len = strlen(pool_topic_name);
1875  const char *search_topic_name;
1876 
1877  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1878  search_topic_name = topic_name + pool_topic_name_len + 1;
1879  } else {
1880  search_topic_name = topic_name;
1881  }
1882 
1883  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1884 }
1885 
1886 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1887 {
1889  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1890  char *new_topic_name;
1891  int ret;
1892 
1894  if (topic_pool_entry) {
1895  return topic_pool_entry->topic;
1896  }
1897 
1899  if (!topic_pool_entry) {
1900  return NULL;
1901  }
1902 
1903  /* To provide further detail and to ensure that the topic is unique within the scope of the
1904  * system we prefix it with the pooling topic name, which should itself already be unique.
1905  */
1906  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1907  if (ret < 0) {
1908  return NULL;
1909  }
1910 
1911  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1912  ast_free(new_topic_name);
1913  if (!topic_pool_entry->topic) {
1914  return NULL;
1915  }
1916 
1918  if (!topic_pool_entry->forward) {
1919  return NULL;
1920  }
1921 
1923  return NULL;
1924  }
1925 
1926  return topic_pool_entry->topic;
1927 }
1928 
1929 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1930 {
1932 
1934  if (!topic_pool_entry) {
1935  return 0;
1936  }
1937 
1939  return 1;
1940 }
1941 
1943 {
1944 #ifdef AST_DEVMODE
1946  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1947  }
1948 #endif
1949 }
1950 
1951 /*! \brief A multi object blob data structure to carry user event stasis messages */
1953  struct ast_json *blob; /*< A blob of JSON data */
1954  AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1955 };
1956 
1957 /*!
1958  * \internal
1959  * \brief Destructor for \ref ast_multi_object_blob objects
1960  */
1961 static void multi_object_blob_dtor(void *obj)
1962 {
1963  struct ast_multi_object_blob *multi = obj;
1964  int type;
1965  int i;
1966 
1967  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1968  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1969  ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1970  }
1971  AST_VECTOR_FREE(&multi->snapshots[type]);
1972  }
1973  ast_json_unref(multi->blob);
1974 }
1975 
1976 /*! \brief Create a stasis user event multi object blob */
1978 {
1979  int type;
1980  struct ast_multi_object_blob *multi;
1981 
1982  ast_assert(blob != NULL);
1983 
1984  multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1985  if (!multi) {
1986  return NULL;
1987  }
1988 
1989  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1990  if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1991  ao2_ref(multi, -1);
1992 
1993  return NULL;
1994  }
1995  }
1996 
1997  multi->blob = ast_json_ref(blob);
1998 
1999  return multi;
2000 }
2001 
2002 /*! \brief Add an object (snapshot) to the blob */
2004  enum stasis_user_multi_object_snapshot_type type, void *object)
2005 {
2006  if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2007  ao2_cleanup(object);
2008  }
2009 }
2010 
2011 /*! \brief Publish single channel user event (for app_userevent compatibility) */
2013  struct stasis_message_type *type, struct ast_json *blob)
2014 {
2015  struct stasis_message *message;
2016  struct ast_channel_snapshot *channel_snapshot;
2017  struct ast_multi_object_blob *multi;
2018 
2019  if (!type) {
2020  return;
2021  }
2022 
2024  if (!multi) {
2025  return;
2026  }
2027 
2028  channel_snapshot = ast_channel_snapshot_create(chan);
2029  if (!channel_snapshot) {
2030  ao2_ref(multi, -1);
2031  return;
2032  }
2033 
2034  /* this call steals the channel_snapshot reference */
2035  ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2036 
2038  ao2_ref(multi, -1);
2039  if (message) {
2040  /* app_userevent still publishes to channel */
2042  ao2_ref(message, -1);
2043  }
2044 }
2045 
2046 /*! \internal \brief convert multi object blob to ari json */
2048  struct stasis_message *message,
2049  const struct stasis_message_sanitizer *sanitize)
2050 {
2051  struct ast_json *out;
2053  struct ast_json *blob = multi->blob;
2054  const struct timeval *tv = stasis_message_timestamp(message);
2056  int i;
2057 
2059  if (!out) {
2060  return NULL;
2061  }
2062 
2063  ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2064  ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2065  ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2066  ast_json_object_set(out, "userevent", ast_json_ref(blob));
2067 
2068  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2069  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2070  struct ast_json *json_object = NULL;
2071  char *name = NULL;
2072  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2073 
2074  switch (type) {
2075  case STASIS_UMOS_CHANNEL:
2076  json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2077  name = "channel";
2078  break;
2079  case STASIS_UMOS_BRIDGE:
2080  json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2081  name = "bridge";
2082  break;
2083  case STASIS_UMOS_ENDPOINT:
2084  json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2085  name = "endpoint";
2086  break;
2087  }
2088  if (json_object) {
2089  ast_json_object_set(out, name, json_object);
2090  }
2091  }
2092  }
2093 
2094  return out;
2095 }
2096 
2097 /*! \internal \brief convert multi object blob to ami string */
2098 static struct ast_str *multi_object_blob_to_ami(void *obj)
2099 {
2100  struct ast_str *ami_str=ast_str_create(1024);
2101  struct ast_str *ami_snapshot;
2102  const struct ast_multi_object_blob *multi = obj;
2104  int i;
2105 
2106  if (!ami_str) {
2107  return NULL;
2108  }
2109  if (!multi) {
2110  ast_free(ami_str);
2111  return NULL;
2112  }
2113 
2114  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2115  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2116  char *name = NULL;
2117  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2118  ami_snapshot = NULL;
2119 
2120  if (i > 0) {
2121  ast_asprintf(&name, "%d", i + 1);
2122  }
2123 
2124  switch (type) {
2125  case STASIS_UMOS_CHANNEL:
2126  ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2127  break;
2128 
2129  case STASIS_UMOS_BRIDGE:
2130  ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2131  break;
2132 
2133  case STASIS_UMOS_ENDPOINT:
2134  /* currently not sending endpoint snapshots to AMI */
2135  break;
2136  }
2137  if (ami_snapshot) {
2138  ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2139  ast_free(ami_snapshot);
2140  }
2141  ast_free(name);
2142  }
2143  }
2144 
2145  return ami_str;
2146 }
2147 
2148 /*! \internal \brief Callback to pass only user defined parameters from blob */
2149 static int userevent_exclusion_cb(const char *key)
2150 {
2151  if (!strcmp("eventname", key)) {
2152  return 1;
2153  }
2154  return 0;
2155 }
2156 
2158  struct stasis_message *message)
2159 {
2160  RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2161  RAII_VAR(struct ast_str *, body, NULL, ast_free);
2163  const char *eventname;
2164 
2165  eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2167  object_string = multi_object_blob_to_ami(multi);
2168  if (!object_string || !body) {
2169  return NULL;
2170  }
2171 
2172  return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2173  "%s"
2174  "UserEvent: %s\r\n"
2175  "%s",
2176  ast_str_buffer(object_string),
2177  eventname,
2178  ast_str_buffer(body));
2179 }
2180 
2181 /*! \brief A structure to hold global configuration-related options */
2183  /*! The list of message types to decline */
2185 };
2186 
2187 /*! \brief Threadpool configuration options */
2189  /*! Initial size of the thread pool */
2191  /*! Time, in seconds, before we expire a thread */
2193  /*! Maximum number of thread to allow */
2195 };
2196 
2198  /*! Thread pool configuration options */
2200  /*! Declined message types */
2202 };
2203 
2204 static struct aco_type threadpool_option = {
2205  .type = ACO_GLOBAL,
2206  .name = "threadpool",
2207  .item_offset = offsetof(struct stasis_config, threadpool_options),
2208  .category = "threadpool",
2209  .category_match = ACO_WHITELIST_EXACT,
2210 };
2211 
2213 
2214 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2215 static struct aco_type declined_option = {
2216  .type = ACO_GLOBAL,
2217  .name = "declined_message_types",
2218  .item_offset = offsetof(struct stasis_config, declined_message_types),
2219  .category_match = ACO_WHITELIST_EXACT,
2220  .category = "declined_message_types",
2221 };
2222 
2224 
2225 struct aco_file stasis_conf = {
2226  .filename = "stasis.conf",
2228 };
2229 
2230 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2232 
2233 static void *stasis_config_alloc(void);
2234 
2235 /*! \brief Register information about the configs being processed by this module */
2237  .files = ACO_FILES(&stasis_conf),
2238 );
2239 
2241 {
2242  struct stasis_declined_config *declined = obj;
2243 
2244  ao2_cleanup(declined->declined);
2245 }
2246 
2247 static void stasis_config_destructor(void *obj)
2248 {
2249  struct stasis_config *cfg = obj;
2250 
2253 }
2254 
2255 static void *stasis_config_alloc(void)
2256 {
2257  struct stasis_config *cfg;
2258 
2259  if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2260  return NULL;
2261  }
2262 
2263  cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2264  if (!cfg->threadpool_options) {
2265  ao2_ref(cfg, -1);
2266  return NULL;
2267  }
2268 
2271  if (!cfg->declined_message_types) {
2272  ao2_ref(cfg, -1);
2273  return NULL;
2274  }
2275 
2277  if (!cfg->declined_message_types->declined) {
2278  ao2_ref(cfg, -1);
2279  return NULL;
2280  }
2281 
2282  return cfg;
2283 }
2284 
2286 {
2287  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2288  char *name_in_declined;
2289  int res;
2290 
2291  if (!cfg || !cfg->declined_message_types) {
2292  ao2_cleanup(cfg);
2293  return 0;
2294  }
2295 
2296  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2297  res = name_in_declined ? 1 : 0;
2298  ao2_cleanup(name_in_declined);
2299  ao2_ref(cfg, -1);
2300  if (res) {
2301  ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2302  }
2303  return res;
2304 }
2305 
2306 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2307 {
2308  struct stasis_declined_config *declined = obj;
2309 
2310  if (ast_strlen_zero(var->value)) {
2311  return 0;
2312  }
2313 
2314  if (ast_str_container_add(declined->declined, var->value)) {
2315  return -1;
2316  }
2317 
2318  return 0;
2319 }
2320 
2321 /*!
2322  * @{ \brief Define multi user event message type(s).
2323  */
2324 
2326  .to_json = multi_user_event_to_json,
2328  );
2329 
2330 /*! @} */
2331 
2332 /*!
2333  * \internal
2334  * \brief CLI command implementation for 'stasis show topics'
2335  */
2336 static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2337 {
2338  struct ao2_iterator iter;
2339  struct topic_proxy *topic;
2340  struct ao2_container *tmp_container;
2341  int count = 0;
2342 #define FMT_HEADERS "%-64s %-64s\n"
2343 #define FMT_FIELDS "%-64s %-64s\n"
2344 
2345  switch (cmd) {
2346  case CLI_INIT:
2347  e->command = "stasis show topics";
2348  e->usage =
2349  "Usage: stasis show topics\n"
2350  " Shows a list of topics\n";
2351  return NULL;
2352  case CLI_GENERATE:
2353  return NULL;
2354  }
2355 
2356  if (a->argc != e->args) {
2357  return CLI_SHOWUSAGE;
2358  }
2359 
2360  ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2361 
2363  topic_proxy_sort_fn, NULL);
2364 
2365  if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2366  ao2_cleanup(tmp_container);
2367 
2368  return NULL;
2369  }
2370 
2371  /* getting all topic in order */
2372  iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2373  while ((topic = ao2_iterator_next(&iter))) {
2374  ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2375  ao2_ref(topic, -1);
2376  ++count;
2377  }
2378  ao2_iterator_destroy(&iter);
2379  ao2_cleanup(tmp_container);
2380 
2381  ast_cli(a->fd, "\n%d Total topics\n\n", count);
2382 
2383 #undef FMT_HEADERS
2384 #undef FMT_FIELDS
2385 
2386  return CLI_SUCCESS;
2387 }
2388 
2389 /*!
2390  * \internal
2391  * \brief CLI tab completion for topic names
2392  */
2393 static char *topic_complete_name(const char *word)
2394 {
2395  struct topic_proxy *topic;
2396  struct ao2_iterator it;
2397  int wordlen = strlen(word);
2398  int ret;
2399 
2400  it = ao2_iterator_init(topic_all, 0);
2401  while ((topic = ao2_iterator_next(&it))) {
2402  if (!strncasecmp(word, topic->name, wordlen)) {
2403  ret = ast_cli_completion_add(ast_strdup(topic->name));
2404  if (ret) {
2405  ao2_ref(topic, -1);
2406  break;
2407  }
2408  }
2409  ao2_ref(topic, -1);
2410  }
2411  ao2_iterator_destroy(&it);
2412  return NULL;
2413 }
2414 
2415 /*!
2416  * \internal
2417  * \brief CLI command implementation for 'stasis show topic'
2418  */
2419 static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2420 {
2421  struct stasis_topic *topic;
2422  char print_time[32];
2423  int i;
2424 
2425  switch (cmd) {
2426  case CLI_INIT:
2427  e->command = "stasis show topic";
2428  e->usage =
2429  "Usage: stasis show topic <name>\n"
2430  " Show stasis topic detail info.\n";
2431  return NULL;
2432  case CLI_GENERATE:
2433  if (a->pos == 3) {
2434  return topic_complete_name(a->word);
2435  } else {
2436  return NULL;
2437  }
2438  }
2439 
2440  if (a->argc != 4) {
2441  return CLI_SHOWUSAGE;
2442  }
2443 
2444  topic = stasis_topic_get(a->argv[3]);
2445  if (!topic) {
2446  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2447  return CLI_FAILURE;
2448  }
2449 
2450  ast_cli(a->fd, "Name: %s\n", topic->name);
2451  ast_cli(a->fd, "Detail: %s\n", topic->detail);
2452  ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2453  ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2454  ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2455  ast_cli(a->fd, "Duration time: %s\n", print_time);
2456 
2457  ao2_lock(topic);
2458  ast_cli(a->fd, "\nSubscribers:\n");
2459  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2460  struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2461  ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2462  subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2463  }
2464 
2465  ast_cli(a->fd, "\nForwarded topics:\n");
2466  for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2467  struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2468  ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2469  }
2470  ao2_unlock(topic);
2471 
2472  ao2_ref(topic, -1);
2473 
2474  return CLI_SUCCESS;
2475 }
2476 
2477 
2478 static struct ast_cli_entry cli_stasis[] = {
2479  AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2480  AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2481 };
2482 
2483 
2484 #ifdef AST_DEVMODE
2485 
2486 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2487 
2488 /*!
2489  * \internal
2490  * \brief CLI command implementation for 'stasis statistics show subscriptions'
2491  */
2492 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2493 {
2494  struct ao2_container *sorted_subscriptions;
2495  struct ao2_container *subscription_stats;
2496  struct ao2_iterator iter;
2497  struct stasis_subscription_statistics *statistics;
2498  int count = 0;
2499  int dropped = 0;
2500  int passed = 0;
2501 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2502 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2503 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2504 
2505  switch (cmd) {
2506  case CLI_INIT:
2507  e->command = "stasis statistics show subscriptions";
2508  e->usage =
2509  "Usage: stasis statistics show subscriptions\n"
2510  " Shows a list of subscriptions and their general statistics\n";
2511  return NULL;
2512  case CLI_GENERATE:
2513  return NULL;
2514  }
2515 
2516  if (a->argc != e->args) {
2517  return CLI_SHOWUSAGE;
2518  }
2519 
2520  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2521  if (!subscription_stats) {
2522  ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2523  return CLI_FAILURE;
2524  }
2525 
2526  sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2527  stasis_subscription_statistics_sort_fn, NULL);
2528  if (!sorted_subscriptions) {
2529  ao2_ref(subscription_stats, -1);
2530  ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2531  return CLI_SUCCESS;
2532  }
2533 
2534  if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2535  ao2_ref(sorted_subscriptions, -1);
2536  ao2_ref(subscription_stats, -1);
2537  ast_cli(a->fd, "Could not sort subscription statistics\n");
2538  return CLI_SUCCESS;
2539  }
2540 
2541  ao2_ref(subscription_stats, -1);
2542 
2543  ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2544 
2545  iter = ao2_iterator_init(sorted_subscriptions, 0);
2546  while ((statistics = ao2_iterator_next(&iter))) {
2547  ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2548  statistics->lowest_time_invoked, statistics->highest_time_invoked);
2549  dropped += statistics->messages_dropped;
2550  passed += statistics->messages_passed;
2551  ao2_ref(statistics, -1);
2552  ++count;
2553  }
2554  ao2_iterator_destroy(&iter);
2555 
2556  ao2_ref(sorted_subscriptions, -1);
2557 
2558  ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2559  ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2560 
2561 #undef FMT_HEADERS
2562 #undef FMT_FIELDS
2563 #undef FMT_FIELDS2
2564 
2565  return CLI_SUCCESS;
2566 }
2567 
2568 /*!
2569  * \internal
2570  * \brief CLI tab completion for subscription statistics names
2571  */
2572 static char *subscription_statistics_complete_name(const char *word, int state)
2573 {
2574  struct stasis_subscription_statistics *statistics;
2575  struct ao2_container *subscription_stats;
2576  struct ao2_iterator it_statistics;
2577  int wordlen = strlen(word);
2578  int which = 0;
2579  char *result = NULL;
2580 
2581  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2582  if (!subscription_stats) {
2583  return result;
2584  }
2585 
2586  it_statistics = ao2_iterator_init(subscription_stats, 0);
2587  while ((statistics = ao2_iterator_next(&it_statistics))) {
2588  if (!strncasecmp(word, statistics->uniqueid, wordlen)
2589  && ++which > state) {
2590  result = ast_strdup(statistics->uniqueid);
2591  }
2592  ao2_ref(statistics, -1);
2593  if (result) {
2594  break;
2595  }
2596  }
2597  ao2_iterator_destroy(&it_statistics);
2598  ao2_ref(subscription_stats, -1);
2599  return result;
2600 }
2601 
2602 /*!
2603  * \internal
2604  * \brief CLI command implementation for 'stasis statistics show subscription'
2605  */
2606 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2607 {
2608  struct stasis_subscription_statistics *statistics;
2609  struct ao2_container *subscription_stats;
2610  struct ao2_iterator i;
2611  char *name;
2612 
2613  switch (cmd) {
2614  case CLI_INIT:
2615  e->command = "stasis statistics show subscription";
2616  e->usage =
2617  "Usage: stasis statistics show subscription <uniqueid>\n"
2618  " Show stasis subscription statistics.\n";
2619  return NULL;
2620  case CLI_GENERATE:
2621  if (a->pos == 4) {
2622  return subscription_statistics_complete_name(a->word, a->n);
2623  } else {
2624  return NULL;
2625  }
2626  }
2627 
2628  if (a->argc != 5) {
2629  return CLI_SHOWUSAGE;
2630  }
2631 
2632  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2633  if (!subscription_stats) {
2634  ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2635  return CLI_FAILURE;
2636  }
2637 
2638  statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2639  if (!statistics) {
2640  ao2_ref(subscription_stats, -1);
2641  ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2642  return CLI_FAILURE;
2643  }
2644 
2645  ao2_ref(subscription_stats, -1);
2646 
2647  ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2648  ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2649  ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2650  ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2651  ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2652  ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2653  ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2654  ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2655  ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2656  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2657  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2658 
2660  if (statistics->highest_time_message_type) {
2661  ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2662  }
2664 
2665  ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2666 
2667  ast_cli(a->fd, "Subscribed topics:\n");
2668  i = ao2_iterator_init(statistics->topics, 0);
2669  while ((name = ao2_iterator_next(&i))) {
2670  ast_cli(a->fd, "\t%s\n", name);
2671  ao2_ref(name, -1);
2672  }
2674 
2675  ao2_ref(statistics, -1);
2676 
2677  return CLI_SUCCESS;
2678 }
2679 
2680 AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2681 
2682 /*!
2683  * \internal
2684  * \brief CLI command implementation for 'stasis statistics show topics'
2685  */
2686 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2687 {
2688  struct ao2_container *sorted_topics;
2689  struct ao2_container *topic_stats;
2690  struct ao2_iterator iter;
2691  struct stasis_topic_statistics *statistics;
2692  int count = 0;
2693  int not_dispatched = 0;
2694  int dispatched = 0;
2695 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2696 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2697 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2698 
2699  switch (cmd) {
2700  case CLI_INIT:
2701  e->command = "stasis statistics show topics";
2702  e->usage =
2703  "Usage: stasis statistics show topics\n"
2704  " Shows a list of topics and their general statistics\n";
2705  return NULL;
2706  case CLI_GENERATE:
2707  return NULL;
2708  }
2709 
2710  if (a->argc != e->args) {
2711  return CLI_SHOWUSAGE;
2712  }
2713 
2714  topic_stats = ao2_global_obj_ref(topic_statistics);
2715  if (!topic_stats) {
2716  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2717  return CLI_FAILURE;
2718  }
2719 
2721  stasis_topic_statistics_sort_fn, NULL);
2722  if (!sorted_topics) {
2723  ao2_ref(topic_stats, -1);
2724  ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2725  return CLI_SUCCESS;
2726  }
2727 
2728  if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2729  ao2_ref(sorted_topics, -1);
2730  ao2_ref(topic_stats, -1);
2731  ast_cli(a->fd, "Could not sort topic statistics\n");
2732  return CLI_SUCCESS;
2733  }
2734 
2735  ao2_ref(topic_stats, -1);
2736 
2737  ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2738 
2739  iter = ao2_iterator_init(sorted_topics, 0);
2740  while ((statistics = ao2_iterator_next(&iter))) {
2741  ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2742  statistics->messages_not_dispatched, statistics->messages_dispatched,
2743  statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2744  not_dispatched += statistics->messages_not_dispatched;
2745  dispatched += statistics->messages_dispatched;
2746  ao2_ref(statistics, -1);
2747  ++count;
2748  }
2749  ao2_iterator_destroy(&iter);
2750 
2751  ao2_ref(sorted_topics, -1);
2752 
2753  ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2754  ast_cli(a->fd, "\n%d topics\n\n", count);
2755 
2756 #undef FMT_HEADERS
2757 #undef FMT_FIELDS
2758 #undef FMT_FIELDS2
2759 
2760  return CLI_SUCCESS;
2761 }
2762 
2763 /*!
2764  * \internal
2765  * \brief CLI tab completion for topic statistics names
2766  */
2767 static char *topic_statistics_complete_name(const char *word, int state)
2768 {
2769  struct stasis_topic_statistics *statistics;
2770  struct ao2_container *topic_stats;
2771  struct ao2_iterator it_statistics;
2772  int wordlen = strlen(word);
2773  int which = 0;
2774  char *result = NULL;
2775 
2776  topic_stats = ao2_global_obj_ref(topic_statistics);
2777  if (!topic_stats) {
2778  return result;
2779  }
2780 
2781  it_statistics = ao2_iterator_init(topic_stats, 0);
2782  while ((statistics = ao2_iterator_next(&it_statistics))) {
2783  if (!strncasecmp(word, statistics->name, wordlen)
2784  && ++which > state) {
2785  result = ast_strdup(statistics->name);
2786  }
2787  ao2_ref(statistics, -1);
2788  if (result) {
2789  break;
2790  }
2791  }
2792  ao2_iterator_destroy(&it_statistics);
2793  ao2_ref(topic_stats, -1);
2794  return result;
2795 }
2796 
2797 /*!
2798  * \internal
2799  * \brief CLI command implementation for 'stasis statistics show topic'
2800  */
2801 static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2802 {
2803  struct stasis_topic_statistics *statistics;
2804  struct ao2_container *topic_stats;
2805  struct ao2_iterator i;
2806  char *uniqueid;
2807 
2808  switch (cmd) {
2809  case CLI_INIT:
2810  e->command = "stasis statistics show topic";
2811  e->usage =
2812  "Usage: stasis statistics show topic <name>\n"
2813  " Show stasis topic statistics.\n";
2814  return NULL;
2815  case CLI_GENERATE:
2816  if (a->pos == 4) {
2817  return topic_statistics_complete_name(a->word, a->n);
2818  } else {
2819  return NULL;
2820  }
2821  }
2822 
2823  if (a->argc != 5) {
2824  return CLI_SHOWUSAGE;
2825  }
2826 
2827  topic_stats = ao2_global_obj_ref(topic_statistics);
2828  if (!topic_stats) {
2829  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2830  return CLI_FAILURE;
2831  }
2832 
2833  statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2834  if (!statistics) {
2835  ao2_ref(topic_stats, -1);
2836  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2837  return CLI_FAILURE;
2838  }
2839 
2840  ao2_ref(topic_stats, -1);
2841 
2842  ast_cli(a->fd, "Topic: %s\n", statistics->name);
2843  ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2844  ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2845  ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2846  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2847  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2848  ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2849 
2850  ast_cli(a->fd, "Subscribers:\n");
2851  i = ao2_iterator_init(statistics->subscribers, 0);
2852  while ((uniqueid = ao2_iterator_next(&i))) {
2853  ast_cli(a->fd, "\t%s\n", uniqueid);
2854  ao2_ref(uniqueid, -1);
2855  }
2857 
2858  ao2_ref(statistics, -1);
2859 
2860  return CLI_SUCCESS;
2861 }
2862 
2863 /*!
2864  * \internal
2865  * \brief CLI command implementation for 'stasis statistics show messages'
2866  */
2867 static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2868 {
2869  int i;
2870  int count = 0;
2871  int published = 0;
2872  int unused = 0;
2873 #define FMT_HEADERS "%-64s %10s %10s\n"
2874 #define FMT_FIELDS "%-64s %10d %10d\n"
2875 
2876  switch (cmd) {
2877  case CLI_INIT:
2878  e->command = "stasis statistics show messages";
2879  e->usage =
2880  "Usage: stasis statistics show messages\n"
2881  " Shows a list of message types and their general statistics\n";
2882  return NULL;
2883  case CLI_GENERATE:
2884  return NULL;
2885  }
2886 
2887  if (a->argc != e->args) {
2888  return CLI_SHOWUSAGE;
2889  }
2890 
2891  ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2892 
2893  ast_mutex_lock(&message_type_statistics_lock);
2894  for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2895  struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2896 
2897  if (!statistics->message_type) {
2898  continue;
2899  }
2900 
2901  ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2902  statistics->unused);
2903  published += statistics->published;
2904  unused += statistics->unused;
2905  ++count;
2906  }
2907  ast_mutex_unlock(&message_type_statistics_lock);
2908 
2909  ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2910  ast_cli(a->fd, "\n%d seen message types\n\n", count);
2911 
2912 #undef FMT_HEADERS
2913 #undef FMT_FIELDS
2914 
2915  return CLI_SUCCESS;
2916 }
2917 
2918 static struct ast_cli_entry cli_stasis_statistics[] = {
2919  AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2920  AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2921  AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2922  AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2923  AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2924 };
2925 
2926 static int subscription_statistics_hash(const void *obj, const int flags)
2927 {
2928  const struct stasis_subscription_statistics *object;
2929  const char *key;
2930 
2931  switch (flags & OBJ_SEARCH_MASK) {
2932  case OBJ_SEARCH_KEY:
2933  key = obj;
2934  break;
2935  case OBJ_SEARCH_OBJECT:
2936  object = obj;
2937  key = object->uniqueid;
2938  break;
2939  default:
2940  /* Hash can only work on something with a full key. */
2941  ast_assert(0);
2942  return 0;
2943  }
2944  return ast_str_case_hash(key);
2945 }
2946 
2947 static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2948 {
2949  const struct stasis_subscription_statistics *object_left = obj;
2950  const struct stasis_subscription_statistics *object_right = arg;
2951  const char *right_key = arg;
2952  int cmp;
2953 
2954  switch (flags & OBJ_SEARCH_MASK) {
2955  case OBJ_SEARCH_OBJECT:
2956  right_key = object_right->uniqueid;
2957  /* Fall through */
2958  case OBJ_SEARCH_KEY:
2959  cmp = strcasecmp(object_left->uniqueid, right_key);
2960  break;
2962  /* Not supported by container */
2963  ast_assert(0);
2964  cmp = -1;
2965  break;
2966  default:
2967  /*
2968  * What arg points to is specific to this traversal callback
2969  * and has no special meaning to astobj2.
2970  */
2971  cmp = 0;
2972  break;
2973  }
2974  if (cmp) {
2975  return 0;
2976  }
2977  /*
2978  * At this point the traversal callback is identical to a sorted
2979  * container.
2980  */
2981  return CMP_MATCH;
2982 }
2983 
2984 static int topic_statistics_hash(const void *obj, const int flags)
2985 {
2986  const struct stasis_topic_statistics *object;
2987  const char *key;
2988 
2989  switch (flags & OBJ_SEARCH_MASK) {
2990  case OBJ_SEARCH_KEY:
2991  key = obj;
2992  break;
2993  case OBJ_SEARCH_OBJECT:
2994  object = obj;
2995  key = object->name;
2996  break;
2997  default:
2998  /* Hash can only work on something with a full key. */
2999  ast_assert(0);
3000  return 0;
3001  }
3002  return ast_str_case_hash(key);
3003 }
3004 
3005 static int topic_statistics_cmp(void *obj, void *arg, int flags)
3006 {
3007  const struct stasis_topic_statistics *object_left = obj;
3008  const struct stasis_topic_statistics *object_right = arg;
3009  const char *right_key = arg;
3010  int cmp;
3011 
3012  switch (flags & OBJ_SEARCH_MASK) {
3013  case OBJ_SEARCH_OBJECT:
3014  right_key = object_right->name;
3015  /* Fall through */
3016  case OBJ_SEARCH_KEY:
3017  cmp = strcasecmp(object_left->name, right_key);
3018  break;
3020  /* Not supported by container */
3021  ast_assert(0);
3022  cmp = -1;
3023  break;
3024  default:
3025  /*
3026  * What arg points to is specific to this traversal callback
3027  * and has no special meaning to astobj2.
3028  */
3029  cmp = 0;
3030  break;
3031  }
3032  if (cmp) {
3033  return 0;
3034  }
3035  /*
3036  * At this point the traversal callback is identical to a sorted
3037  * container.
3038  */
3039  return CMP_MATCH;
3040 }
3041 #endif
3042 
3043 /*! \brief Cleanup function for graceful shutdowns */
3044 static void stasis_cleanup(void)
3045 {
3046 #ifdef AST_DEVMODE
3047  ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3048  AST_VECTOR_FREE(&message_type_statistics);
3049  ao2_global_obj_release(subscription_statistics);
3050  ao2_global_obj_release(topic_statistics);
3051 #endif
3054  topic_all = NULL;
3056  threadpool = NULL;
3059  aco_info_destroy(&cfg_info);
3061 }
3062 
3063 int stasis_init(void)
3064 {
3065  struct stasis_config *cfg;
3066  int cache_init;
3067  struct ast_threadpool_options threadpool_opts = { 0, };
3068 #ifdef AST_DEVMODE
3069  struct ao2_container *subscription_stats;
3070  struct ao2_container *topic_stats;
3071 #endif
3072 
3073  /* Be sure the types are cleaned up after the message bus */
3075 
3076  if (aco_info_init(&cfg_info)) {
3077  return -1;
3078  }
3079 
3080  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3082  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3084  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3085  INT_MAX);
3086  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3088  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3089  INT_MAX);
3090  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3092  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3093  INT_MAX);
3094 
3095  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3096  struct stasis_config *default_cfg = stasis_config_alloc();
3097 
3098  if (!default_cfg) {
3099  return -1;
3100  }
3101 
3102  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3103  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3104  ao2_ref(default_cfg, -1);
3105 
3106  return -1;
3107  }
3108 
3109  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3110  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3111  ao2_ref(default_cfg, -1);
3112 
3113  return -1;
3114  }
3115 
3116  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3117  ao2_global_obj_replace_unref(globals, default_cfg);
3118  cfg = default_cfg;
3119  } else {
3120  cfg = ao2_global_obj_ref(globals);
3121  if (!cfg) {
3122  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3123 
3124  return -1;
3125  }
3126  }
3127 
3128  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3129  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3130  threadpool_opts.auto_increment = 1;
3131  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3132  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3133  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3134  ao2_ref(cfg, -1);
3135  if (!threadpool) {
3136  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3137 
3138  return -1;
3139  }
3140 
3141  cache_init = stasis_cache_init();
3142  if (cache_init != 0) {
3143  return -1;
3144  }
3145 
3147  return -1;
3148  }
3150  return -1;
3151  }
3152 
3154  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3155  if (!topic_all) {
3156  return -1;
3157  }
3158 
3160  return -1;
3161  }
3162 
3163 #ifdef AST_DEVMODE
3164  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3165  * topic or subscripton.
3166  */
3167  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3168  subscription_statistics_hash, 0, subscription_statistics_cmp);
3169  if (!subscription_stats) {
3170  return -1;
3171  }
3172  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3173  ao2_cleanup(subscription_stats);
3174 
3175  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3176  topic_statistics_hash, 0, topic_statistics_cmp);
3177  if (!topic_stats) {
3178  return -1;
3179  }
3180  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3181  ao2_cleanup(topic_stats);
3182  if (!topic_stats) {
3183  return -1;
3184  }
3185 
3186  AST_VECTOR_INIT(&message_type_statistics, 0);
3187 
3188  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3189  return -1;
3190  }
3191 #endif
3192 
3193  return 0;
3194 }
#define var
Definition: ast_expr2f.c:614
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static PGresult * result
Definition: cel_pgsql.c:84
static struct console_pvt globals
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2760
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
short word
Configuration option-handling.
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
#define ACO_FILES(...)
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:734
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1828
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1977
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2003
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2012
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:389
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_unlock(a)
Definition: lock.h:188
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
pthread_cond_t ast_cond_t
Definition: lock.h:176
#define ast_mutex_destroy(a)
Definition: lock.h:186
#define ast_mutex_lock(a)
Definition: lock.h:187
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:518
#define ast_cond_signal(cond)
Definition: lock.h:201
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9760
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
#define EVENT_FLAG_USER
Definition: manager.h:81
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ao2_container * container
Definition: res_fax.c:501
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:624
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2336
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2157
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2478
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:303
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1622
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1866
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1886
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1055
#define FMT_HEADERS
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1580
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1288
#define FMT_FIELDS
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1726
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
static int sub_cleanup(void *data)
Definition: stasis.c:966
static void forward_dtor(void *obj)
Definition: stasis.c:1540
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2047
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:645
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1746
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1518
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
static void * stasis_config_alloc(void)
Definition: stasis.c:2255
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
static void topic_dtor(void *obj)
Definition: stasis.c:435
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:306
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1550
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3044
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2098
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3063
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:414
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:811
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1786
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2247
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
static char * topic_complete_name(const char *word)
Definition: stasis.c:2393
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:570
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2285
static struct ast_threadpool * threadpool
Definition: stasis.c:309
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:637
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1765
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1942
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2419
static struct aco_type threadpool_option
Definition: stasis.c:2204
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1961
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:502
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1203
static void subscription_dtor(void *obj)
Definition: stasis.c:716
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1678
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1629
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1929
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1432
struct aco_file stasis_conf
Definition: stasis.c:2225
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1650
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2240
static struct aco_type * threadpool_options[]
Definition: stasis.c:2212
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1314
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:756
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:973
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1717
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1513
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1261
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:619
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2306
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2149
struct aco_type * declined_options[]
Definition: stasis.c:2223
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:427
struct ao2_container * topic_all
Definition: stasis.c:397
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1835
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1231
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:629
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:319
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2215
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_cache_init(void)
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Endpoint abstractions.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Internal Stasis APIs.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1117
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:739
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1343
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:640
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1281
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:406
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:222
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:206
The representation of a single configuration file to be processed.
const char * filename
Type information about a category-level configurable object.
enum aco_type_t type
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
Definition: cli.h:171
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:490
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1952
struct ast_multi_object_blob::@422 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:1953
Structure for mutex and tracking information.
Definition: lock.h:135
Support for dynamic strings.
Definition: strings.h:604
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
An opaque threadpool structure.
Definition: threadpool.c:36
Structure for variables, used for configurations and for channel variables.
Definition: search.h:40
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2201
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2199
A structure to hold global configuration-related options.
Definition: stasis.c:2182
struct ao2_container * declined
Definition: stasis.c:2184
Forwarding information.
Definition: stasis.c:1533
struct stasis_topic * from_topic
Definition: stasis.c:1535
struct stasis_topic * to_topic
Definition: stasis.c:1537
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
int final_message_rxed
Definition: stasis.c:698
struct stasis_topic * topic
Definition: stasis.c:686
struct stasis_subscription::@421 accepted_message_types
ast_cond_t join_cond
Definition: stasis.c:695
stasis_subscription_cb callback
Definition: stasis.c:690
struct ast_taskprocessor * mailbox
Definition: stasis.c:688
enum stasis_subscription_message_filter filter
Definition: stasis.c:708
int final_message_processed
Definition: stasis.c:701
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:706
Threadpool configuration options.
Definition: stasis.c:2188
struct ao2_container * pool_container
Definition: stasis.c:1742
struct stasis_topic * pool_topic
Definition: stasis.c:1743
char * name
Definition: stasis.c:388
struct stasis_topic::@419 subscribers
struct stasis_topic::@420 upstream_topics
int subscriber_id
Definition: stasis.c:385
char * detail
Definition: stasis.c:391
struct timeval * creationtime
Definition: stasis.c:394
ast_cond_t cond
Definition: stasis.c:1278
void * task_data
Definition: stasis.c:1280
ast_mutex_t lock
Definition: stasis.c:1277
Definition: stasis.c:1711
struct stasis_topic * topic
Definition: stasis.c:1713
struct stasis_forward * forward
Definition: stasis.c:1712
char name[0]
Definition: stasis.c:1714
char buf[0]
Definition: stasis.c:407
struct timeval creationtime
Definition: stasis.c:405
char * name
Definition: stasis.c:402
char * detail
Definition: stasis.c:403
An API for managing task processing threads that can be shared across modules.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_val a
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: main/utils.c:2195
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:105
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:157
FILE * out
Definition: utils/frame.c:33
static void statistics(void)
Definition: utils/frame.c:287
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:936
#define ast_assert(a)
Definition: utils.h:734
#define ARRAY_LEN(a)
Definition: utils.h:661
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668