Asterisk - The Open Source Telephony Project GIT-master-4f2b068
Loading...
Searching...
No Matches
Data Structures | Macros | Functions | Variables
stasis.c File Reference

Stasis Message Bus API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/taskpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"
Include dependency graph for stasis.c:

Go to the source code of this file.

Data Structures

struct  ast_multi_object_blob
 A multi object blob data structure to carry user event stasis messages. More...
 
struct  stasis_config
 
struct  stasis_declined_config
 A structure to hold global configuration-related options. More...
 
struct  stasis_forward
 Forwarding information. More...
 
struct  stasis_subscription
 
struct  stasis_taskpool_conf
 Taskpool configuration options. More...
 
struct  stasis_topic
 
struct  stasis_topic_pool
 
struct  sync_task_data
 
struct  topic_pool_entry
 
struct  topic_proxy
 

Macros

#define FMT_FIELDS   "%-64s %-64s\n"
 
#define FMT_HEADERS   "%-64s %-64s\n"
 
#define INITIAL_SUBSCRIBERS_MAX   4
 
#define TOPIC_ALL_BUCKETS   997
 
#define topic_lock_both(topic1, topic2)
 Lock two topics.
 
#define TOPIC_POOL_BUCKETS   57
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription.
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a task pool.
 
static AO2_GLOBAL_OBJ_STATIC (globals)
 A global object container that will contain the stasis_config that gets swapped out on reloads.
 
 AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_CMP_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_HASH_FN (topic_proxy, name)
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object (snapshot) to the blob.
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis user event multi object blob.
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Publish single channel user event (for app_userevent compatibility)
 
 CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
 Register information about the configs being processed by this module.
 
static int declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int dispatch_exec_async (struct ast_taskprocessor_local *local)
 
static int dispatch_exec_sync (struct ast_taskprocessor_local *local)
 
static unsigned int dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
 
static void forward_dtor (void *obj)
 
struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func)
 Create a subscription.
 
static int link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail)
 
static void multi_object_blob_dtor (void *obj)
 
static struct ast_strmulti_object_blob_to_ami (void *obj)
 
static struct ast_manager_event_blobmulti_user_event_to_ami (struct stasis_message *message)
 
static struct ast_jsonmulti_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 
static void proxy_dtor (void *weakproxy, void *container)
 
static void publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
 
static void send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void stasis_cleanup (void)
 Cleanup function for graceful shutdowns.
 
static void * stasis_config_alloc (void)
 
static void stasis_config_destructor (void *obj)
 
static void stasis_declined_config_destructor (void *obj)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another.
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem.
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined.
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type)
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers.
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
 
static char * stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters.
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type.
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing.
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type.
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription.
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message.
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed.
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription.
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription.
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription.
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription.
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic.
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail.
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic.
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name.
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic.
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic.
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool.
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Get a topic from the pool for the given name.
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool.
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic.
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *sub)
 Cancel a subscription.
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed.
 
static int sub_cleanup (void *data)
 
static struct stasis_subscription_changesubscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description)
 
static void subscription_change_dtor (void *obj)
 
static void subscription_dtor (void *obj)
 
static void subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message)
 Invoke the subscription's callback.
 
static int topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 Add a subscriber to a topic.
 
static char * topic_complete_name (const char *word)
 
static void topic_dtor (void *obj)
 
static void topic_pool_dtor (void *obj)
 
static struct topic_pool_entrytopic_pool_entry_alloc (const char *topic_name)
 
static int topic_pool_entry_cmp (void *obj, void *arg, int flags)
 
static void topic_pool_entry_dtor (void *obj)
 
static int topic_pool_entry_hash (const void *obj, const int flags)
 
static int topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static int userevent_exclusion_cb (const char *key)
 
 STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
 Define multi user event message type(s).
 

Variables

static struct ast_cli_entry cli_stasis []
 
static struct aco_type declined_option
 An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type.
 
struct aco_typedeclined_options [] = ACO_TYPES(&declined_option)
 
struct aco_file stasis_conf
 
static struct ast_taskpooltaskpool
 
static struct aco_type taskpool_option
 
static struct aco_typetaskpool_options [] = ACO_TYPES(&threadpool_option, &taskpool_option)
 
static struct aco_type threadpool_option
 
struct ao2_containertopic_all
 

Detailed Description

Stasis Message Bus API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis.c.

Macro Definition Documentation

◆ FMT_FIELDS

#define FMT_FIELDS   "%-64s %-64s\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-64s %-64s\n"

◆ INITIAL_SUBSCRIBERS_MAX

#define INITIAL_SUBSCRIBERS_MAX   4

Initial size of the subscribers list.

Definition at line 368 of file stasis.c.

◆ TOPIC_ALL_BUCKETS

#define TOPIC_ALL_BUCKETS   997

Definition at line 384 of file stasis.c.

◆ topic_lock_both

#define topic_lock_both (   topic1,
  topic2 
)

Lock two topics.

Definition at line 492 of file stasis.c.

493 { \
494 ao2_lock(topic1); \
495 while (ao2_trylock(topic2)) { \
496 AO2_DEADLOCK_AVOIDANCE(topic1); \
497 } \
498 } while (0)
#define ao2_trylock(a)
Definition astobj2.h:739

◆ TOPIC_POOL_BUCKETS

#define TOPIC_POOL_BUCKETS   57

The number of buckets to use for topic pools

Definition at line 371 of file stasis.c.

Function Documentation

◆ __stasis_subscribe()

struct stasis_subscription * __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 1009 of file stasis.c.

1016{
1017 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
1018}
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags, int rdlock)
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_taskpool, const char *file, int lineno, const char *func)
Create a subscription.
Definition stasis.c:923

References callback(), stasis_subscription::data, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ __stasis_subscribe_pool()

struct stasis_subscription * __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a task pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a taskpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 1020 of file stasis.c.

1027{
1028 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
1029}

References callback(), stasis_subscription::data, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ AO2_GLOBAL_OBJ_STATIC()

static AO2_GLOBAL_OBJ_STATIC ( globals  )
static

A global object container that will contain the stasis_config that gets swapped out on reloads.

◆ AO2_STRING_FIELD_CASE_SORT_FN()

AO2_STRING_FIELD_CASE_SORT_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_CMP_FN()

AO2_STRING_FIELD_CMP_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_HASH_FN()

AO2_STRING_FIELD_HASH_FN ( topic_proxy  ,
name   
)

◆ CONFIG_INFO_CORE()

CONFIG_INFO_CORE ( "stasis"  ,
cfg_info  ,
globals  ,
stasis_config_alloc  ,
files = ACO_FILES(&stasis_conf) 
)

Register information about the configs being processed by this module.

◆ declined_handler()

static int declined_handler ( const struct aco_option opt,
struct ast_variable var,
void *  obj 
)
static

Definition at line 2515 of file stasis.c.

2516{
2517 struct stasis_declined_config *declined = obj;
2518
2519 if (ast_strlen_zero(var->value)) {
2520 return 0;
2521 }
2522
2523 if (ast_str_container_add(declined->declined, var->value)) {
2524 return -1;
2525 }
2526
2527 return 0;
2528}
#define var
Definition ast_expr2f.c:605
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition strings.c:205
A structure to hold global configuration-related options.
Definition stasis.c:2381
struct ao2_container * declined
Definition stasis.c:2383

References ast_str_container_add(), ast_strlen_zero(), stasis_declined_config::declined, and var.

Referenced by stasis_init().

◆ dispatch_exec_async()

static int dispatch_exec_async ( struct ast_taskprocessor_local local)
static

Definition at line 1326 of file stasis.c.

1327{
1328 struct stasis_subscription *sub = local->local_data;
1329 struct stasis_message *message = local->data;
1330
1333
1334 return 0;
1335}
#define ao2_cleanup(obj)
Definition astobj2.h:1934
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition stasis.c:821

References ao2_cleanup, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sub, and subscription_invoke().

Referenced by dispatch_message().

◆ dispatch_exec_sync()

static int dispatch_exec_sync ( struct ast_taskprocessor_local local)
static

Definition at line 1353 of file stasis.c.

1354{
1355 struct stasis_subscription *sub = local->local_data;
1356 struct sync_task_data *std = local->data;
1357 struct stasis_message *message = std->task_data;
1358
1361
1362 ast_mutex_lock(&std->lock);
1363 std->complete = 1;
1364 ast_cond_signal(&std->cond);
1365 ast_mutex_unlock(&std->lock);
1366
1367 return 0;
1368}
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_mutex_lock(a)
Definition lock.h:196
#define ast_cond_signal(cond)
Definition lock.h:210
ast_cond_t cond
Definition stasis.c:1343
void * task_data
Definition stasis.c:1345
ast_mutex_t lock
Definition stasis.c:1342

References ao2_cleanup, ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, sync_task_data::complete, sync_task_data::cond, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sync_task_data::lock, sub, subscription_invoke(), and sync_task_data::task_data.

Referenced by dispatch_message().

◆ dispatch_message()

static unsigned int dispatch_message ( struct stasis_subscription sub,
struct stasis_message message,
int  synchronous 
)
static

Definition at line 1379 of file stasis.c.

