Asterisk - The Open Source Telephony Project  GIT-master-44aef04
Data Structures | Macros | Typedefs | Enumerations | Functions
stasis.h File Reference

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation. More...

#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
#include "asterisk/event.h"

Go to the source code of this file.

Data Structures

struct  stasis_cache_update
 Cache update message. More...
 
struct  stasis_message_sanitizer
 Structure containing callbacks for Stasis message sanitization. More...
 
struct  stasis_message_vtable
 Virtual table providing methods for messages. More...
 
struct  stasis_subscription_change
 Holds details about changes to subscriptions for the specified topic. More...
 

Macros

#define STASIS_MESSAGE_TYPE_CLEANUP(name)
 Boiler-plate messaging macro for cleaning up message types. More...
 
#define STASIS_MESSAGE_TYPE_DEFN(name, ...)
 Boiler-plate messaging macro for defining public message types. More...
 
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...)
 Boiler-plate messaging macro for defining local message types. More...
 
#define STASIS_MESSAGE_TYPE_INIT(name)
 Boiler-plate messaging macro for initializing message types. More...
 
#define stasis_subscribe(topic, callback, data)   __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define stasis_subscribe_pool(topic, callback, data)   __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define STASIS_UMOS_MAX   (STASIS_UMOS_ENDPOINT + 1)
 Number of snapshot types. More...
 

Typedefs

typedef struct stasis_message *(* cache_aggregate_calc_fn) (struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
 Callback to calculate the aggregate cache entry. More...
 
typedef void(* cache_aggregate_publish_fn) (struct stasis_topic *topic, struct stasis_message *aggregate)
 Callback to publish the aggregate cache entry message. More...
 
typedef const char *(* snapshot_get_id) (struct stasis_message *message)
 Callback extract a unique identity from a snapshot message. More...
 
typedef void(* stasis_subscription_cb) (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Callback function type for Stasis subscriptions. More...
 

Enumerations

enum  stasis_message_type_result { STASIS_MESSAGE_TYPE_ERROR = -1, STASIS_MESSAGE_TYPE_SUCCESS, STASIS_MESSAGE_TYPE_DECLINED }
 Return code for Stasis message type creation attempts. More...
 
enum  stasis_subscription_message_filter { STASIS_SUBSCRIPTION_FILTER_NONE = 0, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, STASIS_SUBSCRIPTION_FILTER_SELECTIVE }
 Stasis subscription message filters. More...
 
enum  stasis_subscription_message_formatters { STASIS_SUBSCRIPTION_FORMATTER_NONE = 0, STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2 }
 Stasis subscription formatter filters. More...
 
enum  stasis_user_multi_object_snapshot_type { STASIS_UMOS_CHANNEL = 0, STASIS_UMOS_BRIDGE, STASIS_UMOS_ENDPOINT }
 Object type code for multi user object snapshots. More...
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a thread pool. More...
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object to a multi object blob previously created. More...
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis multi object blob. More...
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Create and publish a stasis message blob on a channel with it's snapshot. More...
 
struct stasis_message_typeast_multi_user_event_type (void)
 Message type for custom user defined events with multi object blobs. More...
 
struct stasis_messagestasis_cache_clear_create (struct stasis_message *message)
 A message which instructs the caching topic to remove an entry from its cache. More...
 
struct stasis_message_typestasis_cache_clear_type (void)
 Message type for clearing a message from a stasis cache. More...
 
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache. More...
 
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache. More...
 
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription. More...
 
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity. More...
 
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index. More...
 
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache. More...
 
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity. More...
 
int stasis_cache_init (void)
 
struct stasis_message_typestasis_cache_update_type (void)
 Message type for cache update messages. More...
 
int stasis_caching_accept_message_type (struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
 Indicate to a caching topic that we are interested in a message type. More...
 
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics. More...
 
int stasis_caching_set_filter (struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic. More...
 
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic. More...
 
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded. More...
 
int stasis_config_init (void)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another. More...
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem. More...
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_can_be_ami (struct stasis_message *msg)
 Determine if the given message can be converted to AMI. More...
 
struct stasis_messagestasis_message_create (struct stasis_message_type *type, void *data)
 Create a new message. More...
 
struct stasis_messagestasis_message_create_full (struct stasis_message_type *type, void *data, const struct ast_eid *eid)
 Create a new message for an entity. More...
 
void * stasis_message_data (const struct stasis_message *msg)
 Get the data contained in a message. More...
 
const struct ast_eidstasis_message_eid (const struct stasis_message *msg)
 Get the entity id for a stasis_message. More...
 
const struct timeval * stasis_message_timestamp (const struct stasis_message *msg)
 Get the time when a message was created. More...
 
struct ast_manager_event_blobstasis_message_to_ami (struct stasis_message *msg)
 Build the AMI representation of the message. More...
 
struct ast_eventstasis_message_to_event (struct stasis_message *msg)
 Build the Generic event system representation of the message. More...
 
struct ast_jsonstasis_message_to_json (struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
 Build the JSON representation of the message. More...
 
struct stasis_message_typestasis_message_type (const struct stasis_message *msg)
 Get the message type for a stasis_message. More...
 
enum stasis_subscription_message_formatters stasis_message_type_available_formatters (const struct stasis_message_type *message_type)
 Get a bitmap of available formatters for a message type. More...
 
enum stasis_message_type_result stasis_message_type_create (const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
 Create a new message type. More...
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined. More...
 
unsigned int stasis_message_type_hash (const struct stasis_message_type *type)
 Gets the hash of a given message type. More...
 
int stasis_message_type_id (const struct stasis_message_type *type)
 Gets the id of a given message type. More...
 
const char * stasis_message_type_name (const struct stasis_message_type *type)
 Gets the name of a given message type. More...
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers. More...
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More...
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters. More...
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type. More...
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing. More...
 
struct stasis_message_typestasis_subscription_change_type (void)
 Gets the message type for subscription change notices. More...
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type. More...
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription. More...
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message. More...
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed. More...
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription. More...
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription. More...
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription. More...
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription. More...
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic. More...
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail. More...
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic. More...
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name. More...
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic. More...
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic. More...
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool. More...
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool. More...
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool. More...
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic. More...
 
const char * stasis_topic_uniqueid (const struct stasis_topic *topic)
 Return the uniqueid of a topic. More...
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *subscription)
 Cancel a subscription. More...
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed. More...
 

Detailed Description

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m
Since
12

Definition in file stasis.h.

Macro Definition Documentation

◆ STASIS_MESSAGE_TYPE_CLEANUP

#define STASIS_MESSAGE_TYPE_CLEANUP (   name)
Value:
({ \
ao2_cleanup(_priv_ ## name); \
_priv_ ## name = NULL; \
})
#define NULL
Definition: resample.c:96
static const char name[]
Definition: cdr_mysql.c:74

Boiler-plate messaging macro for cleaning up message types.

Note that if your type is defined in core instead of a loadable module, you should call message type cleanup from an ast_register_cleanup() handler instead of an ast_register_atexit() handler.

The reason is that during an immediate shutdown, loadable modules (which may refer to core message types) are not unloaded. While the atexit handlers are run, there's a window of time where a module subscription might reference a core message type after it's been cleaned up. Which is bad.

Parameters
nameName of message type.
Since
12

Definition at line 1523 of file stasis.h.

Referenced by __unload_module(), aoc_shutdown(), cdr_engine_shutdown(), cleanup_module(), devstate_cleanup(), endpoints_stasis_cleanup(), file_shutdown(), load_module(), local_shutdown(), manager_confbridge_shutdown(), manager_shutdown(), meetme_stasis_cleanup(), mwi_cleanup(), parking_stasis_cleanup(), pbx_shutdown(), pickup_shutdown(), presence_state_engine_cleanup(), rtp_engine_shutdown(), security_stasis_cleanup(), stasis_bridging_cleanup(), stasis_cache_cleanup(), stasis_channels_cleanup(), stasis_cleanup(), stasis_system_cleanup(), and unload_module().

◆ STASIS_MESSAGE_TYPE_DEFN

#define STASIS_MESSAGE_TYPE_DEFN (   name,
  ... 
)

Boiler-plate messaging macro for defining public message types.

.to_ami = foo_to_ami,
.to_json = foo_to_json,
.to_event = foo_to_event,
);
Parameters
nameName of message type.
...Virtual table methods for messages of this type.
Since
12

Definition at line 1448 of file stasis.h.

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL

#define STASIS_MESSAGE_TYPE_DEFN_LOCAL (   name,
  ... 
)

Boiler-plate messaging macro for defining local message types.

.to_ami = foo_to_ami,
.to_json = foo_to_json,
.to_event = foo_to_event,
);
Parameters
nameName of message type.
...Virtual table methods for messages of this type.
Since
12

Definition at line 1475 of file stasis.h.

◆ STASIS_MESSAGE_TYPE_INIT

#define STASIS_MESSAGE_TYPE_INIT (   name)
Value:
({ \
ast_assert(_priv_ ## name == NULL); \
stasis_message_type_create(#name, \
&_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
})
#define NULL
Definition: resample.c:96
static const char name[]
Definition: cdr_mysql.c:74

Boiler-plate messaging macro for initializing message types.

if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
return -1;
}
Parameters
nameName of message type.
Returns
0 if initialization is successful.
Non-zero on failure.
Since
12

Definition at line 1501 of file stasis.h.

Referenced by __init_manager(), ast_aoc_cli_init(), ast_endpoint_stasis_init(), ast_file_init(), ast_local_init(), ast_parking_stasis_init(), ast_pbx_init(), ast_pickup_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), ast_test_init(), devstate_init(), load_module(), manager_confbridge_init(), meetme_stasis_init(), mwi_init(), stasis_cache_init(), and stasis_init().

◆ stasis_subscribe

#define stasis_subscribe (   topic,
  callback,
  data 
)    __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

◆ stasis_subscribe_pool

#define stasis_subscribe_pool (   topic,
  callback,
  data 
)    __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Typedef Documentation

