Asterisk - The Open Source Telephony Project GIT-master-8f1982c
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
Data Structures | Macros | Functions | Variables
stasis.c File Reference

Stasis Message Bus API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"
Include dependency graph for stasis.c:

Go to the source code of this file.

Data Structures

struct  ast_multi_object_blob
 A multi object blob data structure to carry user event stasis messages. More...
 
struct  stasis_config
 
struct  stasis_declined_config
 A structure to hold global configuration-related options. More...
 
struct  stasis_forward
 Forwarding information. More...
 
struct  stasis_subscription
 
struct  stasis_threadpool_conf
 Threadpool configuration options. More...
 
struct  stasis_topic
 
struct  stasis_topic_pool
 
struct  sync_task_data
 
struct  topic_pool_entry
 
struct  topic_proxy
 

Macros

#define FMT_FIELDS   "%-64s %-64s\n"
 
#define FMT_HEADERS   "%-64s %-64s\n"
 
#define INITIAL_SUBSCRIBERS_MAX   4
 
#define TOPIC_ALL_BUCKETS   997
 
#define topic_lock_both(topic1, topic2)
 Lock two topics. More...
 
#define TOPIC_POOL_BUCKETS   57
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a thread pool. More...
 
static AO2_GLOBAL_OBJ_STATIC (globals)
 A global object container that will contain the stasis_config that gets swapped out on reloads. More...
 
 AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_CMP_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_HASH_FN (topic_proxy, name)
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object (snapshot) to the blob. More...
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis user event multi object blob. More...
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Publish single channel user event (for app_userevent compatibility) More...
 
 CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
 Register information about the configs being processed by this module. More...
 
static int declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int dispatch_exec_async (struct ast_taskprocessor_local *local)
 
static int dispatch_exec_sync (struct ast_taskprocessor_local *local)
 
static unsigned int dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
 
static void forward_dtor (void *obj)
 
struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
static int link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail)
 
static void multi_object_blob_dtor (void *obj)
 
static struct ast_strmulti_object_blob_to_ami (void *obj)
 
static struct ast_manager_event_blobmulti_user_event_to_ami (struct stasis_message *message)
 
static struct ast_jsonmulti_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 
static void proxy_dtor (void *weakproxy, void *container)
 
static void publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
 
static void send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void stasis_cleanup (void)
 Cleanup function for graceful shutdowns. More...
 
static void * stasis_config_alloc (void)
 
static void stasis_config_destructor (void *obj)
 
static void stasis_declined_config_destructor (void *obj)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another. More...
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem. More...
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type)
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers. More...
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More...
 
static char * stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters. More...
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type. More...
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing. More...
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type. More...
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription. More...
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message. More...
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed. More...
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription. More...
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription. More...
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription. More...
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription. More...
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic. More...
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail. More...
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic. More...
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name. More...
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic. More...
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic. More...
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool. More...
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool. More...
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool. More...
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic. More...
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *sub)
 Cancel a subscription. More...
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed. More...
 
static int sub_cleanup (void *data)
 
static struct stasis_subscription_changesubscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description)
 
static void subscription_change_dtor (void *obj)
 
static void subscription_dtor (void *obj)
 
static void subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message)
 Invoke the subscription's callback. More...
 
static int topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 Add a subscriber to a topic. More...
 
static char * topic_complete_name (const char *word)
 
static void topic_dtor (void *obj)
 
static void topic_pool_dtor (void *obj)
 
static struct topic_pool_entrytopic_pool_entry_alloc (const char *topic_name)
 
static int topic_pool_entry_cmp (void *obj, void *arg, int flags)
 
static void topic_pool_entry_dtor (void *obj)
 
static int topic_pool_entry_hash (const void *obj, const int flags)
 
static int topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static int userevent_exclusion_cb (const char *key)
 
 STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
 Define multi user event message type(s). More...
 

Variables

static struct ast_cli_entry cli_stasis []
 
static struct aco_type declined_option
 An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type. More...
 
struct aco_typedeclined_options [] = ACO_TYPES(&declined_option)
 
struct aco_file stasis_conf
 
static struct ast_threadpoolthreadpool
 
static struct aco_type threadpool_option
 
static struct aco_typethreadpool_options [] = ACO_TYPES(&threadpool_option)
 
struct ao2_containertopic_all
 

Detailed Description

Stasis Message Bus API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis.c.

Macro Definition Documentation

◆ FMT_FIELDS

#define FMT_FIELDS   "%-64s %-64s\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-64s %-64s\n"

◆ INITIAL_SUBSCRIBERS_MAX

#define INITIAL_SUBSCRIBERS_MAX   4

Initial size of the subscribers list.

Definition at line 328 of file stasis.c.

◆ TOPIC_ALL_BUCKETS

#define TOPIC_ALL_BUCKETS   997

Definition at line 344 of file stasis.c.

◆ topic_lock_both

#define topic_lock_both (   topic1,
  topic2 
)

Lock two topics.

Definition at line 452 of file stasis.c.

◆ TOPIC_POOL_BUCKETS

#define TOPIC_POOL_BUCKETS   57

The number of buckets to use for topic pools

Definition at line 331 of file stasis.c.

Function Documentation

◆ __stasis_subscribe()

struct stasis_subscription * __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 969 of file stasis.c.

976{
977 return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
978}
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags)
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:883

References callback(), stasis_subscription::data, make_ari_stubs::file, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ __stasis_subscribe_pool()

struct stasis_subscription * __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a thread pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 980 of file stasis.c.

987{
988 return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
989}

References callback(), stasis_subscription::data, make_ari_stubs::file, internal_stasis_subscribe(), and stasis_subscription::topic.

Referenced by stasis_message_router_create_internal().

◆ AO2_GLOBAL_OBJ_STATIC()

static AO2_GLOBAL_OBJ_STATIC ( globals  )
static

A global object container that will contain the stasis_config that gets swapped out on reloads.

◆ AO2_STRING_FIELD_CASE_SORT_FN()

AO2_STRING_FIELD_CASE_SORT_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_CMP_FN()

AO2_STRING_FIELD_CMP_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_HASH_FN()

AO2_STRING_FIELD_HASH_FN ( topic_proxy  ,
name   
)

◆ CONFIG_INFO_CORE()

CONFIG_INFO_CORE ( "stasis"  ,
cfg_info  ,
globals  ,
stasis_config_alloc  ,
files = ACO_FILES(&stasis_conf) 
)

Register information about the configs being processed by this module.

◆ declined_handler()

static int declined_handler ( const struct aco_option opt,
struct ast_variable var,
void *  obj 
)
static

Definition at line 2331 of file stasis.c.

2332{
2333 struct stasis_declined_config *declined = obj;
2334
2335 if (ast_strlen_zero(var->value)) {
2336 return 0;
2337 }
2338
2339 if (ast_str_container_add(declined->declined, var->value)) {
2340 return -1;
2341 }
2342
2343 return 0;
2344}
#define var
Definition: ast_expr2f.c:605
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
A structure to hold global configuration-related options.
Definition: stasis.c:2207
struct ao2_container * declined
Definition: stasis.c:2209

References ast_str_container_add(), ast_strlen_zero(), stasis_declined_config::declined, and var.

Referenced by stasis_init().

◆ dispatch_exec_async()

static int dispatch_exec_async ( struct ast_taskprocessor_local local)
static

Definition at line 1286 of file stasis.c.

1287{
1288 struct stasis_subscription *sub = local->local_data;
1289 struct stasis_message *message = local->data;
1290
1293
1294 return 0;
1295}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
struct stasis_forward * sub
Definition: res_corosync.c:240
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:781

References ao2_cleanup, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sub, and subscription_invoke().

Referenced by dispatch_message().

◆ dispatch_exec_sync()

static int dispatch_exec_sync ( struct ast_taskprocessor_local local)
static

Definition at line 1313 of file stasis.c.

1314{
1315 struct stasis_subscription *sub = local->local_data;
1316 struct sync_task_data *std = local->data;
1317 struct stasis_message *message = std->task_data;
1318
1321
1322 ast_mutex_lock(&std->lock);
1323 std->complete = 1;
1324 ast_cond_signal(&std->cond);
1325 ast_mutex_unlock(&std->lock);
1326
1327 return 0;
1328}
#define ast_mutex_unlock(a)
Definition: lock.h:197
#define ast_mutex_lock(a)
Definition: lock.h:196
#define ast_cond_signal(cond)
Definition: lock.h:210
ast_cond_t cond
Definition: stasis.c:1303
void * task_data
Definition: stasis.c:1305
ast_mutex_t lock
Definition: stasis.c:1302

References ao2_cleanup, ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, sync_task_data::complete, sync_task_data::cond, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sync_task_data::lock, sub, subscription_invoke(), and sync_task_data::task_data.

Referenced by dispatch_message().

