Asterisk - The Open Source Telephony Project  GIT-master-a24979a
stasis.h
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
21 
22 /*! \file
23  *
24  * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25  * detailed documentation.
26  *
27  * \author David M. Lee, II <dlee@digium.com>
28  * \since 12
29  *
30  * \page stasis Stasis Message Bus API
31  *
32  * \par Intro
33  *
34  * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35  * within Asterisk. It is designed to be:
36  * - Loosely coupled; new message types can be added in seperate modules.
37  * - Easy to use; publishing and subscribing are straightforward operations.
38  *
39  * There are three main concepts for using the Stasis Message Bus:
40  * - \ref stasis_message
41  * - \ref stasis_topic
42  * - \ref stasis_subscription
43  *
44  * \par stasis_message
45  *
46  * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47  * that are sent on the bus. These messages have:
48  * - a type (as defined by a \ref stasis_message_type)
49  * - a value - a \c void pointer to an AO2 object
50  * - a timestamp when it was created
51  *
52  * Once a \ref stasis_message has been created, it is immutable and cannot
53  * change. The same goes for the value of the message (although this cannot be
54  * enforced in code). Messages themselves are reference-counted, AO2 objects,
55  * along with their values. By being both reference counted and immutable,
56  * messages can be shared throughout the system without any concerns for
57  * threading.
58  *
59  * The type of a message is defined by an instance of \ref stasis_message_type,
60  * which can be created by calling stasis_message_type_create(). Message types
61  * are named, which is useful in debugging. It is recommended that the string
62  * name for a message type match the name of the struct that's stored in the
63  * message. For example, name for \ref stasis_cache_update's message type is \c
64  * "stasis_cache_update".
65  *
66  * \par stasis_topic
67  *
68  * A \ref stasis_topic is an object to which stasis_topic_subscriber's may be
69  * subscribed, and \ref stasis_message's may be published. Any message published
70  * to the topic is dispatched to all of its subscribers. The topic itself may be
71  * named, which is useful in debugging.
72  *
73  * Topics themselves are reference counted objects. Since topics are referred to
74  * by their subscribers, they will not be freed until all of their subscribers
75  * have unsubscribed. Topics are also thread safe, so no worries about
76  * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77  * threads. It's also designed to handle the case of unsubscribing from a topic
78  * from within the subscription handler.
79  *
80  * \par Forwarding
81  *
82  * There is one special case of topics that's worth noting: forwarding
83  * messages. It's a fairly common use case to want to forward all the messages
84  * published on one topic to another one (for example, an aggregator topic that
85  * publishes all the events from a set of other topics). This can be
86  * accomplished easily using stasis_forward_all(). This sets up the forwarding
87  * between the two topics, and returns a \ref stasis_subscription, which can be
88  * unsubscribed to stop the forwarding.
89  *
90  * \par Caching
91  *
92  * Another common use case is to want to cache certain messages that are
93  * published on the bus. Usually these events are snapshots of the current state
94  * in the system, and it's desirable to query that state from the cache without
95  * locking the original object. It's also desirable for subscribers of the
96  * caching topic to receive messages that have both the old cache value and the
97  * new value being put into the cache. For this, we have stasis_cache_create()
98  * and stasis_caching_topic_create(), providing them with the topic which
99  * publishes the messages that you wish to cache, and a function that can
100  * identify cacheable messages.
101  *
102  * The \ref stasis_cache is designed so that it may be shared amongst several
103  * \ref stasis_caching_topic objects. This allows you to have individual caching
104  * topics per-object (i.e. so you can subscribe to updates for a single object),
105  * and still have a single cache to query for the state of all objects. While a
106  * cache may be shared amongst different message types, such a usage is probably
107  * not a good idea.
108  *
109  * The \ref stasis_cache can only be written to by \ref stasis_caching_topic.
110  * It's a thread safe container, so freely use the stasis_cache_get() and
111  * stasis_cache_dump() to query the cache.
112  *
113  * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114  * message is wrapped in a \ref stasis_cache_update message which provides the
115  * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116  * (or \c NULL if the entry was removed from the cache). A
117  * stasis_cache_clear_create() message must be sent to the topic in order to
118  * remove entries from the cache.
119  *
120  * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122  * stasis_caching_topic will not be freed until after it has been unsubscribed,
123  * and all other ao2_ref()'s have been cleaned up.
124  *
125  * The \ref stasis_cache object is a normal AO2 managed object, which can be
126  * release with ao2_cleanup().
127  *
128  * \par stasis_topic_subscriber
129  *
130  * Any topic may be subscribed to by simply providing stasis_subscribe() the
131  * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132  * data that is passed back to the handler. Invocations on the subscription's
133  * handler are serialized, but different invocations may occur on different
134  * threads (this usually isn't important unless you use thread locals or
135  * something similar).
136  *
137  * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138  * stasis_subscription. Due to cyclic references, the \ref
139  * stasis_subscription will not be freed until after it has been unsubscribed,
140  * and all other ao2_ref()'s have been cleaned up.
141  *
142  * \par Shutdown
143  *
144  * Subscriptions have two options for unsubscribing, depending upon the context
145  * in which you need to unsubscribe.
146  *
147  * If your subscription is owned by a module, and you must unsubscribe from the
148  * module_unload() function, then you'll want to use the
149  * stasis_unsubscribe_and_join() function. This will block until the final
150  * message has been received on the subscription. Otherwise, there's the danger
151  * of invoking the callback function after it has been unloaded.
152  *
153  * If your subscription is owned by an object, then your object should have an
154  * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155  * subscription handler, when the stasis_subscription_final_message() has been
156  * received, decrement the refcount on your object. In your object's destructor,
157  * you may assert that stasis_subscription_is_done() to validate that the
158  * subscription's callback will no longer be invoked.
159  *
160  * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161  * an object's destructor. While code that does this may work most of the time,
162  * it's got one big downside. There's a general assumption that object
163  * destruction is non-blocking. If you block the destruction waiting for the
164  * subscription to complete, there's the danger that the subscription may
165  * process a message which will bump the refcount up by one. Then it does
166  * whatever it does, decrements the refcount, which then proceeds to re-destroy
167  * the object. Now you've got hard to reproduce bugs that only show up under
168  * certain loads.
169  */
170 
171 #include "asterisk/json.h"
172 #include "asterisk/manager.h"
173 #include "asterisk/utils.h"
174 #include "asterisk/event.h"
175 
176 /*!
177  * \brief Metadata about a \ref stasis_message.
178  * \since 12
179  */
180 struct stasis_message_type;
181 
182 /*!
183  * \brief Opaque type for a Stasis message.
184  * \since 12
185  */
186 struct stasis_message;
187 
188 /*!
189  * \brief Opaque type for a Stasis subscription.
190  * \since 12
191  */
192 struct stasis_subscription;
193 
194 /*!
195  * \brief Structure containing callbacks for Stasis message sanitization
196  *
197  * \note If either callback is implemented, both should be implemented since
198  * not all callers may have access to the full snapshot.
199  */
201  /*!
202  * \brief Callback which determines whether a channel should be sanitized from
203  * a message based on the channel's unique ID
204  *
205  * \param channel_id The unique ID of the channel
206  *
207  * \retval non-zero if the channel should be left out of the message
208  * \retval zero if the channel should remain in the message
209  */
210  int (*channel_id)(const char *channel_id);
211 
212  /*!
213  * \brief Callback which determines whether a channel should be sanitized from
214  * a message based on the channel's snapshot
215  *
216  * \param snapshot A snapshot generated from the channel
217  *
218  * \retval non-zero if the channel should be left out of the message
219  * \retval zero if the channel should remain in the message
220  */
221  int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
222 
223  /*!
224  * \brief Callback which determines whether a channel should be sanitized from
225  * a message based on the channel
226  *
227  * \param chan The channel to be checked
228  *
229  * \retval non-zero if the channel should be left out of the message
230  * \retval zero if the channel should remain in the message
231  */
232  int (*channel)(const struct ast_channel *chan);
233 };
234 
235 /*!
236  * \brief Virtual table providing methods for messages.
237  * \since 12
238  */
240  /*!
241  * \brief Build the JSON representation of the message.
242  *
243  * May be \c NULL, or may return \c NULL, to indicate no representation.
244  * The returned object should be ast_json_unref()'ed.
245  *
246  * \param message Message to convert to JSON string.
247  * \param sanitize Snapshot sanitization callback.
248  *
249  * \return Newly allocated JSON message.
250  * \retval NULL if JSON format is not supported.
251  */
252  struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
253 
254  /*!
255  * \brief Build the AMI representation of the message.
256  *
257  * May be \c NULL, or may return \c NULL, to indicate no representation.
258  * The returned object should be ao2_cleanup()'ed.
259  *
260  * \param message Message to convert to AMI string.
261  * \return Newly allocated \ref ast_manager_event_blob.
262  * \retval NULL if AMI format is not supported.
263  */
264  struct ast_manager_event_blob *(*to_ami)(
265  struct stasis_message *message);
266 
267  /*!
268  * \since 12.3.0
269  * \brief Build the \ref ast_event representation of the message.
270  *
271  * May be \c NULL, or may return \c NULL, to indicate no representation.
272  * The returned object should be free'd.
273  *
274  * \param message Message to convert to an \ref ast_event.
275  * \return Newly allocated \ref ast_event.
276  * \retval NULL if AMI format is not supported.
277  */
278  struct ast_event *(*to_event)(
279  struct stasis_message *message);
280 };
281 
282 /*!
283  * \brief Return code for Stasis message type creation attempts
284  */
286  STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
287  STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
288  STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
289 };
290 
291 /*!
292  * \brief Stasis subscription message filters
293  */
295  STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
296  STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
297  STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
298 };
299 
300 /*!
301  * \brief Stasis subscription formatter filters
302  *
303  * There should be an entry here for each member of \ref stasis_message_vtable
304  *
305  * \since 13.25.0
306  * \since 16.2.0
307  */
310  STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
311  STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
312  STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
313 };
314 
315 /*!
316  * \brief Create a new message type.
317  *
318  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
319  * with it.
320  *
321  * \param name Name of the new type.
322  * \param vtable Virtual table of message methods. May be \c NULL.
323  * \param[out] result The location where the new message type will be placed
324  *
325  * \note Stasis message type creation may be declined if the message type is disabled
326  *
327  * \returns A stasis_message_type_result enum
328  * \since 12
329  */
331  struct stasis_message_vtable *vtable, struct stasis_message_type **result);
332 
333 /*!
334  * \brief Gets the name of a given message type
335  * \param type The type to get.
336  * \return Name of the type.
337  * \retval NULL if \a type is \c NULL.
338  * \since 12
339  */
340 const char *stasis_message_type_name(const struct stasis_message_type *type);
341 
342 /*!
343  * \brief Gets the hash of a given message type
344  * \param type The type to get the hash of.
345  * \return The hash
346  * \since 13.24.0
347  */
348 unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
349 
350 /*!
351  * \brief Gets the id of a given message type
352  * \param type The type to get the id of.
353  * \return The id
354  * \since 17.0.0
355  */
357 
358 /*!
359  * \brief Check whether a message type is declined
360  *
361  * \param name The name of the message type to check
362  *
363  * \retval zero The message type is not declined
364  * \retval non-zero The message type is declined
365  */
366 int stasis_message_type_declined(const char *name);
367 
368 /*!
369  * \brief Create a new message.
370  *
371  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
372  * with it. Messages are also immutable, and must not be modified after they
373  * are initialized. Especially the \a data in the message.
374  *
375  * \param type Type of the message
376  * \param data Immutable data that is the actual contents of the message
377  *
378  * \return New message
379  * \retval NULL on error
380  *
381  * \since 12
382  */
384 
385 /*!
386  * \brief Create a new message for an entity.
387  *
388  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
389  * with it. Messages are also immutable, and must not be modified after they
390  * are initialized. Especially the \a data in the message.
391  *
392  * \param type Type of the message
393  * \param data Immutable data that is the actual contents of the message
394  * \param eid What entity originated this message. (NULL for aggregate)
395  *
396  * \note An aggregate message is a combined representation of the local
397  * and remote entities publishing the message data. e.g., An aggregate
398  * device state represents the combined device state from the local and
399  * any remote entities publishing state for a device. e.g., An aggregate
400  * MWI message is the old/new MWI counts accumulated from the local and
401  * any remote entities publishing to a mailbox.
402  *
403  * \retval New message
404  * \retval NULL on error
405  *
406  * \since 12.2.0
407  */
408 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
409 
410 /*!
411  * \brief Get the entity id for a \ref stasis_message.
412  * \since 12.2.0
413  *
414  * \param msg Message to get eid.
415  *
416  * \retval Entity id of \a msg
417  * \retval NULL if \a msg is an aggregate or \a msg is \c NULL.
418  */
419 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
420 
421 /*!
422  * \brief Get the message type for a \ref stasis_message.
423  * \param msg Message to type
424  * \return Type of \a msg
425  * \retval NULL if \a msg is \c NULL.
426  * \since 12
427  */
428 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
429 
430 /*!
431  * \brief Get the data contained in a message.
432  * \param msg Message.
433  * \return Immutable data pointer
434  * \retval NULL if msg is \c NULL.
435  * \since 12
436  */
437 void *stasis_message_data(const struct stasis_message *msg);
438 
439 /*!
440  * \brief Get the time when a message was created.
441  * \param msg Message.
442  * \return Pointer to the \a timeval when the message was created.
443  * \retval NULL if msg is \c NULL.
444  * \since 12
445  */
446 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
447 
448 /*!
449  * \brief Build the JSON representation of the message.
450  *
451  * May return \c NULL, to indicate no representation. The returned object should
452  * be ast_json_unref()'ed.
453  *
454  * \param msg Message to convert to JSON string.
455  * \param sanitize Snapshot sanitization callback.
456  *
457  * \return Newly allocated string with JSON message.
458  * \retval NULL if JSON format is not supported.
459  */
460 struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
461 
462 /*!
463  * \brief Build the AMI representation of the message.
464  *
465  * May return \c NULL, to indicate no representation. The returned object should
466  * be ao2_cleanup()'ed.
467  *
468  * \param msg Message to convert to AMI.
469  * \retval NULL if AMI format is not supported.
470  */
472 
473 /*!
474  * \brief Determine if the given message can be converted to AMI.
475  *
476  * \param msg Message to see if can be converted to AMI.
477  *
478  * \retval 0 Cannot be converted
479  * \retval non-zero Can be converted
480  */
482 
483 /*!
484  * \brief Build the \ref AstGenericEvents representation of the message.
485  *
486  * May return \c NULL, to indicate no representation. The returned object should
487  * be disposed of via \ref ast_event_destroy.
488  *
489  * \param msg Message to convert to AMI.
490  * \retval NULL if AMI format is not supported.
491  */
493 
494 /*!
495  * \brief A topic to which messages may be posted, and subscribers, well, subscribe
496  * \since 12
497  */
498 struct stasis_topic;
499 
500 /*!
501  * \brief Create a new topic.
502  * \param name Name of the new topic.
503  * \return New topic instance.
504  * \retval NULL on error.
505  * \since 12
506  *
507  * \note There is no explicit ability to unsubscribe all subscribers
508  * from a topic and destroy it. As a result the topic can persist until
509  * the last subscriber unsubscribes itself even if there is no
510  * publisher.
511  *
512  * \note Topic names should be in the form of
513  * \verbatim <subsystem>:<functionality>[/<object>] \endverbatim
514  */
515 struct stasis_topic *stasis_topic_create(const char *name);
516 
517 /*!
518  * \brief Create a new topic with given detail.
519  * \param name Name of the new topic.
520  * \param detail Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
521  * \return New topic instance.
522  * \retval NULL on error.
523  *
524  * \note There is no explicit ability to unsubscribe all subscribers
525  * from a topic and destroy it. As a result the topic can persist until
526  * the last subscriber unsubscribes itself even if there is no
527  * publisher.
528  */
530  const char *name, const char *detail);
531 
532 /*!
533  * \brief Get a topic of the given name.
534  * \param name Topic's name.
535  * \return Name of the topic.
536  * \retval NULL on error or not exist.
537  *
538  * \note This SHOULD NOT be used in normal operation for publishing messages.
539  */
540 struct stasis_topic *stasis_topic_get(const char *name);
541 
542 /*!
543  * \brief Return the uniqueid of a topic.
544  * \param topic Topic.
545  * \return Uniqueid of the topic.
546  * \retval NULL if topic is \c NULL.
547  */
548 const char *stasis_topic_uniqueid(const struct stasis_topic *topic);
549 
550 /*!
551  * \brief Return the name of a topic.
552  * \param topic Topic.
553  * \return Name of the topic.
554  * \retval NULL if topic is \c NULL.
555  */
556 const char *stasis_topic_name(const struct stasis_topic *topic);
557 
558 /*!
559  * \brief Return the detail of a topic.
560  * \param topic Topic.
561  * \return Detail of the topic.
562  * \retval NULL if topic is \c NULL.
563  * \since 12
564  */
565 const char *stasis_topic_detail(const struct stasis_topic *topic);
566 
567 /*!
568  * \brief Return the number of subscribers of a topic.
569  * \param topic Topic.
570  * \return Number of subscribers of the topic.
571  * \since 17.0.0
572  */
573 size_t stasis_topic_subscribers(const struct stasis_topic *topic);
574 
575 /*!
576  * \brief Publish a message to a topic's subscribers.
577  * \param topic Topic.
578  * \param message Message to publish.
579  *
580  * This call is asynchronous and will return immediately upon queueing
581  * the message for delivery to the topic's subscribers.
582  *
583  * \since 12
584  */
585 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
586 
587 /*!
588  * \brief Publish a message to a topic's subscribers, synchronizing
589  * on the specified subscriber
590  * \param sub Subscription to synchronize on.
591  * \param message Message to publish.
592  *
593  * The caller of stasis_publish_sync will block until the specified
594  * subscriber completes handling of the message.
595  *
596  * All other subscribers to the topic the \ref stasis_subscription
597  * is subscribed to are also delivered the message; this delivery however
598  * happens asynchronously.
599  *
600  * \since 12.1.0
601  */
603 
604 /*!
605  * \brief Callback function type for Stasis subscriptions.
606  * \param data Data field provided with subscription.
607  * \param sub Subscription published on.
608  * \param message Published message.
609  * \since 12
610  */
611 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
612 
613 /*!
614  * \brief Stasis subscription callback function that does nothing.
615  *
616  * \note This callback should be used for events are not directly processed, but need
617  * to be generated so data can be retrieved from cache later. Subscriptions with this
618  * callback can be released with \ref stasis_unsubscribe, even during module unload.
619  *
620  * \since 13.5
621  */
623 
624 /*!
625  * \brief Create a subscription.
626  *
627  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
628  * up this reference), the subscription must be explicitly unsubscribed from its
629  * topic using stasis_unsubscribe().
630  *
631  * The invocations of the callback are serialized, but may not always occur on
632  * the same thread. The invocation order of different subscriptions is
633  * unspecified.
634  *
635  * \param topic Topic to subscribe to.
636  * \param callback Callback function for subscription messages.
637  * \param data Data to be passed to the callback, in addition to the message.
638  * \param file, lineno, func
639  * \return New \ref stasis_subscription object.
640  * \retval NULL on error.
641  * \since 12
642  *
643  * \note This callback will receive a callback with a message indicating it
644  * has been subscribed. This occurs immediately before accepted message
645  * types can be set and the callback must expect to receive it.
646  */
648  stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
649 #define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
650 
651 /*!
652  * \brief Create a subscription whose callbacks occur on a thread pool
653  *
654  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
655  * up this reference), the subscription must be explicitly unsubscribed from its
656  * topic using stasis_unsubscribe().
657  *
658  * The invocations of the callback are serialized, but will almost certainly not
659  * always happen on the same thread. The invocation order of different subscriptions
660  * is unspecified.
661  *
662  * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
663  * dispatch items to its \c callback. This form of subscription should be used
664  * when many subscriptions may be made to the specified \c topic.
665  *
666  * \param topic Topic to subscribe to.
667  * \param callback Callback function for subscription messages.
668  * \param data Data to be passed to the callback, in addition to the message.
669  * \param file, lineno, func
670  * \return New \ref stasis_subscription object.
671  * \retval NULL on error.
672  * \since 12.8.0
673  *
674  * \note This callback will receive a callback with a message indicating it
675  * has been subscribed. This occurs immediately before accepted message
676  * types can be set and the callback must expect to receive it.
677  */
679  stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
680 #define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
681 
682 /*!
683  * \brief Indicate to a subscription that we are interested in a message type.
684  *
685  * This will cause the subscription to allow the given message type to be
686  * raised to our subscription callback. This enables internal filtering in
687  * the stasis message bus to reduce messages.
688  *
689  * \param subscription Subscription to add message type to.
690  * \param type The message type we wish to receive.
691  * \retval 0 on success
692  * \retval -1 failure
693  *
694  * \since 17.0.0
695  *
696  * \note If you are wanting to use stasis_final_message you will need to accept
697  * \ref stasis_subscription_change_type as a message type.
698  *
699  * \note Until the subscription is set to selective filtering it is possible for it
700  * to receive messages of message types that would not normally be accepted.
701  */
703  const struct stasis_message_type *type);
704 
705 /*!
706  * \brief Indicate to a subscription that we are not interested in a message type.
707  *
708  * \param subscription Subscription to remove message type from.
709  * \param type The message type we don't wish to receive.
710  * \retval 0 on success
711  * \retval -1 failure
712  *
713  * \since 17.0.0
714  */
716  const struct stasis_message_type *type);
717 
718 /*!
719  * \brief Set the message type filtering level on a subscription
720  *
721  * This will cause the subscription to filter messages according to the
722  * provided filter level. For example if selective is used then only
723  * messages matching those provided to \ref stasis_subscription_accept_message_type
724  * will be raised to the subscription callback.
725  *
726  * \param subscription Subscription that should receive all messages.
727  * \param filter What filter to use
728  * \retval 0 on success
729  * \retval -1 failure
730  *
731  * \since 17.0.0
732  */
733 int stasis_subscription_set_filter(struct stasis_subscription *subscription,
735 
736 /*!
737  * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
738  *
739  * \param subscription Subscription to alter.
740  * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
741  *
742  * \since 13.25.0
743  * \since 16.2.0
744  */
747 
748 /*!
749  * \brief Get a bitmap of available formatters for a message type
750  *
751  * \param message_type Message type
752  * \return A bitmap of \ref stasis_subscription_message_formatters
753  *
754  * \since 13.25.0
755  * \since 16.2.0
756  */
758  const struct stasis_message_type *message_type);
759 
760 /*!
761  * \brief Cancel a subscription.
762  *
763  * Note that in an asynchronous system, there may still be messages queued or
764  * in transit to the subscription's callback. These will still be delivered.
765  * There will be a final 'SubscriptionCancelled' message, indicating the
766  * delivery of the final message.
767  *
768  * \param subscription Subscription to cancel.
769  * \retval NULL for convenience
770  * \since 12
771  */
773  struct stasis_subscription *subscription);
774 
775 /*!
776  * \brief Set the high and low alert water marks of the stasis subscription.
777  * \since 13.10.0
778  *
779  * \param subscription Pointer to a stasis subscription
780  * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
781  * \param high_water New queue high water mark.
782  *
783  * \retval 0 on success.
784  * \retval -1 on error (water marks not changed).
785  */
787  long low_water, long high_water);
788 
789 /*!
790  * \brief Block until the last message is processed on a subscription.
791  *
792  * This function will not return until the \a subscription's callback for the
793  * stasis_subscription_final_message() completes. This allows cleanup routines
794  * to run before unblocking the joining thread.
795  *
796  * \param subscription Subscription to block on.
797  * \since 12
798  */
799 void stasis_subscription_join(struct stasis_subscription *subscription);
800 
801 /*!
802  * \brief Returns whether \a subscription has received its final message.
803  *
804  * Note that a subscription is considered done even while the
805  * stasis_subscription_final_message() is being processed. This allows cleanup
806  * routines to check the status of the subscription.
807  *
808  * \param subscription Subscription.
809  * \return True (non-zero) if stasis_subscription_final_message() has been
810  * received.
811  * \return False (zero) if waiting for the end.
812  */
813 int stasis_subscription_is_done(struct stasis_subscription *subscription);
814 
815 /*!
816  * \brief Cancel a subscription, blocking until the last message is processed.
817  *
818  * While normally it's recommended to stasis_unsubscribe() and wait for
819  * stasis_subscription_final_message(), there are times (like during a module
820  * unload) where you have to wait for the final message (otherwise you'll call
821  * a function in a shared module that no longer exists).
822  *
823  * \param subscription Subscription to cancel.
824  * \retval NULL for convenience
825  * \since 12
826  */
828  struct stasis_subscription *subscription);
829 
830 struct stasis_forward;
831 
832 /*!
833  * \brief Create a subscription which forwards all messages from one topic to
834  * another.
835  *
836  * Note that the \a topic parameter of the invoked callback will the be the
837  * \a topic the message was sent to, not the topic the subscriber subscribed to.
838  *
839  * \param from_topic Topic to forward.
840  * \param to_topic Destination topic of forwarded messages.
841  * \return New forwarding subscription.
842  * \retval NULL on error.
843  * \since 12
844  */
846  struct stasis_topic *to_topic);
847 
848 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
849 
850 /*!
851  * \brief Get the unique ID for the subscription.
852  *
853  * \param sub Subscription for which to get the unique ID.
854  * \return Unique ID for the subscription.
855  * \since 12
856  */
857 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
858 
859 /*!
860  * \brief Returns whether a subscription is currently subscribed.
861  *
862  * Note that there may still be messages queued up to be dispatched to this
863  * subscription, but the stasis_subscription_final_message() has been enqueued.
864  *
865  * \param sub Subscription to check
866  * \return False (zero) if subscription is not subscribed.
867  * \return True (non-zero) if still subscribed.
868  */
870 
871 /*!
872  * \brief Determine whether a message is the final message to be received on a subscription.
873  *
874  * \param sub Subscription on which the message was received.
875  * \param msg Message to check.
876  * \return zero if the provided message is not the final message.
877  * \return non-zero if the provided message is the final message.
878  * \since 12
879  */
881 
882 /*! \addtogroup StasisTopicsAndMessages
883  * @{
884  */
885 
886 /*!
887  * \brief Holds details about changes to subscriptions for the specified topic
888  * \since 12
889  */
891  struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
892  char *uniqueid; /*!< The unique ID associated with this subscription */
893  char description[0]; /*!< The description of the change to the subscription associated with the uniqueid */
894 };
895 
896 /*!
897  * \brief Gets the message type for subscription change notices
898  * \return The stasis_message_type for subscription change notices
899  * \since 12
900  */
902 
903 /*! @} */
904 
905 /*!
906  * \brief Pool for topic aggregation
907  */
908 struct stasis_topic_pool;
909 
910 /*!
911  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
912  * \param pooled_topic Topic to which messages will be routed
913  * \return the new stasis_topic_pool
914  * \retval NULL on failure
915  */
916 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
917 
918 /*!
919  * \brief Find or create a topic in the pool
920  * \param pool Pool for which to get the topic
921  * \param topic_name Name of the topic to get
922  * \return The already stored or newly allocated topic
923  * \retval NULL if the topic was not found and could not be allocated
924  */
925 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
926 
927 /*!
928  * \brief Delete a topic from the topic pool
929  *
930  * \param pool Pool from which to delete the topic
931  * \param topic_name Name of the topic to delete in the form of
932  * \verbatim [<pool_topic_name>/]<topic_name> \endverbatim
933  *
934  * \since 13.24
935  * \since 15.6
936  * \since 16.1
937  */
938 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name);
939 
940 /*!
941  * \brief Check if a topic exists in a pool
942  * \param pool Pool to check
943  * \param topic_name Name of the topic to check
944  * \retval 1 exists
945  * \retval 0 does not exist
946  * \since 13.23.0
947  */
948 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name);
949 
950 /*! \addtogroup StasisTopicsAndMessages
951  * @{
952  */
953 
954 /*!
955  * \brief Message type for cache update messages.
956  * \return Message type for cache update messages.
957  * \since 12
958  */
960 
961 /*!
962  * \brief Cache update message
963  * \since 12
964  */
966  /*! \brief Convenience reference to snapshot type */
968  /*! \brief Old value from the cache */
970  /*! \brief New value */
972 };
973 
974 /*!
975  * \brief Message type for clearing a message from a stasis cache.
976  * \since 12
977  */
979 
980 /*! @} */
981 
982 /*!
983  * \brief A message cache, for use with \ref stasis_caching_topic.
984  * \since 12
985  */
986 struct stasis_cache;
987 
988 /*! Cache entry used for calculating the aggregate snapshot. */
989 struct stasis_cache_entry;
990 
991 /*!
992  * \brief A topic wrapper, which caches certain messages.
993  * \since 12
994  */
995 struct stasis_caching_topic;
996 
997 
998 /*!
999  * \brief Callback extract a unique identity from a snapshot message.
1000  *
1001  * This identity is unique to the underlying object of the snapshot, such as the
1002  * UniqueId field of a channel.
1003  *
1004  * \param message Message to extract id from.
1005  * \return String representing the snapshot's id.
1006  * \retval NULL if the message_type of the message isn't a handled snapshot.
1007  * \since 12
1008  */
1009 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
1010 
1011 /*!
1012  * \brief Callback to calculate the aggregate cache entry.
1013  * \since 12.2.0
1014  *
1015  * \param entry Cache entry to calculate a new aggregate snapshot.
1016  * \param new_snapshot The shapshot that is being updated.
1017  *
1018  * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
1019  * if a new aggregate could not be calculated because of error.
1020  *
1021  * \note An aggregate message is a combined representation of the local
1022  * and remote entities publishing the message data. e.g., An aggregate
1023  * device state represents the combined device state from the local and
1024  * any remote entities publishing state for a device. e.g., An aggregate
1025  * MWI message is the old/new MWI counts accumulated from the local and
1026  * any remote entities publishing to a mailbox.
1027  *
1028  * \return New aggregate-snapshot calculated on success.
1029  * Caller has a reference on return.
1030  */
1031 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
1032 
1033 /*!
1034  * \brief Callback to publish the aggregate cache entry message.
1035  * \since 12.2.0
1036  *
1037  * \details
1038  * Once an aggregate message is calculated. This callback publishes the
1039  * message so subscribers will know the new value of an aggregated state.
1040  *
1041  * \param topic The aggregate message may be published to this topic.
1042  * It is the topic to which the cache itself is subscribed.
1043  * \param aggregate The aggregate shapshot message to publish.
1044  *
1045  * \note It is up to the function to determine if there is a better topic
1046  * the aggregate message should be published over.
1047  *
1048  * \note An aggregate message is a combined representation of the local
1049  * and remote entities publishing the message data. e.g., An aggregate
1050  * device state represents the combined device state from the local and
1051  * any remote entities publishing state for a device. e.g., An aggregate
1052  * MWI message is the old/new MWI counts accumulated from the local and
1053  * any remote entities publishing to a mailbox.
1054  */
1055 typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
1056 
1057 /*!
1058  * \brief Get the aggregate cache entry snapshot.
1059  * \since 12.2.0
1060  *
1061  * \param entry Cache entry to get the aggregate snapshot.
1062  *
1063  * \note A reference is not given to the returned pointer so don't unref it.
1064  *
1065  * \note An aggregate message is a combined representation of the local
1066  * and remote entities publishing the message data. e.g., An aggregate
1067  * device state represents the combined device state from the local and
1068  * any remote entities publishing state for a device. e.g., An aggregate
1069  * MWI message is the old/new MWI counts accumulated from the local and
1070  * any remote entities publishing to a mailbox.
1071  *
1072  * \retval Aggregate-snapshot in cache.
1073  * \retval NULL if not present.
1074  */
1076 
1077 /*!
1078  * \brief Get the local entity's cache entry snapshot.
1079  * \since 12.2.0
1080  *
1081  * \param entry Cache entry to get the local entity's snapshot.
1082  *
1083  * \note A reference is not given to the returned pointer so don't unref it.
1084  *
1085  * \retval Internal-snapshot in cache.
1086  * \retval NULL if not present.
1087  */
1089 
1090 /*!
1091  * \brief Get a remote entity's cache entry snapshot by index.
1092  * \since 12.2.0
1093  *
1094  * \param entry Cache entry to get a remote entity's snapshot.
1095  * \param idx Which remote entity's snapshot to get.
1096  *
1097  * \note A reference is not given to the returned pointer so don't unref it.
1098  *
1099  * \retval Remote-entity-snapshot in cache.
1100  * \retval NULL if not present.
1101  */
1103 
1104 /*!
1105  * \brief Create a cache.
1106  *
1107  * This is the backend store for a \ref stasis_caching_topic. The cache is
1108  * thread safe, allowing concurrent reads and writes.
1109  *
1110  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1111  *
1112  * \param id_fn Callback to extract the id from a snapshot message.
1113  *
1114  * \return New cache indexed by \a id_fn.
1115  * \retval NULL on error
1116  *
1117  * \since 12
1118  */
1120 
1121 /*!
1122  * \brief Create a cache.
1123  *
1124  * This is the backend store for a \ref stasis_caching_topic. The cache is
1125  * thread safe, allowing concurrent reads and writes.
1126  *
1127  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1128  *
1129  * \param id_fn Callback to extract the id from a snapshot message.
1130  * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
1131  * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
1132  *
1133  * \note An aggregate message is a combined representation of the local
1134  * and remote entities publishing the message data. e.g., An aggregate
1135  * device state represents the combined device state from the local and
1136  * any remote entities publishing state for a device. e.g., An aggregate
1137  * MWI message is the old/new MWI counts accumulated from the local and
1138  * any remote entities publishing to a mailbox.
1139  *
1140  * \return New cache indexed by \a id_fn.
1141  * \retval NULL on error
1142  *
1143  * \since 12.2.0
1144  */
1146 
1147 /*!
1148  * \brief Create a topic which monitors and caches messages from another topic.
1149  *
1150  * The idea is that some topics publish 'snapshots' of some other object's state
1151  * that should be cached. When these snapshot messages are received, the cache
1152  * is updated, and a stasis_cache_update() message is forwarded, which has both
1153  * the original snapshot message and the new message.
1154  *
1155  * The returned object is AO2 managed, so ao2_cleanup() when done with it.
1156  *
1157  * \param original_topic Topic publishing snapshot messages.
1158  * \param cache Backend cache in which to keep snapshots.
1159  * \return New topic which changes snapshot messages to stasis_cache_update()
1160  * messages, and forwards all other messages from the original topic.
1161  * \retval NULL on error
1162  * \since 12
1163  */
1165  struct stasis_topic *original_topic, struct stasis_cache *cache);
1166 
1167 /*!
1168  * \brief Unsubscribes a caching topic from its upstream topic.
1169  *
1170  * This function returns immediately, so be sure to cleanup when
1171  * stasis_subscription_final_message() is received.
1172  *
1173  * \param caching_topic Caching topic to unsubscribe
1174  * \retval NULL for convenience
1175  * \since 12
1176  */
1178  struct stasis_caching_topic *caching_topic);
1179 
1180 /*!
1181  * \brief Unsubscribes a caching topic from its upstream topic, blocking until
1182  * all messages have been forwarded.
1183  *
1184  * See stasis_unsubscribe_and_join() for more info on when to use this as
1185  * opposed to stasis_caching_unsubscribe().
1186  *
1187  * \param caching_topic Caching topic to unsubscribe
1188  * \retval NULL for convenience
1189  * \since 12
1190  */
1192  struct stasis_caching_topic *caching_topic);
1193 
1194 /*!
1195  * \brief Returns the topic of cached events from a caching topics.
1196  * \param caching_topic The caching topic.
1197  * \return The topic that publishes cache update events, along with passthrough
1198  * events from the underlying topic.
1199  * \retval NULL if \a caching_topic is \c NULL.
1200  * \since 12
1201  */
1203  struct stasis_caching_topic *caching_topic);
1204 
1205 /*!
1206  * \brief Indicate to a caching topic that we are interested in a message type.
1207  *
1208  * This will cause the caching topic to receive messages of the given message
1209  * type. This enables internal filtering in the stasis message bus to reduce
1210  * messages.
1211  *
1212  * \param caching_topic The caching topic.
1213  * \param type The message type we wish to receive.
1214  * \retval 0 on success
1215  * \retval -1 failure
1216  *
1217  * \since 17.0.0
1218  */
1220  struct stasis_message_type *type);
1221 
1222 /*!
1223  * \brief Set the message type filtering level on a cache
1224  *
1225  * This will cause the underlying subscription to filter messages according to the
1226  * provided filter level. For example if selective is used then only
1227  * messages matching those provided to \ref stasis_subscription_accept_message_type
1228  * will be raised to the subscription callback.
1229  *
1230  * \param caching_topic The caching topic.
1231  * \param filter What filter to use
1232  * \retval 0 on success
1233  * \retval -1 failure
1234  *
1235  * \since 17.0.0
1236  */
1237 int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
1239 
1240 /*!
1241  * \brief A message which instructs the caching topic to remove an entry from
1242  * its cache.
1243  *
1244  * \param message Message representative of the cache entry that should be
1245  * cleared. This will become the data held in the
1246  * stasis_cache_clear message.
1247  *
1248  * \return Message which, when sent to a \ref stasis_caching_topic, will clear
1249  * the item from the cache.
1250  * \retval NULL on error.
1251  * \since 12
1252  */
1254 
1255 /*!
1256  * \brief Retrieve an item from the cache for the ast_eid_default entity.
1257  *
1258  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1259  *
1260  * \param cache The cache to query.
1261  * \param type Type of message to retrieve.
1262  * \param id Identity of the snapshot to retrieve.
1263  *
1264  * \return Message from the cache.
1265  * \retval NULL if message is not found.
1266  *
1267  * \since 12
1268  */
1269 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1270 
1271 /*!
1272  * \brief Retrieve an item from the cache for a specific entity.
1273  *
1274  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1275  *
1276  * \param cache The cache to query.
1277  * \param type Type of message to retrieve.
1278  * \param id Identity of the snapshot to retrieve.
1279  * \param eid Specific entity id to retrieve. NULL for aggregate.
1280  *
1281  * \note An aggregate message is a combined representation of the local
1282  * and remote entities publishing the message data. e.g., An aggregate
1283  * device state represents the combined device state from the local and
1284  * any remote entities publishing state for a device. e.g., An aggregate
1285  * MWI message is the old/new MWI counts accumulated from the local and
1286  * any remote entities publishing to a mailbox.
1287  *
1288  * \return Message from the cache.
1289  * \retval NULL if message is not found.
1290  *
1291  * \since 12.2.0
1292  */
1293 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
1294 
1295 /*!
1296  * \brief Retrieve all matching entity items from the cache.
1297  * \since 12.2.0
1298  *
1299  * \param cache The cache to query.
1300  * \param type Type of message to retrieve.
1301  * \param id Identity of the snapshot to retrieve.
1302  *
1303  * \return Container of matching items found.
1304  * \retval NULL if error.
1305  */
1306 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1307 
1308 /*!
1309  * \brief Dump cached items to a subscription for the ast_eid_default entity.
1310  *
1311  * \param cache The cache to query.
1312  * \param type Type of message to dump (any type if \c NULL).
1313  *
1314  * \return ao2_container containing all matches (must be unreffed by caller)
1315  * \retval NULL on allocation error
1316  *
1317  * \since 12
1318  */
1320 
1321 /*!
1322  * \brief Dump cached items to a subscription for a specific entity.
1323  * \since 12.2.0
1324  *
1325  * \param cache The cache to query.
1326  * \param type Type of message to dump (any type if \c NULL).
1327  * \param eid Specific entity id to retrieve. NULL for aggregate.
1328  *
1329  * \return ao2_container containing all matches (must be unreffed by caller)
1330  * \retval NULL on allocation error
1331  */
1332 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
1333 
1334 /*!
1335  * \brief Dump all entity items from the cache to a subscription.
1336  * \since 12.2.0
1337  *
1338  * \param cache The cache to query.
1339  * \param type Type of message to dump (any type if \c NULL).
1340  *
1341  * \return ao2_container containing all matches (must be unreffed by caller)
1342  * \retval NULL on allocation error
1343  */
1345 
1346 /*! \addtogroup StasisTopicsAndMessages
1347  * @{
1348  */
1349 
1350 /*!
1351  * \brief Object type code for multi user object snapshots
1352  */
1354  STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
1355  STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
1356  STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
1357 };
1358 
1359 /*! \brief Number of snapshot types */
1360 #define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
1361 
1362 /*!
1363  * \brief Message type for custom user defined events with multi object blobs
1364  * \return The stasis_message_type for user event
1365  * \since 12.3.0
1366  */
1368 
1369 /*!
1370  * \brief Create a stasis multi object blob
1371  * \since 12.3.0
1372  *
1373  * \details
1374  * Multi object blob can store a combination of arbitrary json values
1375  * (the blob) and also snapshots of various other system objects (such
1376  * as channels, bridges, etc) for delivery through a stasis message.
1377  * The multi object blob is first created, then optionally objects
1378  * are added to it, before being attached to a message and delivered
1379  * to stasis topic.
1380  *
1381  * \param blob Json blob
1382  *
1383  * \note When used for an ast_multi_user_event_type message, the
1384  * json blob should contain at minimum {eventname: name}.
1385  *
1386  * \retval ast_multi_object_blob* if succeeded
1387  * \retval NULL if creation failed
1388  */
1390 
1391 /*!
1392  * \brief Add an object to a multi object blob previously created
1393  * \since 12.3.0
1394  *
1395  * \param multi The multi object blob previously created
1396  * \param type Type code for the object such as channel, bridge, etc.
1397  * \param object Snapshot object of the type supplied to typename
1398  */
1400 
1401 /*!
1402  * \brief Create and publish a stasis message blob on a channel with it's snapshot
1403  * \since 12.3.0
1404  *
1405  * \details
1406  * For compatibility with app_userevent, this creates a multi object
1407  * blob message, attaches the channel snapshot to it, and publishes it
1408  * to the channel's topic.
1409  *
1410  * \param chan The channel to snapshot and publish event to
1411  * \param type The message type
1412  * \param blob A json blob to publish with the snapshot
1413  */
1415 
1416 
1417 /*! @} */
1418 
1419 /*!
1420  * \internal
1421  * \brief Log a message about invalid attempt to access a type.
1422  */
1423 void stasis_log_bad_type_access(const char *name);
1424 
1425 /*!
1426  * \brief Boiler-plate messaging macro for defining public message types.
1427  *
1428  * \code
1429  * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1430  * .to_ami = foo_to_ami,
1431  * .to_json = foo_to_json,
1432  * .to_event = foo_to_event,
1433  * );
1434  * \endcode
1435  *
1436  * \param name Name of message type.
1437  * \param ... Virtual table methods for messages of this type.
1438  * \since 12
1439  */
1440 #define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
1441  static struct stasis_message_vtable _priv_ ## name ## _v = { \
1442  __VA_ARGS__ \
1443  }; \
1444  static struct stasis_message_type *_priv_ ## name; \
1445  struct stasis_message_type *name(void) { \
1446  if (_priv_ ## name == NULL) { \
1447  stasis_log_bad_type_access(#name); \
1448  } \
1449  return _priv_ ## name; \
1450  }
1451 
1452 /*!
1453  * \brief Boiler-plate messaging macro for defining local message types.
1454  *
1455  * \code
1456  * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1457  * .to_ami = foo_to_ami,
1458  * .to_json = foo_to_json,
1459  * .to_event = foo_to_event,
1460  * );
1461  * \endcode
1462  *
1463  * \param name Name of message type.
1464  * \param ... Virtual table methods for messages of this type.
1465  * \since 12
1466  */
1467 #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
1468  static struct stasis_message_vtable _priv_ ## name ## _v = { \
1469  __VA_ARGS__ \
1470  }; \
1471  static struct stasis_message_type *_priv_ ## name; \
1472  static struct stasis_message_type *name(void) { \
1473  if (_priv_ ## name == NULL) { \
1474  stasis_log_bad_type_access(#name); \
1475  } \
1476  return _priv_ ## name; \
1477  }
1478 
1479 /*!
1480 * \brief Boiler-plate messaging macro for initializing message types.
1481  *
1482  * \code
1483  * if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1484  * return -1;
1485  * }
1486  * \endcode
1487  *
1488  * \param name Name of message type.
1489  * \return 0 if initialization is successful.
1490  * \return Non-zero on failure.
1491  * \since 12
1492  */
1493 #define STASIS_MESSAGE_TYPE_INIT(name) \
1494  ({ \
1495  ast_assert(_priv_ ## name == NULL); \
1496  stasis_message_type_create(#name, \
1497  &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
1498  })
1499 
1500 /*!
1501  * \brief Boiler-plate messaging macro for cleaning up message types.
1502  *
1503  * Note that if your type is defined in core instead of a loadable module, you
1504  * should call message type cleanup from an ast_register_cleanup() handler
1505  * instead of an ast_register_atexit() handler.
1506  *
1507  * The reason is that during an immediate shutdown, loadable modules (which may
1508  * refer to core message types) are not unloaded. While the atexit handlers are
1509  * run, there's a window of time where a module subscription might reference a
1510  * core message type after it's been cleaned up. Which is bad.
1511  *
1512  * \param name Name of message type.
1513  * \since 12
1514  */
1515 #define STASIS_MESSAGE_TYPE_CLEANUP(name) \
1516  ({ \
1517  ao2_cleanup(_priv_ ## name); \
1518  _priv_ ## name = NULL; \
1519  })
1520 
1521 /*!
1522  * \brief Initialize the Stasis subsystem.
1523  * \return 0 on success.
1524  * \return Non-zero on error.
1525  * \since 12
1526  */
1527 int stasis_init(void);
1528 
1529 /*!
1530  * \internal
1531  * \brief called by stasis_init() for cache initialization.
1532  * \return 0 on success.
1533  * \return Non-zero on error.
1534  * \since 12
1535  */
1536 int stasis_cache_init(void);
1537 
1538 /*!
1539  * \internal
1540  * \brief called by stasis_init() for config initialization.
1541  * \return 0 on success.
1542  * \return Non-zero on error.
1543  * \since 12
1544  */
1546 
1547 /*!
1548  * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1549  *
1550  * \brief This group contains the topics, messages and corresponding message types
1551  * found within Asterisk.
1552  */
1553 
1554 #endif /* _ASTERISK_STASIS_H */
static PGresult * result
Definition: cel_pgsql.c:84
static const char type[]
Definition: chan_ooh323.c:109
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:734
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis multi object blob.
Definition: stasis.c:1977
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object to a multi object blob previously created.
Definition: stasis.c:2003
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Create and publish a stasis message blob on a channel with it's snapshot.
Definition: stasis.c:2012
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
Asterisk JSON abstraction layer.
The AMI - Asterisk Manager Interface - is a TCP protocol created to manage Asterisk with third-party ...
struct ao2_container * cache
Definition: pbx_realtime.c:77
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:624
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1866
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1886
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1055
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1580
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
const char * stasis_topic_uniqueid(const struct stasis_topic *topic)
Return the uniqueid of a topic.
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:645
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1518
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
Definition: stasis_cache.c:370
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1550
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_EVENT
Definition: stasis.h:312
@ STASIS_SUBSCRIPTION_FORMATTER_AMI
Definition: stasis.h:311
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition: stasis.h:310
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
int stasis_config_init(void)
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3063
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:811
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:570
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2285
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:637
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1942
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
int stasis_cache_init(void)
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1929
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1009
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
Definition: stasis.h:1055
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
Definition: stasis.h:1031
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
Definition: stasis_cache.c:375
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1513
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:619
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition: stasis.c:1835
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:629
stasis_message_type_result
Return code for Stasis message type creation attempts.
Definition: stasis.h:285
@ STASIS_MESSAGE_TYPE_DECLINED
Definition: stasis.h:288
@ STASIS_MESSAGE_TYPE_ERROR
Definition: stasis.h:286
@ STASIS_MESSAGE_TYPE_SUCCESS
Definition: stasis.h:287
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
int stasis_message_can_be_ami(struct stasis_message *msg)
Determine if the given message can be converted to AMI.
Generic container type.
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:808
An event.
Definition: event.c:81
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:490
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1952
struct ast_json * blob
Definition: stasis.c:1953
Definition: search.h:40
Definition: stasis_cache.c:173
Cache update message.
Definition: stasis.h:965
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:967
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:969
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:971
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
snapshot_get_id id_fn
Definition: stasis_cache.c:48
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
Forwarding information.
Definition: stasis.c:1533
struct stasis_topic * from_topic
Definition: stasis.c:1535
struct stasis_topic * to_topic
Definition: stasis.c:1537
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
int(* channel)(const struct ast_channel *chan)
Callback which determines whether a channel should be sanitized from a message based on the channel.
Definition: stasis.h:232
int(* channel_snapshot)(const struct ast_channel_snapshot *snapshot)
Callback which determines whether a channel should be sanitized from a message based on the channel's...
Definition: stasis.h:221
int(* channel_id)(const char *channel_id)
Callback which determines whether a channel should be sanitized from a message based on the channel's...
Definition: stasis.h:210
Virtual table providing methods for messages.
Definition: stasis.h:239
struct ast_eid eid
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
struct stasis_topic * topic
Definition: stasis.c:686
stasis_subscription_cb callback
Definition: stasis.c:690
char * detail
Definition: stasis.c:391
Utility functions.