◆ cache_aggregate_calc_fn

typedef struct stasis_message*(* cache_aggregate_calc_fn) (struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)

Callback to calculate the aggregate cache entry.

Since
12.2.0
Parameters
entryCache entry to calculate a new aggregate snapshot.
new_snapshotThe shapshot that is being updated.
Note
Return a ref bumped pointer from stasis_cache_entry_get_aggregate() if a new aggregate could not be calculated because of error.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
New aggregate-snapshot calculated on success. Caller has a reference on return.

Definition at line 1033 of file stasis.h.

◆ cache_aggregate_publish_fn

typedef void(* cache_aggregate_publish_fn) (struct stasis_topic *topic, struct stasis_message *aggregate)

Callback to publish the aggregate cache entry message.

Since
12.2.0

Once an aggregate message is calculated. This callback publishes the message so subscribers will know the new value of an aggregated state.

Parameters
topicThe aggregate message may be published to this topic. It is the topic to which the cache itself is subscribed.
aggregateThe aggregate shapshot message to publish.
Note
It is up to the function to determine if there is a better topic the aggregate message should be published over.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
Nothing

Definition at line 1059 of file stasis.h.

◆ snapshot_get_id

typedef const char*(* snapshot_get_id) (struct stasis_message *message)

Callback extract a unique identity from a snapshot message.

This identity is unique to the underlying object of the snapshot, such as the UniqueId field of a channel.

Parameters
messageMessage to extract id from.
Returns
String representing the snapshot's id.
NULL if the message_type of the message isn't a handled snapshot.
Since
12

Definition at line 1011 of file stasis.h.

◆ stasis_subscription_cb

typedef void(* stasis_subscription_cb) (void *data, struct stasis_subscription *sub, struct stasis_message *message)

Callback function type for Stasis subscriptions.

Parameters
dataData field provided with subscription.
messagePublished message.
Since
12

Definition at line 615 of file stasis.h.

Enumeration Type Documentation

◆ stasis_message_type_result

Return code for Stasis message type creation attempts.

Enumerator
STASIS_MESSAGE_TYPE_ERROR 

Message type was not created due to allocation failure

STASIS_MESSAGE_TYPE_SUCCESS 

Message type was created successfully

STASIS_MESSAGE_TYPE_DECLINED 

Message type was not created due to configuration

Definition at line 288 of file stasis.h.

288  {
289  STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
290  STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
291  STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
292 };

◆ stasis_subscription_message_filter

Stasis subscription message filters.

Enumerator
STASIS_SUBSCRIPTION_FILTER_NONE 

No filter is in place, all messages are raised

STASIS_SUBSCRIPTION_FILTER_FORCED_NONE 

No filter is in place or can be set, all messages are raised

STASIS_SUBSCRIPTION_FILTER_SELECTIVE 

Only messages of allowed message types are raised

Definition at line 297 of file stasis.h.

297  {
298  STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
299  STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
300  STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
301 };

◆ stasis_subscription_message_formatters

Stasis subscription formatter filters.

There should be an entry here for each member of stasis_message_vtable

Since
13.25.0
16.2.0
Enumerator
STASIS_SUBSCRIPTION_FORMATTER_NONE 
STASIS_SUBSCRIPTION_FORMATTER_JSON 

Allow messages with a to_json formatter

STASIS_SUBSCRIPTION_FORMATTER_AMI 

Allow messages with a to_ami formatter

STASIS_SUBSCRIPTION_FORMATTER_EVENT 

Allow messages with a to_event formatter

Definition at line 311 of file stasis.h.

311  {
313  STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
314  STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
315  STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
316 };

Function Documentation

◆ __stasis_subscribe()

struct stasis_subscription* __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
Returns
New stasis_subscription object.
NULL on error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 943 of file stasis.c.

References internal_stasis_subscribe().

Referenced by stasis_message_router_create_internal().

950 {
951  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
952 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857

◆ __stasis_subscribe_pool()

struct stasis_subscription* __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a thread pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
Returns
New stasis_subscription object.
NULL on error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 954 of file stasis.c.

References internal_stasis_subscribe().

Referenced by stasis_message_router_create_internal().

961 {
962  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
963 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857

◆ stasis_cache_clear_create()

struct stasis_message* stasis_cache_clear_create ( struct stasis_message message)

A message which instructs the caching topic to remove an entry from its cache.

Parameters
messageMessage representative of the cache entry that should be cleared. This will become the data held in the stasis_cache_clear message.
Returns
Message which, when sent to a stasis_caching_topic, will clear the item from the cache.
NULL on error.
Since
12

Definition at line 778 of file stasis_cache.c.

References stasis_cache_clear_type(), and stasis_message_create().

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), clear_node_cache(), and remove_device_states_cb().

779 {
780  return stasis_message_create(stasis_cache_clear_type(), id_message);
781 }
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.

◆ stasis_cache_create()

struct stasis_cache* stasis_cache_create ( snapshot_get_id  id_fn)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
Return values
Newcache indexed by id_fn.

Definition at line 360 of file stasis_cache.c.

References NULL, and stasis_cache_create_full().

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), mwi_init(), and stasis_cp_all_create().

361 {
362  return stasis_cache_create_full(id_fn, NULL, NULL);
363 }
#define NULL
Definition: resample.c:96
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334

◆ stasis_cache_create_full()

struct stasis_cache* stasis_cache_create_full ( snapshot_get_id  id_fn,
cache_aggregate_calc_fn  aggregate_calc_fn,
cache_aggregate_publish_fn  aggregate_publish_fn 
)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
aggregate_calc_fnCallback to calculate the aggregate cache entry.
aggregate_publish_fnCallback to publish the aggregate cache entry.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Newcache indexed by id_fn.

Definition at line 334 of file stasis_cache.c.

References stasis_cache::aggregate_calc_fn, stasis_cache::aggregate_publish_fn, AO2_ALLOC_OPT_LOCK_NOLOCK, AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, cache, cache_dtor(), cache_entry_cmp(), cache_entry_hash(), stasis_cache::entries, stasis_cache::id_fn, NULL, and NUM_CACHE_BUCKETS.

Referenced by AST_TEST_DEFINE(), devstate_init(), and stasis_cache_create().

337 {
338  struct stasis_cache *cache;
339 
340  cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
342  if (!cache) {
343  return NULL;
344  }
345 
348  if (!cache->entries) {
349  ao2_cleanup(cache);
350  return NULL;
351  }
352 
353  cache->id_fn = id_fn;
356 
357  return cache;
358 }
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
static int cache_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis_cache.c:288
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
struct ao2_container * cache
Definition: pbx_realtime.c:77
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
snapshot_get_id id_fn
Definition: stasis_cache.c:48
#define NUM_CACHE_BUCKETS
Definition: stasis_cache.c:42
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
static int cache_entry_hash(const void *obj, int flags)
Definition: stasis_cache.c:266
static void cache_dtor(void *obj)
Definition: stasis_cache.c:326

◆ stasis_cache_dump()

struct ao2_container* stasis_cache_dump ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump cached items to a subscription for the ast_eid_default entity.

Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 736 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_dump_by_eid().

Referenced by action_presencestatelist(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), AST_TEST_DEFINE(), asterisk_publication_devicestate_refresh(), asterisk_publication_mwi_refresh(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), endpoints_scrape_cb(), load_module(), unload_module(), and xmpp_init_event_distribution().

737 {
738  return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
739 }
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

◆ stasis_cache_dump_all()

struct ao2_container* stasis_cache_dump_all ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump all entity items from the cache to a subscription.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 757 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache_dump_all_cb(), cache_dump_data::container, cache_dump_data::eid, stasis_cache::entries, NULL, OBJ_MULTIPLE, OBJ_NODATA, stasis_cache_clear_type(), stasis_cache_update_type(), STASIS_MESSAGE_TYPE_DEFN(), type, and cache_dump_data::type.

Referenced by AST_TEST_DEFINE(), cache_cleanup(), and cleanup_module().

758 {
759  struct cache_dump_data cache_dump;
760 
761  ast_assert(cache != NULL);
762  ast_assert(cache->entries != NULL);
763 
764  cache_dump.eid = NULL;
765  cache_dump.type = type;
766  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767  if (!cache_dump.container) {
768  return NULL;
769  }
770 
772  return cache_dump.container;
773 }
static const char type[]
Definition: chan_ooh323.c:109
static int cache_dump_all_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:741
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:650
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47

