Asterisk - The Open Source Telephony Project GIT-master-6144b6b
Loading...
Searching...
No Matches
stasis.h
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#ifndef _ASTERISK_STASIS_H
20#define _ASTERISK_STASIS_H
21
22/*! \file
23 *
24 * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25 * detailed documentation.
26 *
27 * \author David M. Lee, II <dlee@digium.com>
28 * \since 12
29 *
30 * \page stasis Stasis Message Bus API
31 *
32 * \par Intro
33 *
34 * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35 * within Asterisk. It is designed to be:
36 * - Loosely coupled; new message types can be added in seperate modules.
37 * - Easy to use; publishing and subscribing are straightforward operations.
38 *
39 * There are three main concepts for using the Stasis Message Bus:
40 * - \ref stasis_message
41 * - \ref stasis_topic
42 * - \ref stasis_subscription
43 *
44 * \par stasis_message
45 *
46 * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47 * that are sent on the bus. These messages have:
48 * - a type (as defined by a \ref stasis_message_type)
49 * - a value - a \c void pointer to an AO2 object
50 * - a timestamp when it was created
51 *
52 * Once a \ref stasis_message has been created, it is immutable and cannot
53 * change. The same goes for the value of the message (although this cannot be
54 * enforced in code). Messages themselves are reference-counted, AO2 objects,
55 * along with their values. By being both reference counted and immutable,
56 * messages can be shared throughout the system without any concerns for
57 * threading.
58 *
59 * The type of a message is defined by an instance of \ref stasis_message_type,
60 * which can be created by calling stasis_message_type_create(). Message types
61 * are named, which is useful in debugging. It is recommended that the string
62 * name for a message type match the name of the struct that's stored in the
63 * message. For example, name for \ref stasis_cache_update's message type is \c
64 * "stasis_cache_update".
65 *
66 * \par stasis_topic
67 *
68 * A \ref stasis_topic is an object to which stasis_topic_subscriber's may be
69 * subscribed, and \ref stasis_message's may be published. Any message published
70 * to the topic is dispatched to all of its subscribers. The topic itself may be
71 * named, which is useful in debugging.
72 *
73 * Topics themselves are reference counted objects. Since topics are referred to
74 * by their subscribers, they will not be freed until all of their subscribers
75 * have unsubscribed. Topics are also thread safe, so no worries about
76 * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77 * threads. It's also designed to handle the case of unsubscribing from a topic
78 * from within the subscription handler.
79 *
80 * \par Forwarding
81 *
82 * There is one special case of topics that's worth noting: forwarding
83 * messages. It's a fairly common use case to want to forward all the messages
84 * published on one topic to another one (for example, an aggregator topic that
85 * publishes all the events from a set of other topics). This can be
86 * accomplished easily using stasis_forward_all(). This sets up the forwarding
87 * between the two topics, and returns a \ref stasis_subscription, which can be
88 * unsubscribed to stop the forwarding.
89 *
90 * \par Caching
91 *
92 * Another common use case is to want to cache certain messages that are
93 * published on the bus. Usually these events are snapshots of the current state
94 * in the system, and it's desirable to query that state from the cache without
95 * locking the original object. It's also desirable for subscribers of the
96 * caching topic to receive messages that have both the old cache value and the
97 * new value being put into the cache. For this, we have stasis_cache_create()
98 * and stasis_caching_topic_create(), providing them with the topic which
99 * publishes the messages that you wish to cache, and a function that can
100 * identify cacheable messages.
101 *
102 * The \ref stasis_cache is designed so that it may be shared amongst several
103 * \ref stasis_caching_topic objects. This allows you to have individual caching
104 * topics per-object (i.e. so you can subscribe to updates for a single object),
105 * and still have a single cache to query for the state of all objects. While a
106 * cache may be shared amongst different message types, such a usage is probably
107 * not a good idea.
108 *
109 * The \ref stasis_cache can only be written to by \ref stasis_caching_topic.
110 * It's a thread safe container, so freely use the stasis_cache_get() and
111 * stasis_cache_dump() to query the cache.
112 *
113 * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114 * message is wrapped in a \ref stasis_cache_update message which provides the
115 * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116 * (or \c NULL if the entry was removed from the cache). A
117 * stasis_cache_clear_create() message must be sent to the topic in order to
118 * remove entries from the cache.
119 *
120 * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121 * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122 * stasis_caching_topic will not be freed until after it has been unsubscribed,
123 * and all other ao2_ref()'s have been cleaned up.
124 *
125 * The \ref stasis_cache object is a normal AO2 managed object, which can be
126 * release with ao2_cleanup().
127 *
128 * \par stasis_topic_subscriber
129 *
130 * Any topic may be subscribed to by simply providing stasis_subscribe() the
131 * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132 * data that is passed back to the handler. Invocations on the subscription's
133 * handler are serialized, but different invocations may occur on different
134 * threads (this usually isn't important unless you use thread locals or
135 * something similar).
136 *
137 * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138 * stasis_subscription. Due to cyclic references, the \ref
139 * stasis_subscription will not be freed until after it has been unsubscribed,
140 * and all other ao2_ref()'s have been cleaned up.
141 *
142 * \par Shutdown
143 *
144 * Subscriptions have two options for unsubscribing, depending upon the context
145 * in which you need to unsubscribe.
146 *
147 * If your subscription is owned by a module, and you must unsubscribe from the
148 * module_unload() function, then you'll want to use the
149 * stasis_unsubscribe_and_join() function. This will block until the final
150 * message has been received on the subscription. Otherwise, there's the danger
151 * of invoking the callback function after it has been unloaded.
152 *
153 * If your subscription is owned by an object, then your object should have an
154 * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155 * subscription handler, when the stasis_subscription_final_message() has been
156 * received, decrement the refcount on your object. In your object's destructor,
157 * you may assert that stasis_subscription_is_done() to validate that the
158 * subscription's callback will no longer be invoked.
159 *
160 * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161 * an object's destructor. While code that does this may work most of the time,
162 * it's got one big downside. There's a general assumption that object
163 * destruction is non-blocking. If you block the destruction waiting for the
164 * subscription to complete, there's the danger that the subscription may
165 * process a message which will bump the refcount up by one. Then it does
166 * whatever it does, decrements the refcount, which then proceeds to re-destroy
167 * the object. Now you've got hard to reproduce bugs that only show up under
168 * certain loads.
169 */
170
171#include "asterisk/json.h"
172#include "asterisk/manager.h"
173#include "asterisk/utils.h"
174#include "asterisk/event.h"
175
176/*!
177 * \brief Metadata about a \ref stasis_message.
178 * \since 12
179 */
181
182/*!
183 * \brief Opaque type for a Stasis message.
184 * \since 12
185 */
186struct stasis_message;
187
188/*!
189 * \brief Opaque type for a Stasis subscription.
190 * \since 12
191 */
193
194/*!
195 * \brief Structure containing callbacks for Stasis message sanitization
196 *
197 * \note If either callback is implemented, both should be implemented since
198 * not all callers may have access to the full snapshot.
199 */
201 /*!
202 * \brief Callback which determines whether a channel should be sanitized from
203 * a message based on the channel's unique ID
204 *
205 * \param channel_id The unique ID of the channel
206 *
207 * \retval non-zero if the channel should be left out of the message
208 * \retval zero if the channel should remain in the message
209 */
210 int (*channel_id)(const char *channel_id);
211
212 /*!
213 * \brief Callback which determines whether a channel should be sanitized from
214 * a message based on the channel's snapshot
215 *
216 * \param snapshot A snapshot generated from the channel
217 *
218 * \retval non-zero if the channel should be left out of the message
219 * \retval zero if the channel should remain in the message
220 */
221 int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
222
223 /*!
224 * \brief Callback which determines whether a channel should be sanitized from
225 * a message based on the channel
226 *
227 * \param chan The channel to be checked
228 *
229 * \retval non-zero if the channel should be left out of the message
230 * \retval zero if the channel should remain in the message
231 */
232 int (*channel)(const struct ast_channel *chan);
233};
234
235/*!
236 * \brief Virtual table providing methods for messages.
237 * \since 12
238 */
240 /*!
241 * \brief Build the JSON representation of the message.
242 *
243 * May be \c NULL, or may return \c NULL, to indicate no representation.
244 * The returned object should be ast_json_unref()'ed.
245 *
246 * \param message Message to convert to JSON string.
247 * \param sanitize Snapshot sanitization callback.
248 *
249 * \return Newly allocated JSON message.
250 * \retval NULL if JSON format is not supported.
251 */
252 struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
253
254 /*!
255 * \brief Build the AMI representation of the message.
256 *
257 * May be \c NULL, or may return \c NULL, to indicate no representation.
258 * The returned object should be ao2_cleanup()'ed.
259 *
260 * \param message Message to convert to AMI string.
261 * \return Newly allocated \ref ast_manager_event_blob.
262 * \retval NULL if AMI format is not supported.
263 */
264 struct ast_manager_event_blob *(*to_ami)(
265 struct stasis_message *message);
266
267 /*!
268 * \since 12.3.0
269 * \brief Build the \ref ast_event representation of the message.
270 *
271 * May be \c NULL, or may return \c NULL, to indicate no representation.
272 * The returned object should be free'd.
273 *
274 * \param message Message to convert to an \ref ast_event.
275 * \return Newly allocated \ref ast_event.
276 * \retval NULL if AMI format is not supported.
277 */
278 struct ast_event *(*to_event)(
279 struct stasis_message *message);
280};
281
282/*!
283 * \brief Return code for Stasis message type creation attempts
284 */
286 STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
287 STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
288 STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
289};
290
291/*!
292 * \brief Stasis subscription message filters
293 */
295 STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
296 STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
297 STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
298};
299
300/*!
301 * \brief Stasis subscription formatter filters
302 *
303 * There should be an entry here for each member of \ref stasis_message_vtable
304 *
305 * \since 13.25.0
306 * \since 16.2.0
307 */
310 STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
311 STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
312 STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
313};
314
315/*!
316 * \brief Create a new message type.
317 *
318 * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
319 * with it.
320 *
321 * \param name Name of the new type.
322 * \param vtable Virtual table of message methods. May be \c NULL.
323 * \param[out] result The location where the new message type will be placed
324 *
325 * \note Stasis message type creation may be declined if the message type is disabled
326 *
327 * \returns A stasis_message_type_result enum
328 * \since 12
329 */
331 struct stasis_message_vtable *vtable, struct stasis_message_type **result);
332
333/*!
334 * \brief Gets the name of a given message type
335 * \param type The type to get.
336 * \return Name of the type.
337 * \retval NULL if \a type is \c NULL.
338 * \since 12
339 */
340const char *stasis_message_type_name(const struct stasis_message_type *type);
341
342/*!
343 * \brief Gets the hash of a given message type
344 * \param type The type to get the hash of.
345 * \return The hash
346 * \since 13.24.0
347 */
348unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
349
350/*!
351 * \brief Gets the id of a given message type
352 * \param type The type to get the id of.
353 * \return The id
354 * \since 17.0.0
355 */
357
358/*!
359 * \brief Check whether a message type is declined
360 *
361 * \param name The name of the message type to check
362 *
363 * \retval zero The message type is not declined
364 * \retval non-zero The message type is declined
365 */
366int stasis_message_type_declined(const char *name);
367
368/*!
369 * \brief Create a new message.
370 *
371 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
372 * with it. Messages are also immutable, and must not be modified after they
373 * are initialized. Especially the \a data in the message.
374 *
375 * \param type Type of the message
376 * \param data Immutable data that is the actual contents of the message
377 *
378 * \return New message
379 * \retval NULL on error
380 *
381 * \since 12
382 */
384
385/*!
386 * \brief Create a new message for an entity.
387 *
388 * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
389 * with it. Messages are also immutable, and must not be modified after they
390 * are initialized. Especially the \a data in the message.
391 *
392 * \param type Type of the message
393 * \param data Immutable data that is the actual contents of the message
394 * \param eid What entity originated this message. (NULL for aggregate)
395 *
396 * \note An aggregate message is a combined representation of the local
397 * and remote entities publishing the message data. e.g., An aggregate
398 * device state represents the combined device state from the local and
399 * any remote entities publishing state for a device. e.g., An aggregate
400 * MWI message is the old/new MWI counts accumulated from the local and
401 * any remote entities publishing to a mailbox.
402 *
403 * \retval New message
404 * \retval NULL on error
405 *
406 * \since 12.2.0
407 */
409
410/*!
411 * \brief Get the entity id for a \ref stasis_message.
412 * \since 12.2.0
413 *
414 * \param msg Message to get eid.
415 *
416 * \retval Entity id of \a msg
417 * \retval NULL if \a msg is an aggregate or \a msg is \c NULL.
418 */
419const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
420
421/*!
422 * \brief Get the message type for a \ref stasis_message.
423 * \param msg Message to type
424 * \return Type of \a msg
425 * \retval NULL if \a msg is \c NULL.
426 * \since 12
427 */
429
430/*!
431 * \brief Get the data contained in a message.
432 * \param msg Message.
433 * \return Immutable data pointer
434 * \retval NULL if msg is \c NULL.
435 * \since 12
436 */
437void *stasis_message_data(const struct stasis_message *msg);
438
439/*!
440 * \brief Get the time when a message was created.
441 * \param msg Message.
442 * \return Pointer to the \a timeval when the message was created.
443 * \retval NULL if msg is \c NULL.
444 * \since 12
445 */
446const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
447
448/*!
449 * \brief Build the JSON representation of the message.
450 *
451 * May return \c NULL, to indicate no representation. The returned object should
452 * be ast_json_unref()'ed.
453 *
454 * \param msg Message to convert to JSON string.
455 * \param sanitize Snapshot sanitization callback.
456 *
457 * \return Newly allocated string with JSON message.
458 * \retval NULL if JSON format is not supported.
459 */
460struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
461
462/*!
463 * \brief Build the AMI representation of the message.
464 *
465 * May return \c NULL, to indicate no representation. The returned object should
466 * be ao2_cleanup()'ed.
467 *
468 * \param msg Message to convert to AMI.
469 * \retval NULL if AMI format is not supported.
470 */
472
473/*!
474 * \brief Determine if the given message can be converted to AMI.
475 *
476 * \param msg Message to see if can be converted to AMI.
477 *
478 * \retval 0 Cannot be converted
479 * \retval non-zero Can be converted
480 */
482
483/*!
484 * \brief Build the \ref AstGenericEvents representation of the message.
485 *
486 * May return \c NULL, to indicate no representation. The returned object should
487 * be disposed of via \ref ast_event_destroy.
488 *
489 * \param msg Message to convert to AMI.
490 * \retval NULL if AMI format is not supported.
491 */
493
494/*!
495 * \brief A topic to which messages may be posted, and subscribers, well, subscribe
496 * \since 12
497 */
498struct stasis_topic;
499
500/*!
501 * \brief Create a new topic.
502 * \param name Name of the new topic.
503 * \return New topic instance.
504 * \retval NULL on error.
505 * \since 12
506 *
507 * \note There is no explicit ability to unsubscribe all subscribers
508 * from a topic and destroy it. As a result the topic can persist until
509 * the last subscriber unsubscribes itself even if there is no
510 * publisher.
511 *
512 * \note Topic names should be in the form of
513 * \verbatim <subsystem>:<functionality>[/<object>] \endverbatim
514 */
515struct stasis_topic *stasis_topic_create(const char *name);
516
517/*!
518 * \brief Create a new topic with given detail.
519 * \param name Name of the new topic.
520 * \param detail Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
521 * \return New topic instance.
522 * \retval NULL on error.
523 *
524 * \note There is no explicit ability to unsubscribe all subscribers
525 * from a topic and destroy it. As a result the topic can persist until
526 * the last subscriber unsubscribes itself even if there is no
527 * publisher.
528 */
530 const char *name, const char *detail);
531
532/*!
533 * \brief Get a topic of the given name.
534 * \param name Topic's name.
535 * \return Name of the topic.
536 * \retval NULL on error or not exist.
537 *
538 * \note This SHOULD NOT be used in normal operation for publishing messages.
539 */
540struct stasis_topic *stasis_topic_get(const char *name);
541
542/*!
543 * \brief Return the uniqueid of a topic.
544 * \param topic Topic.
545 * \return Uniqueid of the topic.
546 * \retval NULL if topic is \c NULL.
547 */
548const char *stasis_topic_uniqueid(const struct stasis_topic *topic);
549
550/*!
551 * \brief Return the name of a topic.
552 * \param topic Topic.
553 * \return Name of the topic.
554 * \retval NULL if topic is \c NULL.
555 */
556const char *stasis_topic_name(const struct stasis_topic *topic);
557
558/*!
559 * \brief Return the detail of a topic.
560 * \param topic Topic.
561 * \return Detail of the topic.
562 * \retval NULL if topic is \c NULL.
563 * \since 12
564 */
565const char *stasis_topic_detail(const struct stasis_topic *topic);
566
567/*!
568 * \brief Return the number of subscribers of a topic.
569 * \param topic Topic.
570 * \return Number of subscribers of the topic.
571 * \since 17.0.0
572 */
573size_t stasis_topic_subscribers(const struct stasis_topic *topic);
574
575/*!
576 * \brief Publish a message to a topic's subscribers.
577 * \param topic Topic.
578 * \param message Message to publish.
579 *
580 * This call is asynchronous and will return immediately upon queueing
581 * the message for delivery to the topic's subscribers.
582 *
583 * \since 12
584 */
585void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
586
587/*!
588 * \brief Publish a message to a topic's subscribers, synchronizing
589 * on the specified subscriber
590 * \param sub Subscription to synchronize on.
591 * \param message Message to publish.
592 *
593 * The caller of stasis_publish_sync will block until the specified
594 * subscriber completes handling of the message.
595 *
596 * All other subscribers to the topic the \ref stasis_subscription
597 * is subscribed to are also delivered the message; this delivery however
598 * happens asynchronously.
599 *
600 * \since 12.1.0
601 */
603
604/*!
605 * \brief Callback function type for Stasis subscriptions.
606 * \param data Data field provided with subscription.
607 * \param sub Subscription published on.
608 * \param message Published message.
609 * \since 12
610 */
611typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
612
613/*!
614 * \brief Stasis subscription callback function that does nothing.
615 *
616 * \note This callback should be used for events are not directly processed, but need
617 * to be generated so data can be retrieved from cache later. Subscriptions with this
618 * callback can be released with \ref stasis_unsubscribe, even during module unload.
619 *
620 * \since 13.5
621 */
623
624/*!
625 * \brief Create a subscription.
626 *
627 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
628 * up this reference), the subscription must be explicitly unsubscribed from its
629 * topic using stasis_unsubscribe().
630 *
631 * The invocations of the callback are serialized, but may not always occur on
632 * the same thread. The invocation order of different subscriptions is
633 * unspecified.
634 *
635 * \param topic Topic to subscribe to.
636 * \param callback Callback function for subscription messages.
637 * \param data Data to be passed to the callback, in addition to the message.
638 * \param file, lineno, func
639 * \return New \ref stasis_subscription object.
640 * \retval NULL on error.
641 * \since 12
642 *
643 * \note This callback will receive a callback with a message indicating it
644 * has been subscribed. This occurs immediately before accepted message
645 * types can be set and the callback must expect to receive it.
646 */
648 stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
649#define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
650
651/*!
652 * \brief Create a subscription whose callbacks occur on a task pool
653 *
654 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
655 * up this reference), the subscription must be explicitly unsubscribed from its
656 * topic using stasis_unsubscribe().
657 *
658 * The invocations of the callback are serialized, but will almost certainly not
659 * always happen on the same thread. The invocation order of different subscriptions
660 * is unspecified.
661 *
662 * Unlike \ref stasis_subscribe, this function will explicitly use a taskpool to
663 * dispatch items to its \c callback. This form of subscription should be used
664 * when many subscriptions may be made to the specified \c topic.
665 *
666 * \param topic Topic to subscribe to.
667 * \param callback Callback function for subscription messages.
668 * \param data Data to be passed to the callback, in addition to the message.
669 * \param file, lineno, func
670 * \return New \ref stasis_subscription object.
671 * \retval NULL on error.
672 * \since 12.8.0
673 *
674 * \note This callback will receive a callback with a message indicating it
675 * has been subscribed. This occurs immediately before accepted message
676 * types can be set and the callback must expect to receive it.
677 */
679 stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
680#define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
681
682/*!
683 * \brief Create a subscription whose callbacks occur synchronously on message publishing
684 * \since 23.5.0
685 * \since 22.11.0
686 * \since 20.21.0
687 *
688 * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
689 * up this reference), the subscription must be explicitly unsubscribed from its
690 * topic using stasis_unsubscribe().
691 *
692 * The invocations of the callback are serialized, but will almost certainly not
693 * always happen on the same thread. The invocation order of different subscriptions
694 * is unspecified.
695 *
696 * This subscription will be invoked on the same thread that is publishing the message.
697 *
698 * \param topic Topic to subscribe to.
699 * \param callback Callback function for subscription messages.
700 * \param data Data to be passed to the callback, in addition to the message.
701 * \param file, lineno, func
702 * \return New \ref stasis_subscription object.
703 * \retval NULL on error.
704 *
705 * \note This callback will receive a callback with a message indicating it
706 * has been subscribed. This occurs immediately before accepted message
707 * types can be set and the callback must expect to receive it.
708 */
710 stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
711#define stasis_subscribe_synchronous(topic, callback, data) __stasis_subscribe_synchronous(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
712
713/*!
714 * \brief Indicate to a subscription that we are interested in a message type.
715 *
716 * This will cause the subscription to allow the given message type to be
717 * raised to our subscription callback. This enables internal filtering in
718 * the stasis message bus to reduce messages.
719 *
720 * \param subscription Subscription to add message type to.
721 * \param type The message type we wish to receive.
722 * \retval 0 on success
723 * \retval -1 failure
724 *
725 * \since 17.0.0
726 *
727 * \note If you are wanting to use stasis_final_message you will need to accept
728 * \ref stasis_subscription_change_type as a message type.
729 *
730 * \note Until the subscription is set to selective filtering it is possible for it
731 * to receive messages of message types that would not normally be accepted.
732 */
734 const struct stasis_message_type *type);
735
736/*!
737 * \brief Indicate to a subscription that we are not interested in a message type.
738 *
739 * \param subscription Subscription to remove message type from.
740 * \param type The message type we don't wish to receive.
741 * \retval 0 on success
742 * \retval -1 failure
743 *
744 * \since 17.0.0
745 */
747 const struct stasis_message_type *type);
748
749/*!
750 * \brief Set the message type filtering level on a subscription
751 *
752 * This will cause the subscription to filter messages according to the
753 * provided filter level. For example if selective is used then only
754 * messages matching those provided to \ref stasis_subscription_accept_message_type
755 * will be raised to the subscription callback.
756 *
757 * \param subscription Subscription that should receive all messages.
758 * \param filter What filter to use
759 * \retval 0 on success
760 * \retval -1 failure
761 *
762 * \since 17.0.0
763 */
766
767/*!
768 * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
769 *
770 * \param subscription Subscription to alter.
771 * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
772 *
773 * \since 13.25.0
774 * \since 16.2.0
775 */
778
779/*!
780 * \brief Get a bitmap of available formatters for a message type
781 *
782 * \param message_type Message type
783 * \return A bitmap of \ref stasis_subscription_message_formatters
784 *
785 * \since 13.25.0
786 * \since 16.2.0
787 */
789 const struct stasis_message_type *message_type);
790
791/*!
792 * \brief Cancel a subscription.
793 *
794 * Note that in an asynchronous system, there may still be messages queued or
795 * in transit to the subscription's callback. These will still be delivered.
796 * There will be a final 'SubscriptionCancelled' message, indicating the
797 * delivery of the final message.
798 *
799 * \param subscription Subscription to cancel.
800 * \retval NULL for convenience
801 * \since 12
802 */
804 struct stasis_subscription *subscription);
805
806/*!
807 * \brief Set the high and low alert water marks of the stasis subscription.
808 * \since 13.10.0
809 *
810 * \param subscription Pointer to a stasis subscription
811 * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
812 * \param high_water New queue high water mark.
813 *
814 * \retval 0 on success.
815 * \retval -1 on error (water marks not changed).
816 */
818 long low_water, long high_water);
819
820/*!
821 * \brief Block until the last message is processed on a subscription.
822 *
823 * This function will not return until the \a subscription's callback for the
824 * stasis_subscription_final_message() completes. This allows cleanup routines
825 * to run before unblocking the joining thread.
826 *
827 * \param subscription Subscription to block on.
828 * \since 12
829 */
830void stasis_subscription_join(struct stasis_subscription *subscription);
831
832/*!
833 * \brief Returns whether \a subscription has received its final message.
834 *
835 * Note that a subscription is considered done even while the
836 * stasis_subscription_final_message() is being processed. This allows cleanup
837 * routines to check the status of the subscription.
838 *
839 * \param subscription Subscription.
840 * \return True (non-zero) if stasis_subscription_final_message() has been
841 * received.
842 * \return False (zero) if waiting for the end.
843 */
844int stasis_subscription_is_done(struct stasis_subscription *subscription);
845
846/*!
847 * \brief Cancel a subscription, blocking until the last message is processed.
848 *
849 * While normally it's recommended to stasis_unsubscribe() and wait for
850 * stasis_subscription_final_message(), there are times (like during a module
851 * unload) where you have to wait for the final message (otherwise you'll call
852 * a function in a shared module that no longer exists).
853 *
854 * \param subscription Subscription to cancel.
855 * \retval NULL for convenience
856 * \since 12
857 */
859 struct stasis_subscription *subscription);
860
861struct stasis_forward;
862
863/*!
864 * \brief Create a subscription which forwards all messages from one topic to
865 * another.
866 *
867 * Note that the \a topic parameter of the invoked callback will the be the
868 * \a topic the message was sent to, not the topic the subscriber subscribed to.
869 *
870 * \param from_topic Topic to forward.
871 * \param to_topic Destination topic of forwarded messages.
872 * \return New forwarding subscription.
873 * \retval NULL on error.
874 * \since 12
875 */
877 struct stasis_topic *to_topic);
878
880
881/*!
882 * \brief Get the unique ID for the subscription.
883 *
884 * \param sub Subscription for which to get the unique ID.
885 * \return Unique ID for the subscription.
886 * \since 12
887 */
888const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
889
890/*!
891 * \brief Returns whether a subscription is currently subscribed.
892 *
893 * Note that there may still be messages queued up to be dispatched to this
894 * subscription, but the stasis_subscription_final_message() has been enqueued.
895 *
896 * \param sub Subscription to check
897 * \return False (zero) if subscription is not subscribed.
898 * \return True (non-zero) if still subscribed.
899 */
901
902/*!
903 * \brief Determine whether a message is the final message to be received on a subscription.
904 *
905 * \param sub Subscription on which the message was received.
906 * \param msg Message to check.
907 * \return zero if the provided message is not the final message.
908 * \return non-zero if the provided message is the final message.
909 * \since 12
910 */
912
913/*! \addtogroup StasisTopicsAndMessages
914 * @{
915 */
916
917/*!
918 * \brief Holds details about changes to subscriptions for the specified topic
919 * \since 12
920 */
922 struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
923 char *uniqueid; /*!< The unique ID associated with this subscription */
924 char description[0]; /*!< The description of the change to the subscription associated with the uniqueid */
925};
926
927/*!
928 * \brief Gets the message type for subscription change notices
929 * \return The stasis_message_type for subscription change notices
930 * \since 12
931 */
933
934/*! @} */
935
936/*!
937 * \brief Pool for topic aggregation
938 */
939struct stasis_topic_pool;
940
941/*!
942 * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
943 * \param pooled_topic Topic to which messages will be routed
944 * \return the new stasis_topic_pool
945 * \retval NULL on failure
946 */
947struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
948
949/*!
950 * \brief Find or create a topic in the pool
951 * \param pool Pool for which to get the topic
952 * \param topic_name Name of the topic to get
953 * \return The already stored or newly allocated topic
954 * \retval NULL if the topic was not found and could not be allocated
955 */
956struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
957
958/*!
959 * \brief Delete a topic from the topic pool
960 *
961 * \param pool Pool from which to delete the topic
962 * \param topic_name Name of the topic to delete in the form of
963 * \verbatim [<pool_topic_name>/]<topic_name> \endverbatim
964 *
965 * \since 13.24
966 * \since 15.6
967 * \since 16.1
968 */
969void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name);
970
971/*!
972 * \brief Check if a topic exists in a pool
973 * \param pool Pool to check
974 * \param topic_name Name of the topic to check
975 * \retval 1 exists
976 * \retval 0 does not exist
977 * \since 13.23.0
978 */
979int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name);
980
981/*! \addtogroup StasisTopicsAndMessages
982 * @{
983 */
984
985/*!
986 * \brief Message type for cache update messages.
987 * \return Message type for cache update messages.
988 * \since 12
989 */
991
992/*!
993 * \brief Cache update message
994 * \since 12
995 */
997 /*! \brief Convenience reference to snapshot type */
999 /*! \brief Old value from the cache */
1001 /*! \brief New value */
1003};
1004
1005/*!
1006 * \brief Message type for clearing a message from a stasis cache.
1007 * \since 12
1008 */
1010
1011/*! @} */
1012
1013/*!
1014 * \brief A message cache, for use with \ref stasis_caching_topic.
1015 * \since 12
1016 */
1017struct stasis_cache;
1018
1019/*! Cache entry used for calculating the aggregate snapshot. */
1020struct stasis_cache_entry;
1021
1022/*!
1023 * \brief A topic wrapper, which caches certain messages.
1024 * \since 12
1025 */
1027
1028
1029/*!
1030 * \brief Callback extract a unique identity from a snapshot message.
1031 *
1032 * This identity is unique to the underlying object of the snapshot, such as the
1033 * UniqueId field of a channel.
1034 *
1035 * \param message Message to extract id from.
1036 * \return String representing the snapshot's id.
1037 * \retval NULL if the message_type of the message isn't a handled snapshot.
1038 * \since 12
1039 */
1040typedef const char *(*snapshot_get_id)(struct stasis_message *message);
1041
1042/*!
1043 * \brief Callback to calculate the aggregate cache entry.
1044 * \since 12.2.0
1045 *
1046 * \param entry Cache entry to calculate a new aggregate snapshot.
1047 * \param new_snapshot The shapshot that is being updated.
1048 *
1049 * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
1050 * if a new aggregate could not be calculated because of error.
1051 *
1052 * \note An aggregate message is a combined representation of the local
1053 * and remote entities publishing the message data. e.g., An aggregate
1054 * device state represents the combined device state from the local and
1055 * any remote entities publishing state for a device. e.g., An aggregate
1056 * MWI message is the old/new MWI counts accumulated from the local and
1057 * any remote entities publishing to a mailbox.
1058 *
1059 * \return New aggregate-snapshot calculated on success.
1060 * Caller has a reference on return.
1061 */
1062typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
1063
1064/*!
1065 * \brief Callback to publish the aggregate cache entry message.
1066 * \since 12.2.0
1067 *
1068 * \details
1069 * Once an aggregate message is calculated. This callback publishes the
1070 * message so subscribers will know the new value of an aggregated state.
1071 *
1072 * \param topic The aggregate message may be published to this topic.
1073 * It is the topic to which the cache itself is subscribed.
1074 * \param aggregate The aggregate shapshot message to publish.
1075 *
1076 * \note It is up to the function to determine if there is a better topic
1077 * the aggregate message should be published over.
1078 *
1079 * \note An aggregate message is a combined representation of the local
1080 * and remote entities publishing the message data. e.g., An aggregate
1081 * device state represents the combined device state from the local and
1082 * any remote entities publishing state for a device. e.g., An aggregate
1083 * MWI message is the old/new MWI counts accumulated from the local and
1084 * any remote entities publishing to a mailbox.
1085 */
1086typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
1087
1088/*!
1089 * \brief Get the aggregate cache entry snapshot.
1090 * \since 12.2.0
1091 *
1092 * \param entry Cache entry to get the aggregate snapshot.
1093 *
1094 * \note A reference is not given to the returned pointer so don't unref it.
1095 *
1096 * \note An aggregate message is a combined representation of the local
1097 * and remote entities publishing the message data. e.g., An aggregate
1098 * device state represents the combined device state from the local and
1099 * any remote entities publishing state for a device. e.g., An aggregate
1100 * MWI message is the old/new MWI counts accumulated from the local and
1101 * any remote entities publishing to a mailbox.
1102 *
1103 * \retval Aggregate-snapshot in cache.
1104 * \retval NULL if not present.
1105 */
1107
1108/*!
1109 * \brief Get the local entity's cache entry snapshot.
1110 * \since 12.2.0
1111 *
1112 * \param entry Cache entry to get the local entity's snapshot.
1113 *
1114 * \note A reference is not given to the returned pointer so don't unref it.
1115 *
1116 * \retval Internal-snapshot in cache.
1117 * \retval NULL if not present.
1118 */
1120
1121/*!
1122 * \brief Get a remote entity's cache entry snapshot by index.
1123 * \since 12.2.0
1124 *
1125 * \param entry Cache entry to get a remote entity's snapshot.
1126 * \param idx Which remote entity's snapshot to get.
1127 *
1128 * \note A reference is not given to the returned pointer so don't unref it.
1129 *
1130 * \retval Remote-entity-snapshot in cache.
1131 * \retval NULL if not present.
1132 */
1134
1135/*!
1136 * \brief Create a cache.
1137 *
1138 * This is the backend store for a \ref stasis_caching_topic. The cache is
1139 * thread safe, allowing concurrent reads and writes.
1140 *
1141 * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1142 *
1143 * \param id_fn Callback to extract the id from a snapshot message.
1144 *
1145 * \return New cache indexed by \a id_fn.
1146 * \retval NULL on error
1147 *
1148 * \since 12
1149 */
1151
1152/*!
1153 * \brief Create a cache.
1154 *
1155 * This is the backend store for a \ref stasis_caching_topic. The cache is
1156 * thread safe, allowing concurrent reads and writes.
1157 *
1158 * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1159 *
1160 * \param id_fn Callback to extract the id from a snapshot message.
1161 * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
1162 * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
1163 *
1164 * \note An aggregate message is a combined representation of the local
1165 * and remote entities publishing the message data. e.g., An aggregate
1166 * device state represents the combined device state from the local and
1167 * any remote entities publishing state for a device. e.g., An aggregate
1168 * MWI message is the old/new MWI counts accumulated from the local and
1169 * any remote entities publishing to a mailbox.
1170 *
1171 * \return New cache indexed by \a id_fn.
1172 * \retval NULL on error
1173 *
1174 * \since 12.2.0
1175 */
1177
1178/*!
1179 * \brief Create a topic which monitors and caches messages from another topic.
1180 *
1181 * The idea is that some topics publish 'snapshots' of some other object's state
1182 * that should be cached. When these snapshot messages are received, the cache
1183 * is updated, and a stasis_cache_update() message is forwarded, which has both
1184 * the original snapshot message and the new message.
1185 *
1186 * The returned object is AO2 managed, so ao2_cleanup() when done with it.
1187 *
1188 * \param original_topic Topic publishing snapshot messages.
1189 * \param cache Backend cache in which to keep snapshots.
1190 * \return New topic which changes snapshot messages to stasis_cache_update()
1191 * messages, and forwards all other messages from the original topic.
1192 * \retval NULL on error
1193 * \since 12
1194 */
1197
1198/*!
1199 * \brief Unsubscribes a caching topic from its upstream topic.
1200 *
1201 * This function returns immediately, so be sure to cleanup when
1202 * stasis_subscription_final_message() is received.
1203 *
1204 * \param caching_topic Caching topic to unsubscribe
1205 * \retval NULL for convenience
1206 * \since 12
1207 */
1209 struct stasis_caching_topic *caching_topic);
1210
1211/*!
1212 * \brief Unsubscribes a caching topic from its upstream topic, blocking until
1213 * all messages have been forwarded.
1214 *
1215 * See stasis_unsubscribe_and_join() for more info on when to use this as
1216 * opposed to stasis_caching_unsubscribe().
1217 *
1218 * \param caching_topic Caching topic to unsubscribe
1219 * \retval NULL for convenience
1220 * \since 12
1221 */
1223 struct stasis_caching_topic *caching_topic);
1224
1225/*!
1226 * \brief Returns the topic of cached events from a caching topics.
1227 * \param caching_topic The caching topic.
1228 * \return The topic that publishes cache update events, along with passthrough
1229 * events from the underlying topic.
1230 * \retval NULL if \a caching_topic is \c NULL.
1231 * \since 12
1232 */
1234 struct stasis_caching_topic *caching_topic);
1235
1236/*!
1237 * \brief Indicate to a caching topic that we are interested in a message type.
1238 *
1239 * This will cause the caching topic to receive messages of the given message
1240 * type. This enables internal filtering in the stasis message bus to reduce
1241 * messages.
1242 *
1243 * \param caching_topic The caching topic.
1244 * \param type The message type we wish to receive.
1245 * \retval 0 on success
1246 * \retval -1 failure
1247 *
1248 * \since 17.0.0
1249 */
1251 struct stasis_message_type *type);
1252
1253/*!
1254 * \brief Set the message type filtering level on a cache
1255 *
1256 * This will cause the underlying subscription to filter messages according to the
1257 * provided filter level. For example if selective is used then only
1258 * messages matching those provided to \ref stasis_subscription_accept_message_type
1259 * will be raised to the subscription callback.
1260 *
1261 * \param caching_topic The caching topic.
1262 * \param filter What filter to use
1263 * \retval 0 on success
1264 * \retval -1 failure
1265 *
1266 * \since 17.0.0
1267 */
1268int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
1270
1271/*!
1272 * \brief A message which instructs the caching topic to remove an entry from
1273 * its cache.
1274 *
1275 * \param message Message representative of the cache entry that should be
1276 * cleared. This will become the data held in the
1277 * stasis_cache_clear message.
1278 *
1279 * \return Message which, when sent to a \ref stasis_caching_topic, will clear
1280 * the item from the cache.
1281 * \retval NULL on error.
1282 * \since 12
1283 */
1285
1286/*!
1287 * \brief Retrieve an item from the cache for the ast_eid_default entity.
1288 *
1289 * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1290 *
1291 * \param cache The cache to query.
1292 * \param type Type of message to retrieve.
1293 * \param id Identity of the snapshot to retrieve.
1294 *
1295 * \return Message from the cache.
1296 * \retval NULL if message is not found.
1297 *
1298 * \since 12
1299 */
1300struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1301
1302/*!
1303 * \brief Retrieve an item from the cache for a specific entity.
1304 *
1305 * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1306 *
1307 * \param cache The cache to query.
1308 * \param type Type of message to retrieve.
1309 * \param id Identity of the snapshot to retrieve.
1310 * \param eid Specific entity id to retrieve. NULL for aggregate.
1311 *
1312 * \note An aggregate message is a combined representation of the local
1313 * and remote entities publishing the message data. e.g., An aggregate
1314 * device state represents the combined device state from the local and
1315 * any remote entities publishing state for a device. e.g., An aggregate
1316 * MWI message is the old/new MWI counts accumulated from the local and
1317 * any remote entities publishing to a mailbox.
1318 *
1319 * \return Message from the cache.
1320 * \retval NULL if message is not found.
1321 *
1322 * \since 12.2.0
1323 */
1324struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
1325
1326/*!
1327 * \brief Retrieve all matching entity items from the cache.
1328 * \since 12.2.0
1329 *
1330 * \param cache The cache to query.
1331 * \param type Type of message to retrieve.
1332 * \param id Identity of the snapshot to retrieve.
1333 *
1334 * \return Container of matching items found.
1335 * \retval NULL if error.
1336 */
1337struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1338
1339/*!
1340 * \brief Dump cached items to a subscription for the ast_eid_default entity.
1341 *
1342 * \param cache The cache to query.
1343 * \param type Type of message to dump (any type if \c NULL).
1344 *
1345 * \return ao2_container containing all matches (must be unreffed by caller)
1346 * \retval NULL on allocation error
1347 *
1348 * \since 12
1349 */
1351
1352/*!
1353 * \brief Dump cached items to a subscription for a specific entity.
1354 * \since 12.2.0
1355 *
1356 * \param cache The cache to query.
1357 * \param type Type of message to dump (any type if \c NULL).
1358 * \param eid Specific entity id to retrieve. NULL for aggregate.
1359 *
1360 * \return ao2_container containing all matches (must be unreffed by caller)
1361 * \retval NULL on allocation error
1362 */
1363struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
1364
1365/*!
1366 * \brief Dump all entity items from the cache to a subscription.
1367 * \since 12.2.0
1368 *
1369 * \param cache The cache to query.
1370 * \param type Type of message to dump (any type if \c NULL).
1371 *
1372 * \return ao2_container containing all matches (must be unreffed by caller)
1373 * \retval NULL on allocation error
1374 */
1376
1377/*! \addtogroup StasisTopicsAndMessages
1378 * @{
1379 */
1380
1381/*!
1382 * \brief Object type code for multi user object snapshots
1383 */
1385 STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
1386 STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
1387 STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
1388};
1389
1390/*! \brief Number of snapshot types */
1391#define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
1392
1393/*!
1394 * \brief Message type for custom user defined events with multi object blobs
1395 * \return The stasis_message_type for user event
1396 * \since 12.3.0
1397 */
1399
1400/*!
1401 * \brief Create a stasis multi object blob
1402 * \since 12.3.0
1403 *
1404 * \details
1405 * Multi object blob can store a combination of arbitrary json values
1406 * (the blob) and also snapshots of various other system objects (such
1407 * as channels, bridges, etc) for delivery through a stasis message.
1408 * The multi object blob is first created, then optionally objects
1409 * are added to it, before being attached to a message and delivered
1410 * to stasis topic.
1411 *
1412 * \param blob Json blob
1413 *
1414 * \note When used for an ast_multi_user_event_type message, the
1415 * json blob should contain at minimum {eventname: name}.
1416 *
1417 * \retval ast_multi_object_blob* if succeeded
1418 * \retval NULL if creation failed
1419 */
1421
1422/*!
1423 * \brief Add an object to a multi object blob previously created
1424 * \since 12.3.0
1425 *
1426 * \param multi The multi object blob previously created
1427 * \param type Type code for the object such as channel, bridge, etc.
1428 * \param object Snapshot object of the type supplied to typename
1429 */
1431
1432/*!
1433 * \brief Create and publish a stasis message blob on a channel with it's snapshot
1434 * \since 12.3.0
1435 *
1436 * \details
1437 * For compatibility with app_userevent, this creates a multi object
1438 * blob message, attaches the channel snapshot to it, and publishes it
1439 * to the channel's topic.
1440 *
1441 * \param chan The channel to snapshot and publish event to
1442 * \param type The message type
1443 * \param blob A json blob to publish with the snapshot
1444 */
1446
1447
1448/*! @} */
1449
1450/*!
1451 * \internal
1452 * \brief Log a message about invalid attempt to access a type.
1453 */
1454void stasis_log_bad_type_access(const char *name);
1455
1456/*!
1457 * \brief Boiler-plate messaging macro for defining public message types.
1458 *
1459 * \code
1460 * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1461 * .to_ami = foo_to_ami,
1462 * .to_json = foo_to_json,
1463 * .to_event = foo_to_event,
1464 * );
1465 * \endcode
1466 *
1467 * \param name Name of message type.
1468 * \param ... Virtual table methods for messages of this type.
1469 * \since 12
1470 */
1471#define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
1472 static struct stasis_message_vtable _priv_ ## name ## _v = { \
1473 __VA_ARGS__ \
1474 }; \
1475 static struct stasis_message_type *_priv_ ## name; \
1476 struct stasis_message_type *name(void) { \
1477 if (_priv_ ## name == NULL) { \
1478 stasis_log_bad_type_access(#name); \
1479 } \
1480 return _priv_ ## name; \
1481 }
1482
1483/*!
1484 * \brief Boiler-plate messaging macro for defining local message types.
1485 *
1486 * \code
1487 * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1488 * .to_ami = foo_to_ami,
1489 * .to_json = foo_to_json,
1490 * .to_event = foo_to_event,
1491 * );
1492 * \endcode
1493 *
1494 * \param name Name of message type.
1495 * \param ... Virtual table methods for messages of this type.
1496 * \since 12
1497 */
1498#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
1499 static struct stasis_message_vtable _priv_ ## name ## _v = { \
1500 __VA_ARGS__ \
1501 }; \
1502 static struct stasis_message_type *_priv_ ## name; \
1503 static struct stasis_message_type *name(void) { \
1504 if (_priv_ ## name == NULL) { \
1505 stasis_log_bad_type_access(#name); \
1506 } \
1507 return _priv_ ## name; \
1508 }
1509
1510/*!
1511* \brief Boiler-plate messaging macro for initializing message types.
1512 *
1513 * \code
1514 * if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1515 * return -1;
1516 * }
1517 * \endcode
1518 *
1519 * \param name Name of message type.
1520 * \return 0 if initialization is successful.
1521 * \return Non-zero on failure.
1522 * \since 12
1523 */
1524#define STASIS_MESSAGE_TYPE_INIT(name) \
1525 ({ \
1526 ast_assert(_priv_ ## name == NULL); \
1527 stasis_message_type_create(#name, \
1528 &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
1529 })
1530
1531/*!
1532 * \brief Boiler-plate messaging macro for cleaning up message types.
1533 *
1534 * Note that if your type is defined in core instead of a loadable module, you
1535 * should call message type cleanup from an ast_register_cleanup() handler
1536 * instead of an ast_register_atexit() handler.
1537 *
1538 * The reason is that during an immediate shutdown, loadable modules (which may
1539 * refer to core message types) are not unloaded. While the atexit handlers are
1540 * run, there's a window of time where a module subscription might reference a
1541 * core message type after it's been cleaned up. Which is bad.
1542 *
1543 * \param name Name of message type.
1544 * \since 12
1545 */
1546#define STASIS_MESSAGE_TYPE_CLEANUP(name) \
1547 ({ \
1548 ao2_cleanup(_priv_ ## name); \
1549 _priv_ ## name = NULL; \
1550 })
1551
1552/*!
1553 * \brief Initialize the Stasis subsystem.
1554 * \return 0 on success.
1555 * \return Non-zero on error.
1556 * \since 12
1557 */
1558int stasis_init(void);
1559
1560/*!
1561 * \internal
1562 * \brief called by stasis_init() for cache initialization.
1563 * \return 0 on success.
1564 * \return Non-zero on error.
1565 * \since 12
1566 */
1567int stasis_cache_init(void);
1568
1569/*!
1570 * \internal
1571 * \brief called by stasis_init() for config initialization.
1572 * \return 0 on success.
1573 * \return Non-zero on error.
1574 * \since 12
1575 */
1577
1578/*!
1579 * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1580 *
1581 * \brief This group contains the topics, messages and corresponding message types
1582 * found within Asterisk.
1583 */
1584
1585#endif /* _ASTERISK_STASIS_H */
static PGresult * result
Definition cel_pgsql.c:84
static const char type[]
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags, int rdlock)
static const char name[]
Definition format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object to a multi object blob previously created.
Definition stasis.c:2213
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition stasis.h:1384
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Create and publish a stasis message blob on a channel with it's snapshot.
Definition stasis.c:2222
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis multi object blob.
Definition stasis.c:2187
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
@ STASIS_UMOS_ENDPOINT
Definition stasis.h:1387
@ STASIS_UMOS_BRIDGE
Definition stasis.h:1386
@ STASIS_UMOS_CHANNEL
Definition stasis.h:1385
Asterisk JSON abstraction layer.
The AMI - Asterisk Manager Interface - is a TCP protocol created to manage Asterisk with third-party ...
struct ao2_container * cache
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition stasis.c:1196
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
struct stasis_subscription * __stasis_subscribe_synchronous(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur synchronously on message publishing.
Definition stasis.c:1031
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition stasis.c:1960
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition stasis.c:1131
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition stasis.c:694
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition stasis.h:611
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition stasis.c:1089
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition stasis.c:1996
stasis_subscription_message_filter
Stasis subscription message filters.
Definition stasis.h:294
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition stasis.h:295
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition stasis.c:1626
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
Definition stasis.h:1062
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
const char * stasis_topic_uniqueid(const struct stasis_topic *topic)
Return the uniqueid of a topic.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition stasis.c:710
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition stasis.c:1594
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition stasis.c:1101
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition stasis.c:1155
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_EVENT
Definition stasis.h:312
@ STASIS_SUBSCRIPTION_FORMATTER_AMI
Definition stasis.h:311
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition stasis.h:310
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition stasis.h:309
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition stasis.c:689
int stasis_config_init(void)
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
int stasis_init(void)
Initialize the Stasis subsystem.
Definition stasis.c:3283
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition stasis.c:876
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a task pool.
Definition stasis.c:1020
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition stasis.c:1183
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition stasis.c:2505
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
void stasis_log_bad_type_access(const char *name)
Definition stasis.c:2152
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
int stasis_cache_init(void)
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition stasis.c:635
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition stasis.c:1252
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition stasis.c:702
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition stasis.c:1212
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition stasis.c:2139
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
Definition stasis.h:1086
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition stasis.c:1049
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition stasis.c:1171
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition stasis.c:1247
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
Definition stasis.c:1929
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1656
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:1009
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition stasis.c:1589
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition stasis.c:1228
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition stasis.h:1040
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
stasis_message_type_result
Return code for Stasis message type creation attempts.
Definition stasis.h:285
@ STASIS_MESSAGE_TYPE_DECLINED
Definition stasis.h:288
@ STASIS_MESSAGE_TYPE_ERROR
Definition stasis.h:286
@ STASIS_MESSAGE_TYPE_SUCCESS
Definition stasis.h:287
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
int stasis_message_can_be_ami(struct stasis_message *msg)
Determine if the given message can be converted to AMI.
Generic container type.
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
An Entity ID is essentially a MAC address, brief and unique.
Definition utils.h:853
An event.
Definition event.c:81
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition manager.h:504
A multi object blob data structure to carry user event stasis messages.
Definition stasis.c:2162
struct ast_json * blob
Definition stasis.c:2163
Cache update message.
Definition stasis.h:996
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition stasis.h:998
struct stasis_message * old_snapshot
Old value from the cache.
Definition stasis.h:1000
struct stasis_message * new_snapshot
New value.
Definition stasis.h:1002
cache_aggregate_calc_fn aggregate_calc_fn
snapshot_get_id id_fn
cache_aggregate_publish_fn aggregate_publish_fn
struct stasis_topic * original_topic
Forwarding information.
Definition stasis.c:1609
struct stasis_topic * from_topic
Definition stasis.c:1611
struct stasis_topic * to_topic
Definition stasis.c:1613
Structure containing callbacks for Stasis message sanitization.
Definition stasis.h:200
int(* channel)(const struct ast_channel *chan)
Callback which determines whether a channel should be sanitized from a message based on the channel.
Definition stasis.h:232
int(* channel_snapshot)(const struct ast_channel_snapshot *snapshot)
Callback which determines whether a channel should be sanitized from a message based on the channel's...
Definition stasis.h:221
int(* channel_id)(const char *channel_id)
Callback which determines whether a channel should be sanitized from a message based on the channel's...
Definition stasis.h:210
Virtual table providing methods for messages.
Definition stasis.h:239
struct ast_eid eid
Holds details about changes to subscriptions for the specified topic.
Definition stasis.h:921
struct stasis_topic * topic
Definition stasis.h:922
struct stasis_topic * topic
Definition stasis.c:751
char * detail
Definition stasis.c:456
Utility functions.