1382{
1384
1385 /*
1386 * The 'do while' gives us an easy way to skip remaining logic once
1387 * we determine the message should be accepted.
1388 * The code looks more verbose than it needs to be but it optimizes
1389 * down very nicely. It's just easier to understand and debug this way.
1390 */
1391 do {
1392 struct stasis_message_type *message_type = stasis_message_type(message);
1393 int type_id = stasis_message_type_id(message_type);
1394 int type_filter_specified = 0;
1395 int formatter_filter_specified = 0;
1396 int type_filter_passed = 0;
1397 int formatter_filter_passed = 0;
1398
1399 /* We always accept final messages so only run the filter logic if not final */
1400 if (is_final) {
1401 break;
1402 }
1403
1404 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1405 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1406
1407 /* Accept if no filters of either type were specified */
1408 if (!type_filter_specified && !formatter_filter_specified) {
1409 break;
1410 }
1411
1412 type_filter_passed = type_filter_specified
1415
1416 /*
1417 * Since the type and formatter filters are OR'd, we can skip
1418 * the formatter check if the type check passes.
1419 */
1420 if (type_filter_passed) {
1421 break;
1422 }
1423
1424 formatter_filter_passed = formatter_filter_specified
1426
1427 if (formatter_filter_passed) {
1428 break;
1429 }
1430
1431#ifdef AST_DEVMODE
1432 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1433#endif
1434
1435 return 0;
1436
1437 } while (0);
1438
1439#ifdef AST_DEVMODE
1440 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1441#endif
1442
1443 if (!sub->mailbox) {
1444 /* Dispatch directly */
1446 return 1;
1447 }
1448
1449 /* Bump the message for the taskprocessor push. This will get de-ref'd
1450 * by the task processor callback.
1451 */
1453 if (!synchronous) {
1455 /* Push failed; ugh. */
1456 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1458 return 0;
1459 }
1460 } else {
1461 struct sync_task_data std;
1462
1463 ast_mutex_init(&std.lock);
1464 ast_cond_init(&std.cond, NULL);
1465 std.complete = 0;
1466 std.task_data = message;
1467
1469 /* Push failed; ugh. */
1470 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1472 ast_mutex_destroy(&std.lock);
1473 ast_cond_destroy(&std.cond);
1474 return 0;
1475 }
1476
1477 ast_mutex_lock(&std.lock);
1478 while (!std.complete) {
1479 ast_cond_wait(&std.cond, &std.lock);
1480 }
1481 ast_mutex_unlock(&std.lock);
1482
1483 ast_mutex_destroy(&std.lock);
1484 ast_cond_destroy(&std.cond);
1485 }
1486
1487 return 1;
1488}
#define ast_log
Definition astobj2.c:42
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define LOG_ERROR
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_mutex_init(pmutex)
Definition lock.h:193
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
#define ast_mutex_destroy(a)
Definition lock.h:195
#define NULL
Definition resample.c:96
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition stasis.c:1353
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition stasis.c:1241
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition stasis.c:1326
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition stasis.h:297
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition stasis.h:309
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
struct stasis_subscription::@427 accepted_message_types
struct ast_taskprocessor * mailbox
Definition stasis.c:753
enum stasis_subscription_message_filter filter
Definition stasis.c:773
enum stasis_subscription_message_formatters accepted_formatters
Definition stasis.c:771
#define ast_taskprocessor_push_local(tps, task_exe, datap)
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References stasis_subscription::accepted_formatters, stasis_subscription::accepted_message_types, ao2_bump, ao2_cleanup, ast_atomic_fetchadd_int(), ast_cond_destroy, ast_cond_init, ast_cond_wait, ast_log, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_taskprocessor_push_local, AST_VECTOR_GET, AST_VECTOR_SIZE, sync_task_data::complete, sync_task_data::cond, dispatch_exec_async(), dispatch_exec_sync(), stasis_subscription::filter, sync_task_data::lock, LOG_ERROR, stasis_subscription::mailbox, NULL, stasis_message_type_available_formatters(), stasis_message_type_id(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), STASIS_SUBSCRIPTION_FORMATTER_NONE, sub, subscription_invoke(), and sync_task_data::task_data.

Referenced by publish_msg(), and send_subscription_unsubscribe().

◆ forward_dtor()

static void forward_dtor ( void *  obj)
static

Definition at line 1605 of file stasis.c.

1606{
1607 struct stasis_forward *forward = obj;
1608
1609 ao2_cleanup(forward->from_topic);
1610 forward->from_topic = NULL;
1611 ao2_cleanup(forward->to_topic);
1612 forward->to_topic = NULL;
1613}
Forwarding information.
Definition stasis.c:1598
struct stasis_topic * from_topic
Definition stasis.c:1600
struct stasis_topic * to_topic
Definition stasis.c:1602

References ao2_cleanup, stasis_forward::from_topic, NULL, and stasis_forward::to_topic.

Referenced by stasis_forward_all().

◆ internal_stasis_subscribe()

struct stasis_subscription * internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_task_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_task_poolUse the task pool for the subscription. This is only relevant if needs_mailbox is non-zero.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12

Definition at line 923 of file stasis.c.

932{
933 struct stasis_subscription *sub;
934 int ret;
935
936 if (!topic) {
937 return NULL;
938 }
939
940 /* The ao2 lock is used for join_cond. */
942 if (!sub) {
943 return NULL;
944 }
945
946#ifdef AST_DEVMODE
948 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func);
949 if (ret < 0 || !sub->statistics) {
950 ao2_ref(sub, -1);
951 return NULL;
952 }
953#else
955 if (ret < 0) {
956 ao2_ref(sub, -1);
957 return NULL;
958 }
959#endif
960
961 if (needs_mailbox) {
962 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
963
964 /* Create name with seq number appended. */
965 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
966 use_taskpool ? 'p' : 'm',
968
969 /*
970 * With a small number of subscribers, a thread-per-sub is
971 * acceptable. For a large number of subscribers, a thread
972 * pool should be used.
973 */
974 if (use_taskpool) {
976 } else {
978 }
979 if (!sub->mailbox) {
980 ao2_ref(sub, -1);
981
982 return NULL;
983 }
985 /* Taskprocessor has a reference */
986 ao2_ref(sub, +1);
987 }
988
989 ao2_ref(topic, +1);
990 sub->topic = topic;
992 sub->data = data;
997
998 if (topic_add_subscription(topic, sub) != 0) {
999 ao2_ref(sub, -1);
1000 ao2_ref(topic, -1);
1001
1002 return NULL;
1003 }
1005
1006 return sub;
1007}
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition astobj2.h:407
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition stasis.c:694
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition stasis.c:1268
static void subscription_dtor(void *obj)
Definition stasis.c:781
static struct ast_taskpool * taskpool
Definition stasis.c:374
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1715
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition stasis.h:295
struct stasis_topic * topic
Definition stasis.c:751
ast_cond_t join_cond
Definition stasis.c:760
stasis_subscription_cb callback
Definition stasis.c:755
int subscriber_id
Definition stasis.c:450
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
Definition taskpool.c:860
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
static void statistics(void)
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124

References stasis_subscription::accepted_formatters, stasis_subscription::accepted_message_types, ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskpool_serializer(), ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), AST_VECTOR_INIT, callback(), stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_topic_name(), statistics(), sub, stasis_topic::subscriber_id, subscription_dtor(), taskpool, stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

◆ link_topic_proxy()

static int link_topic_proxy ( struct stasis_topic topic,
const char *  name,
const char *  detail 
)
static

Definition at line 567 of file stasis.c.

568{
569 struct topic_proxy *proxy;
570 struct stasis_topic* topic_tmp;
571 size_t detail_len;
572
573 if (!topic || !name || !strlen(name) || !detail) {
574 return -1;
575 }
576
578
579 topic_tmp = stasis_topic_get(name);
580 if (topic_tmp) {
581 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
582 ao2_ref(topic_tmp, -1);
584
585 return -1;
586 }
587
588 detail_len = strlen(detail) + 1;
589
590 proxy = ao2_t_weakproxy_alloc(
591 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
592 if (!proxy) {
594
595 return -1;
596 }
597
598 /* set the proxy info */
599 proxy->name = proxy->buf;
600 proxy->detail = proxy->name + strlen(name) + 1;
601
602 strcpy(proxy->name, name); /* SAFE */
603 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
604 proxy->creationtime = ast_tvnow();
605
606 /* We have exclusive access to proxy, no need for locking here. */
607 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
608 ao2_cleanup(proxy);
610
611 return -1;
612 }
613
615 ao2_cleanup(proxy);
618
619 return -1;
620 }
621
622 /* setting the topic point to the proxy */
623 topic->name = proxy->name;
624 topic->detail = proxy->detail;
625 topic->creationtime = &(proxy->creationtime);
626
628 ao2_ref(proxy, -1);
629
631
632 return 0;
633}
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition astobj2.c:934
#define ao2_wrlock(a)
Definition astobj2.h:719
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition astobj2.h:553
#define ao2_unlock(a)
Definition astobj2.h:729
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition astobj2.h:582
static const char name[]
Definition format_mp3.c:68
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition stasis.c:689
static void proxy_dtor(void *weakproxy, void *container)
Definition stasis.c:479
struct ao2_container * topic_all
Definition stasis.c:462
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
char * name
Definition stasis.c:453
char * detail
Definition stasis.c:456
struct timeval * creationtime
Definition stasis.c:459
char buf[0]
Definition stasis.c:472
struct timeval creationtime
Definition stasis.c:470
char * name
Definition stasis.c:467
char * detail
Definition stasis.c:468
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159

References ao2_bump, ao2_cleanup, ao2_link_flags, ao2_ref, ao2_t_weakproxy_alloc, ao2_t_weakproxy_set_object, ao2_unlock, ao2_weakproxy_subscribe(), ao2_wrlock, ast_copy_string(), ast_log, ast_tvnow(), topic_proxy::buf, stasis_topic::creationtime, topic_proxy::creationtime, stasis_topic::detail, topic_proxy::detail, LOG_ERROR, name, stasis_topic::name, topic_proxy::name, NULL, OBJ_NOLOCK, proxy_dtor(), stasis_topic_get(), and topic_all.

Referenced by stasis_topic_create_with_detail().

◆ multi_object_blob_dtor()

static void multi_object_blob_dtor ( void *  obj)
static

Definition at line 2160 of file stasis.c.