◆ stasis_cache_dump_by_eid()

struct ao2_container* stasis_cache_dump_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const struct ast_eid eid 
)

Dump cached items to a subscription for a specific entity.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
eidSpecific entity id to retrieve. NULL for aggregate.
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 718 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache_dump_by_eid_cb(), cache_dump_data::container, cache_dump_data::eid, stasis_cache::entries, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by action_devicestatelist(), AST_TEST_DEFINE(), cpg_confchg_cb(), and stasis_cache_dump().

719 {
720  struct cache_dump_data cache_dump;
721 
722  ast_assert(cache != NULL);
723  ast_assert(cache->entries != NULL);
724 
725  cache_dump.eid = eid;
726  cache_dump.type = type;
727  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728  if (!cache_dump.container) {
729  return NULL;
730  }
731 
733  return cache_dump.container;
734 }
static const char type[]
Definition: chan_ooh323.c:109
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:650
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
const struct ast_eid * eid
Definition: stasis_cache.c:694
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:697

◆ stasis_cache_entry_get_aggregate()

struct stasis_message* stasis_cache_entry_get_aggregate ( struct stasis_cache_entry entry)

Get the aggregate cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the aggregate snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Aggregate-snapshotin cache.
NULLif not present.

Definition at line 365 of file stasis_cache.c.

References stasis_cache_entry::aggregate.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

366 {
367  return entry->aggregate;
368 }
struct stasis_message * aggregate
Definition: stasis_cache.c:176

◆ stasis_cache_entry_get_local()

struct stasis_message* stasis_cache_entry_get_local ( struct stasis_cache_entry entry)

Get the local entity's cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the local entity's snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Internal-snapshotin cache.
NULLif not present.

Definition at line 370 of file stasis_cache.c.

References stasis_cache_entry::local.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

371 {
372  return entry->local;
373 }
struct stasis_message * local
Definition: stasis_cache.c:178

◆ stasis_cache_entry_get_remote()

struct stasis_message* stasis_cache_entry_get_remote ( struct stasis_cache_entry entry,
int  idx 
)

Get a remote entity's cache entry snapshot by index.

Since
12.2.0
Parameters
entryCache entry to get a remote entity's snapshot.
idxWhich remote entity's snapshot to get.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Remote-entity-snapshotin cache.
NULLif not present.

Definition at line 375 of file stasis_cache.c.

References AST_VECTOR_GET, AST_VECTOR_SIZE, and NULL.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

376 {
377  if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378  return AST_VECTOR_GET(&entry->remote, idx);
379  }
380  return NULL;
381 }
#define NULL
Definition: resample.c:96
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_cache_get()

struct stasis_message* stasis_cache_get ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve an item from the cache for the ast_eid_default entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Return values
Messagefrom the cache.

Definition at line 686 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_get_by_eid().

Referenced by ast_endpoint_latest_snapshot(), AST_TEST_DEFINE(), get_cached_mwi(), has_voicemail(), presence_state_cached(), unistim_send_mwi_to_peer(), and update_registry().

687 {
688  return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
689 }
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

◆ stasis_cache_get_all()

struct ao2_container* stasis_cache_get_all ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve all matching entity items from the cache.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Return values
Containerof matching items found.

Definition at line 587 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_rdlock, ao2_unlock, ast_assert, cache_entry_dump(), cache_find(), stasis_cache::entries, and NULL.

Referenced by AST_TEST_DEFINE().

588 {
589  struct stasis_cache_entry *cached_entry;
590  struct ao2_container *found;
591 
592  ast_assert(cache != NULL);
593  ast_assert(cache->entries != NULL);
594  ast_assert(id != NULL);
595 
596  if (!type) {
597  return NULL;
598  }
599 
601  if (!found) {
602  return NULL;
603  }
604 
605  ao2_rdlock(cache->entries);
606 
607  cached_entry = cache_find(cache->entries, type, id);
608  if (cached_entry && cache_entry_dump(found, cached_entry)) {
609  ao2_cleanup(found);
610  found = NULL;
611  }
612 
613  ao2_unlock(cache->entries);
614 
615  ao2_cleanup(cached_entry);
616  return found;
617 }
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:650
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
#define ao2_rdlock(a)
Definition: astobj2.h:719
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Generic container type.
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
Definition: stasis_cache.c:173

◆ stasis_cache_get_by_eid()

struct stasis_message* stasis_cache_get_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid 
)

Retrieve an item from the cache for a specific entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
eidSpecific entity id to retrieve. NULL for aggregate.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Messagefrom the cache.

Definition at line 659 of file stasis_cache.c.

References ao2_bump, ao2_cleanup, ao2_rdlock, ao2_unlock, ast_assert, cache_entry_by_eid(), cache_find(), stasis_cache::entries, and NULL.

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), AST_TEST_DEFINE(), check_cache_aggregate(), devstate_cached(), and stasis_cache_get().

660 {
661  struct stasis_cache_entry *cached_entry;
662  struct stasis_message *snapshot = NULL;
663 
664  ast_assert(cache != NULL);
665  ast_assert(cache->entries != NULL);
666  ast_assert(id != NULL);
667 
668  if (!type) {
669  return NULL;
670  }
671 
672  ao2_rdlock(cache->entries);
673 
674  cached_entry = cache_find(cache->entries, type, id);
675  if (cached_entry) {
676  snapshot = cache_entry_by_eid(cached_entry, eid);
677  ao2_bump(snapshot);
678  }
679 
680  ao2_unlock(cache->entries);
681 
682  ao2_cleanup(cached_entry);
683  return snapshot;
684 }
#define ast_assert(a)
Definition: utils.h:650
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_bump(obj)
Definition: astobj2.h:491
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
#define ao2_rdlock(a)
Definition: astobj2.h:719
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
Definition: stasis_cache.c:173

◆ stasis_cache_init()

int stasis_cache_init ( void  )

Definition at line 1008 of file stasis_cache.c.

References ast_register_cleanup(), stasis_cache_cleanup(), stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_INIT.

Referenced by stasis_init().

1009 {
1011 
1013  return -1;
1014  }
1015 
1017  return -1;
1018  }
1019 
1020  return 0;
1021 }
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static void stasis_cache_cleanup(void)

◆ stasis_caching_accept_message_type()

int stasis_caching_accept_message_type ( struct stasis_caching_topic caching_topic,
struct stasis_message_type type 
)

Indicate to a caching topic that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
caching_topicThe caching topic.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 90 of file stasis_cache.c.

