Asterisk - The Open Source Telephony Project GIT-master-6144b6b
Loading...
Searching...
No Matches
Data Structures | Macros | Typedefs | Enumerations | Functions
stasis.h File Reference

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation. More...

#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
#include "asterisk/event.h"
Include dependency graph for stasis.h:

Go to the source code of this file.

Data Structures

struct  stasis_cache_update
 Cache update message. More...
 
struct  stasis_message_sanitizer
 Structure containing callbacks for Stasis message sanitization. More...
 
struct  stasis_message_vtable
 Virtual table providing methods for messages. More...
 
struct  stasis_subscription_change
 Holds details about changes to subscriptions for the specified topic. More...
 

Macros

#define STASIS_MESSAGE_TYPE_CLEANUP(name)
 Boiler-plate messaging macro for cleaning up message types.
 
#define STASIS_MESSAGE_TYPE_DEFN(name, ...)
 Boiler-plate messaging macro for defining public message types.
 
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...)
 Boiler-plate messaging macro for defining local message types.
 
#define STASIS_MESSAGE_TYPE_INIT(name)
 Boiler-plate messaging macro for initializing message types.
 
#define stasis_subscribe(topic, callback, data)   __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define stasis_subscribe_pool(topic, callback, data)   __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define stasis_subscribe_synchronous(topic, callback, data)   __stasis_subscribe_synchronous(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define STASIS_UMOS_MAX   (STASIS_UMOS_ENDPOINT + 1)
 Number of snapshot types.
 

Typedefs