2161{
2162 struct ast_multi_object_blob *multi = obj;
2163 int type;
2164 int i;
2165
2166 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2167 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2169 }
2170 AST_VECTOR_FREE(&multi->snapshots[type]);
2171 }
2172 ast_json_unref(multi->blob);
2173}
static const char type[]
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition stasis.h:1360
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
A multi object blob data structure to carry user event stasis messages.
Definition stasis.c:2151
struct ast_multi_object_blob::@428 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition stasis.c:2152
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185

References ao2_cleanup, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, ast_multi_object_blob::snapshots, STASIS_UMOS_MAX, and type.

Referenced by ast_multi_object_blob_create().

◆ multi_object_blob_to_ami()

static struct ast_str * multi_object_blob_to_ami ( void *  obj)
static

Definition at line 2297 of file stasis.c.

2298{
2299 struct ast_str *ami_str=ast_str_create(1024);
2300 struct ast_str *ami_snapshot;
2301 const struct ast_multi_object_blob *multi = obj;
2303 int i;
2304
2305 if (!ami_str) {
2306 return NULL;
2307 }
2308 if (!multi) {
2309 ast_free(ami_str);
2310 return NULL;
2311 }
2312
2313 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2314 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2315 char *name = NULL;
2316 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2317 ami_snapshot = NULL;
2318
2319 if (i > 0) {
2320 ast_asprintf(&name, "%d", i + 1);
2321 }
2322
2323 switch (type) {
2325 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2326 break;
2327
2328 case STASIS_UMOS_BRIDGE:
2329 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2330 break;
2331
2333 /* currently not sending endpoint snapshots to AMI */
2334 break;
2335 }
2336 if (ami_snapshot) {
2337 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2338 ast_free(ami_snapshot);
2339 }
2340 ast_free(name);
2341 }
2342 }
2343
2344 return ami_str;
2345}
#define ast_free(a)
Definition astmm.h:180
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition stasis.h:1353
@ STASIS_UMOS_ENDPOINT
Definition stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition stasis.h:1354
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition strings.h:1139
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition strings.h:659
char *attribute_pure ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition strings.h:761
Support for dynamic strings.
Definition strings.h:623

References ast_asprintf, ast_free, ast_manager_build_bridge_state_string_prefix(), ast_manager_build_channel_state_string_prefix(), ast_str_append(), ast_str_buffer(), ast_str_create, AST_VECTOR_GET, AST_VECTOR_SIZE, name, NULL, ast_multi_object_blob::snapshots, STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

Referenced by multi_user_event_to_ami().

◆ multi_user_event_to_ami()

static struct ast_manager_event_blob * multi_user_event_to_ami ( struct stasis_message message)
static

Definition at line 2356 of file stasis.c.

2358{
2359 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2360 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2362 const char *eventname;
2363
2364 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2366 object_string = multi_object_blob_to_ami(multi);
2367 if (!object_string || !body) {
2368 return NULL;
2369 }
2370
2372 "%s"
2373 "UserEvent: %s\r\n"
2374 "%s",
2375 ast_str_buffer(object_string),
2376 eventname,
2377 ast_str_buffer(body));
2378}
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition manager.c:551
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition json.c:407
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition manager.c:10144
#define EVENT_FLAG_USER
Definition manager.h:81
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition stasis.c:2297
static int userevent_exclusion_cb(const char *key)
Definition stasis.c:2348
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ast_free, ast_json_object_get(), ast_json_string_get(), ast_manager_event_blob_create(), ast_manager_str_from_json_object(), ast_str_buffer(), ast_multi_object_blob::blob, EVENT_FLAG_USER, multi_object_blob_to_ami(), NULL, RAII_VAR, stasis_message_data(), and userevent_exclusion_cb().

◆ multi_user_event_to_json()

static struct ast_json * multi_user_event_to_json ( struct stasis_message message,
const struct stasis_message_sanitizer sanitize 
)
static

Definition at line 2246 of file stasis.c.

2249{
2250 struct ast_json *out;
2252 struct ast_json *blob = multi->blob;
2253 const struct timeval *tv = stasis_message_timestamp(message);
2255 int i;
2256
2258 if (!out) {
2259 return NULL;
2260 }
2261
2262 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2263 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2264 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2265 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2266
2267 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2268 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2269 struct ast_json *json_object = NULL;
2270 char *name = NULL;
2271 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2272
2273 switch (type) {
2275 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2276 name = "channel";
2277 break;
2278 case STASIS_UMOS_BRIDGE:
2279 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2280 name = "bridge";
2281 break;
2283 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2284 name = "endpoint";
2285 break;
2286 }
2287 if (json_object) {
2288 ast_json_object_set(out, name, json_object);
2289 }
2290 }
2291 }
2292
2293 return out;
2294}
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition json.c:278
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition json.c:414
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Abstract JSON element (object, array, string, int, ...).
FILE * out
Definition utils/frame.c:33

References ast_bridge_snapshot_to_json(), ast_channel_snapshot_to_json(), ast_endpoint_snapshot_to_json(), ast_json_object_create(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_timeval(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, name, NULL, out, ast_multi_object_blob::snapshots, stasis_message_data(), stasis_message_timestamp(), STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

◆ proxy_dtor()

static void proxy_dtor ( void *  weakproxy,
void *  container 
)
static

Definition at line 479 of file stasis.c.

480{
481 ao2_unlink(container, weakproxy);
483}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
struct ao2_container * container
Definition res_fax.c:603

References ao2_cleanup, ao2_unlink, and container.

Referenced by link_topic_proxy().

◆ publish_msg()

static void publish_msg ( struct stasis_topic topic,
struct stasis_message message,
struct stasis_subscription sync_sub 
)
static

Definition at line 1497 of file stasis.c.

1499{
1500 size_t i;
1501#ifdef AST_DEVMODE
1502 unsigned int dispatched = 0;
1504 struct stasis_message_type_statistics *statistics;
1505 struct timeval start;
1506 long elapsed;
1507#endif
1508
1509 ast_assert(topic != NULL);
1511
1512#ifdef AST_DEVMODE
1513 ast_mutex_lock(&message_type_statistics_lock);
1514 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1515 struct stasis_message_type_statistics new_statistics = {
1516 .published = 0,
1517 };
1518 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1519 ast_mutex_unlock(&message_type_statistics_lock);
1520 return;
1521 }
1522 }
1523 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1524 statistics->message_type = stasis_message_type(message);
1525 ast_mutex_unlock(&message_type_statistics_lock);
1526
1527 ast_atomic_fetchadd_int(&statistics->published, +1);
1528#endif
1529
1530 /* If there are no subscribers don't bother */
1531 if (!stasis_topic_subscribers(topic)) {
1532#ifdef AST_DEVMODE
1533 ast_atomic_fetchadd_int(&statistics->unused, +1);
1534 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1535#endif
1536 return;
1537 }
1538
1539 /*
1540 * The topic may be unref'ed by the subscription invocation.
1541 * Make sure we hold onto a reference while dispatching.
1542 */
1543 ao2_ref(topic, +1);
1544#ifdef AST_DEVMODE
1545 start = ast_tvnow();
1546#endif
1547 ao2_lock(topic);
1548 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1550
1551 ast_assert(sub != NULL);
1552#ifdef AST_DEVMODE
1553 dispatched +=
1554#endif
1555 dispatch_message(sub, message, (sub == sync_sub));
1556 }
1558
1559#ifdef AST_DEVMODE
1560 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1561 if (elapsed > topic->statistics->highest_time_dispatched) {
1562 topic->statistics->highest_time_dispatched = elapsed;
1563 }
1564 if (elapsed < topic->statistics->lowest_time_dispatched) {
1565 topic->statistics->lowest_time_dispatched = elapsed;
1566 }
1567 if (dispatched) {
1568 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1569 } else {
1570 ast_atomic_fetchadd_int(&statistics->unused, +1);
1571 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1572 }
1573#endif
1574
1575 ao2_ref(topic, -1);
1576}
#define ao2_lock(a)
Definition astobj2.h:717
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition stasis.c:710
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition stasis.c:1379
static int message_type_id
struct stasis_topic::@425 subscribers
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
#define ast_assert(a)
Definition utils.h:779
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition vector.h:295
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition vector.h:679

References ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_atomic_fetchadd_int(), ast_mutex_lock, ast_mutex_unlock, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_GET_ADDR, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, dispatch_message(), message_type_id, NULL, stasis_message_type_id(), stasis_topic_subscribers(), statistics(), sub, stasis_topic::subscribers, and stasis_subscription::topic.

Referenced by stasis_publish(), and stasis_publish_sync().

◆ send_subscription_subscribe()

static void send_subscription_subscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1715 of file stasis.c.

1716{
1717 struct stasis_subscription_change *change;
1718 struct stasis_message *msg;
1719
1720 /* This assumes that we have already unsubscribed */
1722
1724 return;
1725 }
1726
1727 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1728 if (!change) {
1729 return;
1730 }
1731
1733 if (!msg) {
1734 ao2_cleanup(change);
1735 return;
1736 }
1737
1738 stasis_publish(topic, msg);
1739 ao2_cleanup(msg);
1740 ao2_cleanup(change);
1741}
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition stasis.c:1694
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition stasis.c:1578
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition stasis.c:1217
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
Holds details about changes to subscriptions for the specified topic.
Definition stasis.h:890

References ao2_cleanup, ast_assert, stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), sub, subscription_change_alloc(), and stasis_subscription::uniqueid.

Referenced by internal_stasis_subscribe().

◆ send_subscription_unsubscribe()

static void send_subscription_unsubscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1743 of file stasis.c.

1745{
1746 struct stasis_subscription_change *change;
1747 struct stasis_message *msg;
1748
1749 /* This assumes that we have already unsubscribed */
1751
1753 return;
1754 }
1755
1756 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1757 if (!change) {
1758 return;
1759 }
1760
1762 if (!msg) {
1763 ao2_cleanup(change);
1764 return;
1765 }
1766
1767 stasis_publish(topic, msg);
1768
1769 /* Now we have to dispatch to the subscription itself */
1770 dispatch_message(sub, msg, 0);
1771
1772 ao2_cleanup(msg);
1773 ao2_cleanup(change);
1774}