References stasis_cache_clear_type(), stasis_subscription_accept_message_type(), stasis_subscription_change_type(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_accept_message_type().

92 {
93  int res;
94 
95  if (!caching_topic) {
96  return -1;
97  }
98 
99  /* We wait to accept the stasis specific message types until now so that by default everything
100  * will flow to us.
101  */
104  res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105 
106  return res;
107 }
struct stasis_subscription * sub
Definition: stasis_cache.c:59
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.

◆ stasis_caching_get_topic()

struct stasis_topic* stasis_caching_get_topic ( struct stasis_caching_topic caching_topic)

Returns the topic of cached events from a caching topics.

Parameters
caching_topicThe caching topic.
Returns
The topic that publishes cache update events, along with passthrough events from the underlying topic.
NULL if caching_topic is NULL.
Since
12

Definition at line 85 of file stasis_cache.c.

References stasis_caching_topic::topic.

Referenced by ast_device_state_topic_cached(), ast_mwi_topic_cached(), ast_presence_state_topic_cached(), AST_TEST_DEFINE(), stasis_cp_single_create(), and stasis_cp_single_topic_cached().

86 {
87  return caching_topic->topic;
88 }
struct stasis_topic * topic
Definition: stasis_cache.c:57

◆ stasis_caching_set_filter()

int stasis_caching_set_filter ( struct stasis_caching_topic caching_topic,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
caching_topicThe caching topic.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 109 of file stasis_cache.c.

References stasis_subscription_set_filter(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_set_filter().

111 {
112  if (!caching_topic) {
113  return -1;
114  }
115  return stasis_subscription_set_filter(caching_topic->sub, filter);
116 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078
struct stasis_subscription * sub
Definition: stasis_cache.c:59
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709

◆ stasis_caching_topic_create()

struct stasis_caching_topic* stasis_caching_topic_create ( struct stasis_topic original_topic,
struct stasis_cache cache 
)

Create a topic which monitors and caches messages from another topic.

The idea is that some topics publish 'snapshots' of some other object's state that should be cached. When these snapshot messages are received, the cache is updated, and a stasis_cache_update() message is forwarded, which has both the original snapshot message and the new message.

The returned object is AO2 managed, so ao2_cleanup() when done with it.

Parameters
original_topicTopic publishing snapshot messages.
cacheBackend cache in which to keep snapshots.
Returns
New topic which changes snapshot messages to stasis_cache_update() messages, and forwards all other messages from the original topic.
NULL on error
Since
12

Definition at line 948 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_container_register(), ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, ast_log, stasis_caching_topic::cache, cache, caching_topic_exec(), stasis_cache::entries, internal_stasis_subscribe(), LOG_ERROR, NULL, stasis_caching_topic::original_topic, print_cache_entry(), stasis_cache::registered, stasis_caching_topic_dtor(), stasis_topic_create(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), devstate_init(), mwi_init(), and stasis_cp_sink_create().

949 {
950  struct stasis_caching_topic *caching_topic;
951  static int caching_id;
952  char *new_name;
953  int ret;
954 
955  ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956  if (ret < 0) {
957  return NULL;
958  }
959 
960  caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962  if (caching_topic == NULL) {
963  ast_free(new_name);
964 
965  return NULL;
966  }
967 
968  caching_topic->topic = stasis_topic_create(new_name);
969  if (caching_topic->topic == NULL) {
970  ao2_ref(caching_topic, -1);
971  ast_free(new_name);
972 
973  return NULL;
974  }
975 
976  ao2_ref(cache, +1);
977  caching_topic->cache = cache;
978  if (!cache->registered) {
979  if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980  ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981  cache->entries, new_name);
982  } else {
983  cache->registered = 1;
984  }
985  }
986  ast_free(new_name);
987 
988  caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989  if (caching_topic->sub == NULL) {
990  ao2_ref(caching_topic, -1);
991 
992  return NULL;
993  }
994 
995  ao2_ref(original_topic, +1);
996  caching_topic->original_topic = original_topic;
997 
998  /* The subscription holds the reference, so no additional ref bump. */
999  return caching_topic;
1000 }
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: stasis_cache.c:833
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
struct stasis_subscription * sub
Definition: stasis_cache.c:59
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct ao2_container * entries
Definition: stasis_cache.c:47
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
#define LOG_ERROR
Definition: logger.h:285
struct stasis_topic * topic
Definition: stasis_cache.c:57
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
#define ast_free(a)
Definition: astmm.h:182
struct ao2_container * cache
Definition: pbx_realtime.c:77
static void stasis_caching_topic_dtor(void *obj)
Definition: stasis_cache.c:62
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
Definition: stasis_cache.c:937

◆ stasis_caching_unsubscribe()

struct stasis_caching_topic* stasis_caching_unsubscribe ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic.

This function returns immediately, so be sure to cleanup when stasis_subscription_final_message() is received.

Parameters
caching_topicCaching topic to unsubscribe
Returns
NULL for convenience
Since
12

Definition at line 119 of file stasis_cache.c.

References ao2_cleanup, ao2_ref, ast_log, LOG_ERROR, NULL, stasis_subscription_is_subscribed(), stasis_unsubscribe(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), stasis_caching_unsubscribe_and_join(), and stasis_cp_single_unsubscribe().

120 {
121  if (!caching_topic) {
122  return NULL;
123  }
124 
125  /*
126  * The subscription may hold the last reference to this caching
127  * topic, but we want to make sure the unsubscribe finishes
128  * before kicking of the caching topic's dtor.
129  */
130  ao2_ref(caching_topic, +1);
131 
132  if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133  /*
134  * Increment the reference to hold on to it past the
135  * unsubscribe. Will be cleaned up in dtor.
136  */
137  ao2_ref(caching_topic->sub, +1);
138  stasis_unsubscribe(caching_topic->sub);
139  } else {
140  ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141  }
142  ao2_cleanup(caching_topic);
143  return NULL;
144 }
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1151
#define NULL
Definition: resample.c:96
struct stasis_subscription * sub
Definition: stasis_cache.c:59
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define LOG_ERROR
Definition: logger.h:285
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:972
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_caching_unsubscribe_and_join()

struct stasis_caching_topic* stasis_caching_unsubscribe_and_join ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.

See stasis_unsubscriben_and_join() for more info on when to use this as opposed to stasis_caching_unsubscribe().

Parameters
caching_topicCaching topic to unsubscribe
Returns
NULL for convenience
Since
12

Definition at line 146 of file stasis_cache.c.

References ao2_cleanup, ao2_ref, NULL, stasis_caching_unsubscribe(), stasis_subscription_join(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), devstate_cleanup(), mwi_cleanup(), and presence_state_engine_cleanup().

147 {
148  if (!caching_topic) {
149  return NULL;
150  }
151 
152  /* Hold a ref past the unsubscribe */
153  ao2_ref(caching_topic, +1);
154  stasis_caching_unsubscribe(caching_topic);
155  stasis_subscription_join(caching_topic->sub);
156  ao2_cleanup(caching_topic);
157  return NULL;
158 }
#define NULL
Definition: resample.c:96
struct stasis_subscription * sub
Definition: stasis_cache.c:59
#define ao2_ref(o, delta)
Definition: astobj2.h:464
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119

◆ stasis_config_init()

int stasis_config_init ( void  )

◆ stasis_forward_all()

struct stasis_forward* stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
NULL on error.
Since
12

Definition at line 1577 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_add_subscription(), and topic_lock_both.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().

1579 {
1580  int res;
1581  size_t idx;
1582  struct stasis_forward *forward;
1583 
1584  if (!from_topic || !to_topic) {
1585  return NULL;
1586  }
1587 
1588  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1589  if (!forward) {
1590  return NULL;
1591  }
1592 
1593  /* Forwards to ourselves are implicit. */
1594  if (to_topic == from_topic) {
1595  return forward;
1596  }
1597 
1598  forward->from_topic = ao2_bump(from_topic);
1599  forward->to_topic = ao2_bump(to_topic);
1600 
1601  topic_lock_both(to_topic, from_topic);
1602  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1603  if (res != 0) {
1604  ao2_unlock(from_topic);
1605  ao2_unlock(to_topic);
1606  ao2_ref(forward, -1);
1607  return NULL;
1608  }
1609 
1610  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1611  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1612  }
1613  ao2_unlock(from_topic);
1614  ao2_unlock(to_topic);
1615 
1616  return forward;
1617 }
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
static void forward_dtor(void *obj)
Definition: stasis.c:1537
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct stasis_topic * from_topic
Definition: stasis.c:1532
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * to_topic
Definition: stasis.c:1534
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1202
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
Forwarding information.
Definition: stasis.c:1530
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_forward_cancel()

struct stasis_forward* stasis_forward_cancel ( struct stasis_forward forward)

Definition at line 1547 of file stasis.c.

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_lock_both, and topic_remove_subscription().

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), forwards_unsubscribe(), load_general_config(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), and unload_module().

1548 {
1549  int idx;
1550  struct stasis_topic *from;
1551  struct stasis_topic *to;
1552 
1553  if (!forward) {
1554  return NULL;
1555  }
1556 
1557  from = forward->from_topic;
1558  to = forward->to_topic;
1559 
1560  if (from && to) {
1561  topic_lock_both(to, from);
1562  AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1564 
1565  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1566  topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1567  }
1568  ao2_unlock(from);
1569  ao2_unlock(to);
1570  }
1571 
1572  ao2_cleanup(forward);
1573 
1574  return NULL;
1575 }
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1230
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:585
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct stasis_topic * from_topic
Definition: stasis.c:1532
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
struct stasis_topic * to_topic
Definition: stasis.c:1534
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_init()

int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3060 of file stasis.c.

References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_threadpool_options::auto_increment, declined_handler(), stasis_config::declined_message_types, FLDSET, globals, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, NULL, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), stasis_config::threadpool_options, TOPIC_ALL_BUCKETS, and ast_threadpool_options::version.

Referenced by asterisk_daemon().

3061 {
3062  struct stasis_config *cfg;
3063  int cache_init;
3064  struct ast_threadpool_options threadpool_opts = { 0, };
3065 #ifdef AST_DEVMODE
3066  struct ao2_container *subscription_stats;
3067  struct ao2_container *topic_stats;
3068 #endif
3069 
3070  /* Be sure the types are cleaned up after the message bus */
3072 
3073  if (aco_info_init(&cfg_info)) {
3074  return -1;
3075  }
3076 
3077  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3079  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3081  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3082  INT_MAX);
3083  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3085  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3086  INT_MAX);
3087  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3089  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3090  INT_MAX);
3091 
3092  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3093  struct stasis_config *default_cfg = stasis_config_alloc();
3094 
3095  if (!default_cfg) {
3096  return -1;
3097  }
3098 
3099  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3100  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3101  ao2_ref(default_cfg, -1);
3102 
3103  return -1;
3104  }
3105 
3106  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3107  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3108  ao2_ref(default_cfg, -1);
3109 
3110  return -1;
3111  }
3112 
3113  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3114  ao2_global_obj_replace_unref(globals, default_cfg);
3115  cfg = default_cfg;
3116  } else {
3117  cfg = ao2_global_obj_ref(globals);
3118  if (!cfg) {
3119  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3120 
3121  return -1;
3122  }
3123  }
3124 
3125  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3126  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3127  threadpool_opts.auto_increment = 1;
3128  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3129  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3130  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3131  ao2_ref(cfg, -1);
3132  if (!threadpool) {
3133  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3134 
3135  return -1;
3136  }
3137 
3138  cache_init = stasis_cache_init();
3139  if (cache_init != 0) {
3140  return -1;
3141  }
3142 
3144  return -1;
3145  }
3147  return -1;
3148  }
3149 
3151  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3152  if (!topic_all) {
3153  return -1;
3154  }
3155 
3157  return -1;
3158  }
3159 
3160 #ifdef AST_DEVMODE
3161  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3162  * topic or subscripton.
3163  */
3164  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3165  subscription_statistics_hash, 0, subscription_statistics_cmp);
3166  if (!subscription_stats) {
3167  return -1;
3168  }
3169  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3170  ao2_cleanup(subscription_stats);
3171 
3172  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3173  topic_statistics_hash, 0, topic_statistics_cmp);
3174  if (!topic_stats) {
3175  return -1;
3176  }
3177  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3178  ao2_cleanup(topic_stats);
3179  if (!topic_stats) {
3180  return -1;
3181  }
3182 
3183  AST_VECTOR_INIT(&message_type_statistics, 0);
3184 
3185  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3186  return -1;
3187  }
3188 #endif
3189 
3190  return 0;
3191 }
static struct aco_type threadpool_option
Definition: stasis.c:2201
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3041
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
Threadpool configuration options.
Definition: stasis.c:2185
struct aco_type * declined_options[]
Definition: stasis.c:2220
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2198
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static struct console_pvt globals
Their was an error and no changes were applied.
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2475
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static void * stasis_config_alloc(void)
Definition: stasis.c:2252
struct ao2_container * topic_all
Definition: stasis.c:395
#define LOG_NOTICE
Definition: logger.h:263
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2212
int stasis_cache_init(void)
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define ao2_global_obj_replace_unref(holder, obj)
Definition: astobj2.h:908
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2196
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
Generic container type.
static struct aco_type * threadpool_options[]
Definition: stasis.c:2209
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:307
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2303
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:317

