Asterisk - The Open Source Telephony Project  GIT-master-932eae6
stasis.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
44 #include "asterisk/cli.h"
45 
46 /*** DOCUMENTATION
47  <managerEvent language="en_US" name="UserEvent">
48  <managerEventInstance class="EVENT_FLAG_USER">
49  <synopsis>A user defined event raised from the dialplan.</synopsis>
50  <syntax>
51  <channel_snapshot/>
52  <parameter name="UserEvent">
53  <para>The event name, as specified in the dialplan.</para>
54  </parameter>
55  </syntax>
56  <description>
57  <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58  </description>
59  <see-also>
60  <ref type="application">UserEvent</ref>
61  <ref type="managerEvent">UserEvent</ref>
62  </see-also>
63  </managerEventInstance>
64  </managerEvent>
65  <configInfo name="stasis" language="en_US">
66  <configFile name="stasis.conf">
67  <configObject name="threadpool">
68  <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69  <configOption name="initial_size" default="5">
70  <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71  </configOption>
72  <configOption name="idle_timeout_sec" default="20">
73  <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74  </configOption>
75  <configOption name="max_size" default="50">
76  <synopsis>Maximum number of threads in the threadpool.</synopsis>
77  </configOption>
78  </configObject>
79  <configObject name="declined_message_types">
80  <synopsis>Stasis message types for which to decline creation.</synopsis>
81  <configOption name="decline">
82  <synopsis>The message type to decline.</synopsis>
83  <description>
84  <para>This configuration option defines the name of the Stasis
85  message type that Asterisk is forbidden from creating and can be
86  specified as many times as necessary to achieve the desired result.</para>
87  <enumlist>
88  <enum name="stasis_app_recording_snapshot_type" />
89  <enum name="stasis_app_playback_snapshot_type" />
90  <enum name="stasis_test_message_type" />
91  <enum name="confbridge_start_type" />
92  <enum name="confbridge_end_type" />
93  <enum name="confbridge_join_type" />
94  <enum name="confbridge_leave_type" />
95  <enum name="confbridge_start_record_type" />
96  <enum name="confbridge_stop_record_type" />
97  <enum name="confbridge_mute_type" />
98  <enum name="confbridge_unmute_type" />
99  <enum name="confbridge_talking_type" />
100  <enum name="cel_generic_type" />
101  <enum name="ast_bridge_snapshot_type" />
102  <enum name="ast_bridge_merge_message_type" />
103  <enum name="ast_channel_entered_bridge_type" />
104  <enum name="ast_channel_left_bridge_type" />
105  <enum name="ast_blind_transfer_type" />
106  <enum name="ast_attended_transfer_type" />
107  <enum name="ast_endpoint_snapshot_type" />
108  <enum name="ast_endpoint_state_type" />
109  <enum name="ast_device_state_message_type" />
110  <enum name="ast_test_suite_message_type" />
111  <enum name="ast_mwi_state_type" />
112  <enum name="ast_mwi_vm_app_type" />
113  <enum name="ast_format_register_type" />
114  <enum name="ast_format_unregister_type" />
115  <enum name="ast_manager_get_generic_type" />
116  <enum name="ast_parked_call_type" />
117  <enum name="ast_channel_snapshot_type" />
118  <enum name="ast_channel_dial_type" />
119  <enum name="ast_channel_varset_type" />
120  <enum name="ast_channel_hangup_request_type" />
121  <enum name="ast_channel_dtmf_begin_type" />
122  <enum name="ast_channel_dtmf_end_type" />
123  <enum name="ast_channel_hold_type" />
124  <enum name="ast_channel_unhold_type" />
125  <enum name="ast_channel_chanspy_start_type" />
126  <enum name="ast_channel_chanspy_stop_type" />
127  <enum name="ast_channel_fax_type" />
128  <enum name="ast_channel_hangup_handler_type" />
129  <enum name="ast_channel_moh_start_type" />
130  <enum name="ast_channel_moh_stop_type" />
131  <enum name="ast_channel_monitor_start_type" />
132  <enum name="ast_channel_monitor_stop_type" />
133  <enum name="ast_channel_mixmonitor_start_type" />
134  <enum name="ast_channel_mixmonitor_stop_type" />
135  <enum name="ast_channel_mixmonitor_mute_type" />
136  <enum name="ast_channel_agent_login_type" />
137  <enum name="ast_channel_agent_logoff_type" />
138  <enum name="ast_channel_talking_start" />
139  <enum name="ast_channel_talking_stop" />
140  <enum name="ast_security_event_type" />
141  <enum name="ast_named_acl_change_type" />
142  <enum name="ast_local_bridge_type" />
143  <enum name="ast_local_optimization_begin_type" />
144  <enum name="ast_local_optimization_end_type" />
145  <enum name="stasis_subscription_change_type" />
146  <enum name="ast_multi_user_event_type" />
147  <enum name="stasis_cache_clear_type" />
148  <enum name="stasis_cache_update_type" />
149  <enum name="ast_network_change_type" />
150  <enum name="ast_system_registry_type" />
151  <enum name="ast_cc_available_type" />
152  <enum name="ast_cc_offertimerstart_type" />
153  <enum name="ast_cc_requested_type" />
154  <enum name="ast_cc_requestacknowledged_type" />
155  <enum name="ast_cc_callerstopmonitoring_type" />
156  <enum name="ast_cc_callerstartmonitoring_type" />
157  <enum name="ast_cc_callerrecalling_type" />
158  <enum name="ast_cc_recallcomplete_type" />
159  <enum name="ast_cc_failure_type" />
160  <enum name="ast_cc_monitorfailed_type" />
161  <enum name="ast_presence_state_message_type" />
162  <enum name="ast_rtp_rtcp_sent_type" />
163  <enum name="ast_rtp_rtcp_received_type" />
164  <enum name="ast_call_pickup_type" />
165  <enum name="aoc_s_type" />
166  <enum name="aoc_d_type" />
167  <enum name="aoc_e_type" />
168  <enum name="dahdichannel_type" />
169  <enum name="mcid_type" />
170  <enum name="session_timeout_type" />
171  <enum name="cdr_read_message_type" />
172  <enum name="cdr_write_message_type" />
173  <enum name="cdr_prop_write_message_type" />
174  <enum name="corosync_ping_message_type" />
175  <enum name="agi_exec_start_type" />
176  <enum name="agi_exec_end_type" />
177  <enum name="agi_async_start_type" />
178  <enum name="agi_async_exec_type" />
179  <enum name="agi_async_end_type" />
180  <enum name="queue_caller_join_type" />
181  <enum name="queue_caller_leave_type" />
182  <enum name="queue_caller_abandon_type" />
183  <enum name="queue_member_status_type" />
184  <enum name="queue_member_added_type" />
185  <enum name="queue_member_removed_type" />
186  <enum name="queue_member_pause_type" />
187  <enum name="queue_member_penalty_type" />
188  <enum name="queue_member_ringinuse_type" />
189  <enum name="queue_agent_called_type" />
190  <enum name="queue_agent_connect_type" />
191  <enum name="queue_agent_complete_type" />
192  <enum name="queue_agent_dump_type" />
193  <enum name="queue_agent_ringnoanswer_type" />
194  <enum name="meetme_join_type" />
195  <enum name="meetme_leave_type" />
196  <enum name="meetme_end_type" />
197  <enum name="meetme_mute_type" />
198  <enum name="meetme_talking_type" />
199  <enum name="meetme_talk_request_type" />
200  <enum name="appcdr_message_type" />
201  <enum name="forkcdr_message_type" />
202  <enum name="cdr_sync_message_type" />
203  </enumlist>
204  </description>
205  </configOption>
206  </configObject>
207  </configFile>
208  </configInfo>
209 ***/
210 
211 /*!
212  * \page stasis-impl Stasis Implementation Notes
213  *
214  * \par Reference counting
215  *
216  * Stasis introduces a number of objects, which are tightly related to one
217  * another. Because we rely on ref-counting for memory management, understanding
218  * these relationships is important to understanding this code.
219  *
220  * \code{.txt}
221  *
222  * stasis_topic <----> stasis_subscription
223  * ^ ^
224  * \ /
225  * \ /
226  * dispatch
227  * |
228  * |
229  * v
230  * stasis_message
231  * |
232  * |
233  * v
234  * stasis_message_type
235  *
236  * \endcode
237  *
238  * The most troubling thing in this chart is the cyclic reference between
239  * stasis_topic and stasis_subscription. This is both unfortunate, and
240  * necessary. Topics need the subscription in order to dispatch messages;
241  * subscriptions need the topics to unsubscribe and check subscription status.
242  *
243  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
244  * topic's reference to a subscription. When the subcription is destroyed, it
245  * will remove its reference to the topic.
246  *
247  * This means that until a subscription has be explicitly unsubscribed, it will
248  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
249  * The destructors of both have assertions regarding this to catch ref-counting
250  * problems where a subscription or topic has had an extra ao2_cleanup().
251  *
252  * The \ref dispatch object is a transient object, which is posted to a
253  * subscription's taskprocessor to send a message to the subscriber. They have
254  * short life cycles, allocated on one thread, destroyed on another.
255  *
256  * During shutdown, or the deletion of a domain object, there are a flurry of
257  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
258  * are processed. Any one of these cleanups could be the one to actually destroy
259  * a given object, so care must be taken to ensure that an object isn't
260  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
261  * that might happen when a RAII_VAR() goes out of scope.
262  *
263  * \par Typical life cycles
264  *
265  * \li stasis_topic - There are several topics which live for the duration of
266  * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
267  * are actually fed by shorter-lived topics whose lifetime is associated
268  * with some domain object (like ast_channel_topic() for a given
269  * ast_channel).
270  *
271  * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
272  * topics, for similar reasons.
273  *
274  * \li dispatch - Very short lived; just long enough to post a message to a
275  * subscriber.
276  *
277  * \li stasis_message - Short to intermediate lifetimes, but that is mostly
278  * irrelevant. Messages are strictly data and have no behavior associated
279  * with them, so it doesn't really matter if/when they are destroyed. By
280  * design, a component could hold a ref to a message forever without any
281  * ill consequences (aside from consuming more memory).
282  *
283  * \li stasis_message_type - Long life cycles, typically only destroyed on
284  * module unloading or _clean_ process exit.
285  *
286  * \par Subscriber shutdown sequencing
287  *
288  * Subscribers are sensitive to shutdown sequencing, specifically in how the
289  * reference message types. This is fully detailed on the wiki at
290  * https://wiki.asterisk.org/wiki/x/K4BqAQ.
291  *
292  * In short, the lifetime of the \a data (and \a callback, if in a module) must
293  * be held until the stasis_subscription_final_message() has been received.
294  * Depending on the structure of the subscriber code, this can be handled by
295  * using stasis_subscription_final_message() to free resources on the final
296  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
297  * block until the unsubscribe has completed.
298  */
299 
300 /*! Initial size of the subscribers list. */
301 #define INITIAL_SUBSCRIBERS_MAX 4
302 
303 /*! The number of buckets to use for topic pools */
304 #define TOPIC_POOL_BUCKETS 57
305 
306 /*! Thread pool for topics that don't want a dedicated taskprocessor */
307 static struct ast_threadpool *threadpool;
308 
310 
311 #if defined(LOW_MEMORY)
312 
313 #define TOPIC_ALL_BUCKETS 257
314 
315 #else
316 
317 #define TOPIC_ALL_BUCKETS 997
318 
319 #endif
320 
321 #ifdef AST_DEVMODE
322 
323 /*! The number of buckets to use for topic statistics */
324 #define TOPIC_STATISTICS_BUCKETS 57
325 
326 /*! The number of buckets to use for subscription statistics */
327 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
328 
329 /*! Global container which stores statistics for topics */
330 static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
331 
332 /*! Global container which stores statistics for subscriptions */
333 static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
334 
335 /*! \internal */
336 struct stasis_message_type_statistics {
337  /*! \brief The number of messages of this published */
338  int published;
339  /*! \brief The number of messages of this that did not reach a subscriber */
340  int unused;
341  /*! \brief The stasis message type */
342  struct stasis_message_type *message_type;
343 };
344 
345 /*! Lock to protect the message types vector */
346 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
347 
348 /*! Vector containing message type information */
349 static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
350 
351 /*! \internal */
352 struct stasis_topic_statistics {
353  /*! \brief Highest time spent dispatching messages to subscribers */
354  long highest_time_dispatched;
355  /*! \brief Lowest time spent dispatching messages to subscribers */
356  long lowest_time_dispatched;
357  /*! \brief The number of messages that were not dispatched to any subscriber */
358  int messages_not_dispatched;
359  /*! \brief The number of messages that were dispatched to at least 1 subscriber */
360  int messages_dispatched;
361  /*! \brief The ids of the subscribers to this topic */
362  struct ao2_container *subscribers;
363  /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
364  struct stasis_topic *topic;
365  /*! \brief Name of the topic */
366  char name[0];
367 };
368 #endif
369 
370 /*! \internal */
371 struct stasis_topic {
372  /*! Variable length array of the subscribers */
373  AST_VECTOR(, struct stasis_subscription *) subscribers;
374 
375  /*! Topics forwarding into this topic */
376  AST_VECTOR(, struct stasis_topic *) upstream_topics;
377 
378 #ifdef AST_DEVMODE
379  struct stasis_topic_statistics *statistics;
380 #endif
381 
382  /*! Unique incrementing integer for subscriber ids */
383  int subscriber_id;
384 
385  /*! Name of the topic */
386  char *name;
387 
388  /*! Detail of the topic */
389  char *detail;
390 
391  /*! Creation time */
392  struct timeval *creationtime;
393 };
394 
396 
397 struct topic_proxy {
398  AO2_WEAKPROXY();
399 
400  char *name;
401  char *detail;
402 
403  struct timeval creationtime;
404 
405  char buf[0];
406 };
407 
411 
412 static void proxy_dtor(void *weakproxy, void *container)
413 {
414  ao2_unlink(container, weakproxy);
415  ao2_cleanup(container);
416 }
417 
418 /* Forward declarations for the tightly-coupled subscription object */
419 static int topic_add_subscription(struct stasis_topic *topic,
420  struct stasis_subscription *sub);
421 
422 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
423 
424 /*! \brief Lock two topics. */
425 #define topic_lock_both(topic1, topic2) \
426  do { \
427  ao2_lock(topic1); \
428  while (ao2_trylock(topic2)) { \
429  AO2_DEADLOCK_AVOIDANCE(topic1); \
430  } \
431  } while (0)
432 
433 static void topic_dtor(void *obj)
434 {
435  struct stasis_topic *topic = obj;
436 #ifdef AST_DEVMODE
437  struct ao2_container *topic_stats;
438 #endif
439 
440  ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
441  topic->name, topic->detail);
442 
443  /* Subscribers hold a reference to topics, so they should all be
444  * unsubscribed before we get here. */
445  ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
446 
447  AST_VECTOR_FREE(&topic->subscribers);
448  AST_VECTOR_FREE(&topic->upstream_topics);
449  ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
450 
451 #ifdef AST_DEVMODE
452  if (topic->statistics) {
453  topic_stats = ao2_global_obj_ref(topic_statistics);
454  if (topic_stats) {
455  ao2_unlink(topic_stats, topic->statistics);
456  ao2_ref(topic_stats, -1);
457  }
458  ao2_ref(topic->statistics, -1);
459  }
460 #endif
461 }
462 
463 #ifdef AST_DEVMODE
464 static void topic_statistics_destroy(void *obj)
465 {
466  struct stasis_topic_statistics *statistics = obj;
467 
468  ao2_cleanup(statistics->subscribers);
469 }
470 
471 static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
472 {
473  struct stasis_topic_statistics *statistics;
474  RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
475 
476  if (!topic_stats) {
477  return NULL;
478  }
479 
480  statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
481  if (!statistics) {
482  return NULL;
483  }
484 
485  statistics->subscribers = ast_str_container_alloc(1);
486  if (!statistics->subscribers) {
487  ao2_ref(statistics, -1);
488  return NULL;
489  }
490 
491  /* This is strictly used for the pointer address when showing the topic */
492  statistics->topic = topic;
493  strcpy(statistics->name, topic->name); /* SAFE */
494  ao2_link(topic_stats, statistics);
495 
496  return statistics;
497 }
498 #endif
499 
500 static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
501 {
502  struct topic_proxy *proxy;
503  struct stasis_topic* topic_tmp;
504  size_t detail_len;
505 
506  if (!topic || !name || !strlen(name) || !detail) {
507  return -1;
508  }
509 
510  ao2_wrlock(topic_all);
511 
512  topic_tmp = stasis_topic_get(name);
513  if (topic_tmp) {
514  ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
515  ao2_ref(topic_tmp, -1);
516  ao2_unlock(topic_all);
517 
518  return -1;
519  }
520 
521  detail_len = strlen(detail) + 1;
522 
523  proxy = ao2_t_weakproxy_alloc(
524  sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
525  if (!proxy) {
526  ao2_unlock(topic_all);
527 
528  return -1;
529  }
530 
531  /* set the proxy info */
532  proxy->name = proxy->buf;
533  proxy->detail = proxy->name + strlen(name) + 1;
534 
535  strcpy(proxy->name, name); /* SAFE */
536  ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
537  proxy->creationtime = ast_tvnow();
538 
539  /* We have exclusive access to proxy, no need for locking here. */
540  if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
541  ao2_cleanup(proxy);
542  ao2_unlock(topic_all);
543 
544  return -1;
545  }
546 
547  if (ao2_weakproxy_subscribe(proxy, proxy_dtor, ao2_bump(topic_all), OBJ_NOLOCK)) {
548  ao2_cleanup(proxy);
549  ao2_unlock(topic_all);
550  ao2_cleanup(topic_all);
551 
552  return -1;
553  }
554 
555  /* setting the topic point to the proxy */
556  topic->name = proxy->name;
557  topic->detail = proxy->detail;
558  topic->creationtime = &(proxy->creationtime);
559 
560  ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
561  ao2_ref(proxy, -1);
562 
563  ao2_unlock(topic_all);
564 
565  return 0;
566 }
567 
569  const char *name, const char* detail
570  )
571 {
572  struct stasis_topic *topic;
573  int res = 0;
574 
575  if (!name|| !strlen(name) || !detail) {
576  return NULL;
577  }
578  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
579 
580  topic = stasis_topic_get(name);
581  if (topic) {
582  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
583  name, detail);
584  return topic;
585  }
586 
587  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
588  if (!topic) {
589  return NULL;
590  }
591 
592  res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
593  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
594  if (res) {
595  ao2_ref(topic, -1);
596  return NULL;
597  }
598 
599  /* link to the proxy */
600  if (link_topic_proxy(topic, name, detail)) {
601  ao2_ref(topic, -1);
602  return NULL;
603  }
604 
605 #ifdef AST_DEVMODE
606  topic->statistics = stasis_topic_statistics_create(topic);
607  if (!topic->statistics) {
608  ao2_ref(topic, -1);
609  return NULL;
610  }
611 #endif
612  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
613 
614  return topic;
615 }
616 
618 {
619  return stasis_topic_create_with_detail(name, "");
620 }
621 
622 struct stasis_topic *stasis_topic_get(const char *name)
623 {
624  return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
625 }
626 
627 const char *stasis_topic_name(const struct stasis_topic *topic)
628 {
629  if (!topic) {
630  return NULL;
631  }
632  return topic->name;
633 }
634 
635 const char *stasis_topic_detail(const struct stasis_topic *topic)
636 {
637  if (!topic) {
638  return NULL;
639  }
640  return topic->detail;
641 }
642 
643 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
644 {
645  return AST_VECTOR_SIZE(&topic->subscribers);
646 }
647 
648 #ifdef AST_DEVMODE
649 struct stasis_subscription_statistics {
650  /*! \brief The filename where the subscription originates */
651  const char *file;
652  /*! \brief The function where the subscription originates */
653  const char *func;
654  /*! \brief Names of the topics we are subscribed to */
655  struct ao2_container *topics;
656  /*! \brief The message type that currently took the longest to process */
657  struct stasis_message_type *highest_time_message_type;
658  /*! \brief Highest time spent invoking a message */
659  long highest_time_invoked;
660  /*! \brief Lowest time spent invoking a message */
661  long lowest_time_invoked;
662  /*! \brief The number of messages that were filtered out */
663  int messages_dropped;
664  /*! \brief The number of messages that passed filtering */
665  int messages_passed;
666  /*! \brief Using a mailbox to queue messages */
667  int uses_mailbox;
668  /*! \brief Using stasis threadpool for handling messages */
669  int uses_threadpool;
670  /*! \brief The line number where the subscription originates */
671  int lineno;
672  /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
673  struct stasis_subscription *sub;
674  /*! \brief Unique ID of the subscription */
675  char uniqueid[0];
676 };
677 #endif
678 
679 /*! \internal */
681  /*! Unique ID for this subscription */
682  char *uniqueid;
683  /*! Topic subscribed to. */
685  /*! Mailbox for processing incoming messages. */
687  /*! Callback function for incoming message processing. */
689  /*! Data pointer to be handed to the callback. */
690  void *data;
691 
692  /*! Condition for joining with subscription. */
694  /*! Flag set when final message for sub has been received.
695  * Be sure join_lock is held before reading/setting. */
697  /*! Flag set when final message for sub has been processed.
698  * Be sure join_lock is held before reading/setting. */
700 
701  /*! The message types this subscription is accepting */
702  AST_VECTOR(, char) accepted_message_types;
703  /*! The message formatters this subscription is accepting */
704  enum stasis_subscription_message_formatters accepted_formatters;
705  /*! The message filter currently in use */
707 
708 #ifdef AST_DEVMODE
709  /*! Statistics information */
710  struct stasis_subscription_statistics *statistics;
711 #endif
712 };
713 
714 static void subscription_dtor(void *obj)
715 {
716  struct stasis_subscription *sub = obj;
717 #ifdef AST_DEVMODE
718  struct ao2_container *subscription_stats;
719 #endif
720 
721  /* Subscriptions need to be manually unsubscribed before destruction
722  * b/c there's a cyclic reference between topics and subscriptions */
724  /* If there are any messages in flight to this subscription; that would
725  * be bad. */
727 
728  ast_free(sub->uniqueid);
729  ao2_cleanup(sub->topic);
730  sub->topic = NULL;
732  sub->mailbox = NULL;
734 
735  AST_VECTOR_FREE(&sub->accepted_message_types);
736 
737 #ifdef AST_DEVMODE
738  if (sub->statistics) {
739  subscription_stats = ao2_global_obj_ref(subscription_statistics);
740  if (subscription_stats) {
741  ao2_unlink(subscription_stats, sub->statistics);
742  ao2_ref(subscription_stats, -1);
743  }
744  ao2_ref(sub->statistics, -1);
745  }
746 #endif
747 }
748 
749 /*!
750  * \brief Invoke the subscription's callback.
751  * \param sub Subscription to invoke.
752  * \param topic Topic message was published to.
753  * \param message Message to send.
754  */
755 static void subscription_invoke(struct stasis_subscription *sub,
756  struct stasis_message *message)
757 {
758  unsigned int final = stasis_subscription_final_message(sub, message);
760 #ifdef AST_DEVMODE
761  struct timeval start;
762  long elapsed;
763 
764  start = ast_tvnow();
765 #endif
766 
767  /* Notify that the final message has been received */
768  if (final) {
769  ao2_lock(sub);
770  sub->final_message_rxed = 1;
771  ast_cond_signal(&sub->join_cond);
772  ao2_unlock(sub);
773  }
774 
775  /*
776  * If filtering is turned on and this is a 'final' message, we only invoke the callback
777  * if the subscriber accepts subscription_change message types.
778  */
779  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
780  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
781  /* Since sub is mostly immutable, no need to lock sub */
782  sub->callback(sub->data, sub, message);
783  }
784 
785  /* Notify that the final message has been processed */
786  if (final) {
787  ao2_lock(sub);
788  sub->final_message_processed = 1;
789  ast_cond_signal(&sub->join_cond);
790  ao2_unlock(sub);
791  }
792 
793 #ifdef AST_DEVMODE
794  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
795  if (elapsed > sub->statistics->highest_time_invoked) {
796  sub->statistics->highest_time_invoked = elapsed;
797  ao2_lock(sub->statistics);
798  sub->statistics->highest_time_message_type = stasis_message_type(message);
799  ao2_unlock(sub->statistics);
800  }
801  if (elapsed < sub->statistics->lowest_time_invoked) {
802  sub->statistics->lowest_time_invoked = elapsed;
803  }
804 #endif
805 }
806 
807 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
808 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
809 
811 {
812 }
813 
814 #ifdef AST_DEVMODE
815 static void subscription_statistics_destroy(void *obj)
816 {
817  struct stasis_subscription_statistics *statistics = obj;
818 
819  ao2_cleanup(statistics->topics);
820 }
821 
822 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
823  int needs_mailbox, int use_thread_pool, const char *file, int lineno,
824  const char *func)
825 {
826  struct stasis_subscription_statistics *statistics;
827  RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
828 
829  if (!subscription_stats) {
830  return NULL;
831  }
832 
833  statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
834  if (!statistics) {
835  return NULL;
836  }
837 
838  statistics->topics = ast_str_container_alloc(1);
839  if (!statistics->topics) {
840  ao2_ref(statistics, -1);
841  return NULL;
842  }
843 
844  statistics->file = file;
845  statistics->lineno = lineno;
846  statistics->func = func;
847  statistics->uses_mailbox = needs_mailbox;
848  statistics->uses_threadpool = use_thread_pool;
849  strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
850  statistics->sub = sub;
851  ao2_link(subscription_stats, statistics);
852 
853  return statistics;
854 }
855 #endif
856 
858  struct stasis_topic *topic,
860  void *data,
861  int needs_mailbox,
862  int use_thread_pool,
863  const char *file,
864  int lineno,
865  const char *func)
866 {
867  struct stasis_subscription *sub;
868  int ret;
869 
870  if (!topic) {
871  return NULL;
872  }
873 
874  /* The ao2 lock is used for join_cond. */
875  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
876  if (!sub) {
877  return NULL;
878  }
879 
880 #ifdef AST_DEVMODE
881  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
882  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
883  if (ret < 0 || !sub->statistics) {
884  ao2_ref(sub, -1);
885  return NULL;
886  }
887 #else
888  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
889  if (ret < 0) {
890  ao2_ref(sub, -1);
891  return NULL;
892  }
893 #endif
894 
895  if (needs_mailbox) {
896  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
897 
898  /* Create name with seq number appended. */
899  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
900  use_thread_pool ? 'p' : 'm',
901  stasis_topic_name(topic));
902 
903  /*
904  * With a small number of subscribers, a thread-per-sub is
905  * acceptable. For a large number of subscribers, a thread
906  * pool should be used.
907  */
908  if (use_thread_pool) {
909  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
910  } else {
912  }
913  if (!sub->mailbox) {
914  ao2_ref(sub, -1);
915 
916  return NULL;
917  }
919  /* Taskprocessor has a reference */
920  ao2_ref(sub, +1);
921  }
922 
923  ao2_ref(topic, +1);
924  sub->topic = topic;
925  sub->callback = callback;
926  sub->data = data;
927  ast_cond_init(&sub->join_cond, NULL);
928  sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
929  AST_VECTOR_INIT(&sub->accepted_message_types, 0);
930  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
931 
932  if (topic_add_subscription(topic, sub) != 0) {
933  ao2_ref(sub, -1);
934  ao2_ref(topic, -1);
935 
936  return NULL;
937  }
938  send_subscription_subscribe(topic, sub);
939 
940  return sub;
941 }
942 
944  struct stasis_topic *topic,
946  void *data,
947  const char *file,
948  int lineno,
949  const char *func)
950 {
951  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
952 }
953 
955  struct stasis_topic *topic,
957  void *data,
958  const char *file,
959  int lineno,
960  const char *func)
961 {
962  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
963 }
964 
965 static int sub_cleanup(void *data)
966 {
967  struct stasis_subscription *sub = data;
968  ao2_cleanup(sub);
969  return 0;
970 }
971 
973 {
974  /* The subscription may be the last ref to this topic. Hold
975  * the topic ref open until after the unlock. */
976  struct stasis_topic *topic;
977 
978  if (!sub) {
979  return NULL;
980  }
981 
982  topic = ao2_bump(sub->topic);
983 
984  /* We have to remove the subscription first, to ensure the unsubscribe
985  * is the final message */
986  if (topic_remove_subscription(sub->topic, sub) != 0) {
988  "Internal error: subscription has invalid topic\n");
989  ao2_cleanup(topic);
990 
991  return NULL;
992  }
993 
994  /* Now let everyone know about the unsubscribe */
995  send_subscription_unsubscribe(topic, sub);
996 
997  /* When all that's done, remove the ref the mailbox has on the sub */
998  if (sub->mailbox) {
999  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1000  /* Nothing we can do here, the conditional is just to keep
1001  * the compiler happy that we're not ignoring the result. */
1002  }
1003  }
1004 
1005  /* Unsubscribing unrefs the subscription */
1006  ao2_cleanup(sub);
1007  ao2_cleanup(topic);
1008 
1009  return NULL;
1010 }
1011 
1013  long low_water, long high_water)
1014 {
1015  int res = -1;
1016 
1017  if (subscription) {
1018  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1019  low_water, high_water);
1020  }
1021  return res;
1022 }
1023 
1025  const struct stasis_message_type *type)
1026 {
1027  if (!subscription) {
1028  return -1;
1029  }
1030 
1031  ast_assert(type != NULL);
1033 
1034  if (!type || !stasis_message_type_name(type)) {
1035  /* Filtering is unreliable as this message type is not yet initialized
1036  * so force all messages through.
1037  */
1038  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1039  return 0;
1040  }
1041 
1042  ao2_lock(subscription->topic);
1043  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1044  /* We do this for the same reason as above. The subscription can still operate, so allow
1045  * it to do so by forcing all messages through.
1046  */
1047  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1048  }
1049  ao2_unlock(subscription->topic);
1050 
1051  return 0;
1052 }
1053 
1055  const struct stasis_message_type *type)
1056 {
1057  if (!subscription) {
1058  return -1;
1059  }
1060 
1061  ast_assert(type != NULL);
1063 
1064  if (!type || !stasis_message_type_name(type)) {
1065  return 0;
1066  }
1067 
1068  ao2_lock(subscription->topic);
1069  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1070  /* The memory is already allocated so this can't fail */
1071  AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
1072  }
1073  ao2_unlock(subscription->topic);
1074 
1075  return 0;
1076 }
1077 
1080 {
1081  if (!subscription) {
1082  return -1;
1083  }
1084 
1085  ao2_lock(subscription->topic);
1086  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1087  subscription->filter = filter;
1088  }
1089  ao2_unlock(subscription->topic);
1090 
1091  return 0;
1092 }
1093 
1096 {
1097  ast_assert(subscription != NULL);
1098 
1099  ao2_lock(subscription->topic);
1100  subscription->accepted_formatters = formatters;
1101  ao2_unlock(subscription->topic);
1102 
1103  return;
1104 }
1105 
1107 {
1108  if (subscription) {
1109  ao2_lock(subscription);
1110  /* Wait until the processed flag has been set */
1111  while (!subscription->final_message_processed) {
1112  ast_cond_wait(&subscription->join_cond,
1113  ao2_object_get_lockaddr(subscription));
1114  }
1115  ao2_unlock(subscription);
1116  }
1117 }
1118 
1120 {
1121  if (subscription) {
1122  int ret;
1123 
1124  ao2_lock(subscription);
1125  ret = subscription->final_message_rxed;
1126  ao2_unlock(subscription);
1127 
1128  return ret;
1129  }
1130 
1131  /* Null subscription is about as done as you can get */
1132  return 1;
1133 }
1134 
1136  struct stasis_subscription *subscription)
1137 {
1138  if (!subscription) {
1139  return NULL;
1140  }
1141 
1142  /* Bump refcount to hold it past the unsubscribe */
1143  ao2_ref(subscription, +1);
1144  stasis_unsubscribe(subscription);
1145  stasis_subscription_join(subscription);
1146  /* Now decrement the refcount back */
1147  ao2_cleanup(subscription);
1148  return NULL;
1149 }
1150 
1152 {
1153  if (sub) {
1154  size_t i;
1155  struct stasis_topic *topic = sub->topic;
1156 
1157  ao2_lock(topic);
1158  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1159  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1160  ao2_unlock(topic);
1161  return 1;
1162  }
1163  }
1164  ao2_unlock(topic);
1165  }
1166 
1167  return 0;
1168 }
1169 
1171 {
1172  return sub->uniqueid;
1173 }
1174 
1176 {
1177  struct stasis_subscription_change *change;
1178 
1180  return 0;
1181  }
1182 
1183  change = stasis_message_data(msg);
1184  if (strcmp("Unsubscribe", change->description)) {
1185  return 0;
1186  }
1187 
1188  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1189  return 0;
1190  }
1191 
1192  return 1;
1193 }
1194 
1195 /*!
1196  * \brief Add a subscriber to a topic.
1197  * \param topic Topic
1198  * \param sub Subscriber
1199  * \return 0 on success
1200  * \return Non-zero on error
1201  */
1202 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1203 {
1204  size_t idx;
1205 
1206  ao2_lock(topic);
1207  /* The reference from the topic to the subscription is shared with
1208  * the owner of the subscription, which will explicitly unsubscribe
1209  * to release it.
1210  *
1211  * If we bumped the refcount here, the owner would have to unsubscribe
1212  * and cleanup, which is a bit awkward. */
1213  AST_VECTOR_APPEND(&topic->subscribers, sub);
1214 
1215  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1217  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1218  }
1219 
1220 #ifdef AST_DEVMODE
1221  ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1222  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1223 #endif
1224 
1225  ao2_unlock(topic);
1226 
1227  return 0;
1228 }
1229 
1230 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1231 {
1232  size_t idx;
1233  int res;
1234 
1235  ao2_lock(topic);
1236  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1238  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1239  }
1240  res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
1242 
1243 #ifdef AST_DEVMODE
1244  if (!res) {
1245  ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1246  ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1247  }
1248 #endif
1249 
1250  ao2_unlock(topic);
1251 
1252  return res;
1253 }
1254 
1255 /*!
1256  * \internal \brief Dispatch a message to a subscriber asynchronously
1257  * \param local \ref ast_taskprocessor_local object
1258  * \return 0
1259  */
1261 {
1262  struct stasis_subscription *sub = local->local_data;
1263  struct stasis_message *message = local->data;
1264 
1265  subscription_invoke(sub, message);
1266  ao2_cleanup(message);
1267 
1268  return 0;
1269 }
1270 
1271 /*!
1272  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1273  * a published message to a subscriber
1274  */
1279  void *task_data;
1280 };
1281 
1282 /*!
1283  * \internal \brief Dispatch a message to a subscriber synchronously
1284  * \param local \ref ast_taskprocessor_local object
1285  * \return 0
1286  */
1288 {
1289  struct stasis_subscription *sub = local->local_data;
1290  struct sync_task_data *std = local->data;
1291  struct stasis_message *message = std->task_data;
1292 
1293  subscription_invoke(sub, message);
1294  ao2_cleanup(message);
1295 
1296  ast_mutex_lock(&std->lock);
1297  std->complete = 1;
1298  ast_cond_signal(&std->cond);
1299  ast_mutex_unlock(&std->lock);
1300 
1301  return 0;
1302 }
1303 
1304 /*!
1305  * \internal \brief Dispatch a message to a subscriber
1306  * \param sub The subscriber to dispatch to
1307  * \param message The message to send
1308  * \param synchronous If non-zero, synchronize on the subscriber receiving
1309  * the message
1310  * \retval 0 if message was not dispatched
1311  * \retval 1 if message was dispatched
1312  */
1313 static unsigned int dispatch_message(struct stasis_subscription *sub,
1314  struct stasis_message *message,
1315  int synchronous)
1316 {
1317  int is_final = stasis_subscription_final_message(sub, message);
1318 
1319  /*
1320  * The 'do while' gives us an easy way to skip remaining logic once
1321  * we determine the message should be accepted.
1322  * The code looks more verbose than it needs to be but it optimizes
1323  * down very nicely. It's just easier to understand and debug this way.
1324  */
1325  do {
1326  struct stasis_message_type *message_type = stasis_message_type(message);
1327  int type_id = stasis_message_type_id(message_type);
1328  int type_filter_specified = 0;
1329  int formatter_filter_specified = 0;
1330  int type_filter_passed = 0;
1331  int formatter_filter_passed = 0;
1332 
1333  /* We always accept final messages so only run the filter logic if not final */
1334  if (is_final) {
1335  break;
1336  }
1337 
1338  type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1339  formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1340 
1341  /* Accept if no filters of either type were specified */
1342  if (!type_filter_specified && !formatter_filter_specified) {
1343  break;
1344  }
1345 
1346  type_filter_passed = type_filter_specified
1347  && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1348  && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1349 
1350  /*
1351  * Since the type and formatter filters are OR'd, we can skip
1352  * the formatter check if the type check passes.
1353  */
1354  if (type_filter_passed) {
1355  break;
1356  }
1357 
1358  formatter_filter_passed = formatter_filter_specified
1359  && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1360 
1361  if (formatter_filter_passed) {
1362  break;
1363  }
1364 
1365 #ifdef AST_DEVMODE
1366  ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1367 #endif
1368 
1369  return 0;
1370 
1371  } while (0);
1372 
1373 #ifdef AST_DEVMODE
1374  ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1375 #endif
1376 
1377  if (!sub->mailbox) {
1378  /* Dispatch directly */
1379  subscription_invoke(sub, message);
1380  return 1;
1381  }
1382 
1383  /* Bump the message for the taskprocessor push. This will get de-ref'd
1384  * by the task processor callback.
1385  */
1386  ao2_bump(message);
1387  if (!synchronous) {
1389  /* Push failed; ugh. */
1390  ast_log(LOG_ERROR, "Dropping async dispatch\n");
1391  ao2_cleanup(message);
1392  return 0;
1393  }
1394  } else {
1395  struct sync_task_data std;
1396 
1397  ast_mutex_init(&std.lock);
1398  ast_cond_init(&std.cond, NULL);
1399  std.complete = 0;
1400  std.task_data = message;
1401 
1403  /* Push failed; ugh. */
1404  ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1405  ao2_cleanup(message);
1406  ast_mutex_destroy(&std.lock);
1407  ast_cond_destroy(&std.cond);
1408  return 0;
1409  }
1410 
1411  ast_mutex_lock(&std.lock);
1412  while (!std.complete) {
1413  ast_cond_wait(&std.cond, &std.lock);
1414  }
1415  ast_mutex_unlock(&std.lock);
1416 
1417  ast_mutex_destroy(&std.lock);
1418  ast_cond_destroy(&std.cond);
1419  }
1420 
1421  return 1;
1422 }
1423 
1424 /*!
1425  * \internal \brief Publish a message to a topic's subscribers
1426  * \brief topic The topic to publish to
1427  * \brief message The message to publish
1428  * \brief sync_sub An optional subscriber of the topic to publish synchronously
1429  * to
1430  */
1431 static void publish_msg(struct stasis_topic *topic,
1432  struct stasis_message *message, struct stasis_subscription *sync_sub)
1433 {
1434  size_t i;
1435  unsigned int dispatched = 0;
1436 #ifdef AST_DEVMODE
1438  struct stasis_message_type_statistics *statistics;
1439  struct timeval start;
1440  long elapsed;
1441 #endif
1442 
1443  ast_assert(topic != NULL);
1444  ast_assert(message != NULL);
1445 
1446 #ifdef AST_DEVMODE
1447  ast_mutex_lock(&message_type_statistics_lock);
1448  if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1449  struct stasis_message_type_statistics new_statistics = {
1450  .published = 0,
1451  };
1452  if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1453  ast_mutex_unlock(&message_type_statistics_lock);
1454  return;
1455  }
1456  }
1457  statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1458  statistics->message_type = stasis_message_type(message);
1459  ast_mutex_unlock(&message_type_statistics_lock);
1460 
1461  ast_atomic_fetchadd_int(&statistics->published, +1);
1462 #endif
1463 
1464  /* If there are no subscribers don't bother */
1465  if (!stasis_topic_subscribers(topic)) {
1466 #ifdef AST_DEVMODE
1467  ast_atomic_fetchadd_int(&statistics->unused, +1);
1468  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1469 #endif
1470  return;
1471  }
1472 
1473  /*
1474  * The topic may be unref'ed by the subscription invocation.
1475  * Make sure we hold onto a reference while dispatching.
1476  */
1477  ao2_ref(topic, +1);
1478 #ifdef AST_DEVMODE
1479  start = ast_tvnow();
1480 #endif
1481  ao2_lock(topic);
1482  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1483  struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
1484 
1485  ast_assert(sub != NULL);
1486 
1487  dispatched += dispatch_message(sub, message, (sub == sync_sub));
1488  }
1489  ao2_unlock(topic);
1490 
1491 #ifdef AST_DEVMODE
1492  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1493  if (elapsed > topic->statistics->highest_time_dispatched) {
1494  topic->statistics->highest_time_dispatched = elapsed;
1495  }
1496  if (elapsed < topic->statistics->lowest_time_dispatched) {
1497  topic->statistics->lowest_time_dispatched = elapsed;
1498  }
1499  if (dispatched) {
1500  ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1501  } else {
1502  ast_atomic_fetchadd_int(&statistics->unused, +1);
1503  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1504  }
1505 #endif
1506 
1507  ao2_ref(topic, -1);
1508 }
1509 
1511 {
1512  publish_msg(topic, message, NULL);
1513 }
1514 
1516 {
1517  ast_assert(sub != NULL);
1518 
1519  publish_msg(sub->topic, message, sub);
1520 }
1521 
1522 /*!
1523  * \brief Forwarding information
1524  *
1525  * Any message posted to \a from_topic is forwarded to \a to_topic.
1526  *
1527  * In cases where both the \a from_topic and \a to_topic need to be locked,
1528  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1529  */
1531  /*! Originating topic */
1533  /*! Destination topic */
1535 };
1536 
1537 static void forward_dtor(void *obj)
1538 {
1539  struct stasis_forward *forward = obj;
1540 
1541  ao2_cleanup(forward->from_topic);
1542  forward->from_topic = NULL;
1543  ao2_cleanup(forward->to_topic);
1544  forward->to_topic = NULL;
1545 }
1546 
1548 {
1549  int idx;
1550  struct stasis_topic *from;
1551  struct stasis_topic *to;
1552 
1553  if (!forward) {
1554  return NULL;
1555  }
1556 
1557  from = forward->from_topic;
1558  to = forward->to_topic;
1559 
1560  if (from && to) {
1561  topic_lock_both(to, from);
1562  AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1564 
1565  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1566  topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1567  }
1568  ao2_unlock(from);
1569  ao2_unlock(to);
1570  }
1571 
1572  ao2_cleanup(forward);
1573 
1574  return NULL;
1575 }
1576 
1578  struct stasis_topic *to_topic)
1579 {
1580  int res;
1581  size_t idx;
1582  struct stasis_forward *forward;
1583 
1584  if (!from_topic || !to_topic) {
1585  return NULL;
1586  }
1587 
1588  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1589  if (!forward) {
1590  return NULL;
1591  }
1592 
1593  /* Forwards to ourselves are implicit. */
1594  if (to_topic == from_topic) {
1595  return forward;
1596  }
1597 
1598  forward->from_topic = ao2_bump(from_topic);
1599  forward->to_topic = ao2_bump(to_topic);
1600 
1601  topic_lock_both(to_topic, from_topic);
1602  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1603  if (res != 0) {
1604  ao2_unlock(from_topic);
1605  ao2_unlock(to_topic);
1606  ao2_ref(forward, -1);
1607  return NULL;
1608  }
1609 
1610  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1611  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1612  }
1613  ao2_unlock(from_topic);
1614  ao2_unlock(to_topic);
1615 
1616  return forward;
1617 }
1618 
1619 static void subscription_change_dtor(void *obj)
1620 {
1621  struct stasis_subscription_change *change = obj;
1622 
1623  ao2_cleanup(change->topic);
1624 }
1625 
1626 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1627 {
1628  size_t description_len = strlen(description) + 1;
1629  size_t uniqueid_len = strlen(uniqueid) + 1;
1630  struct stasis_subscription_change *change;
1631 
1632  change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1634  if (!change) {
1635  return NULL;
1636  }
1637 
1638  strcpy(change->description, description); /* SAFE */
1639  change->uniqueid = change->description + description_len;
1640  ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1641  ao2_ref(topic, +1);
1642  change->topic = topic;
1643 
1644  return change;
1645 }
1646 
1647 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1648 {
1649  struct stasis_subscription_change *change;
1650  struct stasis_message *msg;
1651 
1652  /* This assumes that we have already unsubscribed */
1654 
1656  return;
1657  }
1658 
1659  change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1660  if (!change) {
1661  return;
1662  }
1663 
1665  if (!msg) {
1666  ao2_cleanup(change);
1667  return;
1668  }
1669 
1670  stasis_publish(topic, msg);
1671  ao2_cleanup(msg);
1672  ao2_cleanup(change);
1673 }
1674 
1676  struct stasis_subscription *sub)
1677 {
1678  struct stasis_subscription_change *change;
1679  struct stasis_message *msg;
1680 
1681  /* This assumes that we have already unsubscribed */
1683 
1685  return;
1686  }
1687 
1688  change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1689  if (!change) {
1690  return;
1691  }
1692 
1694  if (!msg) {
1695  ao2_cleanup(change);
1696  return;
1697  }
1698 
1699  stasis_publish(topic, msg);
1700 
1701  /* Now we have to dispatch to the subscription itself */
1702  dispatch_message(sub, msg, 0);
1703 
1704  ao2_cleanup(msg);
1705  ao2_cleanup(change);
1706 }
1707 
1711  char name[0];
1712 };
1713 
1714 static void topic_pool_entry_dtor(void *obj)
1715 {
1716  struct topic_pool_entry *entry = obj;
1717 
1718  entry->forward = stasis_forward_cancel(entry->forward);
1719  ao2_cleanup(entry->topic);
1720  entry->topic = NULL;
1721 }
1722 
1723 static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1724 {
1726 
1727  topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1729  if (!topic_pool_entry) {
1730  return NULL;
1731  }
1732 
1733  strcpy(topic_pool_entry->name, topic_name); /* Safe */
1734 
1735  return topic_pool_entry;
1736 }
1737 
1741 };
1742 
1743 static void topic_pool_dtor(void *obj)
1744 {
1745  struct stasis_topic_pool *pool = obj;
1746 
1747 #ifdef AO2_DEBUG
1748  {
1749  char *container_name =
1750  ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1751  sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1752  ao2_container_unregister(container_name);
1753  }
1754 #endif
1755 
1756  ao2_cleanup(pool->pool_container);
1757  pool->pool_container = NULL;
1758  ao2_cleanup(pool->pool_topic);
1759  pool->pool_topic = NULL;
1760 }
1761 
1762 static int topic_pool_entry_hash(const void *obj, const int flags)
1763 {
1764  const struct topic_pool_entry *object;
1765  const char *key;
1766 
1767  switch (flags & OBJ_SEARCH_MASK) {
1768  case OBJ_SEARCH_KEY:
1769  key = obj;
1770  break;
1771  case OBJ_SEARCH_OBJECT:
1772  object = obj;
1773  key = object->name;
1774  break;
1775  default:
1776  /* Hash can only work on something with a full key. */
1777  ast_assert(0);
1778  return 0;
1779  }
1780  return ast_str_case_hash(key);
1781 }
1782 
1783 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1784 {
1785  const struct topic_pool_entry *object_left = obj;
1786  const struct topic_pool_entry *object_right = arg;
1787  const char *right_key = arg;
1788  int cmp;
1789 
1790  switch (flags & OBJ_SEARCH_MASK) {
1791  case OBJ_SEARCH_OBJECT:
1792  right_key = object_right->name;
1793  /* Fall through */
1794  case OBJ_SEARCH_KEY:
1795  cmp = strcasecmp(object_left->name, right_key);
1796  break;
1798  /* Not supported by container */
1799  ast_assert(0);
1800  cmp = -1;
1801  break;
1802  default:
1803  /*
1804  * What arg points to is specific to this traversal callback
1805  * and has no special meaning to astobj2.
1806  */
1807  cmp = 0;
1808  break;
1809  }
1810  if (cmp) {
1811  return 0;
1812  }
1813  /*
1814  * At this point the traversal callback is identical to a sorted
1815  * container.
1816  */
1817  return CMP_MATCH;
1818 }
1819 
1820 #ifdef AO2_DEBUG
1821 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1822 {
1823  struct topic_pool_entry *entry = v_obj;
1824 
1825  if (!entry) {
1826  return;
1827  }
1828  prnt(where, "%s", stasis_topic_name(entry->topic));
1829 }
1830 #endif
1831 
1833 {
1834  struct stasis_topic_pool *pool;
1835 
1837  if (!pool) {
1838  return NULL;
1839  }
1840 
1843  if (!pool->pool_container) {
1844  ao2_cleanup(pool);
1845  return NULL;
1846  }
1847 
1848 #ifdef AO2_DEBUG
1849  {
1850  char *container_name =
1851  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1852  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1853  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1854  }
1855 #endif
1856 
1857  ao2_ref(pooled_topic, +1);
1858  pool->pool_topic = pooled_topic;
1859 
1860  return pool;
1861 }
1862 
1863 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1864 {
1865  /*
1866  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1867  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1868  * name and search only on <topic_name>.
1869  */
1870  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1871  int pool_topic_name_len = strlen(pool_topic_name);
1872  const char *search_topic_name;
1873 
1874  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1875  search_topic_name = topic_name + pool_topic_name_len + 1;
1876  } else {
1877  search_topic_name = topic_name;
1878  }
1879 
1880  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1881 }
1882 
1883 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1884 {
1886  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1887  char *new_topic_name;
1888  int ret;
1889 
1891  if (topic_pool_entry) {
1892  return topic_pool_entry->topic;
1893  }
1894 
1896  if (!topic_pool_entry) {
1897  return NULL;
1898  }
1899 
1900  /* To provide further detail and to ensure that the topic is unique within the scope of the
1901  * system we prefix it with the pooling topic name, which should itself already be unique.
1902  */
1903  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1904  if (ret < 0) {
1905  return NULL;
1906  }
1907 
1908  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1909  ast_free(new_topic_name);
1910  if (!topic_pool_entry->topic) {
1911  return NULL;
1912  }
1913 
1915  if (!topic_pool_entry->forward) {
1916  return NULL;
1917  }
1918 
1920  return NULL;
1921  }
1922 
1923  return topic_pool_entry->topic;
1924 }
1925 
1926 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1927 {
1929 
1930  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1931  if (!topic_pool_entry) {
1932  return 0;
1933  }
1934 
1935  ao2_ref(topic_pool_entry, -1);
1936  return 1;
1937 }
1938 
1940 {
1941 #ifdef AST_DEVMODE
1942  if (!stasis_message_type_declined(name)) {
1943  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1944  }
1945 #endif
1946 }
1947 
1948 /*! \brief A multi object blob data structure to carry user event stasis messages */
1950  struct ast_json *blob; /*< A blob of JSON data */
1951  AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1952 };
1953 
1954 /*!
1955  * \internal
1956  * \brief Destructor for \ref ast_multi_object_blob objects
1957  */
1958 static void multi_object_blob_dtor(void *obj)
1959 {
1960  struct ast_multi_object_blob *multi = obj;
1961  int type;
1962  int i;
1963 
1964  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1965  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1966  ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1967  }
1968  AST_VECTOR_FREE(&multi->snapshots[type]);
1969  }
1970  ast_json_unref(multi->blob);
1971 }
1972 
1973 /*! \brief Create a stasis user event multi object blob */
1975 {
1976  int type;
1977  struct ast_multi_object_blob *multi;
1978 
1979  ast_assert(blob != NULL);
1980 
1981  multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1982  if (!multi) {
1983  return NULL;
1984  }
1985 
1986  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1987  if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1988  ao2_ref(multi, -1);
1989 
1990  return NULL;
1991  }
1992  }
1993 
1994  multi->blob = ast_json_ref(blob);
1995 
1996  return multi;
1997 }
1998 
1999 /*! \brief Add an object (snapshot) to the blob */
2001  enum stasis_user_multi_object_snapshot_type type, void *object)
2002 {
2003  if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2004  ao2_cleanup(object);
2005  }
2006 }
2007 
2008 /*! \brief Publish single channel user event (for app_userevent compatibility) */
2010  struct stasis_message_type *type, struct ast_json *blob)
2011 {
2012  struct stasis_message *message;
2013  struct ast_channel_snapshot *channel_snapshot;
2014  struct ast_multi_object_blob *multi;
2015 
2016  if (!type) {
2017  return;
2018  }
2019 
2020  multi = ast_multi_object_blob_create(blob);
2021  if (!multi) {
2022  return;
2023  }
2024 
2025  channel_snapshot = ast_channel_snapshot_create(chan);
2026  if (!channel_snapshot) {
2027  ao2_ref(multi, -1);
2028  return;
2029  }
2030 
2031  /* this call steals the channel_snapshot reference */
2032  ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2033 
2034  message = stasis_message_create(type, multi);
2035  ao2_ref(multi, -1);
2036  if (message) {
2037  /* app_userevent still publishes to channel */
2038  stasis_publish(ast_channel_topic(chan), message);
2039  ao2_ref(message, -1);
2040  }
2041 }
2042 
2043 /*! \internal \brief convert multi object blob to ari json */
2045  struct stasis_message *message,
2046  const struct stasis_message_sanitizer *sanitize)
2047 {
2048  struct ast_json *out;
2049  struct ast_multi_object_blob *multi = stasis_message_data(message);
2050  struct ast_json *blob = multi->blob;
2051  const struct timeval *tv = stasis_message_timestamp(message);
2053  int i;
2054 
2055  out = ast_json_object_create();
2056  if (!out) {
2057  return NULL;
2058  }
2059 
2060  ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2061  ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2062  ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2063  ast_json_object_set(out, "userevent", ast_json_ref(blob));
2064 
2065  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2066  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2067  struct ast_json *json_object = NULL;
2068  char *name = NULL;
2069  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2070 
2071  switch (type) {
2072  case STASIS_UMOS_CHANNEL:
2073  json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2074  name = "channel";
2075  break;
2076  case STASIS_UMOS_BRIDGE:
2077  json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2078  name = "bridge";
2079  break;
2080  case STASIS_UMOS_ENDPOINT:
2081  json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2082  name = "endpoint";
2083  break;
2084  }
2085  if (json_object) {
2086  ast_json_object_set(out, name, json_object);
2087  }
2088  }
2089  }
2090 
2091  return out;
2092 }
2093 
2094 /*! \internal \brief convert multi object blob to ami string */
2095 static struct ast_str *multi_object_blob_to_ami(void *obj)
2096 {
2097  struct ast_str *ami_str=ast_str_create(1024);
2098  struct ast_str *ami_snapshot;
2099  const struct ast_multi_object_blob *multi = obj;
2101  int i;
2102 
2103  if (!ami_str) {
2104  return NULL;
2105  }
2106  if (!multi) {
2107  ast_free(ami_str);
2108  return NULL;
2109  }
2110 
2111  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2112  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2113  char *name = NULL;
2114  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2115  ami_snapshot = NULL;
2116 
2117  if (i > 0) {
2118  ast_asprintf(&name, "%d", i + 1);
2119  }
2120 
2121  switch (type) {
2122  case STASIS_UMOS_CHANNEL:
2123  ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2124  break;
2125 
2126  case STASIS_UMOS_BRIDGE:
2127  ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2128  break;
2129 
2130  case STASIS_UMOS_ENDPOINT:
2131  /* currently not sending endpoint snapshots to AMI */
2132  break;
2133  }
2134  if (ami_snapshot) {
2135  ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2136  ast_free(ami_snapshot);
2137  }
2138  ast_free(name);
2139  }
2140  }
2141 
2142  return ami_str;
2143 }
2144 
2145 /*! \internal \brief Callback to pass only user defined parameters from blob */
2146 static int userevent_exclusion_cb(const char *key)
2147 {
2148  if (!strcmp("eventname", key)) {
2149  return 1;
2150  }
2151  return 0;
2152 }
2153 
2155  struct stasis_message *message)
2156 {
2157  RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2158  RAII_VAR(struct ast_str *, body, NULL, ast_free);
2159  struct ast_multi_object_blob *multi = stasis_message_data(message);
2160  const char *eventname;
2161 
2162  eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2164  object_string = multi_object_blob_to_ami(multi);
2165  if (!object_string || !body) {
2166  return NULL;
2167  }
2168 
2169  return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2170  "%s"
2171  "UserEvent: %s\r\n"
2172  "%s",
2173  ast_str_buffer(object_string),
2174  eventname,
2175  ast_str_buffer(body));
2176 }
2177 
2178 /*! \brief A structure to hold global configuration-related options */
2180  /*! The list of message types to decline */
2182 };
2183 
2184 /*! \brief Threadpool configuration options */
2186  /*! Initial size of the thread pool */
2188  /*! Time, in seconds, before we expire a thread */
2190  /*! Maximum number of thread to allow */
2192 };
2193 
2195  /*! Thread pool configuration options */
2197  /*! Declined message types */
2199 };
2200 
2201 static struct aco_type threadpool_option = {
2202  .type = ACO_GLOBAL,
2203  .name = "threadpool",
2204  .item_offset = offsetof(struct stasis_config, threadpool_options),
2205  .category = "threadpool",
2206  .category_match = ACO_WHITELIST_EXACT,
2207 };
2208 
2209 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
2210 
2211 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2212 static struct aco_type declined_option = {
2213  .type = ACO_GLOBAL,
2214  .name = "declined_message_types",
2215  .item_offset = offsetof(struct stasis_config, declined_message_types),
2216  .category_match = ACO_WHITELIST_EXACT,
2217  .category = "declined_message_types",
2218 };
2219 
2220 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
2221 
2223  .filename = "stasis.conf",
2224  .types = ACO_TYPES(&declined_option, &threadpool_option),
2225 };
2226 
2227 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2229 
2230 static void *stasis_config_alloc(void);
2231 
2232 /*! \brief Register information about the configs being processed by this module */
2233 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
2234  .files = ACO_FILES(&stasis_conf),
2235 );
2236 
2238 {
2239  struct stasis_declined_config *declined = obj;
2240 
2241  ao2_cleanup(declined->declined);
2242 }
2243 
2244 static void stasis_config_destructor(void *obj)
2245 {
2246  struct stasis_config *cfg = obj;
2247 
2250 }
2251 
2252 static void *stasis_config_alloc(void)
2253 {
2254  struct stasis_config *cfg;
2255 
2256  if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2257  return NULL;
2258  }
2259 
2260  cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2261  if (!cfg->threadpool_options) {
2262  ao2_ref(cfg, -1);
2263  return NULL;
2264  }
2265 
2268  if (!cfg->declined_message_types) {
2269  ao2_ref(cfg, -1);
2270  return NULL;
2271  }
2272 
2274  if (!cfg->declined_message_types->declined) {
2275  ao2_ref(cfg, -1);
2276  return NULL;
2277  }
2278 
2279  return cfg;
2280 }
2281 
2283 {
2284  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2285  char *name_in_declined;
2286  int res;
2287 
2288  if (!cfg || !cfg->declined_message_types) {
2289  ao2_cleanup(cfg);
2290  return 0;
2291  }
2292 
2293  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2294  res = name_in_declined ? 1 : 0;
2295  ao2_cleanup(name_in_declined);
2296  ao2_ref(cfg, -1);
2297  if (res) {
2298  ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2299  }
2300  return res;
2301 }
2302 
2303 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2304 {
2305  struct stasis_declined_config *declined = obj;
2306 
2307  if (ast_strlen_zero(var->value)) {
2308  return 0;
2309  }
2310 
2311  if (ast_str_container_add(declined->declined, var->value)) {
2312  return -1;
2313  }
2314 
2315  return 0;
2316 }
2317 
2318 /*!
2319  * @{ \brief Define multi user event message type(s).
2320  */
2321 
2323  .to_json = multi_user_event_to_json,
2325  );
2326 
2327 /*! @} */
2328 
2329 /*!
2330  * \internal
2331  * \brief CLI command implementation for 'stasis show topics'
2332  */
2333 static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2334 {
2335  struct ao2_iterator iter;
2336  struct topic_proxy *topic;
2337  struct ao2_container *tmp_container;
2338  int count = 0;
2339 #define FMT_HEADERS "%-64s %-64s\n"
2340 #define FMT_FIELDS "%-64s %-64s\n"
2341 
2342  switch (cmd) {
2343  case CLI_INIT:
2344  e->command = "stasis show topics";
2345  e->usage =
2346  "Usage: stasis show topics\n"
2347  " Shows a list of topics\n";
2348  return NULL;
2349  case CLI_GENERATE:
2350  return NULL;
2351  }
2352 
2353  if (a->argc != e->args) {
2354  return CLI_SHOWUSAGE;
2355  }
2356 
2357  ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2358 
2360  topic_proxy_sort_fn, NULL);
2361 
2362  if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2363  ao2_cleanup(tmp_container);
2364 
2365  return NULL;
2366  }
2367 
2368  /* getting all topic in order */
2369  iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2370  while ((topic = ao2_iterator_next(&iter))) {
2371  ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2372  ao2_ref(topic, -1);
2373  ++count;
2374  }
2375  ao2_iterator_destroy(&iter);
2376  ao2_cleanup(tmp_container);
2377 
2378  ast_cli(a->fd, "\n%d Total topics\n\n", count);
2379 
2380 #undef FMT_HEADERS
2381 #undef FMT_FIELDS
2382 
2383  return CLI_SUCCESS;
2384 }
2385 
2386 /*!
2387  * \internal
2388  * \brief CLI tab completion for topic names
2389  */
2390 static char *topic_complete_name(const char *word)
2391 {
2392  struct topic_proxy *topic;
2393  struct ao2_iterator it;
2394  int wordlen = strlen(word);
2395  int ret;
2396 
2397  it = ao2_iterator_init(topic_all, 0);
2398  while ((topic = ao2_iterator_next(&it))) {
2399  if (!strncasecmp(word, topic->name, wordlen)) {
2400  ret = ast_cli_completion_add(ast_strdup(topic->name));
2401  if (ret) {
2402  ao2_ref(topic, -1);
2403  break;
2404  }
2405  }
2406  ao2_ref(topic, -1);
2407  }
2408  ao2_iterator_destroy(&it);
2409  return NULL;
2410 }
2411 
2412 /*!
2413  * \internal
2414  * \brief CLI command implementation for 'stasis show topic'
2415  */
2416 static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2417 {
2418  struct stasis_topic *topic;
2419  char print_time[32];
2420  int i;
2421 
2422  switch (cmd) {
2423  case CLI_INIT:
2424  e->command = "stasis show topic";
2425  e->usage =
2426  "Usage: stasis show topic <name>\n"
2427  " Show stasis topic detail info.\n";
2428  return NULL;
2429  case CLI_GENERATE:
2430  if (a->pos == 3) {
2431  return topic_complete_name(a->word);
2432  } else {
2433  return NULL;
2434  }
2435  }
2436 
2437  if (a->argc != 4) {
2438  return CLI_SHOWUSAGE;
2439  }
2440 
2441  topic = stasis_topic_get(a->argv[3]);
2442  if (!topic) {
2443  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2444  return CLI_FAILURE;
2445  }
2446 
2447  ast_cli(a->fd, "Name: %s\n", topic->name);
2448  ast_cli(a->fd, "Detail: %s\n", topic->detail);
2449  ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2450  ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2451  ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2452  ast_cli(a->fd, "Duration time: %s\n", print_time);
2453 
2454  ao2_lock(topic);
2455  ast_cli(a->fd, "\nSubscribers:\n");
2456  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2457  struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2458  ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2459  subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2460  }
2461 
2462  ast_cli(a->fd, "\nForwarded topics:\n");
2463  for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2464  struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2465  ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2466  }
2467  ao2_unlock(topic);
2468 
2469  ao2_ref(topic, -1);
2470 
2471  return CLI_SUCCESS;
2472 }
2473 
2474 
2475 static struct ast_cli_entry cli_stasis[] = {
2476  AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2477  AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2478 };
2479 
2480 
2481 #ifdef AST_DEVMODE
2482 
2483 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2484 
2485 /*!
2486  * \internal
2487  * \brief CLI command implementation for 'stasis statistics show subscriptions'
2488  */
2489 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2490 {
2491  struct ao2_container *sorted_subscriptions;
2492  struct ao2_container *subscription_stats;
2493  struct ao2_iterator iter;
2494  struct stasis_subscription_statistics *statistics;
2495  int count = 0;
2496  int dropped = 0;
2497  int passed = 0;
2498 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2499 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2500 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2501 
2502  switch (cmd) {
2503  case CLI_INIT:
2504  e->command = "stasis statistics show subscriptions";
2505  e->usage =
2506  "Usage: stasis statistics show subscriptions\n"
2507  " Shows a list of subscriptions and their general statistics\n";
2508  return NULL;
2509  case CLI_GENERATE:
2510  return NULL;
2511  }
2512 
2513  if (a->argc != e->args) {
2514  return CLI_SHOWUSAGE;
2515  }
2516 
2517  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2518  if (!subscription_stats) {
2519  ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2520  return CLI_FAILURE;
2521  }
2522 
2523  sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2524  stasis_subscription_statistics_sort_fn, NULL);
2525  if (!sorted_subscriptions) {
2526  ao2_ref(subscription_stats, -1);
2527  ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2528  return CLI_SUCCESS;
2529  }
2530 
2531  if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2532  ao2_ref(sorted_subscriptions, -1);
2533  ao2_ref(subscription_stats, -1);
2534  ast_cli(a->fd, "Could not sort subscription statistics\n");
2535  return CLI_SUCCESS;
2536  }
2537 
2538  ao2_ref(subscription_stats, -1);
2539 
2540  ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2541 
2542  iter = ao2_iterator_init(sorted_subscriptions, 0);
2543  while ((statistics = ao2_iterator_next(&iter))) {
2544  ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2545  statistics->lowest_time_invoked, statistics->highest_time_invoked);
2546  dropped += statistics->messages_dropped;
2547  passed += statistics->messages_passed;
2548  ao2_ref(statistics, -1);
2549  ++count;
2550  }
2551  ao2_iterator_destroy(&iter);
2552 
2553  ao2_ref(sorted_subscriptions, -1);
2554 
2555  ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2556  ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2557 
2558 #undef FMT_HEADERS
2559 #undef FMT_FIELDS
2560 #undef FMT_FIELDS2
2561 
2562  return CLI_SUCCESS;
2563 }
2564 
2565 /*!
2566  * \internal
2567  * \brief CLI tab completion for subscription statistics names
2568  */
2569 static char *subscription_statistics_complete_name(const char *word, int state)
2570 {
2571  struct stasis_subscription_statistics *statistics;
2572  struct ao2_container *subscription_stats;
2573  struct ao2_iterator it_statistics;
2574  int wordlen = strlen(word);
2575  int which = 0;
2576  char *result = NULL;
2577 
2578  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2579  if (!subscription_stats) {
2580  return result;
2581  }
2582 
2583  it_statistics = ao2_iterator_init(subscription_stats, 0);
2584  while ((statistics = ao2_iterator_next(&it_statistics))) {
2585  if (!strncasecmp(word, statistics->uniqueid, wordlen)
2586  && ++which > state) {
2587  result = ast_strdup(statistics->uniqueid);
2588  }
2589  ao2_ref(statistics, -1);
2590  if (result) {
2591  break;
2592  }
2593  }
2594  ao2_iterator_destroy(&it_statistics);
2595  ao2_ref(subscription_stats, -1);
2596  return result;
2597 }
2598 
2599 /*!
2600  * \internal
2601  * \brief CLI command implementation for 'stasis statistics show subscription'
2602  */
2603 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2604 {
2605  struct stasis_subscription_statistics *statistics;
2606  struct ao2_container *subscription_stats;
2607  struct ao2_iterator i;
2608  char *name;
2609 
2610  switch (cmd) {
2611  case CLI_INIT:
2612  e->command = "stasis statistics show subscription";
2613  e->usage =
2614  "Usage: stasis statistics show subscription <uniqueid>\n"
2615  " Show stasis subscription statistics.\n";
2616  return NULL;
2617  case CLI_GENERATE:
2618  if (a->pos == 4) {
2619  return subscription_statistics_complete_name(a->word, a->n);
2620  } else {
2621  return NULL;
2622  }
2623  }
2624 
2625  if (a->argc != 5) {
2626  return CLI_SHOWUSAGE;
2627  }
2628 
2629  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2630  if (!subscription_stats) {
2631  ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2632  return CLI_FAILURE;
2633  }
2634 
2635  statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2636  if (!statistics) {
2637  ao2_ref(subscription_stats, -1);
2638  ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2639  return CLI_FAILURE;
2640  }
2641 
2642  ao2_ref(subscription_stats, -1);
2643 
2644  ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2645  ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2646  ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2647  ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2648  ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2649  ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2650  ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2651  ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2652  ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2653  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2654  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2655 
2656  ao2_lock(statistics);
2657  if (statistics->highest_time_message_type) {
2658  ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2659  }
2660  ao2_unlock(statistics);
2661 
2662  ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2663 
2664  ast_cli(a->fd, "Subscribed topics:\n");
2665  i = ao2_iterator_init(statistics->topics, 0);
2666  while ((name = ao2_iterator_next(&i))) {
2667  ast_cli(a->fd, "\t%s\n", name);
2668  ao2_ref(name, -1);
2669  }
2671 
2672  ao2_ref(statistics, -1);
2673 
2674  return CLI_SUCCESS;
2675 }
2676 
2677 AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2678 
2679 /*!
2680  * \internal
2681  * \brief CLI command implementation for 'stasis statistics show topics'
2682  */
2683 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2684 {
2685  struct ao2_container *sorted_topics;
2686  struct ao2_container *topic_stats;
2687  struct ao2_iterator iter;
2688  struct stasis_topic_statistics *statistics;
2689  int count = 0;
2690  int not_dispatched = 0;
2691  int dispatched = 0;
2692 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2693 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2694 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2695 
2696  switch (cmd) {
2697  case CLI_INIT:
2698  e->command = "stasis statistics show topics";
2699  e->usage =
2700  "Usage: stasis statistics show topics\n"
2701  " Shows a list of topics and their general statistics\n";
2702  return NULL;
2703  case CLI_GENERATE:
2704  return NULL;
2705  }
2706 
2707  if (a->argc != e->args) {
2708  return CLI_SHOWUSAGE;
2709  }
2710 
2711  topic_stats = ao2_global_obj_ref(topic_statistics);
2712  if (!topic_stats) {
2713  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2714  return CLI_FAILURE;
2715  }
2716 
2718  stasis_topic_statistics_sort_fn, NULL);
2719  if (!sorted_topics) {
2720  ao2_ref(topic_stats, -1);
2721  ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2722  return CLI_SUCCESS;
2723  }
2724 
2725  if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2726  ao2_ref(sorted_topics, -1);
2727  ao2_ref(topic_stats, -1);
2728  ast_cli(a->fd, "Could not sort topic statistics\n");
2729  return CLI_SUCCESS;
2730  }
2731 
2732  ao2_ref(topic_stats, -1);
2733 
2734  ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2735 
2736  iter = ao2_iterator_init(sorted_topics, 0);
2737  while ((statistics = ao2_iterator_next(&iter))) {
2738  ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2739  statistics->messages_not_dispatched, statistics->messages_dispatched,
2740  statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2741  not_dispatched += statistics->messages_not_dispatched;
2742  dispatched += statistics->messages_dispatched;
2743  ao2_ref(statistics, -1);
2744  ++count;
2745  }
2746  ao2_iterator_destroy(&iter);
2747 
2748  ao2_ref(sorted_topics, -1);
2749 
2750  ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2751  ast_cli(a->fd, "\n%d topics\n\n", count);
2752 
2753 #undef FMT_HEADERS
2754 #undef FMT_FIELDS
2755 #undef FMT_FIELDS2
2756 
2757  return CLI_SUCCESS;
2758 }
2759 
2760 /*!
2761  * \internal
2762  * \brief CLI tab completion for topic statistics names
2763  */
2764 static char *topic_statistics_complete_name(const char *word, int state)
2765 {
2766  struct stasis_topic_statistics *statistics;
2767  struct ao2_container *topic_stats;
2768  struct ao2_iterator it_statistics;
2769  int wordlen = strlen(word);
2770  int which = 0;
2771  char *result = NULL;
2772 
2773  topic_stats = ao2_global_obj_ref(topic_statistics);
2774  if (!topic_stats) {
2775  return result;
2776  }
2777 
2778  it_statistics = ao2_iterator_init(topic_stats, 0);
2779  while ((statistics = ao2_iterator_next(&it_statistics))) {
2780  if (!strncasecmp(word, statistics->name, wordlen)
2781  && ++which > state) {
2782  result = ast_strdup(statistics->name);
2783  }
2784  ao2_ref(statistics, -1);
2785  if (result) {
2786  break;
2787  }
2788  }
2789  ao2_iterator_destroy(&it_statistics);
2790  ao2_ref(topic_stats, -1);
2791  return result;
2792 }
2793 
2794 /*!
2795  * \internal
2796  * \brief CLI command implementation for 'stasis statistics show topic'
2797  */
2798 static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2799 {
2800  struct stasis_topic_statistics *statistics;
2801  struct ao2_container *topic_stats;
2802  struct ao2_iterator i;
2803  char *uniqueid;
2804 
2805  switch (cmd) {
2806  case CLI_INIT:
2807  e->command = "stasis statistics show topic";
2808  e->usage =
2809  "Usage: stasis statistics show topic <name>\n"
2810  " Show stasis topic statistics.\n";
2811  return NULL;
2812  case CLI_GENERATE:
2813  if (a->pos == 4) {
2814  return topic_statistics_complete_name(a->word, a->n);
2815  } else {
2816  return NULL;
2817  }
2818  }
2819 
2820  if (a->argc != 5) {
2821  return CLI_SHOWUSAGE;
2822  }
2823 
2824  topic_stats = ao2_global_obj_ref(topic_statistics);
2825  if (!topic_stats) {
2826  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2827  return CLI_FAILURE;
2828  }
2829 
2830  statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2831  if (!statistics) {
2832  ao2_ref(topic_stats, -1);
2833  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2834  return CLI_FAILURE;
2835  }
2836 
2837  ao2_ref(topic_stats, -1);
2838 
2839  ast_cli(a->fd, "Topic: %s\n", statistics->name);
2840  ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2841  ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2842  ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2843  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2844  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2845  ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2846 
2847  ast_cli(a->fd, "Subscribers:\n");
2848  i = ao2_iterator_init(statistics->subscribers, 0);
2849  while ((uniqueid = ao2_iterator_next(&i))) {
2850  ast_cli(a->fd, "\t%s\n", uniqueid);
2851  ao2_ref(uniqueid, -1);
2852  }
2854 
2855  ao2_ref(statistics, -1);
2856 
2857  return CLI_SUCCESS;
2858 }
2859 
2860 /*!
2861  * \internal
2862  * \brief CLI command implementation for 'stasis statistics show messages'
2863  */
2864 static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2865 {
2866  int i;
2867  int count = 0;
2868  int published = 0;
2869  int unused = 0;
2870 #define FMT_HEADERS "%-64s %10s %10s\n"
2871 #define FMT_FIELDS "%-64s %10d %10d\n"
2872 
2873  switch (cmd) {
2874  case CLI_INIT:
2875  e->command = "stasis statistics show messages";
2876  e->usage =
2877  "Usage: stasis statistics show messages\n"
2878  " Shows a list of message types and their general statistics\n";
2879  return NULL;
2880  case CLI_GENERATE:
2881  return NULL;
2882  }
2883 
2884  if (a->argc != e->args) {
2885  return CLI_SHOWUSAGE;
2886  }
2887 
2888  ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2889 
2890  ast_mutex_lock(&message_type_statistics_lock);
2891  for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2892  struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2893 
2894  if (!statistics->message_type) {
2895  continue;
2896  }
2897 
2898  ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2899  statistics->unused);
2900  published += statistics->published;
2901  unused += statistics->unused;
2902  ++count;
2903  }
2904  ast_mutex_unlock(&message_type_statistics_lock);
2905 
2906  ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2907  ast_cli(a->fd, "\n%d seen message types\n\n", count);
2908 
2909 #undef FMT_HEADERS
2910 #undef FMT_FIELDS
2911 
2912  return CLI_SUCCESS;
2913 }
2914 
2915 static struct ast_cli_entry cli_stasis_statistics[] = {
2916  AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2917  AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2918  AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2919  AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2920  AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2921 };
2922 
2923 static int subscription_statistics_hash(const void *obj, const int flags)
2924 {
2925  const struct stasis_subscription_statistics *object;
2926  const char *key;
2927 
2928  switch (flags & OBJ_SEARCH_MASK) {
2929  case OBJ_SEARCH_KEY:
2930  key = obj;
2931  break;
2932  case OBJ_SEARCH_OBJECT:
2933  object = obj;
2934  key = object->uniqueid;
2935  break;
2936  default:
2937  /* Hash can only work on something with a full key. */
2938  ast_assert(0);
2939  return 0;
2940  }
2941  return ast_str_case_hash(key);
2942 }
2943 
2944 static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2945 {
2946  const struct stasis_subscription_statistics *object_left = obj;
2947  const struct stasis_subscription_statistics *object_right = arg;
2948  const char *right_key = arg;
2949  int cmp;
2950 
2951  switch (flags & OBJ_SEARCH_MASK) {
2952  case OBJ_SEARCH_OBJECT:
2953  right_key = object_right->uniqueid;
2954  /* Fall through */
2955  case OBJ_SEARCH_KEY:
2956  cmp = strcasecmp(object_left->uniqueid, right_key);
2957  break;
2959  /* Not supported by container */
2960  ast_assert(0);
2961  cmp = -1;
2962  break;
2963  default:
2964  /*
2965  * What arg points to is specific to this traversal callback
2966  * and has no special meaning to astobj2.
2967  */
2968  cmp = 0;
2969  break;
2970  }
2971  if (cmp) {
2972  return 0;
2973  }
2974  /*
2975  * At this point the traversal callback is identical to a sorted
2976  * container.
2977  */
2978  return CMP_MATCH;
2979 }
2980 
2981 static int topic_statistics_hash(const void *obj, const int flags)
2982 {
2983  const struct stasis_topic_statistics *object;
2984  const char *key;
2985 
2986  switch (flags & OBJ_SEARCH_MASK) {
2987  case OBJ_SEARCH_KEY:
2988  key = obj;
2989  break;
2990  case OBJ_SEARCH_OBJECT:
2991  object = obj;
2992  key = object->name;
2993  break;
2994  default:
2995  /* Hash can only work on something with a full key. */
2996  ast_assert(0);
2997  return 0;
2998  }
2999  return ast_str_case_hash(key);
3000 }
3001 
3002 static int topic_statistics_cmp(void *obj, void *arg, int flags)
3003 {
3004  const struct stasis_topic_statistics *object_left = obj;
3005  const struct stasis_topic_statistics *object_right = arg;
3006  const char *right_key = arg;
3007  int cmp;
3008 
3009  switch (flags & OBJ_SEARCH_MASK) {
3010  case OBJ_SEARCH_OBJECT:
3011  right_key = object_right->name;
3012  /* Fall through */
3013  case OBJ_SEARCH_KEY:
3014  cmp = strcasecmp(object_left->name, right_key);
3015  break;
3017  /* Not supported by container */
3018  ast_assert(0);
3019  cmp = -1;
3020  break;
3021  default:
3022  /*
3023  * What arg points to is specific to this traversal callback
3024  * and has no special meaning to astobj2.
3025  */
3026  cmp = 0;
3027  break;
3028  }
3029  if (cmp) {
3030  return 0;
3031  }
3032  /*
3033  * At this point the traversal callback is identical to a sorted
3034  * container.
3035  */
3036  return CMP_MATCH;
3037 }
3038 #endif
3039 
3040 /*! \brief Cleanup function for graceful shutdowns */
3041 static void stasis_cleanup(void)
3042 {
3043 #ifdef AST_DEVMODE
3044  ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3045  AST_VECTOR_FREE(&message_type_statistics);
3046  ao2_global_obj_release(subscription_statistics);
3047  ao2_global_obj_release(topic_statistics);
3048 #endif
3049  ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
3050  ao2_cleanup(topic_all);
3051  topic_all = NULL;
3052  ast_threadpool_shutdown(threadpool);
3053  threadpool = NULL;
3056  aco_info_destroy(&cfg_info);
3058 }
3059 
3060 int stasis_init(void)
3061 {
3062  struct stasis_config *cfg;
3063  int cache_init;
3064  struct ast_threadpool_options threadpool_opts = { 0, };
3065 #ifdef AST_DEVMODE
3066  struct ao2_container *subscription_stats;
3067  struct ao2_container *topic_stats;
3068 #endif
3069 
3070  /* Be sure the types are cleaned up after the message bus */
3072 
3073  if (aco_info_init(&cfg_info)) {
3074  return -1;
3075  }
3076 
3077  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3078  declined_options, "", declined_handler, 0);
3079  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3080  threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
3081  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3082  INT_MAX);
3083  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3084  threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
3085  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3086  INT_MAX);
3087  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3088  threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
3089  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3090  INT_MAX);
3091 
3092  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3093  struct stasis_config *default_cfg = stasis_config_alloc();
3094 
3095  if (!default_cfg) {
3096  return -1;
3097  }
3098 
3099  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3100  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3101  ao2_ref(default_cfg, -1);
3102 
3103  return -1;
3104  }
3105 
3106  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3107  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3108  ao2_ref(default_cfg, -1);
3109 
3110  return -1;
3111  }
3112 
3113  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3114  ao2_global_obj_replace_unref(globals, default_cfg);
3115  cfg = default_cfg;
3116  } else {
3117  cfg = ao2_global_obj_ref(globals);
3118  if (!cfg) {
3119  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3120 
3121  return -1;
3122  }
3123  }
3124 
3125  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3126  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3127  threadpool_opts.auto_increment = 1;
3128  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3129  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3130  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3131  ao2_ref(cfg, -1);
3132  if (!threadpool) {
3133  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3134 
3135  return -1;
3136  }
3137 
3138  cache_init = stasis_cache_init();
3139  if (cache_init != 0) {
3140  return -1;
3141  }
3142 
3144  return -1;
3145  }
3147  return -1;
3148  }
3149 
3151  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3152  if (!topic_all) {
3153  return -1;
3154  }
3155 
3156  if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
3157  return -1;
3158  }
3159 
3160 #ifdef AST_DEVMODE
3161  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3162  * topic or subscripton.
3163  */
3164  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3165  subscription_statistics_hash, 0, subscription_statistics_cmp);
3166  if (!subscription_stats) {
3167  return -1;
3168  }
3169  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3170  ao2_cleanup(subscription_stats);
3171 
3172  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3173  topic_statistics_hash, 0, topic_statistics_cmp);
3174  if (!topic_stats) {
3175  return -1;
3176  }
3177  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3178  ao2_cleanup(topic_stats);
3179  if (!topic_stats) {
3180  return -1;
3181  }
3182 
3183  AST_VECTOR_INIT(&message_type_statistics, 0);
3184 
3185  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3186  return -1;
3187  }
3188 #endif
3189 
3190  return 0;
3191 }
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1626
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
static struct aco_type threadpool_option
Definition: stasis.c:2201
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1230
static const char type[]
Definition: chan_ooh323.c:109
Struct containing info for an AMI event to send out.
Definition: manager.h:491
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2416
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
static void statistics(void)
Definition: utils/frame.c:287
Main Channel structure associated with a channel.
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
#define FMT_FIELDS
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads...
Internal Stasis APIs.
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:585
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1974
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
struct stasis_topic * topic
Definition: stasis.c:684
static int message_type_id
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1364
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:304
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
static void topic_dtor(void *obj)
Definition: stasis.c:433
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:643
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
Definition: stasis.c:1708
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1743
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1783
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3041
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1170
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
descriptor for a cli entry.
Definition: cli.h:171
const int argc
Definition: cli.h:160
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
ast_cond_t join_cond
Definition: stasis.c:693
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1054
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1939
Threadpool configuration options.
Definition: stasis.c:2185
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1119
Structure for variables, used for configurations and for channel variables.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define var
Definition: ast_expr2f.c:614
Structure representing a snapshot of channel state.
Universally unique identifier support.
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
static int sub_cleanup(void *data)
Definition: stasis.c:965
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1723
Definition: cli.h:152
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1883
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1714
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:222
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
struct aco_type * declined_options[]
Definition: stasis.c:2220
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1091
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2237
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
static void forward_dtor(void *obj)
Definition: stasis.c:1537
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:650
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ast_mutex_lock(a)
Definition: lock.h:187
char * detail
Definition: stasis.c:401
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:931
#define ao2_unlock(a)
Definition: astobj2.h:730
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:98
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define NULL
Definition: resample.c:96
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
Definition: stasis.c:2154
#define ao2_wrlock(a)
Definition: astobj2.h:720
The representation of a single configuration file to be processed.
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
struct aco_file stasis_conf
Definition: stasis.c:2222
enum aco_type_t type
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1619
#define ast_cond_signal(cond)
Definition: lock.h:201
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
struct ao2_container * pool_container
Definition: stasis.c:1739
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1926
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9727
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate...
struct stasis_topic * from_topic
Definition: stasis.c:1532
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
Utility functions.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
int final_message_processed
Definition: stasis.c:699
int args
This gets set in ast_cli_register()
Definition: cli.h:185
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1442
pthread_cond_t ast_cond_t
Definition: lock.h:176
char * name
Definition: stasis.c:400
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1821
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: stasis.c:2044
#define ao2_bump(obj)
Definition: astobj2.h:491
ast_cond_t cond
Definition: stasis.c:1277
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1762
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:943
struct ao2_container * declined
Definition: stasis.c:2181
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1012
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2198
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1431
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
char buf[0]
Definition: stasis.c:405
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1949
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1175
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
static char * topic_complete_name(const char *word)
Definition: stasis.c:2390
ast_mutex_t lock
Definition: stasis.c:1276
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:851
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const int fd
Definition: cli.h:159
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2282
const int n
Definition: cli.h:165
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:500
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1647
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1675
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2333
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1357
struct stasis_topic * to_topic
Definition: stasis.c:1534
struct ao2_container * container
Definition: res_fax.c:502
static struct console_pvt globals
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
int final_message_rxed
Definition: stasis.c:696
struct stasis_topic * topic
Definition: stasis.c:1710
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2146
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1515
Their was an error and no changes were applied.
struct stasis_topic * topic
Definition: stasis.h:893
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1313
const char *const * argv
Definition: cli.h:161
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
struct timeval creationtime
Definition: stasis.c:403
Configuration option-handling.
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2475
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
static void * stasis_config_alloc(void)
Definition: stasis.c:2252
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:586
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
#define ao2_global_obj_release(holder)
Definition: astobj2.h:865
#define EVENT_FLAG_USER
Definition: manager.h:77
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Local data parameter.
struct ao2_container * topic_all
Definition: stasis.c:395
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
#define LOG_NOTICE
Definition: logger.h:263
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1135
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2095
#define ast_strlen_zero(a)
Definition: muted.c:73
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1768
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
#define CLI_FAILURE
Definition: cli.h:46
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:972
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:568
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
char * command
Definition: cli.h:186
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
static void multi_object_blob_dtor(void *obj)
Definition: stasis.c:1958
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1863
const char * word
Definition: cli.h:163
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:954
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2212
Vector container support.
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:810
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1202
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:412
int stasis_cache_init(void)
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1287
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2000
const char * usage
Definition: cli.h:177
stasis_subscription_cb callback
Definition: stasis.c:688
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1094
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define CLI_SUCCESS
Definition: cli.h:44
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:389
#define ao2_global_obj_replace_unref(holder, obj)
Definition: astobj2.h:908
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2009
#define ACO_FILES(...)
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
char name[0]
Definition: stasis.c:1711
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:635
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct stasis_topic * pool_topic
Definition: stasis.c:1740
FILE * out
Definition: utils/frame.c:33
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:297
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct stasis_forward * forward
Definition: stasis.c:1709
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106
Type information about a category-level configurable object.
An opaque threadpool structure.
Definition: threadpool.c:36
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:301
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:79
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
const int pos
Definition: cli.h:164
const char * filename
A structure to hold global configuration-related options.
Definition: stasis.c:2179
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078
static PGresult * result
Definition: cel_pgsql.c:88
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AO2_WEAKPROXY()
Macro which must be used at the beginning of weakproxy capable objects.
Definition: astobj2.h:539
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription&#39;s callback.
Definition: stasis.c:755
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1547
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2196
#define FMT_HEADERS
Abstract JSON element (object, array, string, int, ...).
struct ast_json * blob
Definition: stasis.c:1950
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:557
Definition: search.h:40
Forwarding information.
Definition: stasis.c:1530
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
void * task_data
Definition: stasis.c:1279
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:622
#define ast_mutex_init(pmutex)
Definition: lock.h:184
Generic container type.
Search option field mask.
Definition: astobj2.h:1076
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: main/utils.c:1920
static struct aco_type * threadpool_options[]
Definition: stasis.c:2209
#define ast_mutex_destroy(a)
Definition: lock.h:186
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2726
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1510
static void subscription_dtor(void *obj)
Definition: stasis.c:714
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:307
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2087
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2303
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:311
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1577
Endpoint abstractions.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2244
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
Definition: stasis.c:1832
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:206
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:518
Structure for mutex and tracking information.
Definition: lock.h:135
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1151
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1250
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:317
short word
#define ast_mutex_unlock(a)
Definition: lock.h:188
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1260
static struct test_val a
#define ao2_link(container, obj)
Definition: astobj2.h:1549
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3060