References ao2_cleanup, ast_assert, dispatch_message(), stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), sub, subscription_change_alloc(), and stasis_subscription::uniqueid.

Referenced by stasis_unsubscribe().

◆ stasis_cleanup()

static void stasis_cleanup ( void  )
static

Cleanup function for graceful shutdowns.

Definition at line 3253 of file stasis.c.

3254{
3255#ifdef AST_DEVMODE
3256 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3257 AST_VECTOR_FREE(&message_type_statistics);
3258 ao2_global_obj_release(subscription_statistics);
3259 ao2_global_obj_release(topic_statistics);
3260#endif
3263 topic_all = NULL;
3265 taskpool = NULL;
3268 aco_info_destroy(&cfg_info);
3270}
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition astobj2.h:859
static struct console_pvt globals
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
static struct ast_cli_entry cli_stasis[]
Definition stasis.c:2687
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition stasis.h:1515
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition taskpool.c:675
#define ARRAY_LEN(a)
Definition utils.h:706

References aco_info_destroy(), ao2_cleanup, ao2_global_obj_release, ARRAY_LEN, ast_cli_unregister_multiple(), ast_multi_user_event_type(), ast_taskpool_shutdown(), AST_VECTOR_FREE, cli_stasis, globals, NULL, STASIS_MESSAGE_TYPE_CLEANUP, stasis_subscription_change_type(), taskpool, and topic_all.

Referenced by stasis_init().

◆ stasis_config_alloc()

static void * stasis_config_alloc ( void  )
static

Definition at line 2464 of file stasis.c.

2465{
2466 struct stasis_config *cfg;
2467
2468 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2469 return NULL;
2470 }
2471
2472 cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options));
2473 if (!cfg->taskpool_options) {
2474 ao2_ref(cfg, -1);
2475 return NULL;
2476 }
2477
2480 if (!cfg->declined_message_types) {
2481 ao2_ref(cfg, -1);
2482 return NULL;
2483 }
2484
2486 if (!cfg->declined_message_types->declined) {
2487 ao2_ref(cfg, -1);
2488 return NULL;
2489 }
2490
2491 return cfg;
2492}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
static void stasis_config_destructor(void *obj)
Definition stasis.c:2456
static void stasis_declined_config_destructor(void *obj)
Definition stasis.c:2449
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition strings.h:1365
struct stasis_declined_config * declined_message_types
Definition stasis.c:2402
struct stasis_taskpool_conf * taskpool_options
Definition stasis.c:2400

References ao2_alloc, ao2_ref, ast_calloc, ast_str_container_alloc, stasis_declined_config::declined, stasis_config::declined_message_types, NULL, stasis_config_destructor(), stasis_declined_config_destructor(), and stasis_config::taskpool_options.

Referenced by stasis_init().

◆ stasis_config_destructor()

static void stasis_config_destructor ( void *  obj)
static

Definition at line 2456 of file stasis.c.

2457{
2458 struct stasis_config *cfg = obj;
2459
2462}

References ao2_cleanup, ast_free, stasis_config::declined_message_types, and stasis_config::taskpool_options.

Referenced by stasis_config_alloc().

◆ stasis_declined_config_destructor()

static void stasis_declined_config_destructor ( void *  obj)
static

Definition at line 2449 of file stasis.c.

2450{
2451 struct stasis_declined_config *declined = obj;
2452
2453 ao2_cleanup(declined->declined);
2454}

References ao2_cleanup, and stasis_declined_config::declined.

Referenced by stasis_config_alloc().

◆ stasis_forward_all()

struct stasis_forward * stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
Return values
NULLon error.
Since
12

Definition at line 1645 of file stasis.c.

1647{
1648 int res;
1649 size_t idx;
1650 struct stasis_forward *forward;
1651
1652 if (!from_topic || !to_topic) {
1653 return NULL;
1654 }
1655
1656 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1657 if (!forward) {
1658 return NULL;
1659 }
1660
1661 /* Forwards to ourselves are implicit. */
1662 if (to_topic == from_topic) {
1663 return forward;
1664 }
1665
1666 forward->from_topic = ao2_bump(from_topic);
1667 forward->to_topic = ao2_bump(to_topic);
1668
1671 if (res != 0) {
1674 ao2_ref(forward, -1);
1675 return NULL;
1676 }
1677
1678 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1680 }
1683
1684 return forward;
1685}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
static void forward_dtor(void *obj)
Definition stasis.c:1605
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition stasis.c:492
struct stasis_topic::@426 upstream_topics
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_add_subscription(), topic_lock_both, and stasis_topic::upstream_topics.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), create_subscriptions(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), load_module(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_forward_cancel()

struct stasis_forward * stasis_forward_cancel ( struct stasis_forward forward)

Definition at line 1615 of file stasis.c.

1616{
1617 int idx;
1618 struct stasis_topic *from;
1619 struct stasis_topic *to;
1620
1621 if (!forward) {
1622 return NULL;
1623 }
1624
1625 from = forward->from_topic;
1626 to = forward->to_topic;
1627
1628 if (from && to) {
1629 topic_lock_both(to, from);
1632
1633 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1635 }
1636 ao2_unlock(from);
1637 ao2_unlock(to);
1638 }
1639
1640 ao2_cleanup(forward);
1641
1642 return NULL;
1643}
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1296
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition vector.h:582
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition vector.h:594

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_lock_both, topic_remove_subscription(), and stasis_topic::upstream_topics.

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), destroy_subscriptions(), forwards_unsubscribe(), load_general_config(), load_module(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), unload_module(), unload_module(), and unload_module().

◆ stasis_init()

int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3272 of file stasis.c.

3273{
3274 struct stasis_config *cfg;
3275 int cache_init;
3276 struct ast_taskpool_options taskpool_opts = { 0, };
3277#ifdef AST_DEVMODE
3278 struct ao2_container *subscription_stats;
3279 struct ao2_container *topic_stats;
3280#endif
3281
3282 /* Be sure the types are cleaned up after the message bus */
3284
3285 if (aco_info_init(&cfg_info)) {
3286 return -1;
3287 }
3288
3289 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3291 aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
3293 FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
3294 INT_MAX);
3295 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3297 FLDSET(struct stasis_taskpool_conf, initial_size), 0,
3298 INT_MAX);
3299 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3301 FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
3302 INT_MAX);
3303 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3305 FLDSET(struct stasis_taskpool_conf, max_size), 0,
3306 INT_MAX);
3307
3308 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3309 struct stasis_config *default_cfg = stasis_config_alloc();
3310
3311 if (!default_cfg) {
3312 return -1;
3313 }
3314
3315 if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
3316 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3317 ao2_ref(default_cfg, -1);
3318
3319 return -1;
3320 }
3321
3322 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3323 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3324 ao2_ref(default_cfg, -1);
3325
3326 return -1;
3327 }
3328
3329 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3331 cfg = default_cfg;
3332 } else {
3334 if (!cfg) {
3335 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3336
3337 return -1;
3338 }
3339 }
3340
3341 taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
3342 taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
3343 taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
3344 taskpool_opts.auto_increment = 1;
3345 taskpool_opts.max_size = cfg->taskpool_options->max_size;
3346 taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
3347 taskpool = ast_taskpool_create("stasis", &taskpool_opts);
3348 ao2_ref(cfg, -1);
3349 if (!taskpool) {
3350 ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
3351
3352 return -1;
3353 }
3354
3355 cache_init = stasis_cache_init();
3356 if (cache_init != 0) {
3357 return -1;
3358 }
3359
3361 return -1;
3362 }
3364 return -1;
3365 }
3366
3368 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3369 if (!topic_all) {
3370 return -1;
3371 }
3372
3374 return -1;
3375 }
3376
3377#ifdef AST_DEVMODE
3378 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3379 * topic or subscripton.
3380 */
3381 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3382 subscription_statistics_hash, 0, subscription_statistics_cmp);
3383 if (!subscription_stats) {
3384 return -1;
3385 }
3386 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3387 ao2_cleanup(subscription_stats);
3388
3389 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3390 topic_statistics_hash, 0, topic_statistics_cmp);
3391 if (!topic_stats) {
3392 return -1;
3393 }
3394 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3395 ao2_cleanup(topic_stats);
3396 if (!topic_stats) {
3397 return -1;
3398 }
3399
3400 AST_VECTOR_INIT(&message_type_statistics, 0);
3401
3402 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3403 return -1;
3404 }
3405#endif
3406
3407 return 0;
3408}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition astobj2.h:901
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition astobj2.h:918
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define LOG_NOTICE
static void * stasis_config_alloc(void)
Definition stasis.c:2464
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition stasis.c:3253
static struct aco_type taskpool_option
Definition stasis.c:2413
static struct aco_type * taskpool_options[]
Definition stasis.c:2421
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition stasis.c:2515
struct aco_type * declined_options[]
Definition stasis.c:2432
#define TOPIC_ALL_BUCKETS
Definition stasis.c:384
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition stasis.c:2424
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition stasis.h:1493
int stasis_cache_init(void)
Generic container type.
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition taskpool.h:88
int max_size
Maximum number of taskprocessors a pool may have.
Definition taskpool.h:122
int auto_increment
Number of taskprocessors to increment the pool by.
Definition taskpool.h:92
int minimum_size
Number of taskprocessors that will always exist.
Definition taskpool.h:99
int initial_size
Number of taskprocessors the pool will start with.
Definition taskpool.h:109
Taskpool configuration options.
Definition stasis.c:2387
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition taskpool.c:324