◆ stasis_log_bad_type_access()

void stasis_log_bad_type_access ( const char *  name)

Definition at line 1939 of file stasis.c.

References ast_log, LOG_ERROR, and stasis_message_type_declined().

1940 {
1941 #ifdef AST_DEVMODE
1943  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1944  }
1945 #endif
1946 }
#define ast_log
Definition: astobj2.c:42
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2282
#define LOG_ERROR
Definition: logger.h:285
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_message_can_be_ami()

int stasis_message_can_be_ami ( struct stasis_message msg)

Determine if the given message can be converted to AMI.

Parameters
msgMessage to see if can be converted to AMI.
Return values
0Cannot be converted
non-zeroCan be converted

Definition at line 251 of file stasis_message.c.

References HAS_VIRTUAL, and to_ami().

Referenced by manager_default_msg_cb().

252 {
253  return HAS_VIRTUAL(to_ami, msg);
254 }
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define HAS_VIRTUAL(fn, msg)

◆ stasis_message_create()

struct stasis_message* stasis_message_create ( struct stasis_message_type type,
void *  data 
)

Create a new message.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters
typeType of the message
dataImmutable data that is the actual contents of the message
Returns
New message
NULL on error
Since
12

Definition at line 174 of file stasis_message.c.

References ast_eid_default, and stasis_message_create_full().

