Asterisk - The Open Source Telephony Project  GIT-master-e8cda4b
stasis.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
44 #include "asterisk/cli.h"
45 
46 /*** DOCUMENTATION
47  <managerEvent language="en_US" name="UserEvent">
48  <managerEventInstance class="EVENT_FLAG_USER">
49  <synopsis>A user defined event raised from the dialplan.</synopsis>
50  <syntax>
51  <channel_snapshot/>
52  <parameter name="UserEvent">
53  <para>The event name, as specified in the dialplan.</para>
54  </parameter>
55  </syntax>
56  <description>
57  <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58  </description>
59  <see-also>
60  <ref type="application">UserEvent</ref>
61  <ref type="managerEvent">UserEvent</ref>
62  </see-also>
63  </managerEventInstance>
64  </managerEvent>
65  <configInfo name="stasis" language="en_US">
66  <configFile name="stasis.conf">
67  <configObject name="threadpool">
68  <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69  <configOption name="initial_size" default="5">
70  <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71  </configOption>
72  <configOption name="idle_timeout_sec" default="20">
73  <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74  </configOption>
75  <configOption name="max_size" default="50">
76  <synopsis>Maximum number of threads in the threadpool.</synopsis>
77  </configOption>
78  </configObject>
79  <configObject name="declined_message_types">
80  <synopsis>Stasis message types for which to decline creation.</synopsis>
81  <configOption name="decline">
82  <synopsis>The message type to decline.</synopsis>
83  <description>
84  <para>This configuration option defines the name of the Stasis
85  message type that Asterisk is forbidden from creating and can be
86  specified as many times as necessary to achieve the desired result.</para>
87  <enumlist>
88  <enum name="stasis_app_recording_snapshot_type" />
89  <enum name="stasis_app_playback_snapshot_type" />
90  <enum name="stasis_test_message_type" />
91  <enum name="confbridge_start_type" />
92  <enum name="confbridge_end_type" />
93  <enum name="confbridge_join_type" />
94  <enum name="confbridge_leave_type" />
95  <enum name="confbridge_start_record_type" />
96  <enum name="confbridge_stop_record_type" />
97  <enum name="confbridge_mute_type" />
98  <enum name="confbridge_unmute_type" />
99  <enum name="confbridge_talking_type" />
100  <enum name="cel_generic_type" />
101  <enum name="ast_bridge_snapshot_type" />
102  <enum name="ast_bridge_merge_message_type" />
103  <enum name="ast_channel_entered_bridge_type" />
104  <enum name="ast_channel_left_bridge_type" />
105  <enum name="ast_blind_transfer_type" />
106  <enum name="ast_attended_transfer_type" />
107  <enum name="ast_endpoint_snapshot_type" />
108  <enum name="ast_endpoint_state_type" />
109  <enum name="ast_device_state_message_type" />
110  <enum name="ast_test_suite_message_type" />
111  <enum name="ast_mwi_state_type" />
112  <enum name="ast_mwi_vm_app_type" />
113  <enum name="ast_format_register_type" />
114  <enum name="ast_format_unregister_type" />
115  <enum name="ast_manager_get_generic_type" />
116  <enum name="ast_parked_call_type" />
117  <enum name="ast_channel_snapshot_type" />
118  <enum name="ast_channel_dial_type" />
119  <enum name="ast_channel_varset_type" />
120  <enum name="ast_channel_hangup_request_type" />
121  <enum name="ast_channel_dtmf_begin_type" />
122  <enum name="ast_channel_dtmf_end_type" />
123  <enum name="ast_channel_flash_type" />
124  <enum name="ast_channel_hold_type" />
125  <enum name="ast_channel_unhold_type" />
126  <enum name="ast_channel_chanspy_start_type" />
127  <enum name="ast_channel_chanspy_stop_type" />
128  <enum name="ast_channel_fax_type" />
129  <enum name="ast_channel_hangup_handler_type" />
130  <enum name="ast_channel_moh_start_type" />
131  <enum name="ast_channel_moh_stop_type" />
132  <enum name="ast_channel_monitor_start_type" />
133  <enum name="ast_channel_monitor_stop_type" />
134  <enum name="ast_channel_mixmonitor_start_type" />
135  <enum name="ast_channel_mixmonitor_stop_type" />
136  <enum name="ast_channel_mixmonitor_mute_type" />
137  <enum name="ast_channel_agent_login_type" />
138  <enum name="ast_channel_agent_logoff_type" />
139  <enum name="ast_channel_talking_start" />
140  <enum name="ast_channel_talking_stop" />
141  <enum name="ast_security_event_type" />
142  <enum name="ast_named_acl_change_type" />
143  <enum name="ast_local_bridge_type" />
144  <enum name="ast_local_optimization_begin_type" />
145  <enum name="ast_local_optimization_end_type" />
146  <enum name="stasis_subscription_change_type" />
147  <enum name="ast_multi_user_event_type" />
148  <enum name="stasis_cache_clear_type" />
149  <enum name="stasis_cache_update_type" />
150  <enum name="ast_network_change_type" />
151  <enum name="ast_system_registry_type" />
152  <enum name="ast_cc_available_type" />
153  <enum name="ast_cc_offertimerstart_type" />
154  <enum name="ast_cc_requested_type" />
155  <enum name="ast_cc_requestacknowledged_type" />
156  <enum name="ast_cc_callerstopmonitoring_type" />
157  <enum name="ast_cc_callerstartmonitoring_type" />
158  <enum name="ast_cc_callerrecalling_type" />
159  <enum name="ast_cc_recallcomplete_type" />
160  <enum name="ast_cc_failure_type" />
161  <enum name="ast_cc_monitorfailed_type" />
162  <enum name="ast_presence_state_message_type" />
163  <enum name="ast_rtp_rtcp_sent_type" />
164  <enum name="ast_rtp_rtcp_received_type" />
165  <enum name="ast_call_pickup_type" />
166  <enum name="aoc_s_type" />
167  <enum name="aoc_d_type" />
168  <enum name="aoc_e_type" />
169  <enum name="dahdichannel_type" />
170  <enum name="mcid_type" />
171  <enum name="session_timeout_type" />
172  <enum name="cdr_read_message_type" />
173  <enum name="cdr_write_message_type" />
174  <enum name="cdr_prop_write_message_type" />
175  <enum name="corosync_ping_message_type" />
176  <enum name="agi_exec_start_type" />
177  <enum name="agi_exec_end_type" />
178  <enum name="agi_async_start_type" />
179  <enum name="agi_async_exec_type" />
180  <enum name="agi_async_end_type" />
181  <enum name="queue_caller_join_type" />
182  <enum name="queue_caller_leave_type" />
183  <enum name="queue_caller_abandon_type" />
184  <enum name="queue_member_status_type" />
185  <enum name="queue_member_added_type" />
186  <enum name="queue_member_removed_type" />
187  <enum name="queue_member_pause_type" />
188  <enum name="queue_member_penalty_type" />
189  <enum name="queue_member_ringinuse_type" />
190  <enum name="queue_agent_called_type" />
191  <enum name="queue_agent_connect_type" />
192  <enum name="queue_agent_complete_type" />
193  <enum name="queue_agent_dump_type" />
194  <enum name="queue_agent_ringnoanswer_type" />
195  <enum name="meetme_join_type" />
196  <enum name="meetme_leave_type" />
197  <enum name="meetme_end_type" />
198  <enum name="meetme_mute_type" />
199  <enum name="meetme_talking_type" />
200  <enum name="meetme_talk_request_type" />
201  <enum name="appcdr_message_type" />
202  <enum name="forkcdr_message_type" />
203  <enum name="cdr_sync_message_type" />
204  </enumlist>
205  </description>
206  </configOption>
207  </configObject>
208  </configFile>
209  </configInfo>
210 ***/
211 
212 /*!
213  * \page stasis-impl Stasis Implementation Notes
214  *
215  * \par Reference counting
216  *
217  * Stasis introduces a number of objects, which are tightly related to one
218  * another. Because we rely on ref-counting for memory management, understanding
219  * these relationships is important to understanding this code.
220  *
221  * \code{.txt}
222  *
223  * stasis_topic <----> stasis_subscription
224  * ^ ^
225  * \ /
226  * \ /
227  * dispatch
228  * |
229  * |
230  * v
231  * stasis_message
232  * |
233  * |
234  * v
235  * stasis_message_type
236  *
237  * \endcode
238  *
239  * The most troubling thing in this chart is the cyclic reference between
240  * stasis_topic and stasis_subscription. This is both unfortunate, and
241  * necessary. Topics need the subscription in order to dispatch messages;
242  * subscriptions need the topics to unsubscribe and check subscription status.
243  *
244  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
245  * topic's reference to a subscription. When the subcription is destroyed, it
246  * will remove its reference to the topic.
247  *
248  * This means that until a subscription has be explicitly unsubscribed, it will
249  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
250  * The destructors of both have assertions regarding this to catch ref-counting
251  * problems where a subscription or topic has had an extra ao2_cleanup().
252  *
253  * The \ref dispatch object is a transient object, which is posted to a
254  * subscription's taskprocessor to send a message to the subscriber. They have
255  * short life cycles, allocated on one thread, destroyed on another.
256  *
257  * During shutdown, or the deletion of a domain object, there are a flurry of
258  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
259  * are processed. Any one of these cleanups could be the one to actually destroy
260  * a given object, so care must be taken to ensure that an object isn't
261  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
262  * that might happen when a RAII_VAR() goes out of scope.
263  *
264  * \par Typical life cycles
265  *
266  * \li stasis_topic - There are several topics which live for the duration of
267  * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
268  * are actually fed by shorter-lived topics whose lifetime is associated
269  * with some domain object (like ast_channel_topic() for a given
270  * ast_channel).
271  *
272  * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
273  * topics, for similar reasons.
274  *
275  * \li dispatch - Very short lived; just long enough to post a message to a
276  * subscriber.
277  *
278  * \li stasis_message - Short to intermediate lifetimes, but that is mostly
279  * irrelevant. Messages are strictly data and have no behavior associated
280  * with them, so it doesn't really matter if/when they are destroyed. By
281  * design, a component could hold a ref to a message forever without any
282  * ill consequences (aside from consuming more memory).
283  *
284  * \li stasis_message_type - Long life cycles, typically only destroyed on
285  * module unloading or _clean_ process exit.
286  *
287  * \par Subscriber shutdown sequencing
288  *
289  * Subscribers are sensitive to shutdown sequencing, specifically in how the
290  * reference message types. This is fully detailed on the wiki at
291  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
292  *
293  * In short, the lifetime of the \a data (and \a callback, if in a module) must
294  * be held until the stasis_subscription_final_message() has been received.
295  * Depending on the structure of the subscriber code, this can be handled by
296  * using stasis_subscription_final_message() to free resources on the final
297  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
298  * block until the unsubscribe has completed.
299  */
300 
301 /*! Initial size of the subscribers list. */
302 #define INITIAL_SUBSCRIBERS_MAX 4
303 
304 /*! The number of buckets to use for topic pools */
305 #define TOPIC_POOL_BUCKETS 57
306 
307 /*! Thread pool for topics that don't want a dedicated taskprocessor */
308 static struct ast_threadpool *threadpool;
309 
311 
312 #if defined(LOW_MEMORY)
313 
314 #define TOPIC_ALL_BUCKETS 257
315 
316 #else
317 
318 #define TOPIC_ALL_BUCKETS 997
319 
320 #endif
321 
322 #ifdef AST_DEVMODE
323 
324 /*! The number of buckets to use for topic statistics */
325 #define TOPIC_STATISTICS_BUCKETS 57
326 
327 /*! The number of buckets to use for subscription statistics */
328 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
329 
330 /*! Global container which stores statistics for topics */
331 static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
332 
333 /*! Global container which stores statistics for subscriptions */
334 static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
335 
336 /*! \internal */
337 struct stasis_message_type_statistics {
338  /*! \brief The number of messages of this published */
339  int published;
340  /*! \brief The number of messages of this that did not reach a subscriber */
341  int unused;
342  /*! \brief The stasis message type */
343  struct stasis_message_type *message_type;
344 };
345 
346 /*! Lock to protect the message types vector */
347 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
348 
349 /*! Vector containing message type information */
350 static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
351 
352 /*! \internal */
353 struct stasis_topic_statistics {
354  /*! \brief Highest time spent dispatching messages to subscribers */
355  long highest_time_dispatched;
356  /*! \brief Lowest time spent dispatching messages to subscribers */
357  long lowest_time_dispatched;
358  /*! \brief The number of messages that were not dispatched to any subscriber */
359  int messages_not_dispatched;
360  /*! \brief The number of messages that were dispatched to at least 1 subscriber */
361  int messages_dispatched;
362  /*! \brief The ids of the subscribers to this topic */
363  struct ao2_container *subscribers;
364  /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
365  struct stasis_topic *topic;
366  /*! \brief Name of the topic */
367  char name[0];
368 };
369 #endif
370 
371 /*! \internal */
372 struct stasis_topic {
373  /*! Variable length array of the subscribers */
374  AST_VECTOR(, struct stasis_subscription *) subscribers;
375 
376  /*! Topics forwarding into this topic */
377  AST_VECTOR(, struct stasis_topic *) upstream_topics;
378 
379 #ifdef AST_DEVMODE
380  struct stasis_topic_statistics *statistics;
381 #endif
382 
383  /*! Unique incrementing integer for subscriber ids */
384  int subscriber_id;
385 
386  /*! Name of the topic */
387  char *name;
388 
389  /*! Detail of the topic */
390  char *detail;
391 
392  /*! Creation time */
393  struct timeval *creationtime;
394 };
395 
397 
398 struct topic_proxy {
399  AO2_WEAKPROXY();
400 
401  char *name;
402  char *detail;
403 
404  struct timeval creationtime;
405 
406  char buf[0];
407 };
408 
412 
413 static void proxy_dtor(void *weakproxy, void *container)
414 {
415  ao2_unlink(container, weakproxy);
416  ao2_cleanup(container);
417 }
418 
419 /* Forward declarations for the tightly-coupled subscription object */
420 static int topic_add_subscription(struct stasis_topic *topic,
421  struct stasis_subscription *sub);
422 
423 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
424 
425 /*! \brief Lock two topics. */
426 #define topic_lock_both(topic1, topic2) \
427  do { \
428  ao2_lock(topic1); \
429  while (ao2_trylock(topic2)) { \
430  AO2_DEADLOCK_AVOIDANCE(topic1); \
431  } \
432  } while (0)
433 
434 static void topic_dtor(void *obj)
435 {
436  struct stasis_topic *topic = obj;
437 #ifdef AST_DEVMODE
438  struct ao2_container *topic_stats;
439 #endif
440 
441  ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
442  topic->name, topic->detail);
443 
444  /* Subscribers hold a reference to topics, so they should all be
445  * unsubscribed before we get here. */
446  ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
447 
448  AST_VECTOR_FREE(&topic->subscribers);
449  AST_VECTOR_FREE(&topic->upstream_topics);
450  ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
451 
452 #ifdef AST_DEVMODE
453  if (topic->statistics) {
454  topic_stats = ao2_global_obj_ref(topic_statistics);
455  if (topic_stats) {
456  ao2_unlink(topic_stats, topic->statistics);
457  ao2_ref(topic_stats, -1);
458  }
459  ao2_ref(topic->statistics, -1);
460  }
461 #endif
462 }
463 
464 #ifdef AST_DEVMODE
465 static void topic_statistics_destroy(void *obj)
466 {
467  struct stasis_topic_statistics *statistics = obj;
468 
469  ao2_cleanup(statistics->subscribers);
470 }
471 
472 static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
473 {
474  struct stasis_topic_statistics *statistics;
475  RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
476 
477  if (!topic_stats) {
478  return NULL;
479  }
480 
481  statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
482  if (!statistics) {
483  return NULL;
484  }
485 
486  statistics->subscribers = ast_str_container_alloc(1);
487  if (!statistics->subscribers) {
488  ao2_ref(statistics, -1);
489  return NULL;
490  }
491 
492  /* This is strictly used for the pointer address when showing the topic */
493  statistics->topic = topic;
494  strcpy(statistics->name, topic->name); /* SAFE */
495  ao2_link(topic_stats, statistics);
496 
497  return statistics;
498 }
499 #endif
500 
501 static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
502 {
503  struct topic_proxy *proxy;
504  struct stasis_topic* topic_tmp;
505  size_t detail_len;
506 
507  if (!topic || !name || !strlen(name) || !detail) {
508  return -1;
509  }
510 
511  ao2_wrlock(topic_all);
512 
513  topic_tmp = stasis_topic_get(name);
514  if (topic_tmp) {
515  ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
516  ao2_ref(topic_tmp, -1);
517  ao2_unlock(topic_all);
518 
519  return -1;
520  }
521 
522  detail_len = strlen(detail) + 1;
523 
524  proxy = ao2_t_weakproxy_alloc(
525  sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
526  if (!proxy) {
527  ao2_unlock(topic_all);
528 
529  return -1;
530  }
531 
532  /* set the proxy info */
533  proxy->name = proxy->buf;
534  proxy->detail = proxy->name + strlen(name) + 1;
535 
536  strcpy(proxy->name, name); /* SAFE */
537  ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
538  proxy->creationtime = ast_tvnow();
539 
540  /* We have exclusive access to proxy, no need for locking here. */
541  if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
542  ao2_cleanup(proxy);
543  ao2_unlock(topic_all);
544 
545  return -1;
546  }
547 
548  if (ao2_weakproxy_subscribe(proxy, proxy_dtor, ao2_bump(topic_all), OBJ_NOLOCK)) {
549  ao2_cleanup(proxy);
550  ao2_unlock(topic_all);
551  ao2_cleanup(topic_all);
552 
553  return -1;
554  }
555 
556  /* setting the topic point to the proxy */
557  topic->name = proxy->name;
558  topic->detail = proxy->detail;
559  topic->creationtime = &(proxy->creationtime);
560 
561  ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
562  ao2_ref(proxy, -1);
563 
564  ao2_unlock(topic_all);
565 
566  return 0;
567 }
568 
570  const char *name, const char* detail
571  )
572 {
573  struct stasis_topic *topic;
574  int res = 0;
575 
576  if (!name|| !strlen(name) || !detail) {
577  return NULL;
578  }
579  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
580 
581  topic = stasis_topic_get(name);
582  if (topic) {
583  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
584  name, detail);
585  return topic;
586  }
587 
588  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
589  if (!topic) {
590  return NULL;
591  }
592 
593  res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
594  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
595  if (res) {
596  ao2_ref(topic, -1);
597  return NULL;
598  }
599 
600  /* link to the proxy */
601  if (link_topic_proxy(topic, name, detail)) {
602  ao2_ref(topic, -1);
603  return NULL;
604  }
605 
606 #ifdef AST_DEVMODE
607  topic->statistics = stasis_topic_statistics_create(topic);
608  if (!topic->statistics) {
609  ao2_ref(topic, -1);
610  return NULL;
611  }
612 #endif
613  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
614 
615  return topic;
616 }
617 
619 {
620  return stasis_topic_create_with_detail(name, "");
621 }
622 
623 struct stasis_topic *stasis_topic_get(const char *name)
624 {
625  return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
626 }
627 
628 const char *stasis_topic_name(const struct stasis_topic *topic)
629 {
630  if (!topic) {
631  return NULL;
632  }
633  return topic->name;
634 }
635 
636 const char *stasis_topic_detail(const struct stasis_topic *topic)
637 {
638  if (!topic) {
639  return NULL;
640  }
641  return topic->detail;
642 }
643 
644 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
645 {
646  return AST_VECTOR_SIZE(&topic->subscribers);
647 }
648 
649 #ifdef AST_DEVMODE
650 struct stasis_subscription_statistics {
651  /*! \brief The filename where the subscription originates */
652  const char *file;
653  /*! \brief The function where the subscription originates */
654  const char *func;
655  /*! \brief Names of the topics we are subscribed to */
656  struct ao2_container *topics;
657  /*! \brief The message type that currently took the longest to process */
658  struct stasis_message_type *highest_time_message_type;
659  /*! \brief Highest time spent invoking a message */
660  long highest_time_invoked;
661  /*! \brief Lowest time spent invoking a message */
662  long lowest_time_invoked;
663  /*! \brief The number of messages that were filtered out */
664  int messages_dropped;
665  /*! \brief The number of messages that passed filtering */
666  int messages_passed;
667  /*! \brief Using a mailbox to queue messages */
668  int uses_mailbox;
669  /*! \brief Using stasis threadpool for handling messages */
670  int uses_threadpool;
671  /*! \brief The line number where the subscription originates */
672  int lineno;
673  /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
674  struct stasis_subscription *sub;
675  /*! \brief Unique ID of the subscription */
676  char uniqueid[0];
677 };
678 #endif
679 
680 /*! \internal */
682  /*! Unique ID for this subscription */
683  char *uniqueid;
684  /*! Topic subscribed to. */
686  /*! Mailbox for processing incoming messages. */
688  /*! Callback function for incoming message processing. */
690  /*! Data pointer to be handed to the callback. */
691  void *data;
692 
693  /*! Condition for joining with subscription. */
695  /*! Flag set when final message for sub has been received.
696  * Be sure join_lock is held before reading/setting. */
698  /*! Flag set when final message for sub has been processed.
699  * Be sure join_lock is held before reading/setting. */
701 
702  /*! The message types this subscription is accepting */
703  AST_VECTOR(, char) accepted_message_types;
704  /*! The message formatters this subscription is accepting */
705  enum stasis_subscription_message_formatters accepted_formatters;
706  /*! The message filter currently in use */
708 
709 #ifdef AST_DEVMODE
710  /*! Statistics information */
711  struct stasis_subscription_statistics *statistics;
712 #endif
713 };
714 
715 static void subscription_dtor(void *obj)
716 {
717  struct stasis_subscription *sub = obj;
718 #ifdef AST_DEVMODE
719  struct ao2_container *subscription_stats;
720 #endif
721 
722  /* Subscriptions need to be manually unsubscribed before destruction
723  * b/c there's a cyclic reference between topics and subscriptions */
725  /* If there are any messages in flight to this subscription; that would
726  * be bad. */
728 
729  ast_free(sub->uniqueid);
730  ao2_cleanup(sub->topic);
731  sub->topic = NULL;
733  sub->mailbox = NULL;
735 
736  AST_VECTOR_FREE(&sub->accepted_message_types);
737 
738 #ifdef AST_DEVMODE
739  if (sub->statistics) {
740  subscription_stats = ao2_global_obj_ref(subscription_statistics);
741  if (subscription_stats) {
742  ao2_unlink(subscription_stats, sub->statistics);
743  ao2_ref(subscription_stats, -1);
744  }
745  ao2_ref(sub->statistics, -1);
746  }
747 #endif
748 }
749 
750 /*!
751  * \brief Invoke the subscription's callback.
752  * \param sub Subscription to invoke.
753  * \param topic Topic message was published to.
754  * \param message Message to send.
755  */
756 static void subscription_invoke(struct stasis_subscription *sub,
757  struct stasis_message *message)
758 {
759  unsigned int final = stasis_subscription_final_message(sub, message);
761 #ifdef AST_DEVMODE
762  struct timeval start;
763  long elapsed;
764 
765  start = ast_tvnow();
766 #endif
767 
768  /* Notify that the final message has been received */
769  if (final) {
770  ao2_lock(sub);
771  sub->final_message_rxed = 1;
772  ast_cond_signal(&sub->join_cond);
773  ao2_unlock(sub);
774  }
775 
776  /*
777  * If filtering is turned on and this is a 'final' message, we only invoke the callback
778  * if the subscriber accepts subscription_change message types.
779  */
780  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
781  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
782  /* Since sub is mostly immutable, no need to lock sub */
783  sub->callback(sub->data, sub, message);
784  }
785 
786  /* Notify that the final message has been processed */
787  if (final) {
788  ao2_lock(sub);
789  sub->final_message_processed = 1;
790  ast_cond_signal(&sub->join_cond);
791  ao2_unlock(sub);
792  }
793 
794 #ifdef AST_DEVMODE
795  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
796  if (elapsed > sub->statistics->highest_time_invoked) {
797  sub->statistics->highest_time_invoked = elapsed;
798  ao2_lock(sub->statistics);
799  sub->statistics->highest_time_message_type = stasis_message_type(message);
800  ao2_unlock(sub->statistics);
801  }
802  if (elapsed < sub->statistics->lowest_time_invoked) {
803  sub->statistics->lowest_time_invoked = elapsed;
804  }
805 #endif
806 }
807 
808 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
809 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
810 
812 {
813 }
814 
815 #ifdef AST_DEVMODE
816 static void subscription_statistics_destroy(void *obj)
817 {
818  struct stasis_subscription_statistics *statistics = obj;
819 
820  ao2_cleanup(statistics->topics);
821 }
822 
823 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
824  int needs_mailbox, int use_thread_pool, const char *file, int lineno,
825  const char *func)
826 {
827  struct stasis_subscription_statistics *statistics;
828  RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
829 
830  if (!subscription_stats) {
831  return NULL;
832  }
833 
834  statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
835  if (!statistics) {
836  return NULL;
837  }
838 
839  statistics->topics = ast_str_container_alloc(1);
840  if (!statistics->topics) {
841  ao2_ref(statistics, -1);
842  return NULL;
843  }
844 
845  statistics->file = file;
846  statistics->lineno = lineno;
847  statistics->func = func;
848  statistics->uses_mailbox = needs_mailbox;
849  statistics->uses_threadpool = use_thread_pool;
850  strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
851  statistics->sub = sub;
852  ao2_link(subscription_stats, statistics);
853 
854  return statistics;
855 }
856 #endif
857 
859  struct stasis_topic *topic,
861  void *data,
862  int needs_mailbox,
863  int use_thread_pool,
864  const char *file,
865  int lineno,
866  const char *func)
867 {
868  struct stasis_subscription *sub;
869  int ret;
870 
871  if (!topic) {
872  return NULL;
873  }
874 
875  /* The ao2 lock is used for join_cond. */
876  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
877  if (!sub) {
878  return NULL;
879  }
880 
881 #ifdef AST_DEVMODE
882  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
883  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
884  if (ret < 0 || !sub->statistics) {
885  ao2_ref(sub, -1);
886  return NULL;
887  }
888 #else
889  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
890  if (ret < 0) {
891  ao2_ref(sub, -1);
892  return NULL;
893  }
894 #endif
895 
896  if (needs_mailbox) {
897  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
898 
899  /* Create name with seq number appended. */
900  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
901  use_thread_pool ? 'p' : 'm',
902  stasis_topic_name(topic));
903 
904  /*
905  * With a small number of subscribers, a thread-per-sub is
906  * acceptable. For a large number of subscribers, a thread
907  * pool should be used.
908  */
909  if (use_thread_pool) {
910  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
911  } else {
913  }
914  if (!sub->mailbox) {
915  ao2_ref(sub, -1);
916 
917  return NULL;
918  }
920  /* Taskprocessor has a reference */
921  ao2_ref(sub, +1);
922  }
923 
924  ao2_ref(topic, +1);
925  sub->topic = topic;
926  sub->callback = callback;
927  sub->data = data;
928  ast_cond_init(&sub->join_cond, NULL);
929  sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
930  AST_VECTOR_INIT(&sub->accepted_message_types, 0);
931  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
932 
933  if (topic_add_subscription(topic, sub) != 0) {
934  ao2_ref(sub, -1);
935  ao2_ref(topic, -1);
936 
937  return NULL;
938  }
939  send_subscription_subscribe(topic, sub);
940 
941  return sub;
942 }
943 
945  struct stasis_topic *topic,
947  void *data,
948  const char *file,
949  int lineno,
950  const char *func)
951 {
952  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
953 }
954 
956  struct stasis_topic *topic,
958  void *data,
959  const char *file,
960  int lineno,
961  const char *func)
962 {
963  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
964 }
965 
966 static int sub_cleanup(void *data)
967 {
968  struct stasis_subscription *sub = data;
969  ao2_cleanup(sub);
970  return 0;
971 }
972 
974 {
975  /* The subscription may be the last ref to this topic. Hold
976  * the topic ref open until after the unlock. */
977  struct stasis_topic *topic;
978 
979  if (!sub) {
980  return NULL;
981  }
982 
983  topic = ao2_bump(sub->topic);
984 
985  /* We have to remove the subscription first, to ensure the unsubscribe
986  * is the final message */
987  if (topic_remove_subscription(sub->topic, sub) != 0) {
989  "Internal error: subscription has invalid topic\n");
990  ao2_cleanup(topic);
991 
992  return NULL;
993  }
994 
995  /* Now let everyone know about the unsubscribe */
996  send_subscription_unsubscribe(topic, sub);
997 
998  /* When all that's done, remove the ref the mailbox has on the sub */
999  if (sub->mailbox) {
1000  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1001  /* Nothing we can do here, the conditional is just to keep
1002  * the compiler happy that we're not ignoring the result. */
1003  }
1004  }
1005 
1006  /* Unsubscribing unrefs the subscription */
1007  ao2_cleanup(sub);
1008  ao2_cleanup(topic);
1009 
1010  return NULL;
1011 }
1012 
1014  long low_water, long high_water)
1015 {
1016  int res = -1;
1017 
1018  if (subscription) {
1019  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1020  low_water, high_water);
1021  }
1022  return res;
1023 }
1024 
1026  const struct stasis_message_type *type)
1027 {
1028  if (!subscription) {
1029  return -1;
1030  }
1031 
1032  ast_assert(type != NULL);
1034 
1035  if (!type || !stasis_message_type_name(type)) {
1036  /* Filtering is unreliable as this message type is not yet initialized
1037  * so force all messages through.
1038  */
1039  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1040  return 0;
1041  }
1042 
1043  ao2_lock(subscription->topic);
1044  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1045  /* We do this for the same reason as above. The subscription can still operate, so allow
1046  * it to do so by forcing all messages through.
1047  */
1048  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1049  }
1050  ao2_unlock(subscription->topic);
1051 
1052  return 0;
1053 }
1054 
1056  const struct stasis_message_type *type)
1057 {
1058  if (!subscription) {
1059  return -1;
1060  }
1061 
1062  ast_assert(type != NULL);
1064 
1065  if (!type || !stasis_message_type_name(type)) {
1066  return 0;
1067  }
1068 
1069  ao2_lock(subscription->topic);
1070  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1071  /* The memory is already allocated so this can't fail */
1072  AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
1073  }
1074  ao2_unlock(subscription->topic);
1075 
1076  return 0;
1077 }
1078 
1081 {
1082  if (!subscription) {
1083  return -1;
1084  }
1085 
1086  ao2_lock(subscription->topic);
1087  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1088  subscription->filter = filter;
1089  }
1090  ao2_unlock(subscription->topic);
1091 
1092  return 0;
1093 }
1094 
1097 {
1098  ast_assert(subscription != NULL);
1099 
1100  ao2_lock(subscription->topic);
1101  subscription->accepted_formatters = formatters;
1102  ao2_unlock(subscription->topic);
1103 
1104  return;
1105 }
1106 
1108 {
1109  if (subscription) {
1110  ao2_lock(subscription);
1111  /* Wait until the processed flag has been set */
1112  while (!subscription->final_message_processed) {
1113  ast_cond_wait(&subscription->join_cond,
1114  ao2_object_get_lockaddr(subscription));
1115  }
1116  ao2_unlock(subscription);
1117  }
1118 }
1119 
1121 {
1122  if (subscription) {
1123  int ret;
1124 
1125  ao2_lock(subscription);
1126  ret = subscription->final_message_rxed;
1127  ao2_unlock(subscription);
1128 
1129  return ret;
1130  }
1131 
1132  /* Null subscription is about as done as you can get */
1133  return 1;
1134 }
1135 
1137  struct stasis_subscription *subscription)
1138 {
1139  if (!subscription) {
1140  return NULL;
1141  }
1142 
1143  /* Bump refcount to hold it past the unsubscribe */
1144  ao2_ref(subscription, +1);
1145  stasis_unsubscribe(subscription);
1146  stasis_subscription_join(subscription);
1147  /* Now decrement the refcount back */
1148  ao2_cleanup(subscription);
1149  return NULL;
1150 }
1151 
1153 {
1154  if (sub) {
1155  size_t i;
1156  struct stasis_topic *topic = sub->topic;
1157 
1158  ao2_lock(topic);
1159  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1160  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1161  ao2_unlock(topic);
1162  return 1;
1163  }
1164  }
1165  ao2_unlock(topic);
1166  }
1167 
1168  return 0;
1169 }
1170 
1172 {
1173  return sub->uniqueid;
1174 }
1175 
1177 {
1178  struct stasis_subscription_change *change;
1179 
1181  return 0;
1182  }
1183 
1184  change = stasis_message_data(msg);
1185  if (strcmp("Unsubscribe", change->description)) {
1186  return 0;
1187  }
1188 
1189  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1190  return 0;
1191  }
1192 
1193  return 1;
1194 }
1195 
1196 /*!
1197  * \brief Add a subscriber to a topic.
1198  * \param topic Topic
1199  * \param sub Subscriber
1200  * \return 0 on success
1201  * \return Non-zero on error
1202  */
1203 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1204 {
1205  size_t idx;
1206 
1207  ao2_lock(topic);
1208  /* The reference from the topic to the subscription is shared with
1209  * the owner of the subscription, which will explicitly unsubscribe
1210  * to release it.
1211  *
1212  * If we bumped the refcount here, the owner would have to unsubscribe
1213  * and cleanup, which is a bit awkward. */
1214  AST_VECTOR_APPEND(&topic->subscribers, sub);
1215 
1216  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1218  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1219  }
1220 
1221 #ifdef AST_DEVMODE
1222  ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1223  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1224 #endif
1225 
1226  ao2_unlock(topic);
1227 
1228  return 0;
1229 }
1230 
1231 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1232 {
1233  size_t idx;
1234  int res;
1235 
1236  ao2_lock(topic);
1237  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1239  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1240  }
1241  res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
1243 
1244 #ifdef AST_DEVMODE
1245  if (!res) {
1246  ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1247  ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1248  }
1249 #endif
1250 
1251  ao2_unlock(topic);
1252 
1253  return res;
1254 }
1255 
1256 /*!
1257  * \internal \brief Dispatch a message to a subscriber asynchronously
1258  * \param local \ref ast_taskprocessor_local object
1259  * \return 0
1260  */
1262 {
1263  struct stasis_subscription *sub = local->local_data;
1264  struct stasis_message *message = local->data;
1265 
1266  subscription_invoke(sub, message);
1267  ao2_cleanup(message);
1268 
1269  return 0;
1270 }
1271 
1272 /*!
1273  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1274  * a published message to a subscriber
1275  */
1280  void *task_data;
1281 };
1282 
1283 /*!
1284  * \internal \brief Dispatch a message to a subscriber synchronously
1285  * \param local \ref ast_taskprocessor_local object
1286  * \return 0
1287  */
1289 {
1290  struct stasis_subscription *sub = local->local_data;
1291  struct sync_task_data *std = local->data;
1292  struct stasis_message *message = std->task_data;
1293 
1294  subscription_invoke(sub, message);
1295  ao2_cleanup(message);
1296 
1297  ast_mutex_lock(&std->lock);
1298  std->complete = 1;
1299  ast_cond_signal(&std->cond);
1300  ast_mutex_unlock(&std->lock);
1301 
1302  return 0;
1303 }
1304 
1305 /*!
1306  * \internal \brief Dispatch a message to a subscriber
1307  * \param sub The subscriber to dispatch to
1308  * \param message The message to send
1309  * \param synchronous If non-zero, synchronize on the subscriber receiving
1310  * the message
1311  * \retval 0 if message was not dispatched
1312  * \retval 1 if message was dispatched
1313  */
1314 static unsigned int dispatch_message(struct stasis_subscription *sub,
1315  struct stasis_message *message,
1316  int synchronous)
1317 {
1318  int is_final = stasis_subscription_final_message(sub, message);
1319 
1320  /*
1321  * The 'do while' gives us an easy way to skip remaining logic once
1322  * we determine the message should be accepted.
1323  * The code looks more verbose than it needs to be but it optimizes
1324  * down very nicely. It's just easier to understand and debug this way.
1325  */
1326  do {
1327  struct stasis_message_type *message_type = stasis_message_type(message);
1328  int type_id = stasis_message_type_id(message_type);
1329  int type_filter_specified = 0;
1330  int formatter_filter_specified = 0;
1331  int type_filter_passed = 0;
1332  int formatter_filter_passed = 0;
1333 
1334  /* We always accept final messages so only run the filter logic if not final */
1335  if (is_final) {
1336  break;
1337  }
1338 
1339  type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1340  formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1341 
1342  /* Accept if no filters of either type were specified */
1343  if (!type_filter_specified && !formatter_filter_specified) {
1344  break;
1345  }
1346 
1347  type_filter_passed = type_filter_specified
1348  && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1349  && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1350 
1351  /*
1352  * Since the type and formatter filters are OR'd, we can skip
1353  * the formatter check if the type check passes.
1354  */
1355  if (type_filter_passed) {
1356  break;
1357  }
1358 
1359  formatter_filter_passed = formatter_filter_specified
1360  && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1361 
1362  if (formatter_filter_passed) {
1363  break;
1364  }
1365 
1366 #ifdef AST_DEVMODE
1367  ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1368 #endif
1369 
1370  return 0;
1371 
1372  } while (0);
1373 
1374 #ifdef AST_DEVMODE
1375  ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1376 #endif
1377 
1378  if (!sub->mailbox) {
1379  /* Dispatch directly */
1380  subscription_invoke(sub, message);
1381  return 1;
1382  }
1383 
1384  /* Bump the message for the taskprocessor push. This will get de-ref'd
1385  * by the task processor callback.
1386  */
1387  ao2_bump(message);
1388  if (!synchronous) {
1390  /* Push failed; ugh. */
1391  ast_log(LOG_ERROR, "Dropping async dispatch\n");
1392  ao2_cleanup(message);
1393  return 0;
1394  }
1395  } else {
1396  struct sync_task_data std;
1397 
1398  ast_mutex_init(&std.lock);
1399  ast_cond_init(&std.cond, NULL);
1400  std.complete = 0;
1401  std.task_data = message;
1402 
1404  /* Push failed; ugh. */
1405  ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1406  ao2_cleanup(message);
1407  ast_mutex_destroy(&std.lock);
1408  ast_cond_destroy(&std.cond);
1409  return 0;
1410  }
1411 
1412  ast_mutex_lock(&std.lock);
1413  while (!std.complete) {
1414  ast_cond_wait(&std.cond, &std.lock);
1415  }
1416  ast_mutex_unlock(&std.lock);
1417 
1418  ast_mutex_destroy(&std.lock);
1419  ast_cond_destroy(&std.cond);
1420  }
1421 
1422  return 1;
1423 }
1424 
1425 /*!
1426  * \internal \brief Publish a message to a topic's subscribers
1427  * \brief topic The topic to publish to
1428  * \brief message The message to publish
1429  * \brief sync_sub An optional subscriber of the topic to publish synchronously
1430  * to
1431  */
1432 static void publish_msg(struct stasis_topic *topic,
1433  struct stasis_message *message, struct stasis_subscription *sync_sub)
1434 {
1435  size_t i;
1436  unsigned int dispatched = 0;
1437 #ifdef AST_DEVMODE
1439  struct stasis_message_type_statistics *statistics;
1440  struct timeval start;
1441  long elapsed;
1442 #endif
1443 
1444  ast_assert(topic != NULL);
1445  ast_assert(message != NULL);
1446 
1447 #ifdef AST_DEVMODE
1448  ast_mutex_lock(&message_type_statistics_lock);
1449  if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1450  struct stasis_message_type_statistics new_statistics = {
1451  .published = 0,
1452  };
1453  if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1454  ast_mutex_unlock(&message_type_statistics_lock);
1455  return;
1456  }
1457  }
1458  statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1459  statistics->message_type = stasis_message_type(message);
1460  ast_mutex_unlock(&message_type_statistics_lock);
1461 
1462  ast_atomic_fetchadd_int(&statistics->published, +1);
1463 #endif
1464 
1465  /* If there are no subscribers don't bother */
1466  if (!stasis_topic_subscribers(topic)) {
1467 #ifdef AST_DEVMODE
1468  ast_atomic_fetchadd_int(&statistics->unused, +1);
1469  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1470 #endif
1471  return;
1472  }
1473 
1474  /*
1475  * The topic may be unref'ed by the subscription invocation.
1476  * Make sure we hold onto a reference while dispatching.
1477  */
1478  ao2_ref(topic, +1);
1479 #ifdef AST_DEVMODE
1480  start = ast_tvnow();
1481 #endif
1482  ao2_lock(topic);
1483  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1484  struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
1485 
1486  ast_assert(sub != NULL);
1487 
1488  dispatched += dispatch_message(sub, message, (sub == sync_sub));
1489  }
1490  ao2_unlock(topic);
1491 
1492 #ifdef AST_DEVMODE
1493  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1494  if (elapsed > topic->statistics->highest_time_dispatched) {
1495  topic->statistics->highest_time_dispatched = elapsed;
1496  }
1497  if (elapsed < topic->statistics->lowest_time_dispatched) {
1498  topic->statistics->lowest_time_dispatched = elapsed;
1499  }
1500  if (dispatched) {
1501  ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1502  } else {
1503  ast_atomic_fetchadd_int(&statistics->unused, +1);
1504  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1505  }
1506 #endif
1507 
1508  ao2_ref(topic, -1);
1509 }
1510 
1512 {
1513  publish_msg(topic, message, NULL);
1514 }
1515 
1517 {
1518  ast_assert(sub != NULL);
1519 
1520  publish_msg(sub->topic, message, sub);
1521 }
1522 
1523 /*!
1524  * \brief Forwarding information
1525  *
1526  * Any message posted to \a from_topic is forwarded to \a to_topic.
1527  *
1528  * In cases where both the \a from_topic and \a to_topic need to be locked,
1529  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1530  */
1532  /*! Originating topic */
1534  /*! Destination topic */
1536 };
1537 
1538 static void forward_dtor(void *obj)
1539 {
1540  struct stasis_forward *forward = obj;
1541 
1542  ao2_cleanup(forward->from_topic);
1543  forward->from_topic = NULL;
1544  ao2_cleanup(forward->to_topic);
1545  forward->to_topic = NULL;
1546 }
1547 
1549 {
1550  int idx;
1551  struct stasis_topic *from;
1552  struct stasis_topic *to;
1553 
1554  if (!forward) {
1555  return NULL;
1556  }
1557 
1558  from = forward->from_topic;
1559  to = forward->to_topic;
1560 
1561  if (from && to) {
1562  topic_lock_both(to, from);
1563  AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1565 
1566  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1567  topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1568  }
1569  ao2_unlock(from);
1570  ao2_unlock(to);
1571  }
1572 
1573  ao2_cleanup(forward);
1574 
1575  return NULL;
1576 }
1577 
1579  struct stasis_topic *to_topic)
1580 {
1581  int res;
1582  size_t idx;
1583  struct stasis_forward *forward;
1584 
1585  if (!from_topic || !to_topic) {
1586  return NULL;
1587  }
1588 
1589  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1590  if (!forward) {
1591  return NULL;
1592  }
1593 
1594  /* Forwards to ourselves are implicit. */
1595  if (to_topic == from_topic) {
1596  return forward;
1597  }
1598 
1599  forward->from_topic = ao2_bump(from_topic);
1600  forward->to_topic = ao2_bump(to_topic);
1601 
1602  topic_lock_both(to_topic, from_topic);
1603  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1604  if (res != 0) {
1605  ao2_unlock(from_topic);
1606  ao2_unlock(to_topic);
1607  ao2_ref(forward, -1);
1608  return NULL;
1609  }
1610 
1611  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1612  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1613  }
1614  ao2_unlock(from_topic);
1615  ao2_unlock(to_topic);
1616 
1617  return forward;
1618 }
1619 
1620 static void subscription_change_dtor(void *obj)
1621 {
1622  struct stasis_subscription_change *change = obj;
1623 
1624  ao2_cleanup(change->topic);
1625 }
1626 
1627 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1628 {
1629  size_t description_len = strlen(description) + 1;
1630  size_t uniqueid_len = strlen(uniqueid) + 1;
1631  struct stasis_subscription_change *change;
1632 
1633  change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1635  if (!change) {
1636  return NULL;
1637  }
1638 
1639  strcpy(change->description, description); /* SAFE */
1640  change->uniqueid = change->description + description_len;
1641  ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1642  ao2_ref(topic, +1);
1643  change->topic = topic;
1644 
1645  return change;
1646 }
1647 
1648 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1649 {
1650  struct stasis_subscription_change *change;
1651  struct stasis_message *msg;
1652 
1653  /* This assumes that we have already unsubscribed */
1655 
1657  return;
1658  }
1659 
1660  change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1661  if (!change) {
1662  return;
1663  }
1664 
1666  if (!msg) {
1667  ao2_cleanup(change);
1668  return;
1669  }
1670 
1671  stasis_publish(topic, msg);
1672  ao2_cleanup(msg);
1673  ao2_cleanup(change);
1674 }
1675 
1677  struct stasis_subscription *sub)
1678 {
1679  struct stasis_subscription_change *change;
1680  struct stasis_message *msg;
1681 
1682  /* This assumes that we have already unsubscribed */
1684 
1686  return;
1687  }
1688 
1689  change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1690  if (!change) {
1691  return;
1692  }
1693 
1695  if (!msg) {
1696  ao2_cleanup(change);
1697  return;
1698  }
1699 
1700  stasis_publish(topic, msg);
1701 
1702  /* Now we have to dispatch to the subscription itself */
1703  dispatch_message(sub, msg, 0);
1704 
1705  ao2_cleanup(msg);
1706  ao2_cleanup(change);
1707 }
1708 
1712  char name[0];
1713 };
1714 
1715 static void topic_pool_entry_dtor(void *obj)
1716 {
1717  struct topic_pool_entry *entry = obj;
1718 
1719  entry->forward = stasis_forward_cancel(entry->forward);
1720  ao2_cleanup(entry->topic);
1721  entry->topic = NULL;
1722 }
1723 
1724 static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1725 {
1727 
1728  topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1730  if (!topic_pool_entry) {
1731  return NULL;
1732  }
1733 
1734  strcpy(topic_pool_entry->name, topic_name); /* Safe */
1735 
1736  return topic_pool_entry;
1737 }
1738 
1742 };
1743 
1744 static void topic_pool_dtor(void *obj)
1745 {
1746  struct stasis_topic_pool *pool = obj;
1747 
1748 #ifdef AO2_DEBUG
1749  {
1750  char *container_name =
1751  ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1752  sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1753  ao2_container_unregister(container_name);
1754  }
1755 #endif
1756 
1757  ao2_cleanup(pool->pool_container);
1758  pool->pool_container = NULL;
1759  ao2_cleanup(pool->pool_topic);
1760  pool->pool_topic = NULL;
1761 }
1762 
1763 static int topic_pool_entry_hash(const void *obj, const int flags)
1764 {
1765  const struct topic_pool_entry *object;
1766  const char *key;
1767 
1768  switch (flags & OBJ_SEARCH_MASK) {
1769  case OBJ_SEARCH_KEY:
1770  key = obj;
1771  break;
1772  case OBJ_SEARCH_OBJECT:
1773  object = obj;
1774  key = object->name;
1775  break;
1776  default:
1777  /* Hash can only work on something with a full key. */
1778  ast_assert(0);
1779  return 0;
1780  }
1781  return ast_str_case_hash(key);
1782 }
1783 
1784 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1785 {
1786  const struct topic_pool_entry *object_left = obj;
1787  const struct topic_pool_entry *object_right = arg;
1788  const char *right_key = arg;
1789  int cmp;
1790 
1791  switch (flags & OBJ_SEARCH_MASK) {
1792  case OBJ_SEARCH_OBJECT:
1793  right_key = object_right->name;
1794  /* Fall through */
1795  case OBJ_SEARCH_KEY:
1796  cmp = strcasecmp(object_left->name, right_key);
1797  break;
1799  /* Not supported by container */
1800  ast_assert(0);
1801  cmp = -1;
1802  break;
1803  default:
1804  /*
1805  * What arg points to is specific to this traversal callback
1806  * and has no special meaning to astobj2.
1807  */
1808  cmp = 0;
1809  break;
1810  }
1811  if (cmp) {
1812  return 0;
1813  }
1814  /*
1815  * At this point the traversal callback is identical to a sorted
1816  * container.
1817  */
1818  return CMP_MATCH;
1819 }
1820 
1821 #ifdef AO2_DEBUG
1822 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1823 {
1824  struct topic_pool_entry *entry = v_obj;
1825 
1826  if (!entry) {
1827  return;
1828  }
1829  prnt(where, "%s", stasis_topic_name(entry->topic));
1830 }
1831 #endif
1832 
1834 {
1835  struct stasis_topic_pool *pool;
1836 
1838  if (!pool) {
1839  return NULL;
1840  }
1841 
1844  if (!pool->pool_container) {
1845  ao2_cleanup(pool);
1846  return NULL;
1847  }
1848 
1849 #ifdef AO2_DEBUG
1850  {
1851  char *container_name =
1852  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1853  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1854  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1855  }
1856 #endif
1857 
1858  ao2_ref(pooled_topic, +1);
1859  pool->pool_topic = pooled_topic;
1860 
1861  return pool;
1862 }
1863 
1864 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1865 {
1866  /*
1867  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1868  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1869  * name and search only on <topic_name>.
1870  */
1871  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1872  int pool_topic_name_len = strlen(pool_topic_name);
1873  const char *search_topic_name;
1874 
1875  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876  search_topic_name = topic_name + pool_topic_name_len + 1;
1877  } else {
1878  search_topic_name = topic_name;
1879  }
1880 
1881  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1882 }
1883 
1884 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1885 {
1887  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1888  char *new_topic_name;
1889  int ret;
1890 
1892  if (topic_pool_entry) {
1893  return topic_pool_entry->topic;
1894  }
1895 
1897  if (!topic_pool_entry) {
1898  return NULL;
1899  }
1900 
1901  /* To provide further detail and to ensure that the topic is unique within the scope of the
1902  * system we prefix it with the pooling topic name, which should itself already be unique.
1903  */
1904  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1905  if (ret < 0) {
1906  return NULL;
1907  }
1908 
1909  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1910  ast_free(new_topic_name);
1911  if (!topic_pool_entry->topic) {
1912  return NULL;
1913  }
1914 
1916  if (!topic_pool_entry->forward) {
1917  return NULL;
1918  }
1919 
1921  return NULL;
1922  }
1923 
1924  return topic_pool_entry->topic;
1925 }
1926 
1927 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1928 {
1930 
1931  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1932  if (!topic_pool_entry) {
1933  return 0;
1934  }
1935 
1936  ao2_ref(topic_pool_entry, -1);
1937  return 1;
1938 }
1939 
1941 {
1942 #ifdef AST_DEVMODE
1943  if (!stasis_message_type_declined(name)) {
1944  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1945  }
1946 #endif
1947 }
1948 
1949 /*! \brief A multi object blob data structure to carry user event stasis messages */
1951  struct ast_json *blob; /*< A blob of JSON data */
1952  AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1953 };
1954 
1955 /*!
1956  * \internal
1957  * \brief Destructor for \ref ast_multi_object_blob objects
1958  */
1959 static void multi_object_blob_dtor(void *obj)
1960 {
1961  struct ast_multi_object_blob *multi = obj;
1962  int type;
1963  int i;
1964 
1965  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1966  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1967  ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1968  }
1969  AST_VECTOR_FREE(&multi->snapshots[type]);
1970  }
1971  ast_json_unref(multi->blob);
1972 }
1973 
1974 /*! \brief Create a stasis user event multi object blob */
1976 {
1977  int type;
1978  struct ast_multi_object_blob *multi;
1979 
1980  ast_assert(blob != NULL);
1981 
1982  multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1983  if (!multi) {
1984  return NULL;
1985  }
1986 
1987  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1988  if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1989  ao2_ref(multi, -1);
1990 
1991  return NULL;
1992  }
1993  }
1994 
1995  multi->blob = ast_json_ref(blob);
1996 
1997  return multi;
1998 }
1999 
2000 /*! \brief Add an object (snapshot) to the blob */
2002  enum stasis_user_multi_object_snapshot_type type, void *object)
2003 {
2004  if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2005  ao2_cleanup(object);
2006  }
2007 }
2008 
2009 /*! \brief Publish single channel user event (for app_userevent compatibility) */
2011  struct stasis_message_type *type, struct ast_json *blob)
2012 {
2013  struct stasis_message *message;
2014  struct ast_channel_snapshot *channel_snapshot;
2015  struct ast_multi_object_blob *multi;
2016 
2017  if (!type) {
2018  return;
2019  }
2020 
2021  multi = ast_multi_object_blob_create(blob);
2022  if (!multi) {
2023  return;
2024  }
2025 
2026  channel_snapshot = ast_channel_snapshot_create(chan);
2027  if (!channel_snapshot) {
2028  ao2_ref(multi, -1);
2029  return;
2030  }
2031 
2032  /* this call steals the channel_snapshot reference */
2033  ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2034 
2035  message = stasis_message_create(type, multi);
2036  ao2_ref(multi, -1);
2037  if (message) {
2038  /* app_userevent still publishes to channel */
2039  stasis_publish(ast_channel_topic(chan), message);
2040  ao2_ref(message, -1);
2041  }
2042 }
2043 
2044 /*! \internal \brief convert multi object blob to ari json */
2046  struct stasis_message *message,
2047  const struct stasis_message_sanitizer *sanitize)
2048 {
2049  struct ast_json *out;
2050  struct ast_multi_object_blob *multi = stasis_message_data(message);
2051  struct ast_json *blob = multi->blob;
2052  const struct timeval *tv = stasis_message_timestamp(message);
2054  int i;
2055 
2056  out = ast_json_object_create();
2057  if (!out) {
2058  return NULL;
2059  }
2060 
2061  ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2062  ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2063  ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2064  ast_json_object_set(out, "userevent", ast_json_ref(blob));
2065 
2066  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2067  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2068  struct ast_json *json_object = NULL;
2069  char *name = NULL;
2070  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2071 
2072  switch (type) {
2073  case STASIS_UMOS_CHANNEL:
2074  json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2075  name = "channel";
2076  break;
2077  case STASIS_UMOS_BRIDGE:
2078  json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2079  name = "bridge";
2080  break;
2081  case STASIS_UMOS_ENDPOINT:
2082  json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2083  name = "endpoint";
2084  break;
2085  }
2086  if (json_object) {
2087  ast_json_object_set(out, name, json_object);
2088  }
2089  }
2090  }
2091 
2092  return out;
2093 }
2094 
2095 /*! \internal \brief convert multi object blob to ami string */
2096 static struct ast_str *multi_object_blob_to_ami(void *obj)
2097 {
2098  struct ast_str *ami_str=ast_str_create(1024);
2099  struct ast_str *ami_snapshot;
2100  const struct ast_multi_object_blob *multi = obj;
2102  int i;
2103 
2104  if (!ami_str) {
2105  return NULL;
2106  }
2107  if (!multi) {
2108  ast_free(ami_str);
2109  return NULL;
2110  }
2111 
2112  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2113  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2114  char *name = NULL;
2115  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2116  ami_snapshot = NULL;
2117 
2118  if (i > 0) {
2119  ast_asprintf(&name, "%d", i + 1);
2120  }
2121 
2122  switch (type) {
2123  case STASIS_UMOS_CHANNEL:
2124  ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2125  break;
2126 
2127  case STASIS_UMOS_BRIDGE:
2128  ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2129  break;
2130 
2131  case STASIS_UMOS_ENDPOINT:
2132  /* currently not sending endpoint snapshots to AMI */
2133  break;
2134  }
2135  if (ami_snapshot) {
2136  ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2137  ast_free(ami_snapshot);
2138  }
2139  ast_free(name);
2140  }
2141  }
2142 
2143  return ami_str;
2144 }
2145 
2146 /*! \internal \brief Callback to pass only user defined parameters from blob */
2147 static int userevent_exclusion_cb(const char *key)
2148 {
2149  if (!strcmp("eventname", key)) {
2150  return 1;
2151  }
2152  return 0;
2153 }
2154 
2156  struct stasis_message *message)
2157 {
2158  RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2159  RAII_VAR(struct ast_str *, body, NULL, ast_free);
2160  struct ast_multi_object_blob *multi = stasis_message_data(message);
2161  const char *eventname;
2162 
2163  eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2165  object_string = multi_object_blob_to_ami(multi);
2166  if (!object_string || !body) {
2167  return NULL;
2168  }
2169 
2170  return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2171  "%s"
2172  "UserEvent: %s\r\n"
2173  "%s",
2174  ast_str_buffer(object_string),
2175  eventname,
2176  ast_str_buffer(body));
2177 }
2178 
2179 /*! \brief A structure to hold global configuration-related options */
2181  /*! The list of message types to decline */
2183 };
2184 
2185 /*! \brief Threadpool configuration options */
2187  /*! Initial size of the thread pool */
2189  /*! Time, in seconds, before we expire a thread */
2191  /*! Maximum number of thread to allow */
2193 };
2194 
2196  /*! Thread pool configuration options */
2198  /*! Declined message types */
2200 };
2201 
2202 static struct aco_type threadpool_option = {
2203  .type = ACO_GLOBAL,
2204  .name = "threadpool",
2205  .item_offset = offsetof(struct stasis_config, threadpool_options),
2206  .category = "threadpool",
2207  .category_match = ACO_WHITELIST_EXACT,
2208 };
2209 
2210 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
2211 
2212 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2213 static struct aco_type declined_option = {
2214  .type = ACO_GLOBAL,
2215  .name = "declined_message_types",
2216  .item_offset = offsetof(struct stasis_config, declined_message_types),
2217  .category_match = ACO_WHITELIST_EXACT,
2218  .category = "declined_message_types",
2219 };
2220 
2221 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
2222 
2224  .filename = "stasis.conf",
2225  .types = ACO_TYPES(&declined_option, &threadpool_option),
2226 };
2227 
2228 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2230 
2231 static void *stasis_config_alloc(void);
2232 
2233 /*! \brief Register information about the configs being processed by this module */
2234 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
2235  .files = ACO_FILES(&stasis_conf),
2236 );
2237 
2239 {
2240  struct stasis_declined_config *declined = obj;
2241 
2242  ao2_cleanup(declined->declined);
2243 }
2244 
2245 static void stasis_config_destructor(void *obj)
2246 {
2247  struct stasis_config *cfg = obj;
2248 
2251 }
2252 
2253 static void *stasis_config_alloc(void)
2254 {
2255  struct stasis_config *cfg;
2256 
2257  if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2258  return NULL;
2259  }
2260 
2261  cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2262  if (!cfg->threadpool_options) {
2263  ao2_ref(cfg, -1);
2264  return NULL;
2265  }
2266 
2269  if (!cfg->declined_message_types) {
2270  ao2_ref(cfg, -1);
2271  return NULL;
2272  }
2273 
2275  if (!cfg->declined_message_types->declined) {
2276  ao2_ref(cfg, -1);
2277  return NULL;
2278  }
2279 
2280  return cfg;
2281 }
2282 
2284 {
2285  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2286  char *name_in_declined;
2287  int res;
2288 
2289  if (!cfg || !cfg->declined_message_types) {
2290  ao2_cleanup(cfg);
2291  return 0;
2292  }
2293 
2294  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2295  res = name_in_declined ? 1 : 0;
2296  ao2_cleanup(name_in_declined);
2297  ao2_ref(cfg, -1);
2298  if (res) {
2299  ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2300  }
2301  return res;
2302 }
2303 
2304 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2305 {
2306  struct stasis_declined_config *declined = obj;
2307 
2308  if (ast_strlen_zero(var->value)) {
2309  return 0;
2310  }
2311 
2312  if (ast_str_container_add(declined->declined, var->value)) {
2313  return -1;
2314  }
2315 
2316  return 0;
2317 }
2318 
2319 /*!
2320  * @{ \brief Define multi user event message type(s).
2321  */
2322 
2324  .to_json = multi_user_event_to_json,
2326  );
2327 
2328 /*! @} */
2329 
2330 /*!
2331  * \internal
2332  * \brief CLI command implementation for 'stasis show topics'
2333  */
2334 static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2335 {
2336  struct ao2_iterator iter;
2337  struct topic_proxy *topic;
2338  struct ao2_container *tmp_container;
2339  int count = 0;
2340 #define FMT_HEADERS "%-64s %-64s\n"
2341 #define FMT_FIELDS "%-64s %-64s\n"
2342 
2343  switch (cmd) {
2344  case CLI_INIT:
2345  e->command = "stasis show topics";
2346  e->usage =
2347  "Usage: stasis show topics\n"
2348  " Shows a list of topics\n";
2349  return NULL;
2350  case CLI_GENERATE:
2351  return NULL;
2352  }
2353 
2354  if (a->argc != e->args) {
2355  return CLI_SHOWUSAGE;
2356  }
2357 
2358  ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2359 
2361  topic_proxy_sort_fn, NULL);
2362 
2363  if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2364  ao2_cleanup(tmp_container);
2365 
2366  return NULL;
2367  }
2368 
2369  /* getting all topic in order */
2370  iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2371  while ((topic = ao2_iterator_next(&iter))) {
2372  ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2373  ao2_ref(topic, -1);
2374  ++count;
2375  }
2376  ao2_iterator_destroy(&iter);
2377  ao2_cleanup(tmp_container);
2378 
2379  ast_cli(a->fd, "\n%d Total topics\n\n", count);
2380 
2381 #undef FMT_HEADERS
2382 #undef FMT_FIELDS
2383 
2384  return CLI_SUCCESS;
2385 }
2386 
2387 /*!
2388  * \internal
2389  * \brief CLI tab completion for topic names
2390  */
2391 static char *topic_complete_name(const char *word)
2392 {
2393  struct topic_proxy *topic;
2394  struct ao2_iterator it;
2395  int wordlen = strlen(word);
2396  int ret;
2397 
2398  it = ao2_iterator_init(topic_all, 0);
2399  while ((topic = ao2_iterator_next(&it))) {
2400  if (!strncasecmp(word, topic->name, wordlen)) {
2401  ret = ast_cli_completion_add(ast_strdup(topic->name));
2402  if (ret) {
2403  ao2_ref(topic, -1);
2404  break;
2405  }
2406  }
2407  ao2_ref(topic, -1);
2408  }
2409  ao2_iterator_destroy(&it);
2410  return NULL;
2411 }
2412 
2413 /*!
2414  * \internal
2415  * \brief CLI command implementation for 'stasis show topic'
2416  */
2417 static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2418 {
2419  struct stasis_topic *topic;
2420  char print_time[32];
2421  int i;
2422 
2423  switch (cmd) {
2424  case CLI_INIT:
2425  e->command = "stasis show topic";
2426  e->usage =
2427  "Usage: stasis show topic <name>\n"
2428  " Show stasis topic detail info.\n";
2429  return NULL;
2430  case CLI_GENERATE:
2431  if (a->pos == 3) {
2432  return topic_complete_name(a->word);
2433  } else {
2434  return NULL;
2435  }
2436  }
2437 
2438  if (a->argc != 4) {
2439  return CLI_SHOWUSAGE;
2440  }
2441 
2442  topic = stasis_topic_get(a->argv[3]);
2443  if (!topic) {
2444  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2445  return CLI_FAILURE;
2446  }
2447 
2448  ast_cli(a->fd, "Name: %s\n", topic->name);
2449  ast_cli(a->fd, "Detail: %s\n", topic->detail);
2450  ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2451  ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2452  ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2453  ast_cli(a->fd, "Duration time: %s\n", print_time);
2454 
2455  ao2_lock(topic);
2456  ast_cli(a->fd, "\nSubscribers:\n");
2457  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2458  struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2459  ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2460  subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2461  }
2462 
2463  ast_cli(a->fd, "\nForwarded topics:\n");
2464  for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2465  struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2466  ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2467  }
2468  ao2_unlock(topic);
2469 
2470  ao2_ref(topic, -1);
2471 
2472  return CLI_SUCCESS;
2473 }
2474 
2475 
2476 static struct ast_cli_entry cli_stasis[] = {
2477  AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2478  AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2479 };
2480 
2481 
2482 #ifdef AST_DEVMODE
2483 
2484 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2485 
2486 /*!
2487  * \internal
2488  * \brief CLI command implementation for 'stasis statistics show subscriptions'
2489  */
2490 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2491 {
2492  struct ao2_container *sorted_subscriptions;
2493  struct ao2_container *subscription_stats;
2494  struct ao2_iterator iter;
2495  struct stasis_subscription_statistics *statistics;
2496  int count = 0;
2497  int dropped = 0;
2498  int passed = 0;
2499 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2500 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2501 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2502 
2503  switch (cmd) {
2504  case CLI_INIT:
2505  e->command = "stasis statistics show subscriptions";
2506  e->usage =
2507  "Usage: stasis statistics show subscriptions\n"
2508  " Shows a list of subscriptions and their general statistics\n";
2509  return NULL;
2510  case CLI_GENERATE:
2511  return NULL;
2512  }
2513 
2514  if (a->argc != e->args) {
2515  return CLI_SHOWUSAGE;
2516  }
2517 
2518  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2519  if (!subscription_stats) {
2520  ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2521  return CLI_FAILURE;
2522  }
2523 
2524  sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2525  stasis_subscription_statistics_sort_fn, NULL);
2526  if (!sorted_subscriptions) {
2527  ao2_ref(subscription_stats, -1);
2528  ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2529  return CLI_SUCCESS;
2530  }
2531 
2532  if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2533  ao2_ref(sorted_subscriptions, -1);
2534  ao2_ref(subscription_stats, -1);
2535  ast_cli(a->fd, "Could not sort subscription statistics\n");
2536  return CLI_SUCCESS;
2537  }
2538 
2539  ao2_ref(subscription_stats, -1);
2540 
2541  ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2542 
2543  iter = ao2_iterator_init(sorted_subscriptions, 0);
2544  while ((statistics = ao2_iterator_next(&iter))) {
2545  ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2546  statistics->lowest_time_invoked, statistics->highest_time_invoked);
2547  dropped += statistics->messages_dropped;
2548  passed += statistics->messages_passed;
2549  ao2_ref(statistics, -1);
2550  ++count;
2551  }
2552  ao2_iterator_destroy(&iter);
2553 
2554  ao2_ref(sorted_subscriptions, -1);
2555 
2556  ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2557  ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2558 
2559 #undef FMT_HEADERS
2560 #undef FMT_FIELDS
2561 #undef FMT_FIELDS2
2562 
2563  return CLI_SUCCESS;
2564 }
2565 
2566 /*!
2567  * \internal
2568  * \brief CLI tab completion for subscription statistics names
2569  */
2570 static char *subscription_statistics_complete_name(const char *word, int state)
2571 {
2572  struct stasis_subscription_statistics *statistics;
2573  struct ao2_container *subscription_stats;
2574  struct ao2_iterator it_statistics;
2575  int wordlen = strlen(word);
2576  int which = 0;
2577  char *result = NULL;
2578 
2579  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2580  if (!subscription_stats) {
2581  return result;
2582  }
2583 
2584  it_statistics = ao2_iterator_init(subscription_stats, 0);
2585  while ((statistics = ao2_iterator_next(&it_statistics))) {
2586  if (!strncasecmp(word, statistics->uniqueid, wordlen)
2587  && ++which > state) {
2588  result = ast_strdup(statistics->uniqueid);
2589  }
2590  ao2_ref(statistics, -1);
2591  if (result) {
2592  break;
2593  }
2594  }
2595  ao2_iterator_destroy(&it_statistics);
2596  ao2_ref(subscription_stats, -1);
2597  return result;
2598 }
2599 
2600 /*!
2601  * \internal
2602  * \brief CLI command implementation for 'stasis statistics show subscription'
2603  */
2604 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2605 {
2606  struct stasis_subscription_statistics *statistics;
2607  struct ao2_container *subscription_stats;
2608  struct ao2_iterator i;
2609  char *name;
2610 
2611  switch (cmd) {
2612  case CLI_INIT:
2613  e->command = "stasis statistics show subscription";
2614  e->usage =
2615  "Usage: stasis statistics show subscription <uniqueid>\n"
2616  " Show stasis subscription statistics.\n";
2617  return NULL;
2618  case CLI_GENERATE:
2619  if (a->pos == 4) {
2620  return subscription_statistics_complete_name(a->word, a->n);
2621  } else {
2622  return NULL;
2623  }
2624  }
2625 
2626  if (a->argc != 5) {
2627  return CLI_SHOWUSAGE;
2628  }
2629 
2630  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2631  if (!subscription_stats) {
2632  ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2633  return CLI_FAILURE;
2634  }
2635 
2636  statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2637  if (!statistics) {
2638  ao2_ref(subscription_stats, -1);
2639  ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2640  return CLI_FAILURE;
2641  }
2642 
2643  ao2_ref(subscription_stats, -1);
2644 
2645  ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2646  ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2647  ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2648  ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2649  ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2650  ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2651  ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2652  ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2653  ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2654  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2655  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2656 
2657  ao2_lock(statistics);
2658  if (statistics->highest_time_message_type) {
2659  ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2660  }
2661  ao2_unlock(statistics);
2662 
2663  ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2664 
2665  ast_cli(a->fd, "Subscribed topics:\n");
2666  i = ao2_iterator_init(statistics->topics, 0);
2667  while ((name = ao2_iterator_next(&i))) {
2668  ast_cli(a->fd, "\t%s\n", name);
2669  ao2_ref(name, -1);
2670  }
2672 
2673  ao2_ref(statistics, -1);
2674 
2675  return CLI_SUCCESS;
2676 }
2677 
2678 AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2679 
2680 /*!
2681  * \internal
2682  * \brief CLI command implementation for 'stasis statistics show topics'
2683  */
2684 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2685 {
2686  struct ao2_container *sorted_topics;
2687  struct ao2_container *topic_stats;
2688  struct ao2_iterator iter;
2689  struct stasis_topic_statistics *statistics;
2690  int count = 0;
2691  int not_dispatched = 0;
2692  int dispatched = 0;
2693 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2694 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2695 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2696 
2697  switch (cmd) {
2698  case CLI_INIT:
2699  e->command = "stasis statistics show topics";
2700  e->usage =
2701  "Usage: stasis statistics show topics\n"
2702  " Shows a list of topics and their general statistics\n";
2703  return NULL;
2704  case CLI_GENERATE:
2705  return NULL;
2706  }
2707 
2708  if (a->argc != e->args) {
2709  return CLI_SHOWUSAGE;
2710  }
2711 
2712  topic_stats = ao2_global_obj_ref(topic_statistics);
2713  if (!topic_stats) {
2714  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2715  return CLI_FAILURE;
2716  }
2717 
2719  stasis_topic_statistics_sort_fn, NULL);
2720  if (!sorted_topics) {
2721  ao2_ref(topic_stats, -1);
2722  ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2723  return CLI_SUCCESS;
2724  }
2725 
2726  if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2727  ao2_ref(sorted_topics, -1);
2728  ao2_ref(topic_stats, -1);
2729  ast_cli(a->fd, "Could not sort topic statistics\n");
2730  return CLI_SUCCESS;
2731  }
2732 
2733  ao2_ref(topic_stats, -1);
2734 
2735  ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2736 
2737  iter = ao2_iterator_init(sorted_topics, 0);
2738  while ((statistics = ao2_iterator_next(&iter))) {
2739  ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2740  statistics->messages_not_dispatched, statistics->messages_dispatched,
2741  statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742  not_dispatched += statistics->messages_not_dispatched;
2743  dispatched += statistics->messages_dispatched;
2744  ao2_ref(statistics, -1);
2745  ++count;
2746  }
2747  ao2_iterator_destroy(&iter);
2748 
2749  ao2_ref(sorted_topics, -1);
2750 
2751  ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2752  ast_cli(a->fd, "\n%d topics\n\n", count);
2753 
2754 #undef FMT_HEADERS
2755 #undef FMT_FIELDS
2756 #undef FMT_FIELDS2
2757 
2758  return CLI_SUCCESS;
2759 }
2760 
2761 /*!
2762  * \internal
2763  * \brief CLI tab completion for topic statistics names
2764  */
2765 static char *topic_statistics_complete_name(const char *word, int state)
2766 {
2767  struct stasis_topic_statistics *statistics;
2768  struct ao2_container *topic_stats;
2769  struct ao2_iterator it_statistics;
2770  int wordlen = strlen(word);
2771  int which = 0;
2772  char *result = NULL;
2773 
2774  topic_stats = ao2_global_obj_ref(topic_statistics);
2775  if (!topic_stats) {
2776  return result;
2777  }
2778 
2779  it_statistics = ao2_iterator_init(topic_stats, 0);
2780  while ((statistics = ao2_iterator_next(&it_statistics))) {
2781  if (!strncasecmp(word, statistics->name, wordlen)
2782  && ++which > state) {
2783  result = ast_strdup(statistics->name);
2784  }
2785  ao2_ref(statistics, -1);
2786  if (result) {
2787  break;
2788  }
2789  }
2790  ao2_iterator_destroy(&it_statistics);
2791  ao2_ref(topic_stats, -1);
2792  return result;
2793 }
2794 
2795 /*!
2796  * \internal
2797  * \brief CLI command implementation for 'stasis statistics show topic'
2798  */
2799 static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2800 {
2801  struct stasis_topic_statistics *statistics;
2802  struct ao2_container *topic_stats;
2803  struct ao2_iterator i;
2804  char *uniqueid;
2805 
2806  switch (cmd) {
2807  case CLI_INIT:
2808  e->command = "stasis statistics show topic";
2809  e->usage =
2810  "Usage: stasis statistics show topic <name>\n"
2811  " Show stasis topic statistics.\n";
2812  return NULL;
2813  case CLI_GENERATE:
2814  if (a->pos == 4) {
2815  return topic_statistics_complete_name(a->word, a->n);
2816  } else {
2817  return NULL;
2818  }
2819  }
2820 
2821  if (a->argc != 5) {
2822  return CLI_SHOWUSAGE;
2823  }
2824 
2825  topic_stats = ao2_global_obj_ref(topic_statistics);
2826  if (!topic_stats) {
2827  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2828  return CLI_FAILURE;
2829  }
2830 
2831  statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2832  if (!statistics) {
2833  ao2_ref(topic_stats, -1);
2834  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2835  return CLI_FAILURE;
2836  }
2837 
2838  ao2_ref(topic_stats, -1);
2839 
2840  ast_cli(a->fd, "Topic: %s\n", statistics->name);
2841  ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2842  ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843  ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2846  ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2847 
2848  ast_cli(a->fd, "Subscribers:\n");
2849  i = ao2_iterator_init(statistics->subscribers, 0);
2850  while ((uniqueid = ao2_iterator_next(&i))) {
2851  ast_cli(a->fd, "\t%s\n", uniqueid);
2852  ao2_ref(uniqueid, -1);
2853  }
2855 
2856  ao2_ref(statistics, -1);
2857 
2858  return CLI_SUCCESS;
2859 }
2860 
2861 /*!
2862  * \internal
2863  * \brief CLI command implementation for 'stasis statistics show messages'
2864  */
2865 static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2866 {
2867  int i;
2868  int count = 0;
2869  int published = 0;
2870  int unused = 0;
2871 #define FMT_HEADERS "%-64s %10s %10s\n"
2872 #define FMT_FIELDS "%-64s %10d %10d\n"
2873 
2874  switch (cmd) {
2875  case CLI_INIT:
2876  e->command = "stasis statistics show messages";
2877  e->usage =
2878  "Usage: stasis statistics show messages\n"
2879  " Shows a list of message types and their general statistics\n";
2880  return NULL;
2881  case CLI_GENERATE:
2882  return NULL;
2883  }
2884 
2885  if (a->argc != e->args) {
2886  return CLI_SHOWUSAGE;
2887  }
2888 
2889  ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2890 
2891  ast_mutex_lock(&message_type_statistics_lock);
2892  for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2893  struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2894 
2895  if (!statistics->message_type) {
2896  continue;
2897  }
2898 
2899  ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2900  statistics->unused);
2901  published += statistics->published;
2902  unused += statistics->unused;
2903  ++count;
2904  }
2905  ast_mutex_unlock(&message_type_statistics_lock);
2906 
2907  ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2908  ast_cli(a->fd, "\n%d seen message types\n\n", count);
2909 
2910 #undef FMT_HEADERS
2911 #undef FMT_FIELDS
2912 
2913  return CLI_SUCCESS;
2914 }
2915 
2916 static struct ast_cli_entry cli_stasis_statistics[] = {
2917  AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2918  AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2919  AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2920  AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2921  AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2922 };
2923 
2924 static int subscription_statistics_hash(const void *obj, const int flags)
2925 {
2926  const struct stasis_subscription_statistics *object;
2927  const char *key;
2928 
2929  switch (flags & OBJ_SEARCH_MASK) {
2930  case OBJ_SEARCH_KEY:
2931  key = obj;
2932  break;
2933  case OBJ_SEARCH_OBJECT:
2934  object = obj;
2935  key = object->uniqueid;
2936  break;
2937  default:
2938  /* Hash can only work on something with a full key. */
2939  ast_assert(0);
2940  return 0;
2941  }
2942  return ast_str_case_hash(key);
2943 }
2944 
2945 static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2946 {
2947  const struct stasis_subscription_statistics *object_left = obj;
2948  const struct stasis_subscription_statistics *object_right = arg;
2949  const char *right_key = arg;
2950  int cmp;
2951 
2952  switch (flags & OBJ_SEARCH_MASK) {
2953  case OBJ_SEARCH_OBJECT:
2954  right_key = object_right->uniqueid;
2955  /* Fall through */
2956  case OBJ_SEARCH_KEY:
2957  cmp = strcasecmp(object_left->uniqueid, right_key);
2958  break;
2960  /* Not supported by container */
2961  ast_assert(0);
2962  cmp = -1;
2963  break;
2964  default:
2965  /*
2966  * What arg points to is specific to this traversal callback
2967  * and has no special meaning to astobj2.
2968  */
2969  cmp = 0;
2970  break;
2971  }
2972  if (cmp) {
2973  return 0;
2974  }
2975  /*
2976  * At this point the traversal callback is identical to a sorted
2977  * container.
2978  */
2979  return CMP_MATCH;
2980 }
2981 
2982 static int topic_statistics_hash(const void *obj, const int flags)
2983 {
2984  const struct stasis_topic_statistics *object;
2985  const char *key;
2986 
2987  switch (flags & OBJ_SEARCH_MASK) {
2988  case OBJ_SEARCH_KEY:
2989  key = obj;
2990  break;
2991  case OBJ_SEARCH_OBJECT:
2992  object = obj;
2993  key = object->name;
2994  break;
2995  default:
2996  /* Hash can only work on something with a full key. */
2997  ast_assert(0);
2998  return 0;
2999  }
3000  return ast_str_case_hash(key);
3001 }
3002 
3003 static int topic_statistics_cmp(void *obj, void *arg, int flags)
3004 {
3005  const struct stasis_topic_statistics *object_left = obj;
3006  const struct stasis_topic_statistics *object_right = arg;
3007  const char *right_key = arg;
3008  int cmp;
3009 
3010  switch (flags & OBJ_SEARCH_MASK) {
3011  case OBJ_SEARCH_OBJECT:
3012  right_key = object_right->name;
3013  /* Fall through */
3014  case OBJ_SEARCH_KEY:
3015  cmp = strcasecmp(object_left->name, right_key);
3016  break;
3018  /* Not supported by container */
3019  ast_assert(0);
3020  cmp = -1;
3021  break;
3022  default:
3023  /*
3024  * What arg points to is specific to this traversal callback
3025  * and has no special meaning to astobj2.
3026  */
3027  cmp = 0;
3028  break;
3029  }
3030  if (cmp) {
3031  return 0;
3032  }
3033  /*
3034  * At this point the traversal callback is identical to a sorted
3035  * container.
3036  */
3037  return CMP_MATCH;
3038 }
3039 #endif
3040 
3041 /*! \brief Cleanup function for graceful shutdowns */
3042 static void stasis_cleanup(void)
3043 {
3044 #ifdef AST_DEVMODE
3045  ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3046  AST_VECTOR_FREE(&message_type_statistics);
3047  ao2_global_obj_release(subscription_statistics);
3048  ao2_global_obj_release(topic_statistics);
3049 #endif
3050  ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
3051  ao2_cleanup(topic_all);
3052  topic_all = NULL;
3053  ast_threadpool_shutdown(threadpool);
3054  threadpool = NULL;
3057  aco_info_destroy(&cfg_info);
3059 }
3060 
3061 int stasis_init(void)
3062 {
3063  struct stasis_config *cfg;
3064  int cache_init;
3065  struct ast_threadpool_options threadpool_opts = { 0, };
3066 #ifdef AST_DEVMODE
3067  struct ao2_container *subscription_stats;
3068  struct ao2_container *topic_stats;
3069 #endif
3070 
3071  /* Be sure the types are cleaned up after the message bus */
3073 
3074  if (aco_info_init(&cfg_info)) {
3075  return -1;
3076  }
3077 
3078  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3079  declined_options, "", declined_handler, 0);
3080  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3081  threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
3082  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3083  INT_MAX);
3084  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3085  threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
3086  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3087  INT_MAX);
3088  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3089  threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
3090  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3091  INT_MAX);
3092 
3093  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3094  struct stasis_config *default_cfg = stasis_config_alloc();
3095 
3096  if (!default_cfg) {
3097  return -1;
3098  }
3099 
3100  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3101  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3102  ao2_ref(default_cfg, -1);
3103 
3104  return -1;
3105  }
3106 
3107  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3108  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3109  ao2_ref(default_cfg, -1);
3110 
3111  return -1;
3112  }
3113 
3114  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3115  ao2_global_obj_replace_unref(globals, default_cfg);
3116  cfg = default_cfg;
3117  } else {
3118  cfg = ao2_global_obj_ref(globals);
3119  if (!cfg) {
3120  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3121 
3122  return -1;
3123  }
3124  }
3125 
3126  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3127  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3128  threadpool_opts.auto_increment = 1;
3129  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3130  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3131  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3132  ao2_ref(cfg, -1);
3133  if (!threadpool) {
3134  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3135 
3136  return -1;
3137  }
3138 
3139  cache_init = stasis_cache_init();
3140  if (cache_init != 0) {
3141  return -1;
3142  }
3143 
3145  return -1;
3146  }
3148  return -1;
3149  }
3150 
3152  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3153  if (!topic_all) {
3154  return -1;
3155  }
3156 
3157  if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
3158  return -1;
3159  }
3160 
3161 #ifdef AST_DEVMODE
3162  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3163  * topic or subscripton.
3164  */
3165  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3166  subscription_statistics_hash, 0, subscription_statistics_cmp);
3167  if (!subscription_stats) {
3168  return -1;
3169  }
3170  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3171  ao2_cleanup(subscription_stats);
3172 
3173  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3174  topic_statistics_hash, 0, topic_statistics_cmp);
3175  if (!topic_stats) {
3176  return -1;
3177  }
3178  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3179  ao2_cleanup(topic_stats);
3180  if (!topic_stats) {
3181  return -1;
3182  }
3183 
3184  AST_VECTOR_INIT(&message_type_statistics, 0);
3185 
3186  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3187  return -1;
3188  }
3189 #endif
3190 
3191  return 0;
3192 }
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1627
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
static struct aco_type threadpool_option
Definition: stasis.c:2202
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1231
static const char type[]
Definition: chan_ooh323.c:109
Struct containing info for an AMI event to send out.
Definition: manager.h:491
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2417
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
static void statistics(void)
Definition: utils/frame.c:287
Main Channel structure associated with a channel.
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
#define FMT_FIELDS
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads...
Internal Stasis APIs.
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:585
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1975
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
struct stasis_topic * topic
Definition: stasis.c:685
static int message_type_id
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1364
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:305
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
static void topic_dtor(void *obj)
Definition: stasis.c:434
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
Definition: stasis.c:1709
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1744
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1784
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3042
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
descriptor for a cli entry.
Definition: cli.h:171
const int argc
Definition: cli.h:160
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
ast_cond_t join_cond
Definition: stasis.c:694
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1055
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1940
Threadpool configuration options.
Definition: stasis.c:2186
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
Structure for variables, used for configurations and for channel variables.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define var
Definition: ast_expr2f.c:614
Structure representing a snapshot of channel state.
Universally unique identifier support.
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
static int sub_cleanup(void *data)
Definition: stasis.c:966
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1724
Definition: cli.h:152
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1884
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1715
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:222
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
struct aco_type * declined_options[]
Definition: stasis.c:2221
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1091
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2238
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
static void forward_dtor(void *obj)
Definition: stasis.c:1538
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:710
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ast_mutex_lock(a)
Definition: lock.h:187
char * detail
Definition: stasis.c:402
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:931
#define ao2_unlock(a)
Definition: astobj2.h:730
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:98
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define NULL
Definition: resample.c:96
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2155
#define ao2_wrlock(a)
Definition: astobj2.h:720
The representation of a single configuration file to be processed.
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
struct aco_file stasis_conf
Definition: stasis.c:2223
enum aco_type_t type
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1620
#define ast_cond_signal(cond)
Definition: lock.h:201
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
struct ao2_container * pool_container
Definition: stasis.c:1740
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1927
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:426
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9727
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate...
struct stasis_topic * from_topic
Definition: stasis.c:1533
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
Utility functions.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
int final_message_processed
Definition: stasis.c:700
int args
This gets set in ast_cli_register()
Definition: cli.h:185
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1442
pthread_cond_t ast_cond_t
Definition: lock.h:176
char * name
Definition: stasis.c:401
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1821
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2045
#define ao2_bump(obj)
Definition: astobj2.h:491
ast_cond_t cond
Definition: stasis.c:1278
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1763
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
struct ao2_container * declined
Definition: stasis.c:2182
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1432
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
char buf[0]
Definition: stasis.c:406
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
static char * topic_complete_name(const char *word)
Definition: stasis.c:2391
ast_mutex_t lock
Definition: stasis.c:1277
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const int fd
Definition: cli.h:159
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2283
const int n
Definition: cli.h:165
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:501
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1648
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1676
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2334
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1357
struct stasis_topic * to_topic
Definition: stasis.c:1535
struct ao2_container * container
Definition: res_fax.c:502
static struct console_pvt globals
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
int final_message_rxed
Definition: stasis.c:697
struct stasis_topic * topic
Definition: stasis.c:1711
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2147
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
Their was an error and no changes were applied.
struct stasis_topic * topic
Definition: stasis.h:893
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1314
const char *const * argv
Definition: cli.h:161
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
struct timeval creationtime
Definition: stasis.c:404
Configuration option-handling.
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2476
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
static void * stasis_config_alloc(void)
Definition: stasis.c:2253
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:586
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
#define ao2_global_obj_release(holder)
Definition: astobj2.h:865
#define EVENT_FLAG_USER
Definition: manager.h:77
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Local data parameter.
struct ao2_container * topic_all
Definition: stasis.c:396
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
#define LOG_NOTICE
Definition: logger.h:263
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2096
#define ast_strlen_zero(a)
Definition: muted.c:73
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1768
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
#define CLI_FAILURE
Definition: cli.h:46
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:973
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:569
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
char * command
Definition: cli.h:186
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1959
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1864
const char * word
Definition: cli.h:163
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213
Vector container support.
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:811
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1203
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:413
int stasis_cache_init(void)
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1288
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2001
const char * usage
Definition: cli.h:177
stasis_subscription_cb callback
Definition: stasis.c:689
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define CLI_SUCCESS
Definition: cli.h:44
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:389
#define ao2_global_obj_replace_unref(holder, obj)
Definition: astobj2.h:908
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2010
#define ACO_FILES(...)
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
char name[0]
Definition: stasis.c:1712
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:636
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct stasis_topic * pool_topic
Definition: stasis.c:1741
FILE * out
Definition: utils/frame.c:33
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:297
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct stasis_forward * forward
Definition: stasis.c:1710
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
Type information about a category-level configurable object.
An opaque threadpool structure.
Definition: threadpool.c:36
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:302
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:79
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
const int pos
Definition: cli.h:164
const char * filename
A structure to hold global configuration-related options.
Definition: stasis.c:2180
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
static PGresult * result
Definition: cel_pgsql.c:88
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AO2_WEAKPROXY()
Macro which must be used at the beginning of weakproxy capable objects.
Definition: astobj2.h:539
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription&#39;s callback.
Definition: stasis.c:756
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
#define FMT_HEADERS
Abstract JSON element (object, array, string, int, ...).
struct ast_json * blob
Definition: stasis.c:1951
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:557
Definition: search.h:40
Forwarding information.
Definition: stasis.c:1531
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
void * task_data
Definition: stasis.c:1280
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623
#define ast_mutex_init(pmutex)
Definition: lock.h:184
Generic container type.
Search option field mask.
Definition: astobj2.h:1076
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: main/utils.c:2049
static struct aco_type * threadpool_options[]
Definition: stasis.c:2210
#define ast_mutex_destroy(a)
Definition: lock.h:186
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2726
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
static void subscription_dtor(void *obj)
Definition: stasis.c:715
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:308
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2087
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2304
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:311
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
Endpoint abstractions.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2245
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
Definition: stasis.c:1833
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:206
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:518
Structure for mutex and tracking information.
Definition: lock.h:135
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1250
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:318
short word
#define ast_mutex_unlock(a)
Definition: lock.h:188
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1261
static struct test_val a
#define ao2_link(container, obj)
Definition: astobj2.h:1549
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3061