References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_taskpool_create(), AST_TASKPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_taskpool_options::auto_increment, cli_stasis, declined_handler(), stasis_config::declined_message_types, declined_option, declined_options, FLDSET, globals, ast_taskpool_options::idle_timeout, stasis_taskpool_conf::idle_timeout_sec, ast_taskpool_options::initial_size, stasis_taskpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_taskpool_options::max_size, stasis_taskpool_conf::max_size, ast_taskpool_options::minimum_size, stasis_taskpool_conf::minimum_size, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), taskpool, taskpool_option, stasis_config::taskpool_options, taskpool_options, topic_all, TOPIC_ALL_BUCKETS, and ast_taskpool_options::version.

Referenced by asterisk_daemon().

◆ stasis_log_bad_type_access()

void stasis_log_bad_type_access ( const char *  name)

Definition at line 2141 of file stasis.c.

2142{
2143#ifdef AST_DEVMODE
2145 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
2146 }
2147#endif
2148}
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition stasis.c:2494

References ast_log, LOG_ERROR, name, and stasis_message_type_declined().

◆ stasis_message_type_declined()

int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2494 of file stasis.c.

2495{
2497 char *name_in_declined;
2498 int res;
2499
2500 if (!cfg || !cfg->declined_message_types) {
2501 ao2_cleanup(cfg);
2502 return 0;
2503 }
2504
2505 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2506 res = name_in_declined ? 1 : 0;
2507 ao2_cleanup(name_in_declined);
2508 ao2_ref(cfg, -1);
2509 if (res) {
2510 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2511 }
2512 return res;
2513}
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ast_debug(level,...)
Log a DEBUG message.

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_debug, stasis_declined_config::declined, stasis_config::declined_message_types, globals, name, and OBJ_SEARCH_KEY.

Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().

◆ STASIS_MESSAGE_TYPE_DEFN() [1/2]

STASIS_MESSAGE_TYPE_DEFN ( ast_multi_user_event_type  ,
to_json = multi_user_event_to_json,
to_ami = multi_user_event_to_ami 
)

Define multi user event message type(s).

◆ STASIS_MESSAGE_TYPE_DEFN() [2/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_subscription_change_type  )

◆ stasis_publish()

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1578 of file stasis.c.

1579{
1580 publish_msg(topic, message, NULL);
1581}
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition stasis.c:1497

References NULL, publish_msg(), and stasis_subscription::topic.

Referenced by aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_refer_notify_transfer_request(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), detect_callback(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().

◆ stasis_publish_sync()

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1583 of file stasis.c.

1584{
1585 ast_assert(sub != NULL);
1586
1588}

References ast_assert, NULL, publish_msg(), sub, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

◆ stasis_show_topic()

static char * stasis_show_topic ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2628 of file stasis.c.

2629{
2630 struct stasis_topic *topic;
2631 char print_time[32];
2632 int i;
2633
2634 switch (cmd) {
2635 case CLI_INIT:
2636 e->command = "stasis show topic";
2637 e->usage =
2638 "Usage: stasis show topic <name>\n"
2639 " Show stasis topic detail info.\n";
2640 return NULL;
2641 case CLI_GENERATE:
2642 if (a->pos == 3) {
2643 return topic_complete_name(a->word);
2644 } else {
2645 return NULL;
2646 }
2647 }
2648
2649 if (a->argc != 4) {
2650 return CLI_SHOWUSAGE;
2651 }
2652
2653 topic = stasis_topic_get(a->argv[3]);
2654 if (!topic) {
2655 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2656 return CLI_FAILURE;
2657 }
2658
2659 ast_cli(a->fd, "Name: %s\n", topic->name);
2660 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2661 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2662 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2663 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2664 ast_cli(a->fd, "Duration time: %s\n", print_time);
2665
2666 ao2_lock(topic);
2667 ast_cli(a->fd, "\nSubscribers:\n");
2668 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2669 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2670 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2671 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2672 }
2673
2674 ast_cli(a->fd, "\nForwarded topics:\n");
2675 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2676 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2677 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2678 }
2679 ao2_unlock(topic);
2680
2681 ao2_ref(topic, -1);
2682
2683 return CLI_SUCCESS;
2684}
#define CLI_SHOWUSAGE
Definition cli.h:45
#define CLI_SUCCESS
Definition cli.h:44
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define CLI_FAILURE
Definition cli.h:46
static char * topic_complete_name(const char *word)
Definition stasis.c:2602
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
static struct test_val a
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition utils.c:2333

References a, ao2_lock, ao2_ref, ao2_unlock, ast_cli(), ast_format_duration_hh_mm_ss(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, stasis_topic::creationtime, stasis_topic::detail, stasis_topic::name, NULL, stasis_topic_get(), stasis_topic::subscribers, stasis_subscription::topic, topic_complete_name(), stasis_subscription::uniqueid, stasis_topic::upstream_topics, and ast_cli_entry::usage.

◆ stasis_show_topics()

static char * stasis_show_topics ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2545 of file stasis.c.

2546{
2547 struct ao2_iterator iter;
2548 struct topic_proxy *topic;
2549 struct ao2_container *tmp_container;
2550 int count = 0;
2551#define FMT_HEADERS "%-64s %-64s\n"
2552#define FMT_FIELDS "%-64s %-64s\n"
2553
2554 switch (cmd) {
2555 case CLI_INIT:
2556 e->command = "stasis show topics";
2557 e->usage =
2558 "Usage: stasis show topics\n"
2559 " Shows a list of topics\n";
2560 return NULL;
2561 case CLI_GENERATE:
2562 return NULL;
2563 }
2564
2565 if (a->argc != e->args) {
2566 return CLI_SHOWUSAGE;
2567 }
2568
2569 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2570
2572 topic_proxy_sort_fn, NULL);
2573
2574 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, 0)) {
2575 ao2_cleanup(tmp_container);
2576
2577 return NULL;
2578 }
2579
2580 /* getting all topic in order */
2581 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2582 while ((topic = ao2_iterator_next(&iter))) {
2583 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2584 ao2_ref(topic, -1);
2585 ++count;
2586 }
2587 ao2_iterator_destroy(&iter);
2588 ao2_cleanup(tmp_container);
2589
2590 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2591
2592#undef FMT_HEADERS
2593#undef FMT_FIELDS
2594
2595 return CLI_SUCCESS;
2596}
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
@ AO2_ITERATOR_UNLINK
Definition astobj2.h:1863
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define FMT_HEADERS
#define FMT_FIELDS
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
int args
This gets set in ast_cli_register()
Definition cli.h:185

References a, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, topic_proxy::detail, FMT_FIELDS, FMT_HEADERS, topic_proxy::name, NULL, topic_all, and ast_cli_entry::usage.

◆ stasis_subscription_accept_formatters()

void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1160 of file stasis.c.

1162{
1163 ast_assert(subscription != NULL);
1164
1165 ao2_lock(subscription->topic);
1166 subscription->accepted_formatters = formatters;
1167 ao2_unlock(subscription->topic);
1168
1169 return;
1170}

References stasis_subscription::accepted_formatters, ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

◆ stasis_subscription_accept_message_type()

int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1090 of file stasis.c.

1092{
1093 if (!subscription) {
1094 return -1;
1095 }
1096
1097 ast_assert(type != NULL);
1099
1101 /* Filtering is unreliable as this message type is not yet initialized
1102 * so force all messages through.
1103 */
1105 return 0;
1106 }
1107
1108 ao2_lock(subscription->topic);
1110 /* We do this for the same reason as above. The subscription can still operate, so allow
1111 * it to do so by forcing all messages through.
1112 */
1114 }
1115 ao2_unlock(subscription->topic);
1116
1117 return 0;
1118}
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition stasis.h:296
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, stasis_subscription::filter, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, stasis_subscription::topic, and type.

Referenced by acl_change_stasis_subscribe(), acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), common_config_load(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_pbx(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_cb_noop()

void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 876 of file stasis.c.

877{
878}

Referenced by build_peer(), and mkintf().

◆ stasis_subscription_decline_message_type()

int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1120 of file stasis.c.

1122{
1123 if (!subscription) {
1124 return -1;
1125 }
1126
1127 ast_assert(type != NULL);
1129
1131 return 0;
1132 }
1133
1134 ao2_lock(subscription->topic);
1136 /* The memory is already allocated so this can't fail */
1138 }
1139 ao2_unlock(subscription->topic);
1140
1141 return 0;
1142}

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), stasis_subscription::topic, and type.

Referenced by AST_TEST_DEFINE().

◆ stasis_subscription_final_message()

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1241 of file stasis.c.

1242{
1243 struct stasis_subscription_change *change;
1244
1246 return 0;
1247 }
1248
1249 change = stasis_message_data(msg);
1250 if (strcmp("Unsubscribe", change->description)) {
1251 return 0;
1252 }
1253
1254 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1255 return 0;
1256 }
1257
1258 return 1;
1259}
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition stasis.c:1236

References stasis_subscription_change::description, stasis_message_data(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), sub, and stasis_subscription_change::uniqueid.

Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), generic_agent_devstate_cb(), message_sink_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().

◆ stasis_subscription_is_done()

int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1185 of file stasis.c.

1186{
1187 if (subscription) {
1188 int ret;
1189
1190 ao2_lock(subscription);
1191 ret = subscription->final_message_rxed;
1192 ao2_unlock(subscription);
1193
1194 return ret;
1195 }
1196
1197 /* Null subscription is about as done as you can get */
1198 return 1;
1199}

References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

◆ stasis_subscription_is_subscribed()

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1217 of file stasis.c.

1218{
1219 if (sub) {
1220 size_t i;
1221 struct stasis_topic *topic = sub->topic;
1222
1223 ao2_lock(topic);
1224 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1225 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1226 ao2_unlock(topic);
1227 return 1;
1228 }
1229 }
1230 ao2_unlock(topic);
1231 }
1232
1233 return 0;
1234}

References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, stasis_topic::subscribers, and stasis_subscription::topic.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ stasis_subscription_join()

void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1172 of file stasis.c.