◆ dispatch_message()

static unsigned int dispatch_message ( struct stasis_subscription sub,
struct stasis_message message,
int  synchronous 
)
static

Definition at line 1339 of file stasis.c.

1342{
1344
1345 /*
1346 * The 'do while' gives us an easy way to skip remaining logic once
1347 * we determine the message should be accepted.
1348 * The code looks more verbose than it needs to be but it optimizes
1349 * down very nicely. It's just easier to understand and debug this way.
1350 */
1351 do {
1352 struct stasis_message_type *message_type = stasis_message_type(message);
1353 int type_id = stasis_message_type_id(message_type);
1354 int type_filter_specified = 0;
1355 int formatter_filter_specified = 0;
1356 int type_filter_passed = 0;
1357 int formatter_filter_passed = 0;
1358
1359 /* We always accept final messages so only run the filter logic if not final */
1360 if (is_final) {
1361 break;
1362 }
1363
1364 type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1365 formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1366
1367 /* Accept if no filters of either type were specified */
1368 if (!type_filter_specified && !formatter_filter_specified) {
1369 break;
1370 }
1371
1372 type_filter_passed = type_filter_specified
1373 && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1374 && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1375
1376 /*
1377 * Since the type and formatter filters are OR'd, we can skip
1378 * the formatter check if the type check passes.
1379 */
1380 if (type_filter_passed) {
1381 break;
1382 }
1383
1384 formatter_filter_passed = formatter_filter_specified
1385 && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1386
1387 if (formatter_filter_passed) {
1388 break;
1389 }
1390
1391#ifdef AST_DEVMODE
1392 ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1393#endif
1394
1395 return 0;
1396
1397 } while (0);
1398
1399#ifdef AST_DEVMODE
1400 ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1401#endif
1402
1403 if (!sub->mailbox) {
1404 /* Dispatch directly */
1406 return 1;
1407 }
1408
1409 /* Bump the message for the taskprocessor push. This will get de-ref'd
1410 * by the task processor callback.
1411 */
1413 if (!synchronous) {
1415 /* Push failed; ugh. */
1416 ast_log(LOG_ERROR, "Dropping async dispatch\n");
1418 return 0;
1419 }
1420 } else {
1421 struct sync_task_data std;
1422
1423 ast_mutex_init(&std.lock);
1424 ast_cond_init(&std.cond, NULL);
1425 std.complete = 0;
1426 std.task_data = message;
1427
1429 /* Push failed; ugh. */
1430 ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1432 ast_mutex_destroy(&std.lock);
1433 ast_cond_destroy(&std.cond);
1434 return 0;
1435 }
1436
1437 ast_mutex_lock(&std.lock);
1438 while (!std.complete) {
1439 ast_cond_wait(&std.cond, &std.lock);
1440 }
1441 ast_mutex_unlock(&std.lock);
1442
1443 ast_mutex_destroy(&std.lock);
1444 ast_cond_destroy(&std.cond);
1445 }
1446
1447 return 1;
1448}
#define ast_log
Definition: astobj2.c:42
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define LOG_ERROR
#define ast_cond_destroy(cond)
Definition: lock.h:209
#define ast_cond_wait(cond, mutex)
Definition: lock.h:212
#define ast_cond_init(cond, attr)
Definition: lock.h:208
#define ast_mutex_init(pmutex)
Definition: lock.h:193
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:764
#define ast_mutex_destroy(a)
Definition: lock.h:195
#define NULL
Definition: resample.c:96
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1313
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1201
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1286
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ao2_bump, ao2_cleanup, ast_atomic_fetchadd_int(), ast_cond_destroy, ast_cond_init, ast_cond_wait, ast_log, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_taskprocessor_push_local(), AST_VECTOR_GET, AST_VECTOR_SIZE, sync_task_data::complete, sync_task_data::cond, dispatch_exec_async(), dispatch_exec_sync(), sync_task_data::lock, LOG_ERROR, NULL, stasis_message_type(), stasis_message_type_available_formatters(), stasis_message_type_id(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), STASIS_SUBSCRIPTION_FORMATTER_NONE, sub, subscription_invoke(), and sync_task_data::task_data.

Referenced by publish_msg(), and send_subscription_unsubscribe().

◆ forward_dtor()

static void forward_dtor ( void *  obj)
static

Definition at line 1565 of file stasis.c.

1566{
1567 struct stasis_forward *forward = obj;
1568
1569 ao2_cleanup(forward->from_topic);
1570 forward->from_topic = NULL;
1571 ao2_cleanup(forward->to_topic);
1572 forward->to_topic = NULL;
1573}
Forwarding information.
Definition: stasis.c:1558
struct stasis_topic * from_topic
Definition: stasis.c:1560
struct stasis_topic * to_topic
Definition: stasis.c:1562

References ao2_cleanup, stasis_forward::from_topic, NULL, and stasis_forward::to_topic.

Referenced by stasis_forward_all().

◆ internal_stasis_subscribe()

struct stasis_subscription * internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12

Definition at line 883 of file stasis.c.

892{
893 struct stasis_subscription *sub;
894 int ret;
895
896 if (!topic) {
897 return NULL;
898 }
899
900 /* The ao2 lock is used for join_cond. */
902 if (!sub) {
903 return NULL;
904 }
905
906#ifdef AST_DEVMODE
908 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
909 if (ret < 0 || !sub->statistics) {
910 ao2_ref(sub, -1);
911 return NULL;
912 }
913#else
915 if (ret < 0) {
916 ao2_ref(sub, -1);
917 return NULL;
918 }
919#endif
920
921 if (needs_mailbox) {
922 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
923
924 /* Create name with seq number appended. */
925 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
926 use_thread_pool ? 'p' : 'm',
928
929 /*
930 * With a small number of subscribers, a thread-per-sub is
931 * acceptable. For a large number of subscribers, a thread
932 * pool should be used.
933 */
934 if (use_thread_pool) {
935 sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
936 } else {
937 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
938 }
939 if (!sub->mailbox) {
940 ao2_ref(sub, -1);
941
942 return NULL;
943 }
945 /* Taskprocessor has a reference */
946 ao2_ref(sub, +1);
947 }
948
949 ao2_ref(topic, +1);
950 sub->topic = topic;
951 sub->callback = callback;
952 sub->data = data;
953 ast_cond_init(&sub->join_cond, NULL);
955 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
956 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
957
958 if (topic_add_subscription(topic, sub) != 0) {
959 ao2_ref(sub, -1);
960 ao2_ref(topic, -1);
961
962 return NULL;
963 }
965
966 return sub;
967}
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:654
static struct ast_threadpool * threadpool
Definition: stasis.c:334
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1228
static void subscription_dtor(void *obj)
Definition: stasis.c:741
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1675
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
struct stasis_topic * topic
Definition: stasis.c:711
int subscriber_id
Definition: stasis.c:410
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
static void statistics(void)
Definition: utils/frame.c:287
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113

References ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), ast_threadpool_serializer(), AST_VECTOR_INIT, callback(), stasis_subscription::data, make_ari_stubs::file, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_topic_name(), statistics(), sub, stasis_topic::subscriber_id, subscription_dtor(), threadpool, stasis_subscription::topic, topic_add_subscription(), and TPS_REF_DEFAULT.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

◆ link_topic_proxy()

static int link_topic_proxy ( struct stasis_topic topic,
const char *  name,
const char *  detail 
)
static

Definition at line 527 of file stasis.c.

528{
529 struct topic_proxy *proxy;
530 struct stasis_topic* topic_tmp;
531 size_t detail_len;
532
533 if (!topic || !name || !strlen(name) || !detail) {
534 return -1;
535 }
536
538
539 topic_tmp = stasis_topic_get(name);
540 if (topic_tmp) {
541 ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
542 ao2_ref(topic_tmp, -1);
544
545 return -1;
546 }
547
548 detail_len = strlen(detail) + 1;
549
550 proxy = ao2_t_weakproxy_alloc(
551 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
552 if (!proxy) {
554
555 return -1;
556 }
557
558 /* set the proxy info */
559 proxy->name = proxy->buf;
560 proxy->detail = proxy->name + strlen(name) + 1;
561
562 strcpy(proxy->name, name); /* SAFE */
563 ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
564 proxy->creationtime = ast_tvnow();
565
566 /* We have exclusive access to proxy, no need for locking here. */
567 if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
568 ao2_cleanup(proxy);
570
571 return -1;
572 }
573
575 ao2_cleanup(proxy);
578
579 return -1;
580 }
581
582 /* setting the topic point to the proxy */
583 topic->name = proxy->name;
584 topic->detail = proxy->detail;
585 topic->creationtime = &(proxy->creationtime);
586
588 ao2_ref(proxy, -1);
589
591
592 return 0;
593}
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define ao2_unlock(a)
Definition: astobj2.h:729
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
static const char name[]
Definition: format_mp3.c:68
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:649
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:439
struct ao2_container * topic_all
Definition: stasis.c:422
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
char * name
Definition: stasis.c:413
char * detail
Definition: stasis.c:416
struct timeval * creationtime
Definition: stasis.c:419
char buf[0]
Definition: stasis.c:432
struct timeval creationtime
Definition: stasis.c:430
char * name
Definition: stasis.c:427
char * detail
Definition: stasis.c:428
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159