typedef struct stasis_message *(* cache_aggregate_calc_fn) (struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
 Callback to calculate the aggregate cache entry.
 
typedef void(* cache_aggregate_publish_fn) (struct stasis_topic *topic, struct stasis_message *aggregate)
 Callback to publish the aggregate cache entry message.
 
typedef const char *(* snapshot_get_id) (struct stasis_message *message)
 Callback extract a unique identity from a snapshot message.
 
typedef void(* stasis_subscription_cb) (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Callback function type for Stasis subscriptions.
 

Enumerations

enum  stasis_message_type_result { STASIS_MESSAGE_TYPE_ERROR = -1 , STASIS_MESSAGE_TYPE_SUCCESS , STASIS_MESSAGE_TYPE_DECLINED }
 Return code for Stasis message type creation attempts. More...
 
enum  stasis_subscription_message_filter { STASIS_SUBSCRIPTION_FILTER_NONE = 0 , STASIS_SUBSCRIPTION_FILTER_FORCED_NONE , STASIS_SUBSCRIPTION_FILTER_SELECTIVE }
 Stasis subscription message filters. More...
 
enum  stasis_subscription_message_formatters { STASIS_SUBSCRIPTION_FORMATTER_NONE = 0 , STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0 , STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1 , STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2 }
 Stasis subscription formatter filters. More...
 
enum  stasis_user_multi_object_snapshot_type { STASIS_UMOS_CHANNEL = 0 , STASIS_UMOS_BRIDGE , STASIS_UMOS_ENDPOINT }
 Object type code for multi user object snapshots. More...
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription.
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a task pool.
 
struct stasis_subscription__stasis_subscribe_synchronous (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur synchronously on message publishing.
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object to a multi object blob previously created.
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis multi object blob.
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Create and publish a stasis message blob on a channel with it's snapshot.
 
struct stasis_message_typeast_multi_user_event_type (void)
 Message type for custom user defined events with multi object blobs.
 
struct stasis_messagestasis_cache_clear_create (struct stasis_message *message)
 A message which instructs the caching topic to remove an entry from its cache.
 
struct stasis_message_typestasis_cache_clear_type (void)
 Message type for clearing a message from a stasis cache.
 
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache.
 
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache.
 
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity.
 
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription.
 
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity.
 
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot.
 
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot.
 
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index.
 
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity.
 
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache.
 
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity.
 
int stasis_cache_init (void)
 
struct stasis_message_typestasis_cache_update_type (void)
 Message type for cache update messages.
 
int stasis_caching_accept_message_type (struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
 Indicate to a caching topic that we are interested in a message type.
 
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics.
 
int stasis_caching_set_filter (struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache.
 
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic.
 
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic.
 
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.
 
int stasis_config_init (void)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another.
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem.
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_can_be_ami (struct stasis_message *msg)
 Determine if the given message can be converted to AMI.
 
struct stasis_messagestasis_message_create (struct stasis_message_type *type, void *data)
 Create a new message.
 
struct stasis_messagestasis_message_create_full (struct stasis_message_type *type, void *data, const struct ast_eid *eid)
 Create a new message for an entity.
 
void * stasis_message_data (const struct stasis_message *msg)
 Get the data contained in a message.
 
const struct ast_eidstasis_message_eid (const struct stasis_message *msg)
 Get the entity id for a stasis_message.
 
const struct timeval * stasis_message_timestamp (const struct stasis_message *msg)
 Get the time when a message was created.
 
struct ast_manager_event_blobstasis_message_to_ami (struct stasis_message *msg)
 Build the AMI representation of the message.
 
struct ast_eventstasis_message_to_event (struct stasis_message *msg)
 Build the Generic event system representation of the message.
 
struct ast_jsonstasis_message_to_json (struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
 Build the JSON representation of the message.
 
struct stasis_message_typestasis_message_type (const struct stasis_message *msg)
 Get the message type for a stasis_message.
 
enum stasis_subscription_message_formatters stasis_message_type_available_formatters (const struct stasis_message_type *message_type)
 Get a bitmap of available formatters for a message type.
 
enum stasis_message_type_result stasis_message_type_create (const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
 Create a new message type.
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined.
 
unsigned int stasis_message_type_hash (const struct stasis_message_type *type)
 Gets the hash of a given message type.
 
int stasis_message_type_id (const struct stasis_message_type *type)
 Gets the id of a given message type.
 
const char * stasis_message_type_name (const struct stasis_message_type *type)
 Gets the name of a given message type.
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers.
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters.
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type.
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing.
 
struct stasis_message_typestasis_subscription_change_type (void)
 Gets the message type for subscription change notices.
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type.
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription.
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message.
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed.
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription.
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription.
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription.
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription.
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic.
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail.
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic.
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name.
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic.
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic.
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool.
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool.
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool.
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic.
 
const char * stasis_topic_uniqueid (const struct stasis_topic *topic)
 Return the uniqueid of a topic.
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *subscription)
 Cancel a subscription.
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed.
 

Detailed Description

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m
Since
12

Definition in file stasis.h.

Macro Definition Documentation

◆ STASIS_MESSAGE_TYPE_CLEANUP

#define STASIS_MESSAGE_TYPE_CLEANUP (   name)
Value:
({ \
ao2_cleanup(_priv_ ## name); \
_priv_ ## name = NULL; \
})
static const char name[]
Definition format_mp3.c:68
#define NULL
Definition resample.c:96

Boiler-plate messaging macro for cleaning up message types.

Note that if your type is defined in core instead of a loadable module, you should call message type cleanup from an ast_register_cleanup() handler instead of an ast_register_atexit() handler.

The reason is that during an immediate shutdown, loadable modules (which may refer to core message types) are not unloaded. While the atexit handlers are run, there's a window of time where a module subscription might reference a core message type after it's been cleaned up. Which is bad.

Parameters
nameName of message type.
Since
12

Definition at line 1546 of file stasis.h.

1547 { \
1548 ao2_cleanup(_priv_ ## name); \
1549 _priv_ ## name = NULL; \
1550 })

◆ STASIS_MESSAGE_TYPE_DEFN

#define STASIS_MESSAGE_TYPE_DEFN (   name,
  ... 
)

Boiler-plate messaging macro for defining public message types.

.to_ami = foo_to_ami,
.to_json = foo_to_json,
.to_event = foo_to_event,
);
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
Definition stasis.h:1471
Parameters
nameName of message type.
...Virtual table methods for messages of this type.
Since
12

Definition at line 1471 of file stasis.h.

1472 { \
1473 __VA_ARGS__ \
1474 }; \
1475 static struct stasis_message_type *_priv_ ## name; \
1476 struct stasis_message_type *name(void) { \
1477 if (_priv_ ## name == NULL) { \
1478 stasis_log_bad_type_access(#name); \
1479 } \
1480 return _priv_ ## name; \
1481 }

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL

#define STASIS_MESSAGE_TYPE_DEFN_LOCAL (   name,
  ... 
)

Boiler-plate messaging macro for defining local message types.

.to_ami = foo_to_ami,
.to_json = foo_to_json,
.to_event = foo_to_event,
);
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name,...)
Boiler-plate messaging macro for defining local message types.
Definition stasis.h:1498
Parameters
nameName of message type.
...Virtual table methods for messages of this type.
Since
12

Definition at line 1498 of file stasis.h.

1499 { \
1500 __VA_ARGS__ \
1501 }; \
1502 static struct stasis_message_type *_priv_ ## name; \
1503 static struct stasis_message_type *name(void) { \
1504 if (_priv_ ## name == NULL) { \
1505 stasis_log_bad_type_access(#name); \
1506 } \
1507 return _priv_ ## name; \
1508 }

◆ STASIS_MESSAGE_TYPE_INIT

#define STASIS_MESSAGE_TYPE_INIT (   name)
Value:
({ \
ast_assert(_priv_ ## name == NULL); \
stasis_message_type_create(#name, \
&_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
})
@ STASIS_MESSAGE_TYPE_ERROR
Definition stasis.h:286

Boiler-plate messaging macro for initializing message types.

if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
return -1;
}
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition stasis.h:1524
Parameters
nameName of message type.
Returns
0 if initialization is successful.
Non-zero on failure.
Since
12

Definition at line 1524 of file stasis.h.

1525 { \
1526 ast_assert(_priv_ ## name == NULL); \
1527 stasis_message_type_create(#name, \
1528 &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
1529 })

◆ stasis_subscribe

#define stasis_subscribe (   topic,
  callback,
  data 
)    __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 649 of file stasis.h.

◆ stasis_subscribe_pool

#define stasis_subscribe_pool (   topic,
  callback,
  data 
)    __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 680 of file stasis.h.

◆ stasis_subscribe_synchronous

#define stasis_subscribe_synchronous (   topic,
  callback,
  data 
)    __stasis_subscribe_synchronous(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 711 of file stasis.h.

Typedef Documentation

◆ cache_aggregate_calc_fn

typedef struct stasis_message *(* cache_aggregate_calc_fn) (struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)

Callback to calculate the aggregate cache entry.

Since
12.2.0
Parameters
entryCache entry to calculate a new aggregate snapshot.
new_snapshotThe shapshot that is being updated.
Note
Return a ref bumped pointer from stasis_cache_entry_get_aggregate() if a new aggregate could not be calculated because of error.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
New aggregate-snapshot calculated on success. Caller has a reference on return.

Definition at line 1040 of file stasis.h.

◆ cache_aggregate_publish_fn

typedef void(* cache_aggregate_publish_fn) (struct stasis_topic *topic, struct stasis_message *aggregate)

Callback to publish the aggregate cache entry message.

Since
12.2.0

Once an aggregate message is calculated. This callback publishes the message so subscribers will know the new value of an aggregated state.

Parameters
topicThe aggregate message may be published to this topic. It is the topic to which the cache itself is subscribed.
aggregateThe aggregate shapshot message to publish.
Note
It is up to the function to determine if there is a better topic the aggregate message should be published over.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.

Definition at line 1086 of file stasis.h.

◆ snapshot_get_id

typedef const char *(* snapshot_get_id) (struct stasis_message *message)

Callback extract a unique identity from a snapshot message.

This identity is unique to the underlying object of the snapshot, such as the UniqueId field of a channel.

Parameters
messageMessage to extract id from.
Returns
String representing the snapshot's id.
Return values
NULLif the message_type of the message isn't a handled snapshot.
Since
12

Definition at line 1040 of file stasis.h.

◆ stasis_subscription_cb

typedef void(* stasis_subscription_cb) (void *data, struct stasis_subscription *sub, struct stasis_message *message)

Callback function type for Stasis subscriptions.

Parameters
dataData field provided with subscription.
subSubscription published on.
messagePublished message.
Since
12

Definition at line 611 of file stasis.h.

Enumeration Type Documentation

◆ stasis_message_type_result

Return code for Stasis message type creation attempts.

Enumerator
STASIS_MESSAGE_TYPE_ERROR 

Message type was not created due to allocation failure

STASIS_MESSAGE_TYPE_SUCCESS 

Message type was created successfully

STASIS_MESSAGE_TYPE_DECLINED 

Message type was not created due to configuration

Definition at line 285 of file stasis.h.

285 {
286 STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
287 STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
288 STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
289};
@ STASIS_MESSAGE_TYPE_DECLINED
Definition stasis.h:288
@ STASIS_MESSAGE_TYPE_SUCCESS
Definition stasis.h:287

◆ stasis_subscription_message_filter

Stasis subscription message filters.

Enumerator
STASIS_SUBSCRIPTION_FILTER_NONE 

No filter is in place, all messages are raised

STASIS_SUBSCRIPTION_FILTER_FORCED_NONE 

No filter is in place or can be set, all messages are raised

STASIS_SUBSCRIPTION_FILTER_SELECTIVE 

Only messages of allowed message types are raised

Definition at line 294 of file stasis.h.

294 {
295 STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
296 STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
297 STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
298};
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition stasis.h:296
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition stasis.h:295

◆ stasis_subscription_message_formatters

Stasis subscription formatter filters.

There should be an entry here for each member of stasis_message_vtable

Since
13.25.0
16.2.0
Enumerator
STASIS_SUBSCRIPTION_FORMATTER_NONE 
STASIS_SUBSCRIPTION_FORMATTER_JSON 

Allow messages with a to_json formatter

STASIS_SUBSCRIPTION_FORMATTER_AMI 

Allow messages with a to_ami formatter

STASIS_SUBSCRIPTION_FORMATTER_EVENT 

Allow messages with a to_event formatter

Definition at line 308 of file stasis.h.

308 {
310 STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
311 STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
312 STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
313};
@ STASIS_SUBSCRIPTION_FORMATTER_EVENT
Definition stasis.h:312
@ STASIS_SUBSCRIPTION_FORMATTER_AMI
Definition stasis.h:311
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition stasis.h:310
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition stasis.h:309

Function Documentation

◆ __stasis_subscribe()

struct stasis_subscription * __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 1009 of file stasis.c.

1016{
1017 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
1018}
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags, int rdlock)
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:923

References callback(), stasis_subscription::data, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ __stasis_subscribe_pool()

struct stasis_subscription * __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a task pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a taskpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 1020 of file stasis.c.

1027{
1028 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
1029}

References callback(), stasis_subscription::data, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ __stasis_subscribe_synchronous()

struct stasis_subscription * __stasis_subscribe_synchronous ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur synchronously on message publishing.

Since
23.5.0
22.11.0
20.21.0

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

This subscription will be invoked on the same thread that is publishing the message.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 1031 of file stasis.c.

1038{
1039 return internal_stasis_subscribe(topic, callback, data, 0, 0, file, lineno, func);
1040}

References callback(), stasis_subscription::data, internal_stasis_subscribe(), and stasis_subscription::topic.

◆ stasis_cache_clear_create()

struct stasis_message * stasis_cache_clear_create ( struct stasis_message message)

A message which instructs the caching topic to remove an entry from its cache.

Parameters
messageMessage representative of the cache entry that should be cleared. This will become the data held in the stasis_cache_clear message.
Returns
Message which, when sent to a stasis_caching_topic, will clear the item from the cache.
Return values
NULLon error.
Since
12

Definition at line 778 of file stasis_cache.c.

779{
781}
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.

References stasis_cache_clear_type(), and stasis_message_create().

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), clear_node_cache(), and remove_device_states_cb().

◆ stasis_cache_create()

struct stasis_cache * stasis_cache_create ( snapshot_get_id  id_fn)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12

Definition at line 360 of file stasis_cache.c.

361{
362 return stasis_cache_create_full(id_fn, NULL, NULL);
363}
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.

References stasis_cache::id_fn, NULL, and stasis_cache_create_full().

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), mwi_init(), and stasis_cp_all_create().

◆ stasis_cache_create_full()

struct stasis_cache * stasis_cache_create_full ( snapshot_get_id  id_fn,
cache_aggregate_calc_fn  aggregate_calc_fn,
cache_aggregate_publish_fn  aggregate_publish_fn 
)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
aggregate_calc_fnCallback to calculate the aggregate cache entry.
aggregate_publish_fnCallback to publish the aggregate cache entry.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12.2.0

Definition at line 334 of file stasis_cache.c.

337{
338 struct stasis_cache *cache;
339
342 if (!cache) {
343 return NULL;
344 }
345
348 if (!cache->entries) {
350 return NULL;
351 }
352
353 cache->id_fn = id_fn;
354 cache->aggregate_calc_fn = aggregate_calc_fn;
355 cache->aggregate_publish_fn = aggregate_publish_fn;
356
357 return cache;
358}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition astobj2.h:365
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
struct ao2_container * cache
static int cache_entry_hash(const void *obj, int flags)
#define NUM_CACHE_BUCKETS
static int cache_entry_cmp(void *obj, void *arg, int flags)
static void cache_dtor(void *obj)
cache_aggregate_calc_fn aggregate_calc_fn
snapshot_get_id id_fn
cache_aggregate_publish_fn aggregate_publish_fn

References stasis_cache::aggregate_calc_fn, stasis_cache::aggregate_publish_fn, AO2_ALLOC_OPT_LOCK_NOLOCK, AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, cache, cache_dtor(), cache_entry_cmp(), cache_entry_hash(), stasis_cache::id_fn, NULL, and NUM_CACHE_BUCKETS.

Referenced by AST_TEST_DEFINE(), devstate_init(), and stasis_cache_create().

◆ stasis_cache_dump()

struct ao2_container * stasis_cache_dump ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump cached items to a subscription for the ast_eid_default entity.

Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error
Since
12

Definition at line 736 of file stasis_cache.c.

737{
739}
static const char type[]
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
struct ast_eid ast_eid_default
Global EID.
Definition options.c:94

References ast_eid_default, cache, stasis_cache_dump_by_eid(), and type.

Referenced by action_presencestatelist(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), AST_TEST_DEFINE(), asterisk_publication_devicestate_refresh(), asterisk_publication_mwi_refresh(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), endpoints_scrape_cb(), load_module(), load_module(), unload_module(), and xmpp_init_event_distribution().

◆ stasis_cache_dump_all()

struct ao2_container * stasis_cache_dump_all ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump all entity items from the cache to a subscription.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 757 of file stasis_cache.c.

758{
759 struct cache_dump_data cache_dump;
760
762 ast_assert(cache->entries != NULL);
763
764 cache_dump.eid = NULL;
765 cache_dump.type = type;
766 cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767 if (!cache_dump.container) {
768 return NULL;
769 }
770
772 return cache_dump.container;
773}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition astobj2.h:1327
static int cache_dump_all_cb(void *obj, void *arg, int flags)
#define ast_assert(a)
Definition utils.h:779

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache, cache_dump_all_cb(), cache_dump_data::container, cache_dump_data::eid, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by AST_TEST_DEFINE(), cache_cleanup(), and cleanup_module().

◆ stasis_cache_dump_by_eid()

struct ao2_container * stasis_cache_dump_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const struct ast_eid eid 
)