1173{
1174 if (subscription) {
1175 ao2_lock(subscription);
1176 /* Wait until the processed flag has been set */
1177 while (!subscription->final_message_processed) {
1178 ast_cond_wait(&subscription->join_cond,
1179 ao2_object_get_lockaddr(subscription));
1180 }
1181 ao2_unlock(subscription);
1182 }
1183}
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
int final_message_processed
Definition stasis.c:766

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

◆ stasis_subscription_set_congestion_limits()

int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1078 of file stasis.c.

1080{
1081 int res = -1;
1082
1083 if (subscription) {
1084 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1085 low_water, high_water);
1086 }
1087 return res;
1088}
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

◆ stasis_subscription_set_filter()

int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1144 of file stasis.c.

1146{
1147 if (!subscription) {
1148 return -1;
1149 }
1150
1151 ao2_lock(subscription->topic);
1152 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1153 subscription->filter = filter;
1154 }
1155 ao2_unlock(subscription->topic);
1156
1157 return 0;
1158}
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)

References ao2_lock, ao2_unlock, filter(), stasis_subscription::filter, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_module(), load_pbx(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_uniqueid()

const char * stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1236 of file stasis.c.

1237{
1238 return sub->uniqueid;
1239}

References sub, and stasis_subscription::uniqueid.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().

◆ stasis_topic_create()

struct stasis_topic * stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
Return values
NULLon error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of
<subsystem>:<functionality>[/<object>] 

Definition at line 684 of file stasis.c.

685{
687}
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition stasis.c:635

References name, and stasis_topic_create_with_detail().

Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), load_module(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_topic_create_with_detail()

struct stasis_topic * stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
Return values
NULLon error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 635 of file stasis.c.

638{
639 struct stasis_topic *topic;
640 int res = 0;
641
642 if (!name|| !strlen(name) || !detail) {
643 return NULL;
644 }
645 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
646
647 topic = stasis_topic_get(name);
648 if (topic) {
649 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
650 name, detail);
651 return topic;
652 }
653
654 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
655 if (!topic) {
656 return NULL;
657 }
658
660 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
661 if (res) {
662 ao2_ref(topic, -1);
663 return NULL;
664 }
665
666 /* link to the proxy */
667 if (link_topic_proxy(topic, name, detail)) {
668 ao2_ref(topic, -1);
669 return NULL;
670 }
671
672#ifdef AST_DEVMODE
673 topic->statistics = stasis_topic_statistics_create(topic);
674 if (!topic->statistics) {
675 ao2_ref(topic, -1);
676 return NULL;
677 }
678#endif
679 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
680
681 return topic;
682}
#define INITIAL_SUBSCRIBERS_MAX
Definition stasis.c:368
static void topic_dtor(void *obj)
Definition stasis.c:500
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition stasis.c:567

References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, stasis_topic::detail, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), name, stasis_topic::name, NULL, stasis_topic_get(), stasis_topic::subscribers, topic_dtor(), and stasis_topic::upstream_topics.

Referenced by stasis_topic_create().

◆ stasis_topic_detail()

const char * stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
Return values
NULLif topic is NULL.
Since
12

Definition at line 702 of file stasis.c.

703{
704 if (!topic) {
705 return NULL;
706 }
707 return topic->detail;
708}

References stasis_topic::detail, and NULL.

◆ stasis_topic_get()

struct stasis_topic * stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
Return values
NULLon error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 689 of file stasis.c.

690{
692}
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748

References ao2_weakproxy_find, name, OBJ_SEARCH_KEY, and topic_all.

Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().

◆ stasis_topic_name()

const char * stasis_topic_name ( const struct stasis_topic topic)

◆ stasis_topic_pool_create()

struct stasis_topic_pool * stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
Return values
NULLon failure

Definition at line 1918 of file stasis.c.

1919{
1920 struct stasis_topic_pool *pool;
1921
1923 if (!pool) {
1924 return NULL;
1925 }
1926
1929 if (!pool->pool_container) {
1930 ao2_cleanup(pool);
1931 return NULL;
1932 }
1933
1934#ifdef AO2_DEBUG
1935 {
1936 char *container_name =
1937 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1938 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1939 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1940 }
1941#endif
1942
1943 ao2_ref(pooled_topic, +1);
1944 pool->pool_topic = pooled_topic;
1945
1946 return pool;
1947}
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
static void topic_pool_dtor(void *obj)
Definition stasis.c:1829
#define TOPIC_POOL_BUCKETS
Definition stasis.c:371
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition stasis.c:1869
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition stasis.c:1848
struct ao2_container * pool_container
Definition stasis.c:1825
struct stasis_topic * pool_topic
Definition stasis.c:1826

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().

◆ stasis_topic_pool_delete_topic()

void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of
[<pool_topic_name>/]<topic_name> 
Since
13.24
15.6
16.1

Definition at line 1949 of file stasis.c.

1950{
1951 /*
1952 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1953 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1954 * name and search only on <topic_name>.
1955 */
1956 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1957 int pool_topic_name_len = strlen(pool_topic_name);
1958 const char *search_topic_name;
1959
1960 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1961 search_topic_name = topic_name + pool_topic_name_len + 1;
1962 } else {
1963 search_topic_name = topic_name;
1964 }
1965
1966 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1967}
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_UNLINK
Definition astobj2.h:1039

References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by bridge_topics_destroy().

◆ stasis_topic_pool_get_topic()

struct stasis_topic * stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Get a topic from the pool for the given name.

Find or create a topic in the pool.

This returns a borrowed reference: the pool container owns the topic and callers MUST NOT ao2_cleanup() the returned pointer.

To avoid both deadlocks and wasted work we use a per-name "in-flight" topic_pool_entry while a topic is being created:

  • The pool container lock is held only while looking up or inserting the topic_pool_entry for a name.
  • Exactly one thread becomes the creator for a given name and is responsible for allocating the topic and wiring up forwarding.
  • Other threads that race for the same name find the in-flight entry and wait on its condition variable until initialization completes.

Definition at line 1985 of file stasis.c.

1986{
1987 /*
1988 * Lock ordering:
1989 *
1990 * pool->pool_container (AO2 lock)
1991 * → entry->init_lock
1992 * → topic locks (inside stasis_topic_create() /
1993 * stasis_forward_all())
1994 *
1995 * We intentionally do NOT hold the pool container lock while calling
1996 * stasis_topic_create() or stasis_forward_all() to avoid deadlocks with
1997 * other code that may take topic locks first and then need the pool lock.
1998 */
1999 RAII_VAR(struct topic_pool_entry *, entry, NULL, ao2_cleanup);
2000 char *fq = NULL;
2001 int creator = 0;
2002 int ret;
2003
2004 if (!pool || ast_strlen_zero(topic_name)) {
2005 return NULL;
2006 }
2007
2008 /* Creator / waiter split:
2009 *
2010 * - The first thread to create/link an entry for topic_name becomes the
2011 * "creator" and is responsible for creating the underlying stasis
2012 * topic and wiring up forwarding.
2013 *
2014 * - Other threads that find the entry become "waiters"; they block on
2015 * entry->init_cond until either initialization succeeds or fails.
2016 */
2017
2018 /* --- Creator selection under pool container lock --- */
2019 ao2_lock(pool->pool_container);
2020
2021 entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2022 if (!entry) {
2023 entry = topic_pool_entry_alloc(topic_name);
2024 if (!entry) {
2026 return NULL;
2027 }
2028
2029 if (!ao2_link_flags(pool->pool_container, entry, OBJ_NOLOCK)) {
2030 struct topic_pool_entry *other;
2031
2032 other = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
2033 if (other) {
2034 struct topic_pool_entry *tmp = entry;
2035
2036 entry = other;
2037 creator = 0;
2039 ao2_ref(tmp, -1);
2040 goto waiter_path;
2041 }
2042
2044 return NULL;
2045 }
2046
2047 creator = 1;
2048 }
2049
2051
2052/* --- Waiter path: wait for creator to finish --- */
2053waiter_path:
2054 if (!creator) {
2055 ast_mutex_lock(&entry->init_lock);
2056 while (!entry->initialized && !entry->failed) {
2057 ast_cond_wait(&entry->init_cond, &entry->init_lock);
2058 }
2059
2060 if (entry->initialized && !entry->failed) {
2061 struct stasis_topic *topic = entry->topic;
2062
2063 if (!topic) {
2064 ast_debug(1, "Pooled topic '%s' marked initialized but topic is NULL\n", entry->name);
2065 ast_mutex_unlock(&entry->init_lock);
2066 return NULL;
2067 }
2068 ast_mutex_unlock(&entry->init_lock);
2069 /* Borrowed reference: container owns the topic */
2070 return topic;
2071 }
2072
2073 ast_mutex_unlock(&entry->init_lock);
2074 return NULL;
2075 }
2076
2077 /* --- Creator path: perform topic creation without pool lock --- */
2078 ast_mutex_lock(&entry->init_lock);
2079 /* Defensive: entry may have been initialized/failed before we acquired init_lock. */
2080 if (entry->initialized || entry->failed) {
2081 struct stasis_topic *topic = entry->initialized ? entry->topic : NULL;
2082 ast_mutex_unlock(&entry->init_lock);
2083 return topic;
2084 }
2085
2086 ret = ast_asprintf(&fq, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
2087 if (ret < 0) {
2088 entry->failed = 1;
2089 goto creator_fail;
2090 }
2091
2092 entry->topic = stasis_topic_create(fq);
2093 ast_free(fq);
2094 fq = NULL;
2095
2096 if (!entry->topic) {
2097 entry->failed = 1;
2098 goto creator_fail;
2099 }
2100
2101 entry->forward = stasis_forward_all(entry->topic, pool->pool_topic);
2102 if (!entry->forward) {
2103 ao2_cleanup(entry->topic);
2104 entry->topic = NULL;
2105 entry->failed = 1;
2106 goto creator_fail;
2107 }
2108
2109 entry->initialized = 1;
2110 ast_cond_broadcast(&entry->init_cond);
2111 ast_mutex_unlock(&entry->init_lock);
2112
2113 return entry->topic; /* borrowed ref */
2114
2115creator_fail:
2116 ast_debug(1, "Failed to create pooled stasis topic '%s/%s'\n", stasis_topic_name(pool->pool_topic), entry->name);
2117 ast_cond_broadcast(&entry->init_cond);
2118 ast_mutex_unlock(&entry->init_lock);
2119
2120 /* Remove failed entry so future callers can retry */
2121 ao2_lock(pool->pool_container);
2122 ao2_unlink(pool->pool_container, entry);
2124
2125 return NULL;
2126}
#define ast_cond_broadcast(cond)
Definition lock.h:211
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition stasis.c:1809
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1645
Definition stasis.c:1776

References ao2_cleanup, ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, ast_asprintf, ast_cond_broadcast, ast_cond_wait, ast_debug, ast_free, ast_mutex_lock, ast_mutex_unlock, ast_strlen_zero(), topic_pool_entry::failed, topic_pool_entry::forward, topic_pool_entry::init_cond, topic_pool_entry::init_lock, topic_pool_entry::initialized, topic_pool_entry::name, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().

Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().

◆ stasis_topic_pool_topic_exists()

int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 2128 of file stasis.c.

2129{
2131
2133 if (!topic_pool_entry) {
2134 return 0;
2135 }
2136
2138 return 1;
2139}

References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.

Referenced by ast_bridge_topic_exists(), and ast_publish_device_state_full().

◆ stasis_topic_subscribers()

size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 710 of file stasis.c.

711{
712 return AST_VECTOR_SIZE(&topic->subscribers);
713}

References AST_VECTOR_SIZE, and stasis_topic::subscribers.

Referenced by caching_topic_exec(), and publish_msg().

◆ stasis_unsubscribe()

struct stasis_subscription * stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1038 of file stasis.c.

1039{
1040 /* The subscription may be the last ref to this topic. Hold
1041 * the topic ref open until after the unlock. */
1042 struct stasis_topic *topic;
1043
1044 if (!sub) {
1045 return NULL;
1046 }
1047
1048 topic = ao2_bump(sub->topic);
1049
1050 /* We have to remove the subscription first, to ensure the unsubscribe
1051 * is the final message */
1052 if (topic_remove_subscription(sub->topic, sub) != 0) {
1054 "Internal error: subscription has invalid topic\n");
1055 ao2_cleanup(topic);
1056
1057 return NULL;
1058 }
1059
1060 /* Now let everyone know about the unsubscribe */
1062
1063 /* When all that's done, remove the ref the mailbox has on the sub */
1064 if (sub->mailbox) {
1066 /* Nothing we can do here, the conditional is just to keep
1067 * the compiler happy that we're not ignoring the result. */
1068 }
1069 }
1070
1071 /* Unsubscribing unrefs the subscription */
1073 ao2_cleanup(topic);
1074
1075 return NULL;
1076}
static int sub_cleanup(void *data)
Definition stasis.c:1031
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition stasis.c:1743
#define ast_taskprocessor_push(tps, task_exe, datap)

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push, LOG_ERROR, stasis_subscription::mailbox, NULL, send_subscription_unsubscribe(), sub, sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), cc_generic_agent_destructor(), common_config_unload(), destroy_cts(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().

◆ stasis_unsubscribe_and_join()

struct stasis_subscription * stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1201 of file stasis.c.

1203{
1204 if (!subscription) {
1205 return NULL;
1206 }
1207
1208 /* Bump refcount to hold it past the unsubscribe */
1209 ao2_ref(subscription, +1);
1210 stasis_unsubscribe(subscription);
1211 stasis_subscription_join(subscription);
1212 /* Now decrement the refcount back */
1213 ao2_cleanup(subscription);
1214 return NULL;
1215}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition stasis.c:1038
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition stasis.c:1172

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), and unload_pbx().