References ao2_bump, ao2_cleanup, ao2_link_flags, ao2_ref, ao2_t_weakproxy_alloc, ao2_t_weakproxy_set_object, ao2_unlock, ao2_weakproxy_subscribe(), ao2_wrlock, ast_copy_string(), ast_log, ast_tvnow(), topic_proxy::buf, stasis_topic::creationtime, topic_proxy::creationtime, stasis_topic::detail, topic_proxy::detail, LOG_ERROR, name, stasis_topic::name, topic_proxy::name, NULL, OBJ_NOLOCK, proxy_dtor(), stasis_topic_get(), and topic_all.

Referenced by stasis_topic_create_with_detail().

◆ multi_object_blob_dtor()

static void multi_object_blob_dtor ( void *  obj)
static

Definition at line 1986 of file stasis.c.

1987{
1988 struct ast_multi_object_blob *multi = obj;
1989 int type;
1990 int i;
1991
1992 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1993 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1995 }
1996 AST_VECTOR_FREE(&multi->snapshots[type]);
1997 }
1998 ast_json_unref(multi->blob);
1999}
static const char type[]
Definition: chan_ooh323.c:109
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1977
struct ast_multi_object_blob::@398 snapshots[STASIS_UMOS_MAX]
struct ast_json * blob
Definition: stasis.c:1978
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174

References ao2_cleanup, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, ast_multi_object_blob::snapshots, STASIS_UMOS_MAX, and type.

Referenced by ast_multi_object_blob_create().

◆ multi_object_blob_to_ami()

static struct ast_str * multi_object_blob_to_ami ( void *  obj)
static

Definition at line 2123 of file stasis.c.

2124{
2125 struct ast_str *ami_str=ast_str_create(1024);
2126 struct ast_str *ami_snapshot;
2127 const struct ast_multi_object_blob *multi = obj;
2129 int i;
2130
2131 if (!ami_str) {
2132 return NULL;
2133 }
2134 if (!multi) {
2135 ast_free(ami_str);
2136 return NULL;
2137 }
2138
2139 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2140 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2141 char *name = NULL;
2142 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2143 ami_snapshot = NULL;
2144
2145 if (i > 0) {
2146 ast_asprintf(&name, "%d", i + 1);
2147 }
2148
2149 switch (type) {
2151 ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2152 break;
2153
2154 case STASIS_UMOS_BRIDGE:
2155 ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2156 break;
2157
2159 /* currently not sending endpoint snapshots to AMI */
2160 break;
2161 }
2162 if (ami_snapshot) {
2163 ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2164 ast_free(ami_snapshot);
2165 }
2166 ast_free(name);
2167 }
2168 }
2169
2170 return ami_str;
2171}
#define ast_free(a)
Definition: astmm.h:180
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
@ STASIS_UMOS_ENDPOINT
Definition: stasis.h:1356
@ STASIS_UMOS_BRIDGE
Definition: stasis.h:1355
@ STASIS_UMOS_CHANNEL
Definition: stasis.h:1354
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
Support for dynamic strings.
Definition: strings.h:623

References ast_asprintf, ast_free, ast_manager_build_bridge_state_string_prefix(), ast_manager_build_channel_state_string_prefix(), ast_str_append(), ast_str_buffer(), ast_str_create, AST_VECTOR_GET, AST_VECTOR_SIZE, name, NULL, ast_multi_object_blob::snapshots, STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

Referenced by multi_user_event_to_ami().

◆ multi_user_event_to_ami()

static struct ast_manager_event_blob * multi_user_event_to_ami ( struct stasis_message message)
static

Definition at line 2182 of file stasis.c.

2184{
2185 RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2186 RAII_VAR(struct ast_str *, body, NULL, ast_free);
2188 const char *eventname;
2189
2190 eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2192 object_string = multi_object_blob_to_ami(multi);
2193 if (!object_string || !body) {
2194 return NULL;
2195 }
2196
2198 "%s"
2199 "UserEvent: %s\r\n"
2200 "%s",
2201 ast_str_buffer(object_string),
2202 eventname,
2203 ast_str_buffer(body));
2204}
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:555
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10237
#define EVENT_FLAG_USER
Definition: manager.h:81
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2123
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2174
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941

References ast_free, ast_json_object_get(), ast_json_string_get(), ast_manager_event_blob_create(), ast_manager_str_from_json_object(), ast_str_buffer(), ast_multi_object_blob::blob, EVENT_FLAG_USER, multi_object_blob_to_ami(), NULL, RAII_VAR, stasis_message_data(), and userevent_exclusion_cb().

◆ multi_user_event_to_json()

static struct ast_json * multi_user_event_to_json ( struct stasis_message message,
const struct stasis_message_sanitizer sanitize 
)
static

Definition at line 2072 of file stasis.c.

2075{
2076 struct ast_json *out;
2078 struct ast_json *blob = multi->blob;
2079 const struct timeval *tv = stasis_message_timestamp(message);
2081 int i;
2082
2084 if (!out) {
2085 return NULL;
2086 }
2087
2088 ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2089 ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2090 ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2091 ast_json_object_set(out, "userevent", ast_json_ref(blob));
2092
2093 for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2094 for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2095 struct ast_json *json_object = NULL;
2096 char *name = NULL;
2097 void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2098
2099 switch (type) {
2101 json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2102 name = "channel";
2103 break;
2104 case STASIS_UMOS_BRIDGE:
2105 json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2106 name = "bridge";
2107 break;
2109 json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2110 name = "endpoint";
2111 break;
2112 }
2113 if (json_object) {
2114 ast_json_object_set(out, name, json_object);
2115 }
2116 }
2117 }
2118
2119 return out;
2120}
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Abstract JSON element (object, array, string, int, ...).
FILE * out
Definition: utils/frame.c:33

References ast_bridge_snapshot_to_json(), ast_channel_snapshot_to_json(), ast_endpoint_snapshot_to_json(), ast_json_object_create(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_timeval(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, name, NULL, out, ast_multi_object_blob::snapshots, stasis_message_data(), stasis_message_timestamp(), STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

◆ proxy_dtor()

static void proxy_dtor ( void *  weakproxy,
void *  container 
)
static

Definition at line 439 of file stasis.c.

440{
441 ao2_unlink(container, weakproxy);
443}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
struct ao2_container * container
Definition: res_fax.c:531

References ao2_cleanup, ao2_unlink, and container.

Referenced by link_topic_proxy().

◆ publish_msg()

static void publish_msg ( struct stasis_topic topic,
struct stasis_message message,
struct stasis_subscription sync_sub 
)
static

Definition at line 1457 of file stasis.c.

1459{
1460 size_t i;
1461#ifdef AST_DEVMODE
1462 unsigned int dispatched = 0;
1464 struct stasis_message_type_statistics *statistics;
1465 struct timeval start;
1466 long elapsed;
1467#endif
1468
1469 ast_assert(topic != NULL);
1471
1472#ifdef AST_DEVMODE
1473 ast_mutex_lock(&message_type_statistics_lock);
1474 if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1475 struct stasis_message_type_statistics new_statistics = {
1476 .published = 0,
1477 };
1478 if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1479 ast_mutex_unlock(&message_type_statistics_lock);
1480 return;
1481 }
1482 }
1483 statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1484 statistics->message_type = stasis_message_type(message);
1485 ast_mutex_unlock(&message_type_statistics_lock);
1486
1487 ast_atomic_fetchadd_int(&statistics->published, +1);
1488#endif
1489
1490 /* If there are no subscribers don't bother */
1491 if (!stasis_topic_subscribers(topic)) {
1492#ifdef AST_DEVMODE
1493 ast_atomic_fetchadd_int(&statistics->unused, +1);
1494 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1495#endif
1496 return;
1497 }
1498
1499 /*
1500 * The topic may be unref'ed by the subscription invocation.
1501 * Make sure we hold onto a reference while dispatching.
1502 */
1503 ao2_ref(topic, +1);
1504#ifdef AST_DEVMODE
1505 start = ast_tvnow();
1506#endif
1507 ao2_lock(topic);
1508 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1510
1511 ast_assert(sub != NULL);
1512#ifdef AST_DEVMODE
1513 dispatched +=
1514#endif
1515 dispatch_message(sub, message, (sub == sync_sub));
1516 }
1518
1519#ifdef AST_DEVMODE
1520 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1521 if (elapsed > topic->statistics->highest_time_dispatched) {
1522 topic->statistics->highest_time_dispatched = elapsed;
1523 }
1524 if (elapsed < topic->statistics->lowest_time_dispatched) {
1525 topic->statistics->lowest_time_dispatched = elapsed;
1526 }
1527 if (dispatched) {
1528 ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1529 } else {
1530 ast_atomic_fetchadd_int(&statistics->unused, +1);
1531 ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1532 }
1533#endif
1534
1535 ao2_ref(topic, -1);
1536}
#define ao2_lock(a)
Definition: astobj2.h:717
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:670
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1339
static int message_type_id
struct stasis_topic::@395 subscribers
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
#define ast_assert(a)
Definition: utils.h:739
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668

References ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_atomic_fetchadd_int(), ast_mutex_lock, ast_mutex_unlock, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_GET_ADDR, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, dispatch_message(), message_type_id, NULL, stasis_message_type(), stasis_message_type_id(), stasis_topic_subscribers(), statistics(), sub, stasis_topic::subscribers, and stasis_subscription::topic.

Referenced by stasis_publish(), and stasis_publish_sync().

◆ send_subscription_subscribe()

static void send_subscription_subscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1675 of file stasis.c.

1676{
1677 struct stasis_subscription_change *change;
1678 struct stasis_message *msg;
1679
1680 /* This assumes that we have already unsubscribed */
1682
1684 return;
1685 }
1686
1687 change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1688 if (!change) {
1689 return;
1690 }
1691
1693 if (!msg) {
1694 ao2_cleanup(change);
1695 return;
1696 }
1697
1698 stasis_publish(topic, msg);
1699 ao2_cleanup(msg);
1700 ao2_cleanup(change);
1701}
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1654
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1538
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1177
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890

References ao2_cleanup, ast_assert, stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), sub, and subscription_change_alloc().

Referenced by internal_stasis_subscribe().

◆ send_subscription_unsubscribe()

static void send_subscription_unsubscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1703 of file stasis.c.

1705{
1706 struct stasis_subscription_change *change;
1707 struct stasis_message *msg;
1708
1709 /* This assumes that we have already unsubscribed */
1711
1713 return;
1714 }
1715
1716 change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1717 if (!change) {
1718 return;
1719 }
1720
1722 if (!msg) {
1723 ao2_cleanup(change);
1724 return;
1725 }
1726
1727 stasis_publish(topic, msg);
1728
1729 /* Now we have to dispatch to the subscription itself */
1730 dispatch_message(sub, msg, 0);
1731
1732 ao2_cleanup(msg);
1733 ao2_cleanup(change);
1734}

References ao2_cleanup, ast_assert, dispatch_message(), stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), sub, and subscription_change_alloc().

Referenced by stasis_unsubscribe().

◆ stasis_cleanup()

static void stasis_cleanup ( void  )
static

Cleanup function for graceful shutdowns.

Definition at line 3069 of file stasis.c.

3070{
3071#ifdef AST_DEVMODE
3072 ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3073 AST_VECTOR_FREE(&message_type_statistics);
3074 ao2_global_obj_release(subscription_statistics);
3075 ao2_global_obj_release(topic_statistics);
3076#endif
3079 topic_all = NULL;
3081 threadpool = NULL;
3084 aco_info_destroy(&cfg_info);
3086}
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
static struct console_pvt globals
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2503
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
#define ARRAY_LEN(a)
Definition: utils.h:666

References aco_info_destroy(), ao2_cleanup, ao2_global_obj_release, ARRAY_LEN, ast_cli_unregister_multiple(), ast_multi_user_event_type(), ast_threadpool_shutdown(), AST_VECTOR_FREE, cli_stasis, globals, NULL, STASIS_MESSAGE_TYPE_CLEANUP, stasis_subscription_change_type(), threadpool, and topic_all.

Referenced by stasis_init().

◆ stasis_config_alloc()

static void * stasis_config_alloc ( void  )
static

Definition at line 2280 of file stasis.c.

2281{
2282 struct stasis_config *cfg;
2283
2284 if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2285 return NULL;
2286 }
2287
2288 cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2289 if (!cfg->threadpool_options) {
2290 ao2_ref(cfg, -1);
2291 return NULL;
2292 }
2293
2296 if (!cfg->declined_message_types) {
2297 ao2_ref(cfg, -1);
2298 return NULL;
2299 }
2300
2302 if (!cfg->declined_message_types->declined) {
2303 ao2_ref(cfg, -1);
2304 return NULL;
2305 }
2306
2307 return cfg;
2308}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2272
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2265
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2226
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2224

References ao2_alloc, ao2_ref, ast_calloc, ast_str_container_alloc, stasis_declined_config::declined, stasis_config::declined_message_types, NULL, stasis_config_destructor(), stasis_declined_config_destructor(), and stasis_config::threadpool_options.

Referenced by stasis_init().

◆ stasis_config_destructor()

static void stasis_config_destructor ( void *  obj)
static

Definition at line 2272 of file stasis.c.

2273{
2274 struct stasis_config *cfg = obj;
2275
2278}

References ao2_cleanup, ast_free, stasis_config::declined_message_types, and stasis_config::threadpool_options.

Referenced by stasis_config_alloc().

◆ stasis_declined_config_destructor()

static void stasis_declined_config_destructor ( void *  obj)
static

Definition at line 2265 of file stasis.c.

2266{
2267 struct stasis_declined_config *declined = obj;
2268
2269 ao2_cleanup(declined->declined);
2270}

References ao2_cleanup, and stasis_declined_config::declined.

Referenced by stasis_config_alloc().

◆ stasis_forward_all()

struct stasis_forward * stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
Return values
NULLon error.
Since
12

Definition at line 1605 of file stasis.c.

1607{
1608 int res;
1609 size_t idx;
1610 struct stasis_forward *forward;
1611
1612 if (!from_topic || !to_topic) {
1613 return NULL;
1614 }
1615
1616 forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1617 if (!forward) {
1618 return NULL;
1619 }
1620
1621 /* Forwards to ourselves are implicit. */
1622 if (to_topic == from_topic) {
1623 return forward;
1624 }
1625
1626 forward->from_topic = ao2_bump(from_topic);
1627 forward->to_topic = ao2_bump(to_topic);
1628
1631 if (res != 0) {
1634 ao2_ref(forward, -1);
1635 return NULL;
1636 }
1637
1638 for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1640 }
1643
1644 return forward;
1645}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
static void forward_dtor(void *obj)
Definition: stasis.c:1565
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:452
struct stasis_topic::@396 upstream_topics
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_add_subscription(), topic_lock_both, and stasis_topic::upstream_topics.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_forward_cancel()

struct stasis_forward * stasis_forward_cancel ( struct stasis_forward forward)

Definition at line 1575 of file stasis.c.

1576{
1577 int idx;
1578 struct stasis_topic *from;
1579 struct stasis_topic *to;
1580
1581 if (!forward) {
1582 return NULL;
1583 }
1584
1585 from = forward->from_topic;
1586 to = forward->to_topic;
1587
1588 if (from && to) {
1589 topic_lock_both(to, from);
1592
1593 for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1595 }
1596 ao2_unlock(from);
1597 ao2_unlock(to);
1598 }
1599
1600 ao2_cleanup(forward);
1601
1602 return NULL;
1603}
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1256
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_topic::subscribers, stasis_forward::to_topic, topic_lock_both, topic_remove_subscription(), and stasis_topic::upstream_topics.

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), forwards_unsubscribe(), load_general_config(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), and unload_module().

◆ stasis_init()

int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3088 of file stasis.c.