Dump cached items to a subscription for a specific entity.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
eidSpecific entity id to retrieve. NULL for aggregate.
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 718 of file stasis_cache.c.

719{
720 struct cache_dump_data cache_dump;
721
723 ast_assert(cache->entries != NULL);
724
725 cache_dump.eid = eid;
726 cache_dump.type = type;
727 cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728 if (!cache_dump.container) {
729 return NULL;
730 }
731
733 return cache_dump.container;
734}
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
const struct ast_eid * eid

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache, cache_dump_by_eid_cb(), cache_dump_data::container, cache_dump_data::eid, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by action_devicestatelist(), AST_TEST_DEFINE(), cpg_confchg_cb(), and stasis_cache_dump().

◆ stasis_cache_entry_get_aggregate()

struct stasis_message * stasis_cache_entry_get_aggregate ( struct stasis_cache_entry entry)

Get the aggregate cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the aggregate snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Aggregate-snapshotin cache.
NULLif not present.

Definition at line 365 of file stasis_cache.c.

366{
367 return entry->aggregate;
368}
struct stasis_message * aggregate

References stasis_cache_entry::aggregate.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_entry_get_local()

struct stasis_message * stasis_cache_entry_get_local ( struct stasis_cache_entry entry)

Get the local entity's cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the local entity's snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Internal-snapshotin cache.
NULLif not present.

Definition at line 370 of file stasis_cache.c.

371{
372 return entry->local;
373}
struct stasis_message * local

References stasis_cache_entry::local.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_entry_get_remote()

struct stasis_message * stasis_cache_entry_get_remote ( struct stasis_cache_entry entry,
int  idx 
)

Get a remote entity's cache entry snapshot by index.

Since
12.2.0
Parameters
entryCache entry to get a remote entity's snapshot.
idxWhich remote entity's snapshot to get.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Remote-entity-snapshotin cache.
NULLif not present.

Definition at line 375 of file stasis_cache.c.

376{
377 if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378 return AST_VECTOR_GET(&entry->remote, idx);
379 }
380 return NULL;
381}
struct stasis_cache_entry::@426 remote
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References AST_VECTOR_GET, AST_VECTOR_SIZE, NULL, and stasis_cache_entry::remote.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_get()

struct stasis_message * stasis_cache_get ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve an item from the cache for the ast_eid_default entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12

Definition at line 686 of file stasis_cache.c.

687{
689}
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.

References ast_eid_default, cache, stasis_cache_get_by_eid(), and type.

Referenced by ast_endpoint_latest_snapshot(), AST_TEST_DEFINE(), has_voicemail(), presence_state_cached(), unistim_send_mwi_to_peer(), and update_registry().

◆ stasis_cache_get_all()

struct ao2_container * stasis_cache_get_all ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve all matching entity items from the cache.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Container of matching items found.
Return values
NULLif error.

Definition at line 587 of file stasis_cache.c.

588{
589 struct stasis_cache_entry *cached_entry;
590 struct ao2_container *found;
591
593 ast_assert(cache->entries != NULL);
594 ast_assert(id != NULL);
595
596 if (!type) {
597 return NULL;
598 }
599
601 if (!found) {
602 return NULL;
603 }
604
605 ao2_rdlock(cache->entries);
606
607 cached_entry = cache_find(cache->entries, type, id);
608 if (cached_entry && cache_entry_dump(found, cached_entry)) {
609 ao2_cleanup(found);
610 found = NULL;
611 }
612
613 ao2_unlock(cache->entries);
614
615 ao2_cleanup(cached_entry);
616 return found;
617}
#define ao2_rdlock(a)
Definition astobj2.h:718
#define ao2_unlock(a)
Definition astobj2.h:729
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Generic container type.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_rdlock, ao2_unlock, ast_assert, cache, cache_entry_dump(), cache_find(), NULL, and type.

Referenced by AST_TEST_DEFINE().

◆ stasis_cache_get_by_eid()

struct stasis_message * stasis_cache_get_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid 
)

Retrieve an item from the cache for a specific entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
eidSpecific entity id to retrieve. NULL for aggregate.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12.2.0

Definition at line 659 of file stasis_cache.c.

660{
661 struct stasis_cache_entry *cached_entry;
662 struct stasis_message *snapshot = NULL;
663
665 ast_assert(cache->entries != NULL);
666 ast_assert(id != NULL);
667
668 if (!type) {
669 return NULL;
670 }
671
672 ao2_rdlock(cache->entries);
673
674 cached_entry = cache_find(cache->entries, type, id);
675 if (cached_entry) {
676 snapshot = cache_entry_by_eid(cached_entry, eid);
677 ao2_bump(snapshot);
678 }
679
680 ao2_unlock(cache->entries);
681
682 ao2_cleanup(cached_entry);
683 return snapshot;
684}
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
struct ast_eid eid

References ao2_bump, ao2_cleanup, ao2_rdlock, ao2_unlock, ast_assert, cache, cache_entry_by_eid(), cache_find(), stasis_message::eid, NULL, and type.

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), AST_TEST_DEFINE(), check_cache_aggregate(), devstate_cached(), and stasis_cache_get().

◆ stasis_cache_init()

int stasis_cache_init ( void  )

Definition at line 1008 of file stasis_cache.c.

1009{
1011
1013 return -1;
1014 }
1015
1017 return -1;
1018 }
1019
1020 return 0;
1021}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static void stasis_cache_cleanup(void)

References ast_register_cleanup(), stasis_cache_cleanup(), stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_INIT.

Referenced by stasis_init().

◆ stasis_caching_accept_message_type()

int stasis_caching_accept_message_type ( struct stasis_caching_topic caching_topic,
struct stasis_message_type type 
)

Indicate to a caching topic that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
caching_topicThe caching topic.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 90 of file stasis_cache.c.

92{
93 int res;
94
95 if (!caching_topic) {
96 return -1;
97 }
98
99 /* We wait to accept the stasis specific message types until now so that by default everything
100 * will flow to us.
101 */
104 res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105
106 return res;
107}
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition stasis.c:1101
struct stasis_subscription * sub

References stasis_cache_clear_type(), stasis_subscription_accept_message_type(), stasis_subscription_change_type(), stasis_caching_topic::sub, and type.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_accept_message_type().

◆ stasis_caching_get_topic()

struct stasis_topic * stasis_caching_get_topic ( struct stasis_caching_topic caching_topic)

Returns the topic of cached events from a caching topics.

Parameters
caching_topicThe caching topic.
Returns
The topic that publishes cache update events, along with passthrough events from the underlying topic.
Return values
NULLif caching_topic is NULL.
Since
12

Definition at line 85 of file stasis_cache.c.

86{
87 return caching_topic->topic;
88}
struct stasis_topic * topic

References stasis_caching_topic::topic.

