328#define INITIAL_SUBSCRIBERS_MAX 4
331#define TOPIC_POOL_BUCKETS 57
338#if defined(LOW_MEMORY)
340#define TOPIC_ALL_BUCKETS 257
344#define TOPIC_ALL_BUCKETS 997
351#define TOPIC_STATISTICS_BUCKETS 57
354#define SUBSCRIPTION_STATISTICS_BUCKETS 57
363struct stasis_message_type_statistics {
376static AST_VECTOR(,
struct stasis_message_type_statistics) message_type_statistics;
379struct stasis_topic_statistics {
381 long highest_time_dispatched;
383 long lowest_time_dispatched;
385 int messages_not_dispatched;
387 int messages_dispatched;
452#define topic_lock_both(topic1, topic2) \
455 while (ao2_trylock(topic2)) { \
456 AO2_DEADLOCK_AVOIDANCE(topic1); \
467 ast_debug(2,
"Destroying topic. name: %s, detail: %s\n",
476 ast_debug(1,
"Topic '%s': %p destroyed\n", topic->
name, topic);
479 if (topic->statistics) {
485 ao2_ref(topic->statistics, -1);
491static void topic_statistics_destroy(
void *obj)
493 struct stasis_topic_statistics *
statistics = obj;
498static struct stasis_topic_statistics *stasis_topic_statistics_create(
struct stasis_topic *topic)
548 detail_len = strlen(
detail) + 1;
551 sizeof(*proxy) + strlen(
name) + 1 + detail_len,
NULL,
name);
609 ast_debug(2,
"Topic is already exist. name: %s, detail: %s\n",
633 topic->statistics = stasis_topic_statistics_create(topic);
634 if (!topic->statistics) {
639 ast_debug(1,
"Topic '%s': %p created\n", topic->
name, topic);
676struct stasis_subscription_statistics {
686 long highest_time_invoked;
688 long lowest_time_invoked;
690 int messages_dropped;
737 struct stasis_subscription_statistics *
statistics;
765 if (
sub->statistics) {
767 if (subscription_stats) {
769 ao2_ref(subscription_stats, -1);
787 struct timeval start;
796 sub->final_message_rxed = 1;
814 sub->final_message_processed = 1;
821 if (elapsed >
sub->statistics->highest_time_invoked) {
822 sub->statistics->highest_time_invoked = elapsed;
827 if (elapsed < sub->
statistics->lowest_time_invoked) {
828 sub->statistics->lowest_time_invoked = elapsed;
841static void subscription_statistics_destroy(
void *obj)
843 struct stasis_subscription_statistics *
statistics = obj;
848static struct stasis_subscription_statistics *stasis_subscription_statistics_create(
struct stasis_subscription *
sub,
849 int needs_mailbox,
int use_thread_pool,
const char *
file,
int lineno,
852 struct stasis_subscription_statistics *
statistics;
855 if (!subscription_stats) {
874 statistics->uses_threadpool = use_thread_pool;
908 sub->statistics = stasis_subscription_statistics_create(
sub, needs_mailbox, use_thread_pool,
file, lineno, func);
926 use_thread_pool ?
'p' :
'm',
934 if (use_thread_pool) {
1014 "Internal error: subscription has invalid topic\n");
1039 long low_water,
long high_water)
1045 low_water, high_water);
1053 if (!subscription) {
1083 if (!subscription) {
1107 if (!subscription) {
1164 if (!subscription) {
1198 return sub->uniqueid;
1354 int type_filter_specified = 0;
1355 int formatter_filter_specified = 0;
1356 int type_filter_passed = 0;
1357 int formatter_filter_passed = 0;
1368 if (!type_filter_specified && !formatter_filter_specified) {
1372 type_filter_passed = type_filter_specified
1380 if (type_filter_passed) {
1384 formatter_filter_passed = formatter_filter_specified
1387 if (formatter_filter_passed) {
1403 if (!
sub->mailbox) {
1462 unsigned int dispatched = 0;
1464 struct stasis_message_type_statistics *
statistics;
1465 struct timeval start;
1475 struct stasis_message_type_statistics new_statistics = {
1521 if (elapsed >
topic->statistics->highest_time_dispatched) {
1522 topic->statistics->highest_time_dispatched = elapsed;
1524 if (elapsed < topic->
statistics->lowest_time_dispatched) {
1525 topic->statistics->lowest_time_dispatched = elapsed;
1657 size_t uniqueid_len = strlen(
uniqueid) + 1;
1777 char *container_name =
1815 const char *right_key = arg;
1820 right_key = object_right->
name;
1823 cmp = strcasecmp(object_left->
name, right_key);
1849static void topic_pool_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
1878 char *container_name =
1899 int pool_topic_name_len = strlen(pool_topic_name);
1900 const char *search_topic_name;
1902 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1903 search_topic_name = topic_name + pool_topic_name_len + 1;
1905 search_topic_name = topic_name;
1915 char *new_topic_name;
2054 if (!channel_snapshot) {
2143 ami_snapshot =
NULL;
2176 if (!strcmp(
"eventname", key)) {
2188 const char *eventname;
2193 if (!object_string || !body) {
2231 .name =
"threadpool",
2233 .category =
"threadpool",
2242 .name =
"declined_message_types",
2243 .item_offset = offsetof(
struct stasis_config, declined_message_types),
2245 .category =
"declined_message_types",
2313 char *name_in_declined;
2322 res = name_in_declined ? 1 : 0;
2326 ast_debug(4,
"Declining to allocate Stasis message type '%s' due to configuration\n",
name);
2367#define FMT_HEADERS "%-64s %-64s\n"
2368#define FMT_FIELDS "%-64s %-64s\n"
2372 e->
command =
"stasis show topics";
2374 "Usage: stasis show topics\n"
2375 " Shows a list of topics\n";
2381 if (
a->argc != e->
args) {
2388 topic_proxy_sort_fn,
NULL);
2406 ast_cli(
a->fd,
"\n%d Total topics\n\n", count);
2422 int wordlen = strlen(
word);
2427 if (!strncasecmp(
word, topic->
name, wordlen)) {
2447 char print_time[32];
2452 e->
command =
"stasis show topic";
2454 "Usage: stasis show topic <name>\n"
2455 " Show stasis topic detail info.\n";
2471 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[3]);
2480 ast_cli(
a->fd,
"Duration time: %s\n", print_time);
2483 ast_cli(
a->fd,
"\nSubscribers:\n");
2486 ast_cli(
a->fd,
" UniqueID: %s, Topic: %s, Detail: %s\n",
2490 ast_cli(
a->fd,
"\nForwarded topics:\n");
2522 struct stasis_subscription_statistics *
statistics;
2526#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2527#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2528#define FMT_FIELDS2 "%-64s %10d %10d\n"
2532 e->
command =
"stasis statistics show subscriptions";
2534 "Usage: stasis statistics show subscriptions\n"
2535 " Shows a list of subscriptions and their general statistics\n";
2541 if (
a->argc != e->
args) {
2546 if (!subscription_stats) {
2547 ast_cli(
a->fd,
"Could not fetch subscription_statistics container\n");
2552 stasis_subscription_statistics_sort_fn,
NULL);
2553 if (!sorted_subscriptions) {
2554 ao2_ref(subscription_stats, -1);
2555 ast_cli(
a->fd,
"Could not create container for sorting subscription statistics\n");
2560 ao2_ref(sorted_subscriptions, -1);
2561 ao2_ref(subscription_stats, -1);
2562 ast_cli(
a->fd,
"Could not sort subscription statistics\n");
2566 ao2_ref(subscription_stats, -1);
2568 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Subscription",
"Dropped",
"Passed",
"Lowest Invoke",
"Highest Invoke");
2581 ao2_ref(sorted_subscriptions, -1);
2583 ast_cli(
a->fd, FMT_FIELDS2,
"Total", dropped, passed);
2584 ast_cli(
a->fd,
"\n%d subscriptions\n\n", count);
2597static char *subscription_statistics_complete_name(
const char *
word,
int state)
2599 struct stasis_subscription_statistics *
statistics;
2602 int wordlen = strlen(
word);
2607 if (!subscription_stats) {
2614 && ++which >
state) {
2623 ao2_ref(subscription_stats, -1);
2633 struct stasis_subscription_statistics *
statistics;
2640 e->
command =
"stasis statistics show subscription";
2642 "Usage: stasis statistics show subscription <uniqueid>\n"
2643 " Show stasis subscription statistics.\n";
2647 return subscription_statistics_complete_name(
a->word,
a->n);
2658 if (!subscription_stats) {
2659 ast_cli(
a->fd,
"Could not fetch subscription_statistics container\n");
2665 ao2_ref(subscription_stats, -1);
2666 ast_cli(
a->fd,
"Specified subscription '%s' does not exist\n",
a->argv[4]);
2670 ao2_ref(subscription_stats, -1);
2677 ast_cli(
a->fd,
"Number of messages dropped due to filtering: %d\n",
statistics->messages_dropped);
2678 ast_cli(
a->fd,
"Number of messages passed to subscriber callback: %d\n",
statistics->messages_passed);
2679 ast_cli(
a->fd,
"Using mailbox to queue messages: %s\n",
statistics->uses_mailbox ?
"Yes" :
"No");
2680 ast_cli(
a->fd,
"Using stasis threadpool for handling messages: %s\n",
statistics->uses_threadpool ?
"Yes" :
"No");
2681 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->lowest_time_invoked);
2682 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->highest_time_invoked);
2692 ast_cli(
a->fd,
"Subscribed topics:\n");
2718 int not_dispatched = 0;
2720#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2721#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2722#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2726 e->
command =
"stasis statistics show topics";
2728 "Usage: stasis statistics show topics\n"
2729 " Shows a list of topics and their general statistics\n";
2735 if (
a->argc != e->
args) {
2741 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2746 stasis_topic_statistics_sort_fn,
NULL);
2747 if (!sorted_topics) {
2749 ast_cli(
a->fd,
"Could not create container for sorting topic statistics\n");
2756 ast_cli(
a->fd,
"Could not sort topic statistics\n");
2762 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Topic",
"Subscribers",
"Dropped",
"Dispatched",
"Lowest Dispatch",
"Highest Dispatch");
2769 not_dispatched +=
statistics->messages_not_dispatched;
2770 dispatched +=
statistics->messages_dispatched;
2778 ast_cli(
a->fd, FMT_FIELDS2,
"Total",
"", not_dispatched, dispatched);
2779 ast_cli(
a->fd,
"\n%d topics\n\n", count);
2792static char *topic_statistics_complete_name(
const char *
word,
int state)
2797 int wordlen = strlen(
word);
2809 && ++which >
state) {
2835 e->
command =
"stasis statistics show topic";
2837 "Usage: stasis statistics show topic <name>\n"
2838 " Show stasis topic statistics.\n";
2842 return topic_statistics_complete_name(
a->word,
a->n);
2854 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2861 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[4]);
2869 ast_cli(
a->fd,
"Number of messages published that went to no subscriber: %d\n",
statistics->messages_not_dispatched);
2870 ast_cli(
a->fd,
"Number of messages that went to at least one subscriber: %d\n",
statistics->messages_dispatched);
2871 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent dispatching message: %ld\n",
statistics->lowest_time_dispatched);
2872 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent dispatching messages: %ld\n",
statistics->highest_time_dispatched);
2878 ast_cli(
a->fd,
"\t%s\n", uniqueid);
2898#define FMT_HEADERS "%-64s %10s %10s\n"
2899#define FMT_FIELDS "%-64s %10d %10d\n"
2903 e->
command =
"stasis statistics show messages";
2905 "Usage: stasis statistics show messages\n"
2906 " Shows a list of message types and their general statistics\n";
2912 if (
a->argc != e->
args) {
2935 ast_cli(
a->fd,
"\n%d seen message types\n\n", count);
2944 AST_CLI_DEFINE(statistics_show_subscriptions,
"Show subscriptions with general statistics"),
2945 AST_CLI_DEFINE(statistics_show_subscription,
"Show subscription statistics"),
2946 AST_CLI_DEFINE(statistics_show_topics,
"Show topics with general statistics"),
2948 AST_CLI_DEFINE(statistics_show_messages,
"Show message types with general statistics"),
2951static int subscription_statistics_hash(
const void *obj,
const int flags)
2953 const struct stasis_subscription_statistics *object;
2962 key =
object->uniqueid;
2972static int subscription_statistics_cmp(
void *obj,
void *arg,
int flags)
2974 const struct stasis_subscription_statistics *object_left = obj;
2975 const struct stasis_subscription_statistics *object_right = arg;
2976 const char *right_key = arg;
2981 right_key = object_right->uniqueid;
2984 cmp = strcasecmp(object_left->uniqueid, right_key);
3009static int topic_statistics_hash(
const void *obj,
const int flags)
3011 const struct stasis_topic_statistics *object;
3030static int topic_statistics_cmp(
void *obj,
void *arg,
int flags)
3032 const struct stasis_topic_statistics *object_left = obj;
3033 const struct stasis_topic_statistics *object_right = arg;
3034 const char *right_key = arg;
3039 right_key = object_right->name;
3042 cmp = strcasecmp(object_left->name, right_key);
3128 ast_log(
LOG_ERROR,
"Failed to initialize defaults on Stasis configuration object\n");
3135 ast_log(
LOG_ERROR,
"Failed to load stasis.conf and failed to initialize defaults.\n");
3167 if (cache_init != 0) {
3179 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3193 subscription_statistics_hash, 0, subscription_statistics_cmp);
3194 if (!subscription_stats) {
3201 topic_statistics_hash, 0, topic_statistics_cmp);
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
#define ast_strdup(str)
A wrapper for strdup()
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
#define ast_calloc(num, len)
A wrapper for calloc()
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
#define ao2_iterator_next(iter)
#define ao2_link(container, obj)
Add an object to a container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink(container, obj)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
#define ao2_find(container, arg, flags)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
static struct console_pvt globals
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Configuration option-handling.
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
#define ast_debug(level,...)
Log a DEBUG message.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
#define ast_cond_destroy(cond)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define AST_MUTEX_DEFINE_STATIC(mutex)
#define ast_cond_signal(cond)
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
struct ao2_container * container
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
static struct ast_str * multi_object_blob_to_ami(void *obj)
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
static char * topic_complete_name(const char *word)
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
static struct ast_cli_entry cli_stasis[]
#define INITIAL_SUBSCRIBERS_MAX
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
static void subscription_change_dtor(void *obj)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
static int sub_cleanup(void *data)
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
static void forward_dtor(void *obj)
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
static void topic_pool_dtor(void *obj)
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
static void * stasis_config_alloc(void)
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
static void topic_dtor(void *obj)
#define TOPIC_POOL_BUCKETS
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
int stasis_init(void)
Initialize the Stasis subsystem.
static void proxy_dtor(void *weakproxy, void *container)
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
static void stasis_config_destructor(void *obj)
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
static struct ast_threadpool * threadpool
static int topic_pool_entry_hash(const void *obj, const int flags)
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
static struct aco_type threadpool_option
static void multi_object_blob_dtor(void *obj)
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
static void subscription_dtor(void *obj)
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
struct aco_file stasis_conf
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
static void stasis_declined_config_destructor(void *obj)
static struct aco_type * threadpool_options[]
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
static void topic_pool_entry_dtor(void *obj)
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
struct aco_type * declined_options[]
#define topic_lock_both(topic1, topic2)
Lock two topics.
struct ao2_container * topic_all
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define TOPIC_ALL_BUCKETS
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
stasis_subscription_message_filter
Stasis subscription message filters.
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
@ STASIS_SUBSCRIPTION_FILTER_NONE
stasis_subscription_message_formatters
Stasis subscription formatter filters.
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
The representation of a single configuration file to be processed.
Type information about a category-level configurable object.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
A multi object blob data structure to carry user event stasis messages.
struct ast_multi_object_blob::@398 snapshots[STASIS_UMOS_MAX]
Structure for mutex and tracking information.
Support for dynamic strings.
A ast_taskprocessor structure is a singleton by name.
int idle_timeout
Time limit in seconds for idle threads.
int max_size
Maximum number of threads a pool may have.
int auto_increment
Number of threads to increment pool by.
int initial_size
Number of threads the pool will start with.
An opaque threadpool structure.
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
struct stasis_threadpool_conf * threadpool_options
A structure to hold global configuration-related options.
struct ao2_container * declined
struct stasis_topic * from_topic
struct stasis_topic * to_topic
Structure containing callbacks for Stasis message sanitization.
Holds details about changes to subscriptions for the specified topic.
struct stasis_topic * topic
struct stasis_topic * topic
stasis_subscription_cb callback
struct stasis_subscription::@397 accepted_message_types
struct ast_taskprocessor * mailbox
enum stasis_subscription_message_filter filter
int final_message_processed
enum stasis_subscription_message_formatters accepted_formatters
Threadpool configuration options.
struct ao2_container * pool_container
struct stasis_topic * pool_topic
struct stasis_topic::@396 upstream_topics
struct stasis_topic::@395 subscribers
struct timeval * creationtime
struct stasis_topic * topic
struct stasis_forward * forward
struct timeval creationtime
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
static void statistics(void)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR(name, type)
Define a vector structure.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.