3089{
3090 struct stasis_config *cfg;
3091 int cache_init;
3092 struct ast_threadpool_options threadpool_opts = { 0, };
3093#ifdef AST_DEVMODE
3094 struct ao2_container *subscription_stats;
3095 struct ao2_container *topic_stats;
3096#endif
3097
3098 /* Be sure the types are cleaned up after the message bus */
3100
3101 if (aco_info_init(&cfg_info)) {
3102 return -1;
3103 }
3104
3105 aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3107 aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3109 FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3110 INT_MAX);
3111 aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3113 FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3114 INT_MAX);
3115 aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3117 FLDSET(struct stasis_threadpool_conf, max_size), 0,
3118 INT_MAX);
3119
3120 if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3121 struct stasis_config *default_cfg = stasis_config_alloc();
3122
3123 if (!default_cfg) {
3124 return -1;
3125 }
3126
3127 if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3128 ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3129 ao2_ref(default_cfg, -1);
3130
3131 return -1;
3132 }
3133
3134 if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3135 ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3136 ao2_ref(default_cfg, -1);
3137
3138 return -1;
3139 }
3140
3141 ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3143 cfg = default_cfg;
3144 } else {
3146 if (!cfg) {
3147 ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3148
3149 return -1;
3150 }
3151 }
3152
3153 threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3154 threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3155 threadpool_opts.auto_increment = 1;
3156 threadpool_opts.max_size = cfg->threadpool_options->max_size;
3157 threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3158 threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3159 ao2_ref(cfg, -1);
3160 if (!threadpool) {
3161 ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3162
3163 return -1;
3164 }
3165
3166 cache_init = stasis_cache_init();
3167 if (cache_init != 0) {
3168 return -1;
3169 }
3170
3172 return -1;
3173 }
3175 return -1;
3176 }
3177
3179 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3180 if (!topic_all) {
3181 return -1;
3182 }
3183
3185 return -1;
3186 }
3187
3188#ifdef AST_DEVMODE
3189 /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3190 * topic or subscripton.
3191 */
3192 subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3193 subscription_statistics_hash, 0, subscription_statistics_cmp);
3194 if (!subscription_stats) {
3195 return -1;
3196 }
3197 ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3198 ao2_cleanup(subscription_stats);
3199
3200 topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3201 topic_statistics_hash, 0, topic_statistics_cmp);
3202 if (!topic_stats) {
3203 return -1;
3204 }
3205 ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3206 ao2_cleanup(topic_stats);
3207 if (!topic_stats) {
3208 return -1;
3209 }
3210
3211 AST_VECTOR_INIT(&message_type_statistics, 0);
3212
3213 if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3214 return -1;
3215 }
3216#endif
3217
3218 return 0;
3219}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
@ ACO_EXACT
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define LOG_NOTICE
static void * stasis_config_alloc(void)
Definition: stasis.c:2280
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3069
static struct aco_type threadpool_option
Definition: stasis.c:2229
static struct aco_type * threadpool_options[]
Definition: stasis.c:2237
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2331
struct aco_type * declined_options[]
Definition: stasis.c:2248
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:344
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2240
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
int stasis_cache_init(void)
Generic container type.
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
Threadpool configuration options.
Definition: stasis.c:2213
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71

References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_threadpool_options::auto_increment, cli_stasis, declined_handler(), stasis_config::declined_message_types, declined_option, declined_options, FLDSET, globals, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, NULL, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), threadpool, threadpool_option, stasis_config::threadpool_options, threadpool_options, topic_all, TOPIC_ALL_BUCKETS, and ast_threadpool_options::version.

Referenced by asterisk_daemon().

◆ stasis_log_bad_type_access()

void stasis_log_bad_type_access ( const char *  name)

Definition at line 1967 of file stasis.c.

1968{
1969#ifdef AST_DEVMODE
1971 ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1972 }
1973#endif
1974}
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2310

References ast_log, LOG_ERROR, name, and stasis_message_type_declined().

◆ stasis_message_type_declined()

int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2310 of file stasis.c.

2311{
2313 char *name_in_declined;
2314 int res;
2315
2316 if (!cfg || !cfg->declined_message_types) {
2317 ao2_cleanup(cfg);
2318 return 0;
2319 }
2320
2321 name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2322 res = name_in_declined ? 1 : 0;
2323 ao2_cleanup(name_in_declined);
2324 ao2_ref(cfg, -1);
2325 if (res) {
2326 ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2327 }
2328 return res;
2329}
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ast_debug(level,...)
Log a DEBUG message.

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_debug, stasis_declined_config::declined, stasis_config::declined_message_types, globals, name, and OBJ_SEARCH_KEY.

Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().

◆ STASIS_MESSAGE_TYPE_DEFN() [1/2]

STASIS_MESSAGE_TYPE_DEFN ( ast_multi_user_event_type  ,
to_json = multi_user_event_to_json,
to_ami = multi_user_event_to_ami 
)

Define multi user event message type(s).

◆ STASIS_MESSAGE_TYPE_DEFN() [2/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_subscription_change_type  )

◆ stasis_publish()

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1538 of file stasis.c.

1539{
1540 publish_msg(topic, message, NULL);
1541}
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1457

References NULL, publish_msg(), and stasis_subscription::topic.

Referenced by aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_refer_notify_transfer_request(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), detect_callback(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().

◆ stasis_publish_sync()

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1543 of file stasis.c.

1544{
1545 ast_assert(sub != NULL);
1546
1547 publish_msg(sub->topic, message, sub);
1548}

References ast_assert, NULL, publish_msg(), and sub.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

◆ stasis_show_topic()

static char * stasis_show_topic ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2444 of file stasis.c.

2445{
2446 struct stasis_topic *topic;
2447 char print_time[32];
2448 int i;
2449
2450 switch (cmd) {
2451 case CLI_INIT:
2452 e->command = "stasis show topic";
2453 e->usage =
2454 "Usage: stasis show topic <name>\n"
2455 " Show stasis topic detail info.\n";
2456 return NULL;
2457 case CLI_GENERATE:
2458 if (a->pos == 3) {
2459 return topic_complete_name(a->word);
2460 } else {
2461 return NULL;
2462 }
2463 }
2464
2465 if (a->argc != 4) {
2466 return CLI_SHOWUSAGE;
2467 }
2468
2469 topic = stasis_topic_get(a->argv[3]);
2470 if (!topic) {
2471 ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2472 return CLI_FAILURE;
2473 }
2474
2475 ast_cli(a->fd, "Name: %s\n", topic->name);
2476 ast_cli(a->fd, "Detail: %s\n", topic->detail);
2477 ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2478 ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2479 ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2480 ast_cli(a->fd, "Duration time: %s\n", print_time);
2481
2482 ao2_lock(topic);
2483 ast_cli(a->fd, "\nSubscribers:\n");
2484 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2485 struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2486 ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2487 subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2488 }
2489
2490 ast_cli(a->fd, "\nForwarded topics:\n");
2491 for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2492 struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2493 ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2494 }
2495 ao2_unlock(topic);
2496
2497 ao2_ref(topic, -1);
2498
2499 return CLI_SUCCESS;
2500}
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
static char * topic_complete_name(const char *word)
Definition: stasis.c:2418
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
static struct test_val a
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2297

References a, ao2_lock, ao2_ref, ao2_unlock, ast_cli(), ast_format_duration_hh_mm_ss(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, stasis_topic::creationtime, stasis_topic::detail, stasis_topic::name, NULL, stasis_topic_get(), stasis_topic::subscribers, stasis_subscription::topic, topic_complete_name(), stasis_subscription::uniqueid, stasis_topic::upstream_topics, and ast_cli_entry::usage.

◆ stasis_show_topics()

static char * stasis_show_topics ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2361 of file stasis.c.

2362{
2363 struct ao2_iterator iter;
2364 struct topic_proxy *topic;
2365 struct ao2_container *tmp_container;
2366 int count = 0;
2367#define FMT_HEADERS "%-64s %-64s\n"
2368#define FMT_FIELDS "%-64s %-64s\n"
2369
2370 switch (cmd) {
2371 case CLI_INIT:
2372 e->command = "stasis show topics";
2373 e->usage =
2374 "Usage: stasis show topics\n"
2375 " Shows a list of topics\n";
2376 return NULL;
2377 case CLI_GENERATE:
2378 return NULL;
2379 }
2380
2381 if (a->argc != e->args) {
2382 return CLI_SHOWUSAGE;
2383 }
2384
2385 ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2386
2388 topic_proxy_sort_fn, NULL);
2389
2390 if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2391 ao2_cleanup(tmp_container);
2392
2393 return NULL;
2394 }
2395
2396 /* getting all topic in order */
2397 iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2398 while ((topic = ao2_iterator_next(&iter))) {
2399 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2400 ao2_ref(topic, -1);
2401 ++count;
2402 }
2403 ao2_iterator_destroy(&iter);
2404 ao2_cleanup(tmp_container);
2405
2406 ast_cli(a->fd, "\n%d Total topics\n\n", count);
2407
2408#undef FMT_HEADERS
2409#undef FMT_FIELDS
2410
2411 return CLI_SUCCESS;
2412}
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define FMT_HEADERS
#define FMT_FIELDS
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
int args
This gets set in ast_cli_register()
Definition: cli.h:185

References a, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, topic_proxy::detail, FMT_FIELDS, FMT_HEADERS, topic_proxy::name, NULL, OBJ_SEARCH_OBJECT, topic_all, and ast_cli_entry::usage.

◆ stasis_subscription_accept_formatters()

void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1120 of file stasis.c.

1122{
1123 ast_assert(subscription != NULL);
1124
1125 ao2_lock(subscription->topic);
1126 subscription->accepted_formatters = formatters;
1127 ao2_unlock(subscription->topic);
1128
1129 return;
1130}
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:731

References stasis_subscription::accepted_formatters, ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

◆ stasis_subscription_accept_message_type()

int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1050 of file stasis.c.