Referenced by ast_device_state_topic_cached(), ast_mwi_topic_cached(), ast_presence_state_topic_cached(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_cp_single_create(), and stasis_cp_single_topic_cached().

◆ stasis_caching_set_filter()

int stasis_caching_set_filter ( struct stasis_caching_topic caching_topic,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
caching_topicThe caching topic.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 109 of file stasis_cache.c.

111{
112 if (!caching_topic) {
113 return -1;
114 }
115 return stasis_subscription_set_filter(caching_topic->sub, filter);
116}
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition stasis.c:1155

References filter(), stasis_subscription_set_filter(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_set_filter().

◆ stasis_caching_topic_create()

struct stasis_caching_topic * stasis_caching_topic_create ( struct stasis_topic original_topic,
struct stasis_cache cache 
)

Create a topic which monitors and caches messages from another topic.

The idea is that some topics publish 'snapshots' of some other object's state that should be cached. When these snapshot messages are received, the cache is updated, and a stasis_cache_update() message is forwarded, which has both the original snapshot message and the new message.

The returned object is AO2 managed, so ao2_cleanup() when done with it.

Parameters
original_topicTopic publishing snapshot messages.
cacheBackend cache in which to keep snapshots.
Returns
New topic which changes snapshot messages to stasis_cache_update() messages, and forwards all other messages from the original topic.
Return values
NULLon error
Since
12

Definition at line 948 of file stasis_cache.c.

949{
950 struct stasis_caching_topic *caching_topic;
951 static int caching_id;
952 char *new_name;
953 int ret;
954
955 ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956 if (ret < 0) {
957 return NULL;
958 }
959
960 caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962 if (caching_topic == NULL) {
963 ast_free(new_name);
964
965 return NULL;
966 }
967
968 caching_topic->topic = stasis_topic_create(new_name);
969 if (caching_topic->topic == NULL) {
970 ao2_ref(caching_topic, -1);
971 ast_free(new_name);
972
973 return NULL;
974 }
975
976 ao2_ref(cache, +1);
977 caching_topic->cache = cache;
978 if (!cache->registered) {
979 if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980 ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981 cache->entries, new_name);
982 } else {
983 cache->registered = 1;
984 }
985 }
986 ast_free(new_name);
987
988 caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989 if (caching_topic->sub == NULL) {
990 ao2_ref(caching_topic, -1);
991
992 return NULL;
993 }
994
996 caching_topic->original_topic = original_topic;
997
998 /* The subscription holds the reference, so no additional ref bump. */
999 return caching_topic;
1000}
#define ast_free(a)
Definition astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_log
Definition astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define LOG_ERROR
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition stasis.c:694
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
static void stasis_caching_topic_dtor(void *obj)
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_task_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:923
struct stasis_cache * cache
struct stasis_topic * original_topic

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_container_register(), ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, ast_log, stasis_caching_topic::cache, cache, caching_topic_exec(), internal_stasis_subscribe(), LOG_ERROR, NULL, stasis_caching_topic::original_topic, print_cache_entry(), stasis_caching_topic_dtor(), stasis_topic_create(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), devstate_init(), mwi_init(), and stasis_cp_sink_create().

◆ stasis_caching_unsubscribe()

struct stasis_caching_topic * stasis_caching_unsubscribe ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic.

This function returns immediately, so be sure to cleanup when stasis_subscription_final_message() is received.

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 119 of file stasis_cache.c.

120{
121 if (!caching_topic) {
122 return NULL;
123 }
124
125 /*
126 * The subscription may hold the last reference to this caching
127 * topic, but we want to make sure the unsubscribe finishes
128 * before kicking of the caching topic's dtor.
129 */
130 ao2_ref(caching_topic, +1);
131
132 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133 /*
134 * Increment the reference to hold on to it past the
135 * unsubscribe. Will be cleaned up in dtor.
136 */
137 ao2_ref(caching_topic->sub, +1);
138 stasis_unsubscribe(caching_topic->sub);
139 } else {
140 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141 }
142 ao2_cleanup(caching_topic);
143 return NULL;
144}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition stasis.c:1049
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition stasis.c:1228

References ao2_cleanup, ao2_ref, ast_log, LOG_ERROR, NULL, stasis_subscription_is_subscribed(), stasis_unsubscribe(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_caching_unsubscribe_and_join(), and stasis_cp_single_unsubscribe().

◆ stasis_caching_unsubscribe_and_join()

struct stasis_caching_topic * stasis_caching_unsubscribe_and_join ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.

See stasis_unsubscribe_and_join() for more info on when to use this as opposed to stasis_caching_unsubscribe().

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 146 of file stasis_cache.c.

147{
148 if (!caching_topic) {
149 return NULL;
150 }
151
152 /* Hold a ref past the unsubscribe */
153 ao2_ref(caching_topic, +1);
154 stasis_caching_unsubscribe(caching_topic);
155 stasis_subscription_join(caching_topic->sub);
156 ao2_cleanup(caching_topic);
157 return NULL;
158}
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition stasis.c:1183
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.

References ao2_cleanup, ao2_ref, NULL, stasis_caching_unsubscribe(), stasis_subscription_join(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), devstate_cleanup(), mwi_cleanup(), and presence_state_engine_cleanup().

◆ stasis_config_init()

int stasis_config_init ( void  )

◆ stasis_forward_all()

struct stasis_forward * stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
Return values
NULLon error.
Since
12

Definition at line 1656 of file stasis.c.

1658{
1659 int res;
1660 size_t idx;
1661 struct stasis_forward *forward;
1662
1663 if (!from_topic || !to_topic) {
1664 return NULL;
1665 }
1666
1667 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1668 if (!forward) {
1669 return NULL;
1670 }
1671
1672 /* Forwards to ourselves are implicit. */
1673 if (to_topic == from_topic) {
1674 return forward;
1675 }
1676
1677 forward->from_topic = ao2_bump(from_topic);
1678 forward->to_topic = ao2_bump(to_topic);
1679
1682 if (res != 0) {
1685 ao2_ref(forward, -1);
1686 return NULL;
1687 }
1688
1689 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1691 }
1694
1695 return forward;
1696}
static void forward_dtor(void *obj)
Definition stasis.c:1616
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition stasis.c:1279
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition stasis.c:492
Forwarding information.
Definition stasis.c:1609
struct stasis_topic * from_topic
Definition stasis.c:1611
struct stasis_topic * to_topic
Definition stasis.c:1613
struct stasis_topic::@423 upstream_topics
struct stasis_topic::@422 subscribers
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_add_subscription(), topic_lock_both, and stasis_topic::upstream_topics.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), create_subscriptions(), create_subscriptions(), endpoint_internal_create(), extension_state_alloc(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), load_module(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_forward_cancel()

struct stasis_forward * stasis_forward_cancel ( struct stasis_forward forward)

Definition at line 1626 of file stasis.c.

1627{
1628 int idx;
1629 struct stasis_topic *from;
1630 struct stasis_topic *to;
1631
1632 if (!forward) {
1633 return NULL;
1634 }
1635
1636 from = forward->from_topic;
1637 to = forward->to_topic;
1638
1639 if (from && to) {
1640 topic_lock_both(to, from);
1643
1644 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1646 }
1647 ao2_unlock(from);
1648 ao2_unlock(to);
1649 }
1650
1651 ao2_cleanup(forward);
1652
1653 return NULL;
1654}
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1307
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition vector.h:582
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition vector.h:594

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_lock_both, topic_remove_subscription(), and stasis_topic::upstream_topics.

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), destroy_subscriptions(), extension_state_shutdown(), forwards_unsubscribe(), load_general_config(), load_module(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), unload_module(), unload_module(), and unload_module().

◆ stasis_init()

int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3283 of file stasis.c.

3284{
3285 struct stasis_config *cfg;
3286 int cache_init;
3287 struct ast_taskpool_options taskpool_opts = { 0, };
3288#ifdef AST_DEVMODE
3289 struct ao2_container *subscription_stats;
3290 struct ao2_container *topic_stats;
3291#endif
3292
3293 /* Be sure the types are cleaned up after the message bus */
3295
3296 if (aco_info_init(&cfg_info)) {
3297 return -1;
3298 }
3299
3300 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3302 aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
3304 FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
3305 INT_MAX);
3306 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3308 FLDSET(struct stasis_taskpool_conf, initial_size), 0,
3309 INT_MAX);
3310 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3312 FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
3313 INT_MAX);
3314 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3316 FLDSET(struct stasis_taskpool_conf, max_size), 0,
3317 INT_MAX);
3318
3319 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3320 struct stasis_config *default_cfg = stasis_config_alloc();
3321
3322 if (!default_cfg) {
3323 return -1;
3324 }
3325
3326 if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
3327 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3328 ao2_ref(default_cfg, -1);
3329
3330 return -1;
3331 }
3332
3333 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3334 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3335 ao2_ref(default_cfg, -1);
3336
3337 return -1;
3338 }
3339
3340 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3342 cfg = default_cfg;
3343 } else {
3345 if (!cfg) {
3346 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3347
3348 return -1;
3349 }
3350 }
3351
3352 taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
3353 taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
3354 taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
3355 taskpool_opts.auto_increment = 1;
3356 taskpool_opts.max_size = cfg->taskpool_options->max_size;
3357 taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
3358 taskpool = ast_taskpool_create("stasis", &taskpool_opts);
3359 ao2_ref(cfg, -1);
3360 if (!taskpool) {
3361 ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
3362
3363 return -1;
3364 }
3365
3366 cache_init = stasis_cache_init();
3367 if (cache_init != 0) {
3368 return -1;
3369 }
3370
3372 return -1;
3373 }
3375 return -1;
3376 }
3377
3379 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3380 if (!topic_all) {
3381 return -1;
3382 }
3383
3385 return -1;
3386 }
3387
3388#ifdef AST_DEVMODE
3389 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3390 * topic or subscripton.
3391 */
3392 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3393 subscription_statistics_hash, 0, subscription_statistics_cmp);
3394 if (!subscription_stats) {
3395 return -1;
3396 }
3397 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3398 ao2_cleanup(subscription_stats);
3399
3400 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3401 topic_statistics_hash, 0, topic_statistics_cmp);
3402 if (!topic_stats) {
3403 return -1;
3404 }
3405 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3406 ao2_cleanup(topic_stats);
3407 if (!topic_stats) {
3408 return -1;
3409 }
3410
3411 AST_VECTOR_INIT(&message_type_statistics, 0);
3412
3413 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3414 return -1;
3415 }
3416#endif
3417
3418 return 0;
3419}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition astobj2.h:901
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition astobj2.h:918
static struct console_pvt globals
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define LOG_NOTICE
static struct ast_cli_entry cli_stasis[]
Definition stasis.c:2698
static void * stasis_config_alloc(void)
Definition stasis.c:2475
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition stasis.c:3264
static struct aco_type taskpool_option
Definition stasis.c:2424
static struct ast_taskpool * taskpool
Definition stasis.c:374
static struct aco_type * taskpool_options[]
Definition stasis.c:2432
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition stasis.c:2526
struct aco_type * declined_options[]
Definition stasis.c:2443
struct ao2_container * topic_all
Definition stasis.c:462
#define TOPIC_ALL_BUCKETS
Definition stasis.c:384
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition stasis.c:2435
int stasis_cache_init(void)
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition taskpool.h:88
int max_size
Maximum number of taskprocessors a pool may have.
Definition taskpool.h:122
int auto_increment
Number of taskprocessors to increment the pool by.
Definition taskpool.h:92
int minimum_size
Number of taskprocessors that will always exist.
Definition taskpool.h:99
int initial_size
Number of taskprocessors the pool will start with.
Definition taskpool.h:109
struct stasis_declined_config * declined_message_types
Definition stasis.c:2413
struct stasis_taskpool_conf * taskpool_options
Definition stasis.c:2411
Taskpool configuration options.
Definition stasis.c:2398
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition taskpool.c:324
#define ARRAY_LEN(a)
Definition utils.h:706
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124