◆ sub_cleanup()

static int sub_cleanup ( void *  data)
static

Definition at line 1031 of file stasis.c.

1032{
1033 struct stasis_subscription *sub = data;
1035 return 0;
1036}

References ao2_cleanup, stasis_subscription::data, and sub.

Referenced by stasis_unsubscribe().

◆ subscription_change_alloc()

static struct stasis_subscription_change * subscription_change_alloc ( struct stasis_topic topic,
const char *  uniqueid,
const char *  description 
)
static

Definition at line 1694 of file stasis.c.

1695{
1696 size_t description_len = strlen(description) + 1;
1697 size_t uniqueid_len = strlen(uniqueid) + 1;
1698 struct stasis_subscription_change *change;
1699
1700 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1702 if (!change) {
1703 return NULL;
1704 }
1705
1706 strcpy(change->description, description); /* SAFE */
1707 change->uniqueid = change->description + description_len;
1708 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1709 ao2_ref(topic, +1);
1710 change->topic = topic;
1711
1712 return change;
1713}
static void subscription_change_dtor(void *obj)
Definition stasis.c:1687
struct stasis_topic * topic
Definition stasis.h:891

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_ref, ast_copy_string(), stasis_subscription_change::description, NULL, subscription_change_dtor(), stasis_subscription_change::topic, and stasis_subscription_change::uniqueid.

Referenced by send_subscription_subscribe(), and send_subscription_unsubscribe().

◆ subscription_change_dtor()

static void subscription_change_dtor ( void *  obj)
static

Definition at line 1687 of file stasis.c.

1688{
1689 struct stasis_subscription_change *change = obj;
1690
1691 ao2_cleanup(change->topic);
1692}

References ao2_cleanup, and stasis_subscription_change::topic.

Referenced by subscription_change_alloc().

◆ subscription_dtor()

static void subscription_dtor ( void *  obj)
static

Definition at line 781 of file stasis.c.

782{
783 struct stasis_subscription *sub = obj;
784#ifdef AST_DEVMODE
785 struct ao2_container *subscription_stats;
786#endif
787
788 /* Subscriptions need to be manually unsubscribed before destruction
789 * b/c there's a cyclic reference between topics and subscriptions */
791 /* If there are any messages in flight to this subscription; that would
792 * be bad. */
794
797 sub->topic = NULL;
799 sub->mailbox = NULL;
801
803
804#ifdef AST_DEVMODE
805 if (sub->statistics) {
806 subscription_stats = ao2_global_obj_ref(subscription_statistics);
807 if (subscription_stats) {
808 ao2_unlink(subscription_stats, sub->statistics);
809 ao2_ref(subscription_stats, -1);
810 }
811 ao2_ref(sub->statistics, -1);
812 }
813#endif
814}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition stasis.c:1185
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References stasis_subscription::accepted_message_types, ao2_cleanup, ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_cond_destroy, ast_free, ast_taskprocessor_unreference(), AST_VECTOR_FREE, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), sub, stasis_subscription::topic, and stasis_subscription::uniqueid.

Referenced by internal_stasis_subscribe().

◆ subscription_invoke()

static void subscription_invoke ( struct stasis_subscription sub,
struct stasis_message message 
)
static

Invoke the subscription's callback.

Parameters
subSubscription to invoke.
messageMessage to send.

Definition at line 821 of file stasis.c.

823{
824 unsigned int final = stasis_subscription_final_message(sub, message);
826#ifdef AST_DEVMODE
827 struct timeval start;
828 long elapsed;
829
830 start = ast_tvnow();
831#endif
832
833 /* Notify that the final message has been received */
834 if (final) {
835 ao2_lock(sub);
839 }
840
841 /*
842 * If filtering is turned on and this is a 'final' message, we only invoke the callback
843 * if the subscriber accepts subscription_change message types.
844 */
847 /* Since sub is mostly immutable, no need to lock sub */
849 }
850
851 /* Notify that the final message has been processed */
852 if (final) {
853 ao2_lock(sub);
857 }
858
859#ifdef AST_DEVMODE
860 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
861 if (elapsed > sub->statistics->highest_time_invoked) {
862 sub->statistics->highest_time_invoked = elapsed;
863 ao2_lock(sub->statistics);
864 sub->statistics->highest_time_message_type = stasis_message_type(message);
865 ao2_unlock(sub->statistics);
866 }
867 if (elapsed < sub->statistics->lowest_time_invoked) {
868 sub->statistics->lowest_time_invoked = elapsed;
869 }
870#endif
871}

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_cond_signal, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::final_message_processed, stasis_subscription::final_message_rxed, stasis_subscription::join_cond, message_type_id, stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), statistics(), and sub.

Referenced by dispatch_exec_async(), dispatch_exec_sync(), and dispatch_message().

◆ topic_add_subscription()

static int topic_add_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Add a subscriber to a topic.

Parameters
topicTopic
subSubscriber
Returns
0 on success
Non-zero on error

Definition at line 1268 of file stasis.c.

1269{
1270 size_t idx;
1271
1272 ao2_lock(topic);
1273 /* The reference from the topic to the subscription is shared with
1274 * the owner of the subscription, which will explicitly unsubscribe
1275 * to release it.
1276 *
1277 * If we bumped the refcount here, the owner would have to unsubscribe
1278 * and cleanup, which is a bit awkward. */
1280
1281 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1283 AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1284 }
1285
1286#ifdef AST_DEVMODE
1288 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1289#endif
1290
1291 ao2_unlock(topic);
1292
1293 return 0;
1294}

References ao2_lock, ao2_unlock, ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), sub, stasis_topic::subscribers, stasis_subscription_change::topic, topic_add_subscription(), and stasis_topic::upstream_topics.

Referenced by internal_stasis_subscribe(), stasis_forward_all(), and topic_add_subscription().

◆ topic_complete_name()

static char * topic_complete_name ( const char *  word)
static

Definition at line 2602 of file stasis.c.

2603{
2604 struct topic_proxy *topic;
2605 struct ao2_iterator it;
2606 int wordlen = strlen(word);
2607 int ret;
2608
2610 while ((topic = ao2_iterator_next(&it))) {
2611 if (!strncasecmp(word, topic->name, wordlen)) {
2612 ret = ast_cli_completion_add(ast_strdup(topic->name));
2613 if (ret) {
2614 ao2_ref(topic, -1);
2615 break;
2616 }
2617 }
2618 ao2_ref(topic, -1);
2619 }
2621 return NULL;
2622}
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition main/cli.c:2845
short word

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_completion_add(), ast_strdup, topic_proxy::name, NULL, and topic_all.

Referenced by stasis_show_topic().

◆ topic_dtor()

static void topic_dtor ( void *  obj)
static

Definition at line 500 of file stasis.c.

501{
502 struct stasis_topic *topic = obj;
503#ifdef AST_DEVMODE
504 struct ao2_container *topic_stats;
505#endif
506
507 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
508 topic->name, topic->detail);
509
510 /* Subscribers hold a reference to topics, so they should all be
511 * unsubscribed before we get here. */
513
516 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
517
518#ifdef AST_DEVMODE
519 if (topic->statistics) {
520 topic_stats = ao2_global_obj_ref(topic_statistics);
521 if (topic_stats) {
522 ao2_unlink(topic_stats, topic->statistics);
523 ao2_ref(topic_stats, -1);
524 }
525 ao2_ref(topic->statistics, -1);
526 }
527#endif
528}

References ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_debug, AST_VECTOR_FREE, AST_VECTOR_SIZE, stasis_topic::detail, stasis_topic::name, stasis_topic::subscribers, and stasis_topic::upstream_topics.

Referenced by stasis_topic_create_with_detail().

◆ topic_pool_dtor()

static void topic_pool_dtor ( void *  obj)
static

Definition at line 1829 of file stasis.c.

1830{
1831 struct stasis_topic_pool *pool = obj;
1832
1833#ifdef AO2_DEBUG
1834 {
1835 char *container_name =
1836 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1837 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1838 ao2_container_unregister(container_name);
1839 }
1840#endif
1841
1843 pool->pool_container = NULL;
1844 ao2_cleanup(pool->pool_topic);
1845 pool->pool_topic = NULL;
1846}
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.

References ao2_cleanup, ao2_container_unregister(), ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by stasis_topic_pool_create().

◆ topic_pool_entry_alloc()

static struct topic_pool_entry * topic_pool_entry_alloc ( const char *  topic_name)
static

Definition at line 1809 of file stasis.c.

1810{
1812
1813 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1815 if (!topic_pool_entry) {
1816 return NULL;
1817 }
1820 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1821 return topic_pool_entry;
1822}
static void topic_pool_entry_dtor(void *obj)
Definition stasis.c:1798
ast_cond_t init_cond
Definition stasis.c:1792
ast_mutex_t init_lock
Definition stasis.c:1791
char name[0]
Definition stasis.c:1795

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ast_cond_init, ast_mutex_init, topic_pool_entry::init_cond, topic_pool_entry::init_lock, topic_pool_entry::name, NULL, and topic_pool_entry_dtor().

Referenced by stasis_topic_pool_get_topic().

◆ topic_pool_entry_cmp()

static int topic_pool_entry_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 1869 of file stasis.c.

1870{
1871 const struct topic_pool_entry *object_left = obj;
1872 const struct topic_pool_entry *object_right = arg;
1873 const char *right_key = arg;
1874 int cmp;
1875
1876 switch (flags & OBJ_SEARCH_MASK) {
1877 case OBJ_SEARCH_OBJECT:
1878 right_key = object_right->name;
1879 /* Fall through */
1880 case OBJ_SEARCH_KEY:
1881 cmp = strcasecmp(object_left->name, right_key);
1882 break;
1884 /* Not supported by container */
1885 ast_assert(0);
1886 cmp = -1;
1887 break;
1888 default:
1889 /*
1890 * What arg points to is specific to this traversal callback
1891 * and has no special meaning to astobj2.
1892 */
1893 cmp = 0;
1894 break;
1895 }
1896 if (cmp) {
1897 return 0;
1898 }
1899 /*
1900 * At this point the traversal callback is identical to a sorted
1901 * container.
1902 */
1903 return CMP_MATCH;
1904}
@ CMP_MATCH
Definition astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition astobj2.h:1072

References ast_assert, CMP_MATCH, topic_pool_entry::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by stasis_topic_pool_create().

◆ topic_pool_entry_dtor()

static void topic_pool_entry_dtor ( void *  obj)
static

Definition at line 1798 of file stasis.c.

1799{
1800 struct topic_pool_entry *entry = obj;
1801
1802 entry->forward = stasis_forward_cancel(entry->forward);
1803 ao2_cleanup(entry->topic);
1804 entry->topic = NULL;
1805 ast_cond_destroy(&entry->init_cond);
1807}
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition stasis.c:1615
struct stasis_topic * topic
Definition stasis.c:1778
struct stasis_forward * forward
Definition stasis.c:1777

References ao2_cleanup, ast_cond_destroy, ast_mutex_destroy, topic_pool_entry::forward, topic_pool_entry::init_cond, topic_pool_entry::init_lock, NULL, stasis_forward_cancel(), and topic_pool_entry::topic.

Referenced by topic_pool_entry_alloc().

◆ topic_pool_entry_hash()

static int topic_pool_entry_hash ( const void *  obj,
const int  flags 
)
static

Definition at line 1848 of file stasis.c.

1849{
1850 const struct topic_pool_entry *object;
1851 const char *key;
1852
1853 switch (flags & OBJ_SEARCH_MASK) {
1854 case OBJ_SEARCH_KEY:
1855 key = obj;
1856 break;
1857 case OBJ_SEARCH_OBJECT:
1858 object = obj;
1859 key = object->name;
1860 break;
1861 default:
1862 /* Hash can only work on something with a full key. */
1863 ast_assert(0);
1864 return 0;
1865 }
1866 return ast_str_case_hash(key);
1867}
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition strings.h:1303

References ast_assert, ast_str_case_hash(), topic_pool_entry::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by stasis_topic_pool_create().

◆ topic_remove_subscription()

static int topic_remove_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1296 of file stasis.c.

1297{
1298 size_t idx;
1299 int res;
1300
1301 ao2_lock(topic);
1302 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1304 AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1305 }
1308
1309#ifdef AST_DEVMODE
1310 if (!res) {
1312 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1313 }
1314#endif
1315
1316 ao2_unlock(topic);
1317
1318 return res;
1319}
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition strings.c:221

References ao2_lock, ao2_unlock, ast_str_container_remove(), AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), sub, stasis_topic::subscribers, stasis_subscription_change::topic, topic_remove_subscription(), and stasis_topic::upstream_topics.

Referenced by stasis_forward_cancel(), stasis_unsubscribe(), and topic_remove_subscription().

◆ userevent_exclusion_cb()

static int userevent_exclusion_cb ( const char *  key)
static

Definition at line 2348 of file stasis.c.

2349{
2350 if (!strcmp("eventname", key)) {
2351 return 1;
2352 }
2353 return 0;
2354}

Referenced by multi_user_event_to_ami().

Variable Documentation

◆ cli_stasis

struct ast_cli_entry cli_stasis[]
static
Initial value:
= {
{ .handler = stasis_show_topics , .summary = "Show all topics" ,},
{ .handler = stasis_show_topic , .summary = "Show topic" ,},
}
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition stasis.c:2545
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition stasis.c:2628

Definition at line 2687 of file stasis.c.

2687 {
2688 AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2689 AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2690};
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197

Referenced by stasis_cleanup(), and stasis_init().

◆ declined_option

struct aco_type declined_option
static

An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type.

Definition at line 2424 of file stasis.c.

2424 {
2425 .type = ACO_GLOBAL,
2426 .name = "declined_message_types",
2427 .item_offset = offsetof(struct stasis_config, declined_message_types),
2428 .category_match = ACO_WHITELIST_EXACT,
2429 .category = "declined_message_types",
2430};
@ ACO_GLOBAL
@ ACO_WHITELIST_EXACT

Referenced by stasis_init().

◆ declined_options

struct aco_type* declined_options[] = ACO_TYPES(&declined_option)

Definition at line 2432 of file stasis.c.

Referenced by stasis_init().

◆ stasis_conf

struct aco_file stasis_conf
Initial value:
= {
.filename = "stasis.conf",
}
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static struct aco_type threadpool_option
Definition stasis.c:2405

Definition at line 2434 of file stasis.c.

2434 {
2435 .filename = "stasis.conf",
2437};

◆ taskpool

struct ast_taskpool* taskpool
static

Taskpool for topics that don't want a dedicated taskprocessor

Definition at line 374 of file stasis.c.

Referenced by internal_stasis_subscribe(), stasis_cleanup(), and stasis_init().

◆ taskpool_option

struct aco_type taskpool_option
static

Definition at line 2413 of file stasis.c.

2413 {
2414 .type = ACO_GLOBAL,
2415 .name = "taskpool",
2416 .item_offset = offsetof(struct stasis_config, taskpool_options),
2417 .category = "taskpool",
2418 .category_match = ACO_WHITELIST_EXACT,
2419};

Referenced by stasis_init().

◆ taskpool_options

struct aco_type* taskpool_options[] = ACO_TYPES(&threadpool_option, &taskpool_option)
static

Definition at line 2421 of file stasis.c.

Referenced by sip_get_taskpool_options(), and stasis_init().

◆ threadpool_option

struct aco_type threadpool_option
static

Definition at line 2405 of file stasis.c.

2405 {
2406 .type = ACO_GLOBAL,
2407 .name = "threadpool",
2408 .item_offset = offsetof(struct stasis_config, taskpool_options),
2409 .category = "threadpool",
2410 .category_match = ACO_WHITELIST_EXACT,
2411};

◆ topic_all

struct ao2_container* topic_all