1052{
1053 if (!subscription) {
1054 return -1;
1055 }
1056
1057 ast_assert(type != NULL);
1059
1061 /* Filtering is unreliable as this message type is not yet initialized
1062 * so force all messages through.
1063 */
1065 return 0;
1066 }
1067
1068 ao2_lock(subscription->topic);
1070 /* We do this for the same reason as above. The subscription can still operate, so allow
1071 * it to do so by forcing all messages through.
1072 */
1074 }
1075 ao2_unlock(subscription->topic);
1076
1077 return 0;
1078}
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_subscription::@397 accepted_message_types
enum stasis_subscription_message_filter filter
Definition: stasis.c:733

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, stasis_subscription::filter, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, stasis_subscription::topic, and type.

Referenced by acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), common_config_load(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_cb_noop()

void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 836 of file stasis.c.

837{
838}

Referenced by build_peer(), and mkintf().

◆ stasis_subscription_decline_message_type()

int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1080 of file stasis.c.

1082{
1083 if (!subscription) {
1084 return -1;
1085 }
1086
1087 ast_assert(type != NULL);
1089
1091 return 0;
1092 }
1093
1094 ao2_lock(subscription->topic);
1096 /* The memory is already allocated so this can't fail */
1098 }
1099 ao2_unlock(subscription->topic);
1100
1101 return 0;
1102}

References stasis_subscription::accepted_message_types, ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), stasis_subscription::topic, and type.

Referenced by AST_TEST_DEFINE().

◆ stasis_subscription_final_message()

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1201 of file stasis.c.

1202{
1203 struct stasis_subscription_change *change;
1204
1206 return 0;
1207 }
1208
1209 change = stasis_message_data(msg);
1210 if (strcmp("Unsubscribe", change->description)) {
1211 return 0;
1212 }
1213
1214 if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1215 return 0;
1216 }
1217
1218 return 1;
1219}
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1196

References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), sub, and stasis_subscription_change::uniqueid.

Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), endpoint_subscription_change(), generic_agent_devstate_cb(), message_sink_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().

◆ stasis_subscription_is_done()

int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1145 of file stasis.c.

1146{
1147 if (subscription) {
1148 int ret;
1149
1150 ao2_lock(subscription);
1151 ret = subscription->final_message_rxed;
1152 ao2_unlock(subscription);
1153
1154 return ret;
1155 }
1156
1157 /* Null subscription is about as done as you can get */
1158 return 1;
1159}
int final_message_rxed
Definition: stasis.c:723

References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

◆ stasis_subscription_is_subscribed()

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1177 of file stasis.c.

1178{
1179 if (sub) {
1180 size_t i;
1181 struct stasis_topic *topic = sub->topic;
1182
1183 ao2_lock(topic);
1184 for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1185 if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1186 ao2_unlock(topic);
1187 return 1;
1188 }
1189 }
1190 ao2_unlock(topic);
1191 }
1192
1193 return 0;
1194}

References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, and stasis_topic::subscribers.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ stasis_subscription_join()

void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1132 of file stasis.c.

1133{
1134 if (subscription) {
1135 ao2_lock(subscription);
1136 /* Wait until the processed flag has been set */
1137 while (!subscription->final_message_processed) {
1138 ast_cond_wait(&subscription->join_cond,
1139 ao2_object_get_lockaddr(subscription));
1140 }
1141 ao2_unlock(subscription);
1142 }
1143}
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
ast_cond_t join_cond
Definition: stasis.c:720
int final_message_processed
Definition: stasis.c:726

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

◆ stasis_subscription_set_congestion_limits()

int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1038 of file stasis.c.

1040{
1041 int res = -1;
1042
1043 if (subscription) {
1044 res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1045 low_water, high_water);
1046 }
1047 return res;
1048}
struct ast_taskprocessor * mailbox
Definition: stasis.c:713
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

◆ stasis_subscription_set_filter()

int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1104 of file stasis.c.

1106{
1107 if (!subscription) {
1108 return -1;
1109 }
1110
1111 ao2_lock(subscription->topic);
1112 if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1113 subscription->filter = filter;
1114 }
1115 ao2_unlock(subscription->topic);
1116
1117 return 0;
1118}
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:899

References ao2_lock, ao2_unlock, filter(), stasis_subscription::filter, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().

◆ stasis_subscription_uniqueid()

const char * stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1196 of file stasis.c.

1197{
1198 return sub->uniqueid;
1199}

References sub.

Referenced by AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().

◆ stasis_topic_create()

struct stasis_topic * stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
Return values
NULLon error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of
<subsystem>:<functionality>[/<object>] 

Definition at line 644 of file stasis.c.

645{
647}
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:595

References name, and stasis_topic_create_with_detail().

Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().

◆ stasis_topic_create_with_detail()

struct stasis_topic * stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
Return values
NULLon error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 595 of file stasis.c.

598{
599 struct stasis_topic *topic;
600 int res = 0;
601
602 if (!name|| !strlen(name) || !detail) {
603 return NULL;
604 }
605 ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
606
607 topic = stasis_topic_get(name);
608 if (topic) {
609 ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
610 name, detail);
611 return topic;
612 }
613
614 topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
615 if (!topic) {
616 return NULL;
617 }
618
620 res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
621 if (res) {
622 ao2_ref(topic, -1);
623 return NULL;
624 }
625
626 /* link to the proxy */
627 if (link_topic_proxy(topic, name, detail)) {
628 ao2_ref(topic, -1);
629 return NULL;
630 }
631
632#ifdef AST_DEVMODE
633 topic->statistics = stasis_topic_statistics_create(topic);
634 if (!topic->statistics) {
635 ao2_ref(topic, -1);
636 return NULL;
637 }
638#endif
639 ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
640
641 return topic;
642}
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:328
static void topic_dtor(void *obj)
Definition: stasis.c:460
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:527

References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, stasis_topic::detail, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), name, stasis_topic::name, NULL, stasis_topic_get(), stasis_topic::subscribers, topic_dtor(), and stasis_topic::upstream_topics.

Referenced by stasis_topic_create().

◆ stasis_topic_detail()

const char * stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
Return values
NULLif topic is NULL.
Since
12

Definition at line 662 of file stasis.c.

663{
664 if (!topic) {
665 return NULL;
666 }
667 return topic->detail;
668}

References stasis_topic::detail, and NULL.

◆ stasis_topic_get()

struct stasis_topic * stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
Return values
NULLon error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 649 of file stasis.c.

650{
652}
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748

References ao2_weakproxy_find, name, OBJ_SEARCH_KEY, and topic_all.

Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().

◆ stasis_topic_name()

const char * stasis_topic_name ( const struct stasis_topic topic)

◆ stasis_topic_pool_create()

struct stasis_topic_pool * stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
Return values
NULLon failure

Definition at line 1860 of file stasis.c.

1861{
1862 struct stasis_topic_pool *pool;
1863
1865 if (!pool) {
1866 return NULL;
1867 }
1868
1871 if (!pool->pool_container) {
1872 ao2_cleanup(pool);
1873 return NULL;
1874 }
1875
1876#ifdef AO2_DEBUG
1877 {
1878 char *container_name =
1879 ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1880 sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1881 ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1882 }
1883#endif
1884
1885 ao2_ref(pooled_topic, +1);
1886 pool->pool_topic = pooled_topic;
1887
1888 return pool;
1889}
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1771
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:331
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1811
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1790
struct ao2_container * pool_container
Definition: stasis.c:1767
struct stasis_topic * pool_topic
Definition: stasis.c:1768

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().

◆ stasis_topic_pool_delete_topic()

void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of
[<pool_topic_name>/]<topic_name> 
Since
13.24
15.6
16.1

Definition at line 1891 of file stasis.c.

1892{
1893 /*
1894 * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1895 * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1896 * name and search only on <topic_name>.
1897 */
1898 const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1899 int pool_topic_name_len = strlen(pool_topic_name);
1900 const char *search_topic_name;
1901
1902 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1903 search_topic_name = topic_name + pool_topic_name_len + 1;
1904 } else {
1905 search_topic_name = topic_name;
1906 }
1907
1908 ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1909}
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_UNLINK
Definition: astobj2.h:1039

References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by bridge_topics_destroy().

◆ stasis_topic_pool_get_topic()

struct stasis_topic * stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Find or create a topic in the pool.

Parameters
poolPool for which to get the topic
topic_nameName of the topic to get
Returns
The already stored or newly allocated topic
Return values
NULLif the topic was not found and could not be allocated

Definition at line 1911 of file stasis.c.

1912{
1914 SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1915 char *new_topic_name;
1916 int ret;
1917
1919 if (topic_pool_entry) {
1920 return topic_pool_entry->topic;
1921 }
1922
1924 if (!topic_pool_entry) {
1925 return NULL;
1926 }
1927
1928 /* To provide further detail and to ensure that the topic is unique within the scope of the
1929 * system we prefix it with the pooling topic name, which should itself already be unique.
1930 */
1931 ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1932 if (ret < 0) {
1933 return NULL;
1934 }
1935
1936 topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1937 ast_free(new_topic_name);
1938 if (!topic_pool_entry->topic) {
1939 return NULL;
1940 }
1941
1943 if (!topic_pool_entry->forward) {
1944 return NULL;
1945 }
1946
1948 return NULL;
1949 }
1950
1951 return topic_pool_entry->topic;
1952}
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:611
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:644
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1751
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1605
Definition: stasis.c:1736
struct stasis_topic * topic
Definition: stasis.c:1738
struct stasis_forward * forward
Definition: stasis.c:1737