References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_taskpool_create(), AST_TASKPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_taskpool_options::auto_increment, cli_stasis, declined_handler(), stasis_config::declined_message_types, declined_option, declined_options, FLDSET, globals, ast_taskpool_options::idle_timeout, stasis_taskpool_conf::idle_timeout_sec, ast_taskpool_options::initial_size, stasis_taskpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_taskpool_options::max_size, stasis_taskpool_conf::max_size, ast_taskpool_options::minimum_size, stasis_taskpool_conf::minimum_size, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), taskpool, taskpool_option, stasis_config::taskpool_options, taskpool_options, topic_all, TOPIC_ALL_BUCKETS, and ast_taskpool_options::version.

Referenced by asterisk_daemon().

◆ stasis_log_bad_type_access()

void stasis_log_bad_type_access ( const char *  name)

Definition at line 2152 of file stasis.c.

2153{
2154#ifdef AST_DEVMODE
2156 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
2157 }
2158#endif
2159}
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition stasis.c:2505

References ast_log, LOG_ERROR, name, and stasis_message_type_declined().

◆ stasis_message_can_be_ami()

int stasis_message_can_be_ami ( struct stasis_message msg)

Determine if the given message can be converted to AMI.

Parameters
msgMessage to see if can be converted to AMI.
Return values
0Cannot be converted
non-zeroCan be converted

Definition at line 251 of file stasis_message.c.

252{
253 return HAS_VIRTUAL(to_ami, msg);
254}
#define HAS_VIRTUAL(fn, msg)

References HAS_VIRTUAL, and to_ami().

◆ stasis_message_create()

struct stasis_message * stasis_message_create ( struct stasis_message_type type,
void *  data 
)

Create a new message.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters
typeType of the message
dataImmutable data that is the actual contents of the message
Returns
New message
Return values
NULLon error
Since
12

Definition at line 174 of file stasis_message.c.

175{
177}
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.

References ast_eid_default, stasis_message::data, stasis_message_create_full(), and type.

Referenced by aoc_publish_blob(), ast_bridge_blob_create(), ast_bridge_blob_create_from_snapshots(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cdr_engine_term(), ast_channel_publish_dial_internal(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_endpoint_blob_create(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_mwi_blob_create(), ast_refer_notify_transfer_request(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cc_publish(), cdr_prop_write(), cdr_read(), cdr_write(), create_channel_blob_message(), create_endpoint_snapshot_message(), endpoint_publish_snapshot(), extension_state_device_state_cb(), extension_state_presence_state_cb(), extension_state_remove_message_create(), extension_state_update_sources(), forkcdr_exec(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), presence_state_event(), publish_acl_change(), publish_app_cdr_message(), publish_chanspy_message(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_load_message_type(), publish_local_bridge_message(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), send_call_pickup_stasis_message(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_user_event(), stasis_cache_clear_create(), stasis_test_message_create(), stun_monitor_request(), and update_create().

◆ stasis_message_create_full()

struct stasis_message * stasis_message_create_full ( struct stasis_message_type type,
void *  data,
const struct ast_eid eid 
)

Create a new message for an entity.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters
typeType of the message
dataImmutable data that is the actual contents of the message
eidWhat entity originated this message. (NULL for aggregate)
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Newmessage
NULLon error
Since
12.2.0

Definition at line 140 of file stasis_message.c.

141{
142 struct stasis_message *message;
143
144 if (type == NULL || data == NULL) {
145 return NULL;
146 }
147
150 if (message == NULL) {
151 return NULL;
152 }
153
154 message->timestamp = ast_tvnow();
155 /*
156 * XXX Normal ao2 ref counting rules says we should increment the message
157 * type ref here and decrement it in stasis_message_dtor(). However, the
158 * stasis message could be cached and legitimately cause the type ref count
159 * to hit the excessive ref count assertion. Since the message type
160 * practically has to be a global object anyway, we can get away with not
161 * holding a ref in the stasis message.
162 */
163 message->type = type;
164 ao2_ref(data, +1);
165 message->data = data;
166 if (eid) {
167 message->eid_ptr = &message->eid;
168 message->eid = *eid;
169 }
170
171 return message;
172}
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
Definition astobj2.h:402
static void stasis_message_dtor(void *obj)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_ref, ao2_t_alloc_options, ast_tvnow(), stasis_message::data, stasis_message::eid, NULL, stasis_message_dtor(), and type.

Referenced by ast_publish_device_state_full(), AST_TEST_DEFINE(), cache_test_message_create_full(), create_foo_type_message(), device_state_aggregate_calc(), mwi_state_create_message(), and stasis_message_create().

◆ stasis_message_data()

void * stasis_message_data ( const struct stasis_message msg)

Get the data contained in a message.

Parameters
msgMessage.
Returns
Immutable data pointer
Return values
NULLif msg is NULL.
Since
12

Definition at line 195 of file stasis_message.c.

196{
197 if (msg == NULL) {
198 return NULL;
199 }
200 return msg->data;
201}

References stasis_message::data, and NULL.

Referenced by agent_login_to_ami(), agent_logoff_to_ami(), agi_channel_to_ami(), aoc_to_ami(), appcdr_callback(), ari_transfer_to_json(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), ast_bridge_merge_message_to_json(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_channel_entered_bridge_to_json(), ast_channel_left_bridge_to_json(), ast_delete_mwi_state_full(), ast_endpoint_latest_snapshot(), ast_mwi_publish_by_mailbox(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), attended_transfer_to_ami(), attended_transfer_to_json(), blind_transfer_to_ami(), blind_transfer_to_json(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_cb(), bridge_merge_handler(), bridge_snapshot_update(), cache_test_aggregate_calc_fn(), cache_test_data_id(), cache_update(), cache_update_cb(), caching_topic_exec(), call_forwarded_handler(), call_pickup_to_ami(), cc_available_to_ami(), cc_callerrecalling_to_ami(), cc_callerstartmonitoring_to_ami(), cc_callerstopmonitoring_to_ami(), cc_failure_to_ami(), cc_monitorfailed_to_ami(), cc_offertimerstart_to_ami(), cc_recallcomplete_to_ami(), cc_requestacknowledged_to_ami(), cc_requested_to_ami(), cdr_prop_write_callback(), cdr_read_callback(), cdr_write_callback(), cel_attended_transfer_cb(), cel_blind_transfer_cb(), cel_bridge_enter_cb(), cel_bridge_leave_cb(), cel_dial_cb(), cel_generic_cb(), cel_local_optimization_cb_helper(), cel_parking_cb(), cel_pickup_cb(), cel_snapshot_update_cb(), channel_blob_to_json(), channel_chanspy_start_cb(), channel_chanspy_stop_cb(), channel_dial_cb(), channel_dtmf_begin_cb(), channel_dtmf_end_cb(), channel_enter_cb(), channel_fax_cb(), channel_flash_cb(), channel_hangup_handler_cb(), channel_hangup_request_cb(), channel_hold_cb(), channel_leave_cb(), channel_mixmonitor_mute_cb(), channel_mixmonitor_start_cb(), channel_mixmonitor_stop_cb(), channel_moh_start_cb(), channel_moh_stop_cb(), channel_snapshot_update(), channel_unhold_cb(), channel_wink_cb(), check_cache_aggregate(), conf_send_event_to_participants(), confbridge_atxfer_cb(), confbridge_publish_manager_event(), confbridge_talking_cb(), consumer_exec(), contactstatus_to_ami(), contactstatus_to_json(), corosync_ping_to_event(), dahdichannel_to_ami(), device_state_aggregate_calc(), device_state_cb(), device_state_cb(), device_state_cb(), device_state_get_id(), devstate_cached(), devstate_change_cb(), devstate_to_ami(), devstate_to_event(), dial_to_json(), dtmf_end_to_json(), dump_cache_load(), dump_cache_unload(), dump_consumer(), endpoint_snapshot_get_id(), endpoints_scrape_cb(), explicit_publish_cb(), extension_state_autohints_device_state_cb(), extension_state_device_state_cb(), extension_state_legacy_remove_cb(), extension_state_legacy_update_cb(), extension_state_presence_state_cb(), fake_ami(), fake_json(), find_route(), forkcdr_callback(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), get_bool_header(), handle_attended_transfer(), handle_blind_transfer(), handle_bridge_enter(), handle_bridge_enter_message(), handle_bridge_leave_message(), handle_channel_snapshot_update_message(), handle_dial_message(), handle_hangup(), handle_local_optimization_begin(), handle_local_optimization_end(), handle_masquerade(), handle_mwi_state(), handle_parked_call_message(), has_voicemail(), hold_to_json(), implicit_publish_cb(), is_msg(), local_message_to_ami(), manager_generic_msg_cb(), meetme_stasis_cb(), moh_post_start(), moh_post_stop(), multi_user_event_to_ami(), multi_user_event_to_json(), mwi_app_event_cb(), mwi_startup_event_cb(), mwi_state_get_id(), mwi_to_event(), mwi_update_cb(), mwi_update_cb(), park_announce_update_cb(), parker_update_cb(), parking_event_cb(), peerstatus_to_ami(), peerstatus_to_json(), playback_to_json(), presence_state_cached(), presence_state_get_id(), presence_state_to_ami(), queue_agent_cb(), queue_channel_to_ami(), queue_member_to_ami(), queue_multi_channel_to_ami(), recording_to_json(), refer_progress_bridge(), remove_device_states_cb(), rtcp_report_to_ami(), rtcp_report_to_json(), security_event_to_ami(), security_stasis_cb(), stasis_end_to_json(), stasis_start_to_json(), stasis_state_subscriber_data(), stasis_subscription_final_message(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_endpoint_update_handler(), subscription_persistence_event_cb(), system_registry_to_ami(), talking_start_to_ami(), talking_stop_to_ami(), unhold_to_json(), unistim_send_mwi_to_peer(), update_registry(), updates(), varset_to_ami(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ stasis_message_eid()

const struct ast_eid * stasis_message_eid ( const struct stasis_message msg)

Get the entity id for a stasis_message.

Since
12.2.0
Parameters
msgMessage to get eid.
Return values
Entityid of msg
NULLif msg is an aggregate or msg is NULL.

Definition at line 179 of file stasis_message.c.

180{
181 if (msg == NULL) {
182 return NULL;
183 }
184 return msg->eid_ptr;
185}
const struct ast_eid * eid_ptr

References stasis_message::eid_ptr, and NULL.

Referenced by AST_TEST_DEFINE(), cache_entry_by_eid(), cache_entry_create(), cache_remove(), cache_update(), caching_topic_exec(), and clear_node_cache().

◆ stasis_message_timestamp()

const struct timeval * stasis_message_timestamp ( const struct stasis_message msg)

◆ stasis_message_to_ami()

struct ast_manager_event_blob * stasis_message_to_ami ( struct stasis_message msg)

Build the AMI representation of the message.

May return NULL, to indicate no representation. The returned object should be ao2_cleanup()'ed.

Parameters
msgMessage to convert to AMI.
Return values
NULLif AMI format is not supported.

Definition at line 224 of file stasis_message.c.

225{
226 return INVOKE_VIRTUAL(to_ami, msg);
227}
#define INVOKE_VIRTUAL(fn,...)

References INVOKE_VIRTUAL, and to_ami().

Referenced by action_devicestatelist(), action_presencestatelist(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and manager_default_msg_cb().

◆ stasis_message_to_event()

struct ast_event * stasis_message_to_event ( struct stasis_message msg)

Build the Generic event system representation of the message.

May return NULL, to indicate no representation. The returned object should be disposed of via ast_event_destroy.

Parameters
msgMessage to convert to AMI.
Return values
NULLif AMI format is not supported.

Definition at line 236 of file stasis_message.c.

237{
238 return INVOKE_VIRTUAL(to_event, msg);
239}

References INVOKE_VIRTUAL.

Referenced by publish_to_corosync().

◆ stasis_message_to_json()

struct ast_json * stasis_message_to_json ( struct stasis_message msg,
struct stasis_message_sanitizer sanitize 
)

Build the JSON representation of the message.

May return NULL, to indicate no representation. The returned object should be ast_json_unref()'ed.

Parameters
msgMessage to convert to JSON string.
sanitizeSnapshot sanitization callback.
Returns
Newly allocated string with JSON message.
Return values
NULLif JSON format is not supported.

Definition at line 229 of file stasis_message.c.

232{
233 return INVOKE_VIRTUAL(to_json, msg, sanitize);
234}

References INVOKE_VIRTUAL.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), and rtcp_message_handler().

◆ stasis_message_type()

struct stasis_message_type * stasis_message_type ( const struct stasis_message msg)

Get the message type for a stasis_message.

Parameters
msgMessage to type
Returns
Type of msg
Return values
NULLif msg is NULL.
Since
12

Definition at line 187 of file stasis_message.c.

188{
189 if (msg == NULL) {
190 return NULL;
191 }
192 return msg->type;
193}
struct stasis_message_type * type

References NULL, and stasis_message::type.

◆ stasis_message_type_available_formatters()

enum stasis_subscription_message_formatters stasis_message_type_available_formatters ( const struct stasis_message_type message_type)

Get a bitmap of available formatters for a message type.

Parameters
message_typeMessage type
Returns
A bitmap of stasis_subscription_message_formatters
Since
13.25.0
16.2.0

Definition at line 114 of file stasis_message.c.

116{
117 return type->available_formatters;
118}

References type.

Referenced by dispatch_message().

◆ stasis_message_type_create()

enum stasis_message_type_result stasis_message_type_create ( const char *  name,
struct stasis_message_vtable vtable,
struct stasis_message_type **  result 
)

Create a new message type.

stasis_message_type is an AO2 object, so ao2_cleanup() when you're done with it.

Parameters
nameName of the new type.
vtableVirtual table of message methods. May be NULL.
[out]resultThe location where the new message type will be placed
Note
Stasis message type creation may be declined if the message type is disabled
Returns
A stasis_message_type_result enum
Since
12

Definition at line 56 of file stasis_message.c.

59{
61
62 /* Check for declination */
65 }
66
69 if (!type) {
71 }
72 if (!vtable) {
73 /* Null object pattern, FTW! */
75 }
76
77 type->name = ast_strdup(name);
78 if (!type->name) {
81 }
83 type->vtable = vtable;
84 if (vtable->to_json) {
85 type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON;
86 }
87 if (vtable->to_ami) {
88 type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI;
89 }
90 if (vtable->to_event) {
91 type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT;
92 }
94 *result = type;
95
97}
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
static PGresult * result
Definition cel_pgsql.c:84
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition hashtab.c:153
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition stasis.c:2505
static void message_type_dtor(void *obj)
static int message_type_id
static struct stasis_message_vtable null_vtable
struct stasis_message_vtable * vtable
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
Definition stasis.h:264
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
Definition stasis.h:252
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
Definition stasis.h:278

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_t_alloc_options, ast_atomic_fetchadd_int(), ast_hashtab_hash_string(), ast_strdup, message_type_dtor(), message_type_id, name, null_vtable, result, STASIS_MESSAGE_TYPE_DECLINED, stasis_message_type_declined(), STASIS_MESSAGE_TYPE_ERROR, STASIS_MESSAGE_TYPE_SUCCESS, STASIS_SUBSCRIPTION_FORMATTER_AMI, STASIS_SUBSCRIPTION_FORMATTER_EVENT, STASIS_SUBSCRIPTION_FORMATTER_JSON, stasis_message_vtable::to_ami, stasis_message_vtable::to_event, stasis_message_vtable::to_json, type, and stasis_message_type::vtable.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and create_message_types().

◆ stasis_message_type_declined()

int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2505 of file stasis.c.

2506{
2508 char *name_in_declined;
2509 int res;
2510
2511 if (!cfg || !cfg->declined_message_types) {
2512 ao2_cleanup(cfg);
2513 return 0;
2514 }
2515
2516 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2517 res = name_in_declined ? 1 : 0;
2518 ao2_cleanup(name_in_declined);
2519 ao2_ref(cfg, -1);
2520 if (res) {
2521 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2522 }
2523 return res;
2524}
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ast_debug(level,...)
Log a DEBUG message.
struct ao2_container * declined
Definition stasis.c:2394

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_debug, stasis_declined_config::declined, stasis_config::declined_message_types, globals, name, and OBJ_SEARCH_KEY.

Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().

◆ stasis_message_type_hash()

unsigned int stasis_message_type_hash ( const struct stasis_message_type type)

Gets the hash of a given message type.

Parameters
typeThe type to get the hash of.
Returns
The hash
Since
13.24.0

Definition at line 104 of file stasis_message.c.

105{
106 return type->hash;
107}

References type.

Referenced by cache_entry_compute_hash().

◆ stasis_message_type_id()

int stasis_message_type_id ( const struct stasis_message_type type)

Gets the id of a given message type.

Parameters
typeThe type to get the id of.
Returns
The id
Since
17.0.0

Definition at line 109 of file stasis_message.c.

110{
111 return type->id;
112}

References type.

Referenced by dispatch_message(), publish_msg(), stasis_subscription_accept_message_type(), stasis_subscription_decline_message_type(), and subscription_invoke().

◆ stasis_message_type_name()

const char * stasis_message_type_name ( const struct stasis_message_type type)

Gets the name of a given message type.

Parameters
typeThe type to get.
Returns
Name of the type.
Return values
NULLif type is NULL.
Since
12

Definition at line 99 of file stasis_message.c.

100{
101 return type->name;
102}

References type.

Referenced by AST_TEST_DEFINE(), cache_find(), cache_simple(), cache_test_data_id(), caching_topic_exec(), dump_consumer(), print_cache_entry(), send_msg(), stasis_subscription_accept_message_type(), stasis_subscription_decline_message_type(), and statsmaker().

◆ stasis_publish()

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1589 of file stasis.c.

1590{
1591 publish_msg(topic, message, NULL);
1592}
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition stasis.c:1508

References NULL, publish_msg(), and stasis_subscription::topic.

Referenced by aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_refer_notify_transfer_request(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), detect_callback(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), endpoint_state_cb(), extension_state_device_state_cb(), extension_state_presence_state_cb(), extension_state_shutdown(), extension_state_update_sources(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().

◆ stasis_publish_sync()

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1594 of file stasis.c.

1595{
1596 ast_assert(sub != NULL);
1597
1599}
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
struct stasis_topic * topic
Definition stasis.c:751

References ast_assert, NULL, publish_msg(), sub, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

◆ stasis_subscription_accept_formatters()

void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1171 of file stasis.c.

1173{
1174 ast_assert(subscription != NULL);
1175
1176 ao2_lock(subscription->topic);
1177 subscription->accepted_formatters = formatters;
1178 ao2_unlock(subscription->topic);
1179
1180 return;
1181}
#define ao2_lock(a)
Definition astobj2.h:717
enum stasis_subscription_message_formatters accepted_formatters
Definition stasis.c:771

References stasis_subscription::accepted_formatters, ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

◆ stasis_subscription_accept_message_type()

int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1101 of file stasis.c.

1103{
1104 if (!subscription) {
1105 return -1;
1106 }
1107
1108 ast_assert(type != NULL);
1110
1112 /* Filtering is unreliable as this message type is not yet initialized
1113 * so force all messages through.
1114 */
1116 return 0;
1117 }
1118
1119 ao2_lock(subscription->topic);
1121 /* We do this for the same reason as above. The subscription can still operate, so allow
1122 * it to do so by forcing all messages through.
1123 */
1125 }
1126 ao2_unlock(subscription->topic);
1127
1128 return 0;
1129}
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_subscription::@424 accepted_message_types
enum stasis_subscription_message_filter filter
Definition stasis.c:773
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition vector.h:295

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, stasis_subscription::filter, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, stasis_subscription::topic, and type.

Referenced by acl_change_stasis_subscribe(), acl_change_stasis_subscribe(), ast_extension_state_init(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), common_config_load(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), extension_state_device_source_alloc(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_cb_noop()

void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 876 of file stasis.c.

877{
878}

Referenced by build_peer(), and mkintf().

◆ stasis_subscription_decline_message_type()

int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1131 of file stasis.c.

1133{
1134 if (!subscription) {
1135 return -1;
1136 }
1137
1138 ast_assert(type != NULL);
1140
1142 return 0;
1143 }
1144
1145 ao2_lock(subscription->topic);
1147 /* The memory is already allocated so this can't fail */
1149 }
1150 ao2_unlock(subscription->topic);
1151
1152 return 0;
1153}

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), stasis_subscription::topic, and type.

Referenced by AST_TEST_DEFINE().

◆ stasis_subscription_final_message()

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1252 of file stasis.c.

1253{
1254 struct stasis_subscription_change *change;
1255
1257 return 0;
1258 }
1259
1260 change = stasis_message_data(msg);
1261 if (strcmp("Unsubscribe", change->description)) {
1262 return 0;
1263 }
1264
1265 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1266 return 0;
1267 }
1268
1269 return 1;
1270}
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition stasis.c:1247
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
Holds details about changes to subscriptions for the specified topic.
Definition stasis.h:921

References stasis_subscription_change::description, stasis_message_data(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), sub, and stasis_subscription_change::uniqueid.

Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), extension_state_legacy_subscription_change_cb(), generic_agent_devstate_cb(), manager_subscription_change_msg_cb(), message_sink_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().

◆ stasis_subscription_is_done()

int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1196 of file stasis.c.

1197{
1198 if (subscription) {
1199 int ret;
1200
1201 ao2_lock(subscription);
1202 ret = subscription->final_message_rxed;
1203 ao2_unlock(subscription);
1204
1205 return ret;
1206 }
1207
1208 /* Null subscription is about as done as you can get */
1209 return 1;
1210}

References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

◆ stasis_subscription_is_subscribed()

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1228 of file stasis.c.

1229{
1230 if (sub) {
1231 size_t i;
1232 struct stasis_topic *topic = sub->topic;
1233
1234 ao2_lock(topic);
1235 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1236 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1237 ao2_unlock(topic);
1238 return 1;
1239 }
1240 }
1241 ao2_unlock(topic);
1242 }
1243
1244 return 0;
1245}

References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, stasis_topic::subscribers, and stasis_subscription::topic.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ stasis_subscription_join()

void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1183 of file stasis.c.

1184{
1185 if (subscription) {
1186 ao2_lock(subscription);
1187 /* Wait until the processed flag has been set */
1188 while (!subscription->final_message_processed) {
1189 ast_cond_wait(&subscription->join_cond,
1190 ao2_object_get_lockaddr(subscription));
1191 }
1192 ao2_unlock(subscription);
1193 }
1194}
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
ast_cond_t join_cond
Definition stasis.c:760
int final_message_processed
Definition stasis.c:766

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

◆ stasis_subscription_set_congestion_limits()

int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1089 of file stasis.c.

1091{
1092 int res = -1;
1093
1094 if (subscription) {
1095 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1096 low_water, high_water);
1097 }
1098 return res;
1099}
struct ast_taskprocessor * mailbox
Definition stasis.c:753
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

◆ stasis_subscription_set_filter()

int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1155 of file stasis.c.

1157{
1158 if (!subscription) {
1159 return -1;
1160 }
1161
1162 ao2_lock(subscription->topic);
1163 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1164 subscription->filter = filter;
1165 }
1166 ao2_unlock(subscription->topic);
1167
1168 return 0;
1169}

References ao2_lock, ao2_unlock, filter(), stasis_subscription::filter, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), acl_change_stasis_subscribe(), ast_extension_state_init(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), extension_state_device_source_alloc(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_uniqueid()

const char * stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1247 of file stasis.c.

1248{
1249 return sub->uniqueid;
1250}

References sub, and stasis_subscription::uniqueid.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().

◆ stasis_topic_create()

struct stasis_topic * stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
Return values
NULLon error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of
<subsystem>:<functionality>[/<object>] 

Definition at line 684 of file stasis.c.

685{
687}
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition stasis.c:635

References name, and stasis_topic_create_with_detail().

Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_extension_state_init(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), extension_state_alloc(), load_module(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_topic_create_with_detail()

struct stasis_topic * stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
Return values
NULLon error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 635 of file stasis.c.

638{
639 struct stasis_topic *topic;
640 int res = 0;
641
642 if (!name|| !strlen(name) || !detail) {
643 return NULL;
644 }
645 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
646
647 topic = stasis_topic_get(name);
648 if (topic) {
649 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
650 name, detail);
651 return topic;
652 }
653
654 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
655 if (!topic) {
656 return NULL;
657 }
658
660 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
661 if (res) {
662 ao2_ref(topic, -1);
663 return NULL;
664 }
665
666 /* link to the proxy */
667 if (link_topic_proxy(topic, name, detail)) {
668 ao2_ref(topic, -1);
669 return NULL;
670 }
671
672#ifdef AST_DEVMODE
673 topic->statistics = stasis_topic_statistics_create(topic);
674 if (!topic->statistics) {
675 ao2_ref(topic, -1);
676 return NULL;
677 }
678#endif
679 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
680
681 return topic;
682}
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition astobj2.h:407
#define INITIAL_SUBSCRIBERS_MAX
Definition stasis.c:368
static void topic_dtor(void *obj)
Definition stasis.c:500
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition stasis.c:689
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition stasis.c:567
char * name
Definition stasis.c:453
char * detail
Definition stasis.c:456

References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, stasis_topic::detail, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), name, stasis_topic::name, NULL, stasis_topic_get(), stasis_topic::subscribers, topic_dtor(), and stasis_topic::upstream_topics.

Referenced by stasis_topic_create().

◆ stasis_topic_detail()

const char * stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
Return values
NULLif topic is NULL.
Since
12

Definition at line 702 of file stasis.c.

703{
704 if (!topic) {
705 return NULL;
706 }
707 return topic->detail;
708}

References stasis_topic::detail, and NULL.

◆ stasis_topic_get()

struct stasis_topic * stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
Return values
NULLon error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 689 of file stasis.c.

690{
692}
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748

References ao2_weakproxy_find, name, OBJ_SEARCH_KEY, and topic_all.

Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().

◆ stasis_topic_name()

const char * stasis_topic_name ( const struct stasis_topic topic)

◆ stasis_topic_pool_create()

struct stasis_topic_pool * stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
Return values
NULLon failure

Definition at line 1929 of file stasis.c.

1930{
1931 struct stasis_topic_pool *pool;
1932
1934 if (!pool) {
1935 return NULL;
1936 }
1937
1940 if (!pool->pool_container) {
1941 ao2_cleanup(pool);
1942 return NULL;
1943 }
1944
1945#ifdef AO2_DEBUG
1946 {
1947 char *container_name =
1948 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1949 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1950 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1951 }
1952#endif
1953
1954 ao2_ref(pooled_topic, +1);
1955 pool->pool_topic = pooled_topic;
1956
1957 return pool;
1958}
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition stasis.c:694
static void topic_pool_dtor(void *obj)
Definition stasis.c:1840
#define TOPIC_POOL_BUCKETS
Definition stasis.c:371
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition stasis.c:1880
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition stasis.c:1859
struct ao2_container * pool_container
Definition stasis.c:1836
struct stasis_topic * pool_topic
Definition stasis.c:1837

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().

◆ stasis_topic_pool_delete_topic()

void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of
[<pool_topic_name>/]<topic_name> 
Since
13.24
15.6
16.1

Definition at line 1960 of file stasis.c.

1961{
1962 /*
1963 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1964 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1965 * name and search only on <topic_name>.
1966 */
1967 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1968 int pool_topic_name_len = strlen(pool_topic_name);
1969 const char *search_topic_name;
1970
1971 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1972 search_topic_name = topic_name + pool_topic_name_len + 1;
1973 } else {
1974 search_topic_name = topic_name;
1975 }
1976
1977 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1978}
@ OBJ_UNLINK
Definition astobj2.h:1039

References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by bridge_topics_destroy().

◆ stasis_topic_pool_get_topic()

struct stasis_topic * stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Find or create a topic in the pool.

Parameters
poolPool for which to get the topic
topic_nameName of the topic to get
Returns
The already stored or newly allocated topic
Return values
NULLif the topic was not found and could not be allocated

Find or create a topic in the pool.

This returns a borrowed reference: the pool container owns the topic and callers MUST NOT ao2_cleanup() the returned pointer.

To avoid both deadlocks and wasted work we use a per-name "in-flight" topic_pool_entry while a topic is being created:

  • The pool container lock is held only while looking up or inserting the topic_pool_entry for a name.
  • Exactly one thread becomes the creator for a given name and is responsible for allocating the topic and wiring up forwarding.
  • Other threads that race for the same name find the in-flight entry and wait on its condition variable until initialization completes.

Definition at line 1996 of file stasis.c.

1997{
1998 /*
1999 * Lock ordering:
2000 *
2001 * pool->pool_container (AO2 lock)
2002 * → entry->init_lock
2003 * → topic locks (inside stasis_topic_create() /
2004 * stasis_forward_all())
2005 *
2006 * We intentionally do NOT hold the pool container lock while calling
2007 * stasis_topic_create() or stasis_forward_all() to avoid deadlocks with
2008 * other code that may take topic locks first and then need the pool lock.
2009 */
2010 RAII_VAR(struct topic_pool_entry *, entry, NULL, ao2_cleanup);
2011 char *fq = NULL;
2012 int creator = 0;
2013 int ret;
2014
2015 if (!pool || ast_strlen_zero(topic_name)) {
2016 return NULL;
2017 }
2018
2019 /* Creator / waiter split:
2020 *
2021 * - The first thread to create/link an entry for topic_name becomes the
2022 * "creator" and is responsible for creating the underlying stasis
2023 * topic and wiring up forwarding.
2024 *
2025 * - Other threads that find the entry become "waiters"; they block on
2026 * entry->init_cond until either initialization succeeds or fails.
2027 */
2028
2029 /* --- Creator selection under pool container lock --- */
2030 ao2_lock(pool->pool_container);
2031
2032 entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2033 if (!entry) {
2034 entry = topic_pool_entry_alloc(topic_name);
2035 if (!entry) {
2037 return NULL;
2038 }
2039
2040 if (!ao2_link_flags(pool->pool_container, entry, OBJ_NOLOCK)) {
2041 struct topic_pool_entry *other;
2042
2043 other = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2044 if (other) {
2045 struct topic_pool_entry *tmp = entry;
2046
2047 entry = other;
2048 creator = 0;
2050 ao2_ref(tmp, -1);
2051 goto waiter_path;
2052 }
2053
2055 return NULL;
2056 }
2057
2058 creator = 1;
2059 }
2060
2062
2063/* --- Waiter path: wait for creator to finish --- */
2064waiter_path:
2065 if (!creator) {
2066 ast_mutex_lock(&entry->init_lock);
2067 while (!entry->initialized && !entry->failed) {
2068 ast_cond_wait(&entry->init_cond, &entry->init_lock);
2069 }
2070
2071 if (entry->initialized && !entry->failed) {
2072 struct stasis_topic *topic = entry->topic;
2073
2074 if (!topic) {
2075 ast_debug(1, "Pooled topic '%s' marked initialized but topic is NULL\n", entry->name);
2076 ast_mutex_unlock(&entry->init_lock);
2077 return NULL;
2078 }
2079 ast_mutex_unlock(&entry->init_lock);
2080 /* Borrowed reference: container owns the topic */
2081 return topic;
2082 }
2083
2084 ast_mutex_unlock(&entry->init_lock);
2085 return NULL;
2086 }
2087
2088 /* --- Creator path: perform topic creation without pool lock --- */
2089 ast_mutex_lock(&entry->init_lock);
2090 /* Defensive: entry may have been initialized/failed before we acquired init_lock. */
2091 if (entry->initialized || entry->failed) {
2092 struct stasis_topic *topic = entry->initialized ? entry->topic : NULL;
2093 ast_mutex_unlock(&entry->init_lock);
2094 return topic;
2095 }
2096
2097 ret = ast_asprintf(&fq, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
2098 if (ret < 0) {
2099 entry->failed = 1;
2100 goto creator_fail;
2101 }
2102
2103 entry->topic = stasis_topic_create(fq);
2104 ast_free(fq);
2105 fq = NULL;
2106
2107 if (!entry->topic) {
2108 entry->failed = 1;
2109 goto creator_fail;
2110 }
2111
2112 entry->forward = stasis_forward_all(entry->topic, pool->pool_topic);
2113 if (!entry->forward) {
2114 ao2_cleanup(entry->topic);
2115 entry->topic = NULL;
2116 entry->failed = 1;
2117 goto creator_fail;
2118 }
2119
2120 entry->initialized = 1;
2121 ast_cond_broadcast(&entry->init_cond);
2122 ast_mutex_unlock(&entry->init_lock);
2123
2124 return entry->topic; /* borrowed ref */
2125
2126creator_fail:
2127 ast_debug(1, "Failed to create pooled stasis topic '%s/%s'\n", stasis_topic_name(pool->pool_topic), entry->name);
2128 ast_cond_broadcast(&entry->init_cond);
2129 ast_mutex_unlock(&entry->init_lock);
2130
2131 /* Remove failed entry so future callers can retry */
2132 ao2_lock(pool->pool_container);
2133 ao2_unlink(pool->pool_container, entry);
2135
2136 return NULL;
2137}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_cond_broadcast(cond)
Definition lock.h:211
#define ast_mutex_lock(a)
Definition lock.h:196
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition stasis.c:1820
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1656
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
Definition stasis.c:1787
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ao2_cleanup, ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_asprintf, ast_cond_broadcast, ast_cond_wait, ast_debug, ast_free, ast_mutex_lock, ast_mutex_unlock, ast_strlen_zero(), topic_pool_entry::failed, topic_pool_entry::forward, topic_pool_entry::init_cond, topic_pool_entry::init_lock, topic_pool_entry::initialized, topic_pool_entry::name, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().

Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().

◆ stasis_topic_pool_topic_exists()

int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 2139 of file stasis.c.

2140{
2142
2144 if (!topic_pool_entry) {
2145 return 0;
2146 }
2147
2149 return 1;
2150}

References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.

Referenced by ast_bridge_topic_exists(), and ast_publish_device_state_full().

◆ stasis_topic_subscribers()

size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 710 of file stasis.c.

711{
712 return AST_VECTOR_SIZE(&topic->subscribers);
713}

References AST_VECTOR_SIZE, and stasis_topic::subscribers.

Referenced by caching_topic_exec(), handle_show_hint(), handle_show_hints(), and publish_msg().

◆ stasis_topic_uniqueid()

const char * stasis_topic_uniqueid ( const struct stasis_topic topic)

Return the uniqueid of a topic.

Parameters
topicTopic.
Returns
Uniqueid of the topic.
Return values
NULLif topic is NULL.

◆ stasis_unsubscribe()

struct stasis_subscription * stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1049 of file stasis.c.

1050{
1051 /* The subscription may be the last ref to this topic. Hold
1052 * the topic ref open until after the unlock. */
1053 struct stasis_topic *topic;
1054
1055 if (!sub) {
1056 return NULL;
1057 }
1058
1059 topic = ao2_bump(sub->topic);
1060
1061 /* We have to remove the subscription first, to ensure the unsubscribe
1062 * is the final message */
1063 if (topic_remove_subscription(sub->topic, sub) != 0) {
1065 "Internal error: subscription has invalid topic\n");
1066 ao2_cleanup(topic);
1067
1068 return NULL;
1069 }
1070
1071 /* Now let everyone know about the unsubscribe */
1073
1074 /* When all that's done, remove the ref the mailbox has on the sub */
1075 if (sub->mailbox) {
1077 /* Nothing we can do here, the conditional is just to keep
1078 * the compiler happy that we're not ignoring the result. */
1079 }
1080 }
1081
1082 /* Unsubscribing unrefs the subscription */
1084 ao2_cleanup(topic);
1085
1086 return NULL;
1087}
static int sub_cleanup(void *data)
Definition stasis.c:1042
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1754
#define ast_taskprocessor_push(tps, task_exe, datap)

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push, LOG_ERROR, stasis_subscription::mailbox, NULL, send_subscription_unsubscribe(), sub, sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), cc_generic_agent_destructor(), common_config_unload(), destroy_cts(), extension_state_autohints_cleanup(), extension_state_device_source_destroy(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), pbx_extension_state_autohint_remove(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().

◆ stasis_unsubscribe_and_join()

struct stasis_subscription * stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1212 of file stasis.c.

1214{
1215 if (!subscription) {
1216 return NULL;
1217 }
1218
1219 /* Bump refcount to hold it past the unsubscribe */
1220 ao2_ref(subscription, +1);
1221 stasis_unsubscribe(subscription);
1222 stasis_subscription_join(subscription);
1223 /* Now decrement the refcount back */
1224 ao2_cleanup(subscription);
1225 return NULL;
1226}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition stasis.c:1049
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition stasis.c:1183

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), extension_state_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), and unload_module().