Referenced by aoc_publish_blob(), ast_bridge_blob_create(), ast_bridge_blob_create_from_snapshots(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cdr_engine_term(), ast_channel_publish_dial_internal(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_endpoint_blob_create(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_mwi_blob_create(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cc_publish(), cdr_prop_write(), cdr_read(), cdr_write(), create_channel_blob_message(), create_endpoint_snapshot_message(), endpoint_publish_snapshot(), forkcdr_exec(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), presence_state_event(), publish_acl_change(), publish_app_cdr_message(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), send_call_pickup_stasis_message(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_user_event(), stasis_cache_clear_create(), stasis_test_message_create(), stun_monitor_request(), and update_create().

175 {
176  return stasis_message_create_full(type, data, &ast_eid_default);
177 }
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

◆ stasis_message_create_full()

struct stasis_message* stasis_message_create_full ( struct stasis_message_type type,
void *  data,
const struct ast_eid eid 
)

Create a new message for an entity.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters
typeType of the message
dataImmutable data that is the actual contents of the message
eidWhat entity originated this message. (NULL for aggregate)
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Newmessage

Definition at line 140 of file stasis_message.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_ref, ao2_t_alloc_options, ast_tvnow(), stasis_message::data, stasis_message::eid, stasis_message::eid_ptr, stasis_message_type::name, NULL, stasis_message_dtor(), stasis_message::timestamp, type, and stasis_message::type.

Referenced by ast_publish_device_state_full(), AST_TEST_DEFINE(), cache_test_message_create_full(), create_foo_type_message(), device_state_aggregate_calc(), mwi_state_create_message(), and stasis_message_create().

141 {
142  struct stasis_message *message;
143 
144  if (type == NULL || data == NULL) {
145  return NULL;
146  }
147 
148  message = ao2_t_alloc_options(sizeof(*message), stasis_message_dtor,
150  if (message == NULL) {
151  return NULL;
152  }
153 
154  message->timestamp = ast_tvnow();
155  /*
156  * XXX Normal ao2 ref counting rules says we should increment the message
157  * type ref here and decrement it in stasis_message_dtor(). However, the
158  * stasis message could be cached and legitimately cause the type ref count
159  * to hit the excessive ref count assertion. Since the message type
160  * practically has to be a global object anyway, we can get away with not
161  * holding a ref in the stasis message.
162  */
163  message->type = type;
164  ao2_ref(data, +1);
165  message->data = data;
166  if (eid) {
167  message->eid_ptr = &message->eid;
168  message->eid = *eid;
169  }
170 
171  return message;
172 }
static const char type[]
Definition: chan_ooh323.c:109
struct ast_eid eid
static void stasis_message_dtor(void *obj)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
Definition: astobj2.h:404
#define NULL
Definition: resample.c:96
struct stasis_message_type * type
struct timeval timestamp
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const struct ast_eid * eid_ptr

◆ stasis_message_data()

void* stasis_message_data ( const struct stasis_message msg)

Get the data contained in a message.

Parameters
msgMessage.
Returns
Immutable data pointer
NULL if msg is NULL.
Since
12

Definition at line 195 of file stasis_message.c.

References stasis_message::data, and NULL.

Referenced by agent_login_to_ami(), agent_logoff_to_ami(), agi_channel_to_ami(), aoc_to_ami(), appcdr_callback(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), ast_bridge_merge_message_to_json(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_channel_entered_bridge_to_json(), ast_channel_left_bridge_to_json(), ast_delete_mwi_state_full(), ast_endpoint_latest_snapshot(), ast_mwi_publish_by_mailbox(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), attended_transfer_to_ami(), attended_transfer_to_json(), blind_transfer_to_ami(), blind_transfer_to_json(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_cb(), bridge_merge_handler(), bridge_snapshot_update(), cache_test_aggregate_calc_fn(), cache_test_data_id(), cache_update(), cache_update_cb(), caching_topic_exec(), call_forwarded_handler(), call_pickup_to_ami(), cc_available_to_ami(), cc_callerrecalling_to_ami(), cc_callerstartmonitoring_to_ami(), cc_callerstopmonitoring_to_ami(), cc_failure_to_ami(), cc_monitorfailed_to_ami(), cc_offertimerstart_to_ami(), cc_recallcomplete_to_ami(), cc_requestacknowledged_to_ami(), cc_requested_to_ami(), cdr_prop_write_callback(), cdr_read_callback(), cdr_write_callback(), cel_attended_transfer_cb(), cel_blind_transfer_cb(), cel_bridge_enter_cb(), cel_bridge_leave_cb(), cel_dial_cb(), cel_generic_cb(), cel_local_cb(), cel_parking_cb(), cel_pickup_cb(), cel_snapshot_update_cb(), channel_blob_to_json(), channel_chanspy_start_cb(), channel_chanspy_stop_cb(), channel_dial_cb(), channel_dtmf_begin_cb(), channel_dtmf_end_cb(), channel_enter_cb(), channel_fax_cb(), channel_hangup_handler_cb(), channel_hangup_request_cb(), channel_hold_cb(), channel_leave_cb(), channel_mixmonitor_mute_cb(), channel_mixmonitor_start_cb(), channel_mixmonitor_stop_cb(), channel_moh_start_cb(), channel_moh_stop_cb(), channel_monitor_start_cb(), channel_monitor_stop_cb(), channel_snapshot_update(), channel_unhold_cb(), check_cache_aggregate(), conf_send_event_to_participants(), confbridge_atxfer_cb(), confbridge_publish_manager_event(), confbridge_talking_cb(), consumer_exec(), contactstatus_to_ami(), contactstatus_to_json(), corosync_ping_to_event(), dahdichannel_to_ami(), device_state_aggregate_calc(), device_state_cb(), device_state_get_id(), devstate_cached(), devstate_change_cb(), devstate_to_ami(), devstate_to_event(), dial_to_json(), dtmf_end_to_json(), dump_cache_load(), dump_cache_unload(), dump_consumer(), endpoint_cache_clear(), endpoint_snapshot_get_id(), endpoints_scrape_cb(), explicit_publish_cb(), fake_ami(), fake_json(), find_route(), forkcdr_callback(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), get_bool_header(), get_cached_mwi(), handle_attended_transfer(), handle_blind_transfer(), handle_bridge_enter(), handle_bridge_enter_message(), handle_bridge_leave_message(), handle_channel_snapshot_update_message(), handle_dial_message(), handle_hangup(), handle_hint_change_message_type(), handle_local_optimization_begin(), handle_local_optimization_end(), handle_masquerade(), handle_mwi_state(), handle_parked_call_message(), has_voicemail(), hold_to_json(), implicit_publish_cb(), is_msg(), local_message_to_ami(), manager_generic_msg_cb(), meetme_stasis_cb(), moh_post_start(), moh_post_stop(), multi_user_event_to_ami(), multi_user_event_to_json(), mwi_app_event_cb(), mwi_event_cb(), mwi_startup_event_cb(), mwi_state_get_id(), mwi_to_event(), mwi_update_cb(), park_announce_update_cb(), parker_update_cb(), parking_event_cb(), peerstatus_to_ami(), peerstatus_to_json(), playback_to_json(), presence_state_cached(), presence_state_cb(), presence_state_get_id(), presence_state_to_ami(), queue_agent_cb(), queue_channel_to_ami(), queue_member_to_ami(), queue_multi_channel_to_ami(), recording_to_json(), refer_progress_bridge(), remove_device_states_cb(), rtcp_report_to_ami(), rtcp_report_to_json(), security_event_to_ami(), security_stasis_cb(), session_timeout_to_ami(), startup_event_cb(), stasis_end_to_json(), stasis_start_to_json(), stasis_state_subscriber_data(), stasis_subscription_final_message(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_endpoint_update_handler(), subscription_persistence_event_cb(), system_registry_to_ami(), talking_start_to_ami(), talking_stop_to_ami(), unhold_to_json(), unistim_send_mwi_to_peer(), update_registry(), updates(), varset_to_ami(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

196 {
197  if (msg == NULL) {
198  return NULL;
199  }
200  return msg->data;
201 }
#define NULL
Definition: resample.c:96

◆ stasis_message_eid()

const struct ast_eid* stasis_message_eid ( const struct stasis_message msg)

Get the entity id for a stasis_message.

Since
12.2.0
Parameters
msgMessage to get eid.
Return values
Entityid of msg

Definition at line 179 of file stasis_message.c.

References stasis_message::eid_ptr, and NULL.

Referenced by AST_TEST_DEFINE(), cache_entry_by_eid(), cache_entry_create(), cache_remove(), cache_udpate(), caching_topic_exec(), and clear_node_cache().

180 {
181  if (msg == NULL) {
182  return NULL;
183  }
184  return msg->eid_ptr;
185 }
#define NULL
Definition: resample.c:96
const struct ast_eid * eid_ptr

◆ stasis_message_timestamp()

const struct timeval* stasis_message_timestamp ( const struct stasis_message msg)

◆ stasis_message_to_ami()

struct ast_manager_event_blob* stasis_message_to_ami ( struct stasis_message msg)

Build the AMI representation of the message.

May return NULL, to indicate no representation. The returned object should be ao2_cleanup()'ed.

Parameters
msgMessage to convert to AMI.
Returns
NULL on error.
NULL if AMI format is not supported.

Definition at line 224 of file stasis_message.c.

References INVOKE_VIRTUAL, and to_ami().

Referenced by action_devicestatelist(), action_presencestatelist(), AST_TEST_DEFINE(), and manager_default_msg_cb().

225 {
226  return INVOKE_VIRTUAL(to_ami, msg);
227 }
#define INVOKE_VIRTUAL(fn,...)
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)

◆ stasis_message_to_event()

struct ast_event* stasis_message_to_event ( struct stasis_message msg)

Build the Generic event system representation of the message.

May return NULL, to indicate no representation. The returned object should be disposed of via ast_event_destroy.

Parameters
msgMessage to convert to AMI.
Returns
NULL on error.
NULL if AMI format is not supported.

Definition at line 236 of file stasis_message.c.

References INVOKE_VIRTUAL.

Referenced by publish_to_corosync().

237 {
238  return INVOKE_VIRTUAL(to_event, msg);
239 }
#define INVOKE_VIRTUAL(fn,...)

◆ stasis_message_to_json()

struct ast_json* stasis_message_to_json ( struct stasis_message msg,
struct stasis_message_sanitizer sanitize 
)

Build the JSON representation of the message.

May return NULL, to indicate no representation. The returned object should be ast_json_unref()'ed.

Parameters
msgMessage to convert to JSON string.
sanitizeSnapshot sanitization callback.
Returns
Newly allocated string with JSON message.
NULL on error.
NULL if JSON format is not supported.

Definition at line 229 of file stasis_message.c.

References INVOKE_VIRTUAL.

Referenced by AST_TEST_DEFINE(), rtcp_message_handler(), and sub_default_handler().

232 {
233  return INVOKE_VIRTUAL(to_json, msg, sanitize);
234 }
#define INVOKE_VIRTUAL(fn,...)

◆ stasis_message_type()

struct stasis_message_type* stasis_message_type ( const struct stasis_message msg)

Get the message type for a stasis_message.

Parameters
msgMessage to type
Returns
Type of msg
NULL if msg is NULL.
Since
12

Definition at line 187 of file stasis_message.c.

References NULL, and stasis_message::type.

Referenced by acl_change_stasis_cb(), appcdr_callback(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), cache_entry_create(), cache_put(), cache_simple(), cache_test_aggregate_calc_fn(), cache_test_data_id(), cache_update(), caching_topic_exec(), cdr_prop_write_callback(), cdr_read_callback(), cdr_write_callback(), conf_send_event_to_participants(), confbridge_publish_manager_event(), device_state_cb(), device_state_get_id(), devstate_change_cb(), dispatch_message(), dump_consumer(), endpoint_snapshot_get_id(), find_route(), forkcdr_callback(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), handle_hint_change_message_type(), is_msg(), local_message_to_ami(), meetme_stasis_cb(), message_sink_cb(), mwi_event_cb(), mwi_startup_event_cb(), mwi_stasis_cb(), mwi_state_get_id(), mwi_update_cb(), network_change_stasis_cb(), pack_bridge_and_channels(), park_announce_update_cb(), parker_update_cb(), parking_event_cb(), presence_state_cb(), presence_state_get_id(), publish_msg(), queue_agent_cb(), refer_progress_bridge(), reload_module(), rtcp_report_to_ami(), rtp_topic_handler(), security_event_to_ami(), security_stasis_cb(), startup_event_cb(), stasis_subscription_final_message(), statsmaker(), sub_default_handler(), sub_endpoint_update_handler(), subscription_invoke(), subscription_persistence_event_cb(), update_create(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

188 {
189  if (msg == NULL) {
190  return NULL;
191  }
192  return msg->type;
193 }
#define NULL
Definition: resample.c:96
struct stasis_message_type * type

◆ stasis_message_type_available_formatters()

enum stasis_subscription_message_formatters stasis_message_type_available_formatters ( const struct stasis_message_type message_type)

Get a bitmap of available formatters for a message type.

Parameters
message_typeMessage type
Returns
A bitmap of stasis_subscription_message_formatters
Since
13.25.0
16.2.0

Definition at line 114 of file stasis_message.c.

References stasis_message_type::available_formatters.

Referenced by dispatch_message().

116 {
117  return type->available_formatters;
118 }
static const char type[]
Definition: chan_ooh323.c:109

◆ stasis_message_type_create()

enum stasis_message_type_result stasis_message_type_create ( const char *  name,
struct stasis_message_vtable vtable,
struct stasis_message_type **  result 
)

Create a new message type.

stasis_message_type is an AO2 object, so ao2_cleanup() when you're done with it.

Parameters
nameName of the new type.
vtableVirtual table of message methods. May be NULL.
[out]resultThe location where the new message type will be placed
Note
Stasis message type creation may be declined if the message type is disabled
Returns
A stasis_message_type_result enum
Since
12

Definition at line 56 of file stasis_message.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_t_alloc_options, ast_atomic_fetchadd_int(), ast_hashtab_hash_string(), ast_strdup, stasis_message_type::available_formatters, stasis_message_type::hash, stasis_message_type::id, message_type_dtor(), message_type_id, stasis_message_type::name, null_vtable, STASIS_MESSAGE_TYPE_DECLINED, stasis_message_type_declined(), STASIS_MESSAGE_TYPE_ERROR, STASIS_MESSAGE_TYPE_SUCCESS, STASIS_SUBSCRIPTION_FORMATTER_AMI, STASIS_SUBSCRIPTION_FORMATTER_EVENT, STASIS_SUBSCRIPTION_FORMATTER_JSON, stasis_message_vtable::to_ami, stasis_message_vtable::to_event, stasis_message_vtable::to_json, type, and stasis_message_type::vtable.

Referenced by AST_TEST_DEFINE(), and create_message_types().

59 {
60  struct stasis_message_type *type;
61 
62  /* Check for declination */
65  }
66 
67  type = ao2_t_alloc_options(sizeof(*type), message_type_dtor,
69  if (!type) {
71  }
72  if (!vtable) {
73  /* Null object pattern, FTW! */
74  vtable = &null_vtable;
75  }
76 
77  type->name = ast_strdup(name);
78  if (!type->name) {
79  ao2_cleanup(type);
81  }
83  type->vtable = vtable;
84  if (vtable->to_json) {
86  }
87  if (vtable->to_ami) {
89  }
90  if (vtable->to_event) {
92  }
94  *result = type;
95 
97 }
static const char type[]
Definition: chan_ooh323.c:109
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2282
static int message_type_id
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
Definition: astobj2.h:404
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
Definition: stasis.h:281
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
Definition: stasis.h:253
static struct stasis_message_vtable null_vtable
static void message_type_dtor(void *obj)
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
Definition: stasis.h:266
static const char name[]
Definition: cdr_mysql.c:74
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition: hashtab.c:153
struct stasis_message_vtable * vtable
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
enum stasis_subscription_message_formatters available_formatters

◆ stasis_message_type_declined()

int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2282 of file stasis.c.

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_log, stasis_declined_config::declined, stasis_config::declined_message_types, globals, LOG_NOTICE, and OBJ_SEARCH_KEY.

Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().

2283 {
2284  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2285  char *name_in_declined;
2286  int res;
2287 
2288  if (!cfg || !cfg->declined_message_types) {
2289  ao2_cleanup(cfg);
2290  return 0;
2291  }
2292 
2293  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2294  res = name_in_declined ? 1 : 0;
2295  ao2_cleanup(name_in_declined);
2296  ao2_ref(cfg, -1);
2297  if (res) {
2298  ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2299  }
2300  return res;
2301 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
struct ao2_container * declined
Definition: stasis.c:2181
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2198
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static struct console_pvt globals
#define LOG_NOTICE
Definition: logger.h:263
static const char name[]
Definition: cdr_mysql.c:74
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_message_type_hash()

unsigned int stasis_message_type_hash ( const struct stasis_message_type type)

Gets the hash of a given message type.

Parameters
typeThe type to get the hash of.
Returns
The hash
Since
13.24.0

Definition at line 104 of file stasis_message.c.

References stasis_message_type::hash.

Referenced by cache_entry_compute_hash().

105 {
106  return type->hash;
107 }

◆ stasis_message_type_id()

int stasis_message_type_id ( const struct stasis_message_type type)

Gets the id of a given message type.

Parameters
typeThe type to get the id of.
Returns
The id
Since
17.0.0

Definition at line 109 of file stasis_message.c.

References stasis_message_type::id.

Referenced by dispatch_message(), publish_msg(), stasis_subscription_accept_message_type(), stasis_subscription_decline_message_type(), and subscription_invoke().

110 {
111  return type->id;
112 }

◆ stasis_message_type_name()

const char* stasis_message_type_name ( const struct stasis_message_type type)

Gets the name of a given message type.

Parameters
typeThe type to get.
Returns
Name of the type.
NULL if type is NULL.
Since
12

Definition at line 99 of file stasis_message.c.

References stasis_message_type::name.

Referenced by AST_TEST_DEFINE(), cache_find(), cache_simple(), cache_test_data_id(), caching_topic_exec(), dump_consumer(), print_cache_entry(), send_msg(), stasis_subscription_accept_message_type(), stasis_subscription_decline_message_type(), and statsmaker().

100 {
101  return type->name;
102 }

◆ stasis_publish()

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1510 of file stasis.c.

References NULL, and publish_msg().

Referenced by aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_monitor_start(), ast_monitor_stop(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), phase_e_handler(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().

1511 {
1512  publish_msg(topic, message, NULL);
1513 }
#define NULL
Definition: resample.c:96
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1431

◆ stasis_publish_sync()

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subpscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1515 of file stasis.c.

References ast_assert, NULL, publish_msg(), and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

1516 {
1517  ast_assert(sub != NULL);
1518 
1519  publish_msg(sub->topic, message, sub);
1520 }
struct stasis_topic * topic
Definition: stasis.c:684
#define ast_assert(a)
Definition: utils.h:650
#define NULL
Definition: resample.c:96
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1431

◆ stasis_subscription_accept_formatters()

void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1094 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

1096 {
1097  ast_assert(subscription != NULL);
1098 
1099  ao2_lock(subscription->topic);
1100  subscription->accepted_formatters = formatters;
1101  ao2_unlock(subscription->topic);
1102 
1103  return;
1104 }
struct stasis_topic * topic
Definition: stasis.c:684
#define ast_assert(a)
Definition: utils.h:650
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_subscription_accept_message_type()

int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1024 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), add_peer_mwi_subs(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

1026 {
1027  if (!subscription) {
1028  return -1;
1029  }
1030 
1031  ast_assert(type != NULL);
1033 
1034  if (!type || !stasis_message_type_name(type)) {
1035  /* Filtering is unreliable as this message type is not yet initialized
1036  * so force all messages through.
1037  */
1038  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1039  return 0;
1040  }
1041 
1042  ao2_lock(subscription->topic);
1043  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1044  /* We do this for the same reason as above. The subscription can still operate, so allow
1045  * it to do so by forcing all messages through.
1046  */
1047  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1048  }
1049  ao2_unlock(subscription->topic);
1050 
1051  return 0;
1052 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:684
#define ast_assert(a)
Definition: utils.h:650
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284

◆ stasis_subscription_cb_noop()

void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 810 of file stasis.c.

References ao2_alloc, ao2_cleanup, ao2_global_obj_ref, ao2_link, ao2_ref, ast_str_container_alloc, make_ari_stubs::file, NULL, RAII_VAR, statistics(), sub, and stasis_subscription::uniqueid.

Referenced by build_gateway(), build_peer(), and mkintf().

811 {
812 }

◆ stasis_subscription_decline_message_type()

int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1054 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE().

1056 {
1057  if (!subscription) {
1058  return -1;
1059  }
1060 
1061  ast_assert(type != NULL);
1063 
1064  if (!type || !stasis_message_type_name(type)) {
1065  return 0;
1066  }
1067 
1068  ao2_lock(subscription->topic);
1069  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1070  /* The memory is already allocated so this can't fail */
1071  AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
1072  }
1073  ao2_unlock(subscription->topic);
1074 
1075  return 0;
1076 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:684
#define ast_assert(a)
Definition: utils.h:650
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_subscription_final_message()

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1175 of file stasis.c.

References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.

Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), endpoint_subscription_change(), generic_agent_devstate_cb(), message_sink_cb(), mwi_event_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().

1176 {
1177  struct stasis_subscription_change *change;
1178 
1180  return 0;
1181  }
1182 
1183  change = stasis_message_data(msg);
1184  if (strcmp("Unsubscribe", change->description)) {
1185  return 0;
1186  }
1187 
1188  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1189  return 0;
1190  }
1191 
1192  return 1;
1193 }
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1170
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.

◆ stasis_subscription_is_done()

int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1119 of file stasis.c.

References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

1120 {
1121  if (subscription) {
1122  int ret;
1123 
1124  ao2_lock(subscription);
1125  ret = subscription->final_message_rxed;
1126  ao2_unlock(subscription);
1127 
1128  return ret;
1129  }
1130 
1131  /* Null subscription is about as done as you can get */
1132  return 1;
1133 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
int final_message_rxed
Definition: stasis.c:696

◆ stasis_subscription_is_subscribed()

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1151 of file stasis.c.

References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, and stasis_subscription::topic.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

1152 {
1153  if (sub) {
1154  size_t i;
1155  struct stasis_topic *topic = sub->topic;
1156 
1157  ao2_lock(topic);
1158  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1159  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1160  ao2_unlock(topic);
1161  return 1;
1162  }
1163  }
1164  ao2_unlock(topic);
1165  }
1166 
1167  return 0;
1168 }
struct stasis_topic * topic
Definition: stasis.c:684
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_forward * sub
Definition: res_corosync.c:240
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_subscription_join()

void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1106 of file stasis.c.

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

1107 {
1108  if (subscription) {
1109  ao2_lock(subscription);
1110  /* Wait until the processed flag has been set */
1111  while (!subscription->final_message_processed) {
1112  ast_cond_wait(&subscription->join_cond,
1113  ao2_object_get_lockaddr(subscription));
1114  }
1115  ao2_unlock(subscription);
1116  }
1117 }
ast_cond_t join_cond
Definition: stasis.c:693
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ao2_unlock(a)
Definition: astobj2.h:730
int final_message_processed
Definition: stasis.c:699
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_subscription_set_congestion_limits()

int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1012 of file stasis.c.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

1014 {
1015  int res = -1;
1016 
1017  if (subscription) {
1018  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1019  low_water, high_water);
1020  }
1021  return res;
1022 }
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
struct ast_taskprocessor * mailbox
Definition: stasis.c:686

◆ stasis_subscription_set_filter()

int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1078 of file stasis.c.

References ao2_lock, ao2_unlock, filter(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().

1080 {
1081  if (!subscription) {
1082  return -1;
1083  }
1084 
1085  ao2_lock(subscription->topic);
1086  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1087  subscription->filter = filter;
1088  }
1089  ao2_unlock(subscription->topic);
1090 
1091  return 0;
1092 }
struct stasis_topic * topic
Definition: stasis.c:684
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709

◆ stasis_subscription_uniqueid()

const char* stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1170 of file stasis.c.

References stasis_subscription::uniqueid.

Referenced by AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().

1171 {
1172  return sub->uniqueid;
1173 }

◆ stasis_topic_create()

struct stasis_topic* stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
NULL on error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of <subsystem>:<functionality>[/<object>]
Examples:
/tmp/asterisk-shallow/main/app.c.

Definition at line 617 of file stasis.c.

References stasis_topic_create_with_detail().

Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().

618 {
620 }
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:568
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_topic_create_with_detail()

struct stasis_topic* stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
NULL on error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 568 of file stasis.c.

References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), NULL, stasis_topic_get(), and topic_dtor().

Referenced by stasis_topic_create().

571 {
572  struct stasis_topic *topic;
573  int res = 0;
574 
575  if (!name|| !strlen(name) || !detail) {
576  return NULL;
577  }
578  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
579 
580  topic = stasis_topic_get(name);
581  if (topic) {
582  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
583  name, detail);
584  return topic;
585  }
586 
587  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
588  if (!topic) {
589  return NULL;
590  }
591 
592  res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
593  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
594  if (res) {
595  ao2_ref(topic, -1);
596  return NULL;
597  }
598 
599  /* link to the proxy */
600  if (link_topic_proxy(topic, name, detail)) {
601  ao2_ref(topic, -1);
602  return NULL;
603  }
604 
605 #ifdef AST_DEVMODE
606  topic->statistics = stasis_topic_statistics_create(topic);
607  if (!topic->statistics) {
608  ao2_ref(topic, -1);
609  return NULL;
610  }
611 #endif
612  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
613 
614  return topic;
615 }
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
static void topic_dtor(void *obj)
Definition: stasis.c:433
#define NULL
Definition: resample.c:96
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:500
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static const char name[]
Definition: cdr_mysql.c:74
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:301
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:622

◆ stasis_topic_detail()

const char* stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
NULL if topic is NULL.
Since
12

Definition at line 635 of file stasis.c.

References NULL.

636 {
637  if (!topic) {
638  return NULL;
639  }
640  return topic->detail;
641 }
#define NULL
Definition: resample.c:96

◆ stasis_topic_get()

struct stasis_topic* stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
NULL on error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 622 of file stasis.c.

References ao2_weakproxy_find, and OBJ_SEARCH_KEY.

Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().

623 {
625 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * topic_all
Definition: stasis.c:395
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1768
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_topic_name()

const char* stasis_topic_name ( const struct stasis_topic topic)

◆ stasis_topic_pool_create()

struct stasis_topic_pool* stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
NULL on failure
Examples:
/tmp/asterisk-shallow/main/app.c.

Definition at line 1832 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().

1833 {
1834  struct stasis_topic_pool *pool;
1835 
1837  if (!pool) {
1838  return NULL;
1839  }
1840 
1843  if (!pool->pool_container) {
1844  ao2_cleanup(pool);
1845  return NULL;
1846  }
1847 
1848 #ifdef AO2_DEBUG
1849  {
1850  char *container_name =
1851  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1852  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1853  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1854  }
1855 #endif
1856 
1857  ao2_ref(pooled_topic, +1);
1858  pool->pool_topic = pooled_topic;
1859 
1860  return pool;
1861 }
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:304
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1743
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1783
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
struct ao2_container * pool_container
Definition: stasis.c:1739
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1762
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
struct stasis_topic * pool_topic
Definition: stasis.c:1740
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_topic_pool_delete_topic()

void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of <pool_topic_name>/<topic_name> or just <topic_name>
Since
13.24
15.6
16.1

Definition at line 1863 of file stasis.c.

References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by bridge_topics_destroy().

1864 {
1865  /*
1866  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1867  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1868  * name and search only on <topic_name>.
1869  */
1870  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1871  int pool_topic_name_len = strlen(pool_topic_name);
1872  const char *search_topic_name;
1873 
1874  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1875  search_topic_name = topic_name + pool_topic_name_len + 1;
1876  } else {
1877  search_topic_name = topic_name;
1878  }
1879 
1880  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1881 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * pool_container
Definition: stasis.c:1739
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct stasis_topic * pool_topic
Definition: stasis.c:1740

◆ stasis_topic_pool_get_topic()

struct stasis_topic* stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Find or create a topic in the pool.

Parameters
poolPool for which to get the topic
topic_nameName of the topic to get
Returns
The already stored or newly allocated topic
NULL if the topic was not found and could not be allocated
Examples:
/tmp/asterisk-shallow/main/app.c.

Definition at line 1883 of file stasis.c.

References ao2_cleanup, ao2_find, ao2_link_flags, ast_asprintf, ast_free, topic_pool_entry::forward, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().

Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().

1884 {
1886  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1887  char *new_topic_name;
1888  int ret;
1889 
1891  if (topic_pool_entry) {
1892  return topic_pool_entry->topic;
1893  }
1894 
1896  if (!topic_pool_entry) {
1897  return NULL;
1898  }
1899 
1900  /* To provide further detail and to ensure that the topic is unique within the scope of the
1901  * system we prefix it with the pooling topic name, which should itself already be unique.
1902  */
1903  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1904  if (ret < 0) {
1905  return NULL;
1906  }
1907 
1908  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1909  ast_free(new_topic_name);
1910  if (!topic_pool_entry->topic) {
1911  return NULL;
1912  }
1913 
1915  if (!topic_pool_entry->forward) {
1916  return NULL;
1917  }
1918 
1920  return NULL;
1921  }
1922 
1923  return topic_pool_entry->topic;
1924 }
Definition: stasis.c:1708
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1723
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define NULL
Definition: resample.c:96
struct ao2_container * pool_container
Definition: stasis.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:851
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
struct stasis_topic * topic
Definition: stasis.c:1710
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_free(a)
Definition: astmm.h:182
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct stasis_topic * pool_topic
Definition: stasis.c:1740
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * forward
Definition: stasis.c:1709
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1577

◆ stasis_topic_pool_topic_exists()

int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 1926 of file stasis.c.

References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.

Referenced by ast_publish_device_state_full().

1927 {
1929 
1930  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1931  if (!topic_pool_entry) {
1932  return 0;
1933  }
1934 
1935  ao2_ref(topic_pool_entry, -1);
1936  return 1;
1937 }
Definition: stasis.c:1708
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * pool_container
Definition: stasis.c:1739
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756

◆ stasis_topic_subscribers()

size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 643 of file stasis.c.

References AST_VECTOR_SIZE, make_ari_stubs::file, sub, and stasis_subscription::uniqueid.

Referenced by caching_topic_exec(), and publish_msg().

644 {
645  return AST_VECTOR_SIZE(&topic->subscribers);
646 }
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_topic_uniqueid()

const char* stasis_topic_uniqueid ( const struct stasis_topic topic)

Return the uniqueid of a topic.

Parameters
topicTopic.
Returns
Uniqueid of the topic.
NULL if topic is NULL.

◆ stasis_unsubscribe()

struct stasis_subscription* stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Returns
NULL for convenience
Since
12

Definition at line 972 of file stasis.c.

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push(), LOG_ERROR, stasis_subscription::mailbox, NULL, send_subscription_unsubscribe(), sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().

Referenced by AST_TEST_DEFINE(), cc_generic_agent_destructor(), destroy_cts(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), startup_event_cb(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().

973 {
974  /* The subscription may be the last ref to this topic. Hold
975  * the topic ref open until after the unlock. */
976  struct stasis_topic *topic;
977 
978  if (!sub) {
979  return NULL;
980  }
981 
982  topic = ao2_bump(sub->topic);
983 
984  /* We have to remove the subscription first, to ensure the unsubscribe
985  * is the final message */
986  if (topic_remove_subscription(sub->topic, sub) != 0) {
988  "Internal error: subscription has invalid topic\n");
989  ao2_cleanup(topic);
990 
991  return NULL;
992  }
993 
994  /* Now let everyone know about the unsubscribe */
996 
997  /* When all that's done, remove the ref the mailbox has on the sub */
998  if (sub->mailbox) {
999  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1000  /* Nothing we can do here, the conditional is just to keep
1001  * the compiler happy that we're not ignoring the result. */
1002  }
1003  }
1004 
1005  /* Unsubscribing unrefs the subscription */
1006  ao2_cleanup(sub);
1007  ao2_cleanup(topic);
1008 
1009  return NULL;
1010 }
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1230
static int sub_cleanup(void *data)
Definition: stasis.c:965
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ast_log
Definition: astobj2.c:42
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1675
#define LOG_ERROR
Definition: logger.h:285
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * sub
Definition: res_corosync.c:240

◆ stasis_unsubscribe_and_join()

struct stasis_subscription* stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Returns
NULL for convenience
Since
12

Definition at line 1135 of file stasis.c.

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_event_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), and unload_pbx().

1137 {
1138  if (!subscription) {
1139  return NULL;
1140  }
1141 
1142  /* Bump refcount to hold it past the unsubscribe */
1143  ao2_ref(subscription, +1);
1144  stasis_unsubscribe(subscription);
1145  stasis_subscription_join(subscription);
1146  /* Now decrement the refcount back */
1147  ao2_cleanup(subscription);
1148  return NULL;
1149 }
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:972
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106