References ao2_cleanup, ao2_find, ao2_link_flags, ast_asprintf, ast_free, topic_pool_entry::forward, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().

Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().

◆ stasis_topic_pool_topic_exists()

int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 1954 of file stasis.c.

1955{
1957
1959 if (!topic_pool_entry) {
1960 return 0;
1961 }
1962
1964 return 1;
1965}

References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.

Referenced by ast_bridge_topic_exists(), and ast_publish_device_state_full().

◆ stasis_topic_subscribers()

size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 670 of file stasis.c.

671{
672 return AST_VECTOR_SIZE(&topic->subscribers);
673}

References AST_VECTOR_SIZE, and stasis_topic::subscribers.

Referenced by caching_topic_exec(), and publish_msg().

◆ stasis_unsubscribe()

struct stasis_subscription * stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 998 of file stasis.c.

999{
1000 /* The subscription may be the last ref to this topic. Hold
1001 * the topic ref open until after the unlock. */
1002 struct stasis_topic *topic;
1003
1004 if (!sub) {
1005 return NULL;
1006 }
1007
1008 topic = ao2_bump(sub->topic);
1009
1010 /* We have to remove the subscription first, to ensure the unsubscribe
1011 * is the final message */
1012 if (topic_remove_subscription(sub->topic, sub) != 0) {
1014 "Internal error: subscription has invalid topic\n");
1015 ao2_cleanup(topic);
1016
1017 return NULL;
1018 }
1019
1020 /* Now let everyone know about the unsubscribe */
1022
1023 /* When all that's done, remove the ref the mailbox has on the sub */
1024 if (sub->mailbox) {
1025 if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1026 /* Nothing we can do here, the conditional is just to keep
1027 * the compiler happy that we're not ignoring the result. */
1028 }
1029 }
1030
1031 /* Unsubscribing unrefs the subscription */
1033 ao2_cleanup(topic);
1034
1035 return NULL;
1036}
static int sub_cleanup(void *data)
Definition: stasis.c:991
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1703
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push(), LOG_ERROR, NULL, send_subscription_unsubscribe(), sub, sub_cleanup(), and topic_remove_subscription().

Referenced by AST_TEST_DEFINE(), cc_generic_agent_destructor(), common_config_unload(), destroy_cts(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), startup_event_cb(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().

◆ stasis_unsubscribe_and_join()

struct stasis_subscription * stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1161 of file stasis.c.

1163{
1164 if (!subscription) {
1165 return NULL;
1166 }
1167
1168 /* Bump refcount to hold it past the unsubscribe */
1169 ao2_ref(subscription, +1);
1170 stasis_unsubscribe(subscription);
1171 stasis_subscription_join(subscription);
1172 /* Now decrement the refcount back */
1173 ao2_cleanup(subscription);
1174 return NULL;
1175}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:998
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1132

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), and unload_pbx().

◆ sub_cleanup()

static int sub_cleanup ( void *  data)
static

Definition at line 991 of file stasis.c.

992{
993 struct stasis_subscription *sub = data;
995 return 0;
996}

References ao2_cleanup, stasis_subscription::data, and sub.

Referenced by stasis_unsubscribe().

◆ subscription_change_alloc()

static struct stasis_subscription_change * subscription_change_alloc ( struct stasis_topic topic,
const char *  uniqueid,
const char *  description 
)
static

Definition at line 1654 of file stasis.c.

1655{
1656 size_t description_len = strlen(description) + 1;
1657 size_t uniqueid_len = strlen(uniqueid) + 1;
1658 struct stasis_subscription_change *change;
1659
1660 change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1662 if (!change) {
1663 return NULL;
1664 }
1665
1666 strcpy(change->description, description); /* SAFE */
1667 change->uniqueid = change->description + description_len;
1668 ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1669 ao2_ref(topic, +1);
1670 change->topic = topic;
1671
1672 return change;
1673}
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1647
struct stasis_topic * topic
Definition: stasis.h:891

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_ref, ast_copy_string(), stasis_subscription_change::description, NULL, subscription_change_dtor(), stasis_subscription_change::topic, and stasis_subscription_change::uniqueid.

Referenced by send_subscription_subscribe(), and send_subscription_unsubscribe().

◆ subscription_change_dtor()

static void subscription_change_dtor ( void *  obj)
static

Definition at line 1647 of file stasis.c.

1648{
1649 struct stasis_subscription_change *change = obj;
1650
1651 ao2_cleanup(change->topic);
1652}

References ao2_cleanup, and stasis_subscription_change::topic.

Referenced by subscription_change_alloc().

◆ subscription_dtor()

static void subscription_dtor ( void *  obj)
static

Definition at line 741 of file stasis.c.

742{
743 struct stasis_subscription *sub = obj;
744#ifdef AST_DEVMODE
745 struct ao2_container *subscription_stats;
746#endif
747
748 /* Subscriptions need to be manually unsubscribed before destruction
749 * b/c there's a cyclic reference between topics and subscriptions */
751 /* If there are any messages in flight to this subscription; that would
752 * be bad. */
754
755 ast_free(sub->uniqueid);
756 ao2_cleanup(sub->topic);
757 sub->topic = NULL;
759 sub->mailbox = NULL;
760 ast_cond_destroy(&sub->join_cond);
761
762 AST_VECTOR_FREE(&sub->accepted_message_types);
763
764#ifdef AST_DEVMODE
765 if (sub->statistics) {
766 subscription_stats = ao2_global_obj_ref(subscription_statistics);
767 if (subscription_stats) {
768 ao2_unlink(subscription_stats, sub->statistics);
769 ao2_ref(subscription_stats, -1);
770 }
771 ao2_ref(sub->statistics, -1);
772 }
773#endif
774}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1145
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_cleanup, ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_cond_destroy, ast_free, ast_taskprocessor_unreference(), AST_VECTOR_FREE, NULL, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), and sub.

Referenced by internal_stasis_subscribe().

◆ subscription_invoke()

static void subscription_invoke ( struct stasis_subscription sub,
struct stasis_message message 
)
static

Invoke the subscription's callback.

Parameters
subSubscription to invoke.
messageMessage to send.

Definition at line 781 of file stasis.c.

783{
784 unsigned int final = stasis_subscription_final_message(sub, message);
786#ifdef AST_DEVMODE
787 struct timeval start;
788 long elapsed;
789
790 start = ast_tvnow();
791#endif
792
793 /* Notify that the final message has been received */
794 if (final) {
795 ao2_lock(sub);
796 sub->final_message_rxed = 1;
797 ast_cond_signal(&sub->join_cond);
799 }
800
801 /*
802 * If filtering is turned on and this is a 'final' message, we only invoke the callback
803 * if the subscriber accepts subscription_change message types.
804 */
805 if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
806 (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
807 /* Since sub is mostly immutable, no need to lock sub */
808 sub->callback(sub->data, sub, message);
809 }
810
811 /* Notify that the final message has been processed */
812 if (final) {
813 ao2_lock(sub);
814 sub->final_message_processed = 1;
815 ast_cond_signal(&sub->join_cond);
817 }
818
819#ifdef AST_DEVMODE
820 elapsed = ast_tvdiff_ms(ast_tvnow(), start);
821 if (elapsed > sub->statistics->highest_time_invoked) {
822 sub->statistics->highest_time_invoked = elapsed;
823 ao2_lock(sub->statistics);
824 sub->statistics->highest_time_message_type = stasis_message_type(message);
825 ao2_unlock(sub->statistics);
826 }
827 if (elapsed < sub->statistics->lowest_time_invoked) {
828 sub->statistics->lowest_time_invoked = elapsed;
829 }
830#endif
831}

References ao2_lock, ao2_unlock, ast_cond_signal, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, message_type_id, stasis_message_type(), stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), statistics(), and sub.

Referenced by dispatch_exec_async(), dispatch_exec_sync(), and dispatch_message().

◆ topic_add_subscription()

static int topic_add_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Add a subscriber to a topic.

Parameters
topicTopic
subSubscriber
Returns
0 on success
Non-zero on error

Definition at line 1228 of file stasis.c.

1229{
1230 size_t idx;
1231
1232 ao2_lock(topic);
1233 /* The reference from the topic to the subscription is shared with
1234 * the owner of the subscription, which will explicitly unsubscribe
1235 * to release it.
1236 *
1237 * If we bumped the refcount here, the owner would have to unsubscribe
1238 * and cleanup, which is a bit awkward. */
1240
1241 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1243 AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1244 }
1245
1246#ifdef AST_DEVMODE
1248 ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1249#endif
1250
1251 ao2_unlock(topic);
1252
1253 return 0;
1254}

References ao2_lock, ao2_unlock, ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), sub, stasis_topic::subscribers, stasis_subscription_change::topic, topic_add_subscription(), and stasis_topic::upstream_topics.

Referenced by internal_stasis_subscribe(), stasis_forward_all(), and topic_add_subscription().

◆ topic_complete_name()

static char * topic_complete_name ( const char *  word)
static

Definition at line 2418 of file stasis.c.

2419{
2420 struct topic_proxy *topic;
2421 struct ao2_iterator it;
2422 int wordlen = strlen(word);
2423 int ret;
2424
2426 while ((topic = ao2_iterator_next(&it))) {
2427 if (!strncasecmp(word, topic->name, wordlen)) {
2428 ret = ast_cli_completion_add(ast_strdup(topic->name));
2429 if (ret) {
2430 ao2_ref(topic, -1);
2431 break;
2432 }
2433 }
2434 ao2_ref(topic, -1);
2435 }
2437 return NULL;
2438}
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2768
short word

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_completion_add(), ast_strdup, topic_proxy::name, NULL, and topic_all.

Referenced by stasis_show_topic().

◆ topic_dtor()

static void topic_dtor ( void *  obj)
static

Definition at line 460 of file stasis.c.

461{
462 struct stasis_topic *topic = obj;
463#ifdef AST_DEVMODE
464 struct ao2_container *topic_stats;
465#endif
466
467 ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
468 topic->name, topic->detail);
469
470 /* Subscribers hold a reference to topics, so they should all be
471 * unsubscribed before we get here. */
473
476 ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
477
478#ifdef AST_DEVMODE
479 if (topic->statistics) {
480 topic_stats = ao2_global_obj_ref(topic_statistics);
481 if (topic_stats) {
482 ao2_unlink(topic_stats, topic->statistics);
483 ao2_ref(topic_stats, -1);
484 }
485 ao2_ref(topic->statistics, -1);
486 }
487#endif
488}

References ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_debug, AST_VECTOR_FREE, AST_VECTOR_SIZE, stasis_topic::detail, stasis_topic::name, stasis_topic::subscribers, and stasis_topic::upstream_topics.

Referenced by stasis_topic_create_with_detail().

◆ topic_pool_dtor()

static void topic_pool_dtor ( void *  obj)
static

Definition at line 1771 of file stasis.c.

1772{
1773 struct stasis_topic_pool *pool = obj;
1774
1775#ifdef AO2_DEBUG
1776 {
1777 char *container_name =
1778 ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1779 sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1780 ao2_container_unregister(container_name);
1781 }
1782#endif
1783
1785 pool->pool_container = NULL;
1786 ao2_cleanup(pool->pool_topic);
1787 pool->pool_topic = NULL;
1788}
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.

References ao2_cleanup, ao2_container_unregister(), ast_alloca, NULL, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by stasis_topic_pool_create().

◆ topic_pool_entry_alloc()

static struct topic_pool_entry * topic_pool_entry_alloc ( const char *  topic_name)
static

Definition at line 1751 of file stasis.c.

1752{
1754
1755 topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1757 if (!topic_pool_entry) {
1758 return NULL;
1759 }
1760
1761 strcpy(topic_pool_entry->name, topic_name); /* Safe */
1762
1763 return topic_pool_entry;
1764}
static void topic_pool_entry_dtor(void *obj)
Definition: stasis.c:1742
char name[0]
Definition: stasis.c:1739

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, topic_pool_entry::name, NULL, and topic_pool_entry_dtor().

Referenced by stasis_topic_pool_get_topic().

◆ topic_pool_entry_cmp()

static int topic_pool_entry_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 1811 of file stasis.c.

1812{
1813 const struct topic_pool_entry *object_left = obj;
1814 const struct topic_pool_entry *object_right = arg;
1815 const char *right_key = arg;
1816 int cmp;
1817
1818 switch (flags & OBJ_SEARCH_MASK) {
1819 case OBJ_SEARCH_OBJECT:
1820 right_key = object_right->name;
1821 /* Fall through */
1822 case OBJ_SEARCH_KEY:
1823 cmp = strcasecmp(object_left->name, right_key);
1824 break;
1826 /* Not supported by container */
1827 ast_assert(0);
1828 cmp = -1;
1829 break;
1830 default:
1831 /*
1832 * What arg points to is specific to this traversal callback
1833 * and has no special meaning to astobj2.
1834 */
1835 cmp = 0;
1836 break;
1837 }
1838 if (cmp) {
1839 return 0;
1840 }
1841 /*
1842 * At this point the traversal callback is identical to a sorted
1843 * container.
1844 */
1845 return CMP_MATCH;
1846}
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072

References ast_assert, CMP_MATCH, topic_pool_entry::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by stasis_topic_pool_create().

◆ topic_pool_entry_dtor()

static void topic_pool_entry_dtor ( void *  obj)
static

Definition at line 1742 of file stasis.c.

1743{
1744 struct topic_pool_entry *entry = obj;
1745
1746 entry->forward = stasis_forward_cancel(entry->forward);
1747 ao2_cleanup(entry->topic);
1748 entry->topic = NULL;
1749}
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1575

References ao2_cleanup, topic_pool_entry::forward, NULL, stasis_forward_cancel(), and topic_pool_entry::topic.

Referenced by topic_pool_entry_alloc().

◆ topic_pool_entry_hash()

static int topic_pool_entry_hash ( const void *  obj,
const int  flags 
)
static

Definition at line 1790 of file stasis.c.

1791{
1792 const struct topic_pool_entry *object;
1793 const char *key;
1794
1795 switch (flags & OBJ_SEARCH_MASK) {
1796 case OBJ_SEARCH_KEY:
1797 key = obj;
1798 break;
1799 case OBJ_SEARCH_OBJECT:
1800 object = obj;
1801 key = object->name;
1802 break;
1803 default:
1804 /* Hash can only work on something with a full key. */
1805 ast_assert(0);
1806 return 0;
1807 }
1808 return ast_str_case_hash(key);
1809}
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303

References ast_assert, ast_str_case_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by stasis_topic_pool_create().

◆ topic_remove_subscription()

static int topic_remove_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1256 of file stasis.c.

1257{
1258 size_t idx;
1259 int res;
1260
1261 ao2_lock(topic);
1262 for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1264 AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1265 }
1268
1269#ifdef AST_DEVMODE
1270 if (!res) {
1272 ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1273 }
1274#endif
1275
1276 ao2_unlock(topic);
1277
1278 return res;
1279}
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221

References ao2_lock, ao2_unlock, ast_str_container_remove(), AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), sub, stasis_topic::subscribers, stasis_subscription_change::topic, topic_remove_subscription(), and stasis_topic::upstream_topics.

Referenced by stasis_forward_cancel(), stasis_unsubscribe(), and topic_remove_subscription().

◆ userevent_exclusion_cb()

static int userevent_exclusion_cb ( const char *  key)
static

Definition at line 2174 of file stasis.c.

2175{
2176 if (!strcmp("eventname", key)) {
2177 return 1;
2178 }
2179 return 0;
2180}

Referenced by multi_user_event_to_ami().

Variable Documentation

◆ cli_stasis

struct ast_cli_entry cli_stasis[]
static
Initial value:
= {
{ .handler = stasis_show_topics , .summary = "Show all topics" ,},
{ .handler = stasis_show_topic , .summary = "Show topic" ,},
}
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2361
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: stasis.c:2444

Definition at line 2503 of file stasis.c.

Referenced by stasis_cleanup(), and stasis_init().

◆ declined_option

struct aco_type declined_option
static

An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type.

Definition at line 2240 of file stasis.c.

Referenced by stasis_init().

◆ declined_options

struct aco_type* declined_options[] = ACO_TYPES(&declined_option)

Definition at line 2248 of file stasis.c.

Referenced by stasis_init().

◆ stasis_conf

struct aco_file stasis_conf
Initial value:
= {
.filename = "stasis.conf",
}
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.

Definition at line 2250 of file stasis.c.

◆ threadpool

struct ast_threadpool* threadpool
static

Thread pool for topics that don't want a dedicated taskprocessor

Definition at line 334 of file stasis.c.

Referenced by internal_stasis_subscribe(), stasis_cleanup(), and stasis_init().

◆ threadpool_option

struct aco_type threadpool_option
static

Definition at line 2229 of file stasis.c.

Referenced by stasis_init().

◆ threadpool_options

struct aco_type* threadpool_options[] = ACO_TYPES(&threadpool_option)
static

Definition at line 2237 of file stasis.c.

Referenced by sip_get_threadpool_options(), and stasis_init().

◆ topic_all

struct ao2_container* topic_all