302#define INITIAL_SUBSCRIBERS_MAX 4
305#define TOPIC_POOL_BUCKETS 57
312#if defined(LOW_MEMORY)
314#define TOPIC_ALL_BUCKETS 257
318#define TOPIC_ALL_BUCKETS 997
325#define TOPIC_STATISTICS_BUCKETS 57
328#define SUBSCRIPTION_STATISTICS_BUCKETS 57
337struct stasis_message_type_statistics {
350static AST_VECTOR(,
struct stasis_message_type_statistics) message_type_statistics;
353struct stasis_topic_statistics {
355 long highest_time_dispatched;
357 long lowest_time_dispatched;
359 int messages_not_dispatched;
361 int messages_dispatched;
426#define topic_lock_both(topic1, topic2) \
429 while (ao2_trylock(topic2)) { \
430 AO2_DEADLOCK_AVOIDANCE(topic1); \
441 ast_debug(2,
"Destroying topic. name: %s, detail: %s\n",
450 ast_debug(1,
"Topic '%s': %p destroyed\n", topic->
name, topic);
453 if (topic->statistics) {
459 ao2_ref(topic->statistics, -1);
465static void topic_statistics_destroy(
void *obj)
467 struct stasis_topic_statistics *
statistics = obj;
472static struct stasis_topic_statistics *stasis_topic_statistics_create(
struct stasis_topic *topic)
522 detail_len = strlen(
detail) + 1;
525 sizeof(*proxy) + strlen(
name) + 1 + detail_len,
NULL,
name);
583 ast_debug(2,
"Topic is already exist. name: %s, detail: %s\n",
607 topic->statistics = stasis_topic_statistics_create(topic);
608 if (!topic->statistics) {
613 ast_debug(1,
"Topic '%s': %p created\n", topic->
name, topic);
650struct stasis_subscription_statistics {
660 long highest_time_invoked;
662 long lowest_time_invoked;
664 int messages_dropped;
711 struct stasis_subscription_statistics *
statistics;
739 if (
sub->statistics) {
741 if (subscription_stats) {
743 ao2_ref(subscription_stats, -1);
761 struct timeval start;
770 sub->final_message_rxed = 1;
788 sub->final_message_processed = 1;
795 if (elapsed >
sub->statistics->highest_time_invoked) {
796 sub->statistics->highest_time_invoked = elapsed;
801 if (elapsed < sub->
statistics->lowest_time_invoked) {
802 sub->statistics->lowest_time_invoked = elapsed;
815static void subscription_statistics_destroy(
void *obj)
817 struct stasis_subscription_statistics *
statistics = obj;
822static struct stasis_subscription_statistics *stasis_subscription_statistics_create(
struct stasis_subscription *
sub,
823 int needs_mailbox,
int use_thread_pool,
const char *
file,
int lineno,
826 struct stasis_subscription_statistics *
statistics;
829 if (!subscription_stats) {
848 statistics->uses_threadpool = use_thread_pool;
882 sub->statistics = stasis_subscription_statistics_create(
sub, needs_mailbox, use_thread_pool,
file, lineno, func);
900 use_thread_pool ?
'p' :
'm',
908 if (use_thread_pool) {
988 "Internal error: subscription has invalid topic\n");
1013 long low_water,
long high_water)
1019 low_water, high_water);
1027 if (!subscription) {
1057 if (!subscription) {
1081 if (!subscription) {
1138 if (!subscription) {
1172 return sub->uniqueid;
1328 int type_filter_specified = 0;
1329 int formatter_filter_specified = 0;
1330 int type_filter_passed = 0;
1331 int formatter_filter_passed = 0;
1342 if (!type_filter_specified && !formatter_filter_specified) {
1346 type_filter_passed = type_filter_specified
1354 if (type_filter_passed) {
1358 formatter_filter_passed = formatter_filter_specified
1361 if (formatter_filter_passed) {
1377 if (!
sub->mailbox) {
1436 unsigned int dispatched = 0;
1438 struct stasis_message_type_statistics *
statistics;
1439 struct timeval start;
1449 struct stasis_message_type_statistics new_statistics = {
1495 if (elapsed >
topic->statistics->highest_time_dispatched) {
1496 topic->statistics->highest_time_dispatched = elapsed;
1498 if (elapsed < topic->
statistics->lowest_time_dispatched) {
1499 topic->statistics->lowest_time_dispatched = elapsed;
1631 size_t uniqueid_len = strlen(
uniqueid) + 1;
1751 char *container_name =
1789 const char *right_key = arg;
1794 right_key = object_right->
name;
1797 cmp = strcasecmp(object_left->
name, right_key);
1823static void topic_pool_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
1852 char *container_name =
1873 int pool_topic_name_len = strlen(pool_topic_name);
1874 const char *search_topic_name;
1876 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1877 search_topic_name = topic_name + pool_topic_name_len + 1;
1879 search_topic_name = topic_name;
1889 char *new_topic_name;
2028 if (!channel_snapshot) {
2117 ami_snapshot =
NULL;
2150 if (!strcmp(
"eventname", key)) {
2162 const char *eventname;
2167 if (!object_string || !body) {
2205 .name =
"threadpool",
2207 .category =
"threadpool",
2216 .name =
"declined_message_types",
2217 .item_offset = offsetof(
struct stasis_config, declined_message_types),
2219 .category =
"declined_message_types",
2287 char *name_in_declined;
2296 res = name_in_declined ? 1 : 0;
2300 ast_debug(4,
"Declining to allocate Stasis message type '%s' due to configuration\n",
name);
2341#define FMT_HEADERS "%-64s %-64s\n"
2342#define FMT_FIELDS "%-64s %-64s\n"
2346 e->
command =
"stasis show topics";
2348 "Usage: stasis show topics\n"
2349 " Shows a list of topics\n";
2355 if (
a->argc != e->
args) {
2362 topic_proxy_sort_fn,
NULL);
2380 ast_cli(
a->fd,
"\n%d Total topics\n\n", count);
2396 int wordlen = strlen(
word);
2401 if (!strncasecmp(
word, topic->
name, wordlen)) {
2421 char print_time[32];
2426 e->
command =
"stasis show topic";
2428 "Usage: stasis show topic <name>\n"
2429 " Show stasis topic detail info.\n";
2445 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[3]);
2454 ast_cli(
a->fd,
"Duration time: %s\n", print_time);
2457 ast_cli(
a->fd,
"\nSubscribers:\n");
2460 ast_cli(
a->fd,
" UniqueID: %s, Topic: %s, Detail: %s\n",
2464 ast_cli(
a->fd,
"\nForwarded topics:\n");
2496 struct stasis_subscription_statistics *
statistics;
2500#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2501#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2502#define FMT_FIELDS2 "%-64s %10d %10d\n"
2506 e->
command =
"stasis statistics show subscriptions";
2508 "Usage: stasis statistics show subscriptions\n"
2509 " Shows a list of subscriptions and their general statistics\n";
2515 if (
a->argc != e->
args) {
2520 if (!subscription_stats) {
2521 ast_cli(
a->fd,
"Could not fetch subscription_statistics container\n");
2526 stasis_subscription_statistics_sort_fn,
NULL);
2527 if (!sorted_subscriptions) {
2528 ao2_ref(subscription_stats, -1);
2529 ast_cli(
a->fd,
"Could not create container for sorting subscription statistics\n");
2534 ao2_ref(sorted_subscriptions, -1);
2535 ao2_ref(subscription_stats, -1);
2536 ast_cli(
a->fd,
"Could not sort subscription statistics\n");
2540 ao2_ref(subscription_stats, -1);
2542 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Subscription",
"Dropped",
"Passed",
"Lowest Invoke",
"Highest Invoke");
2555 ao2_ref(sorted_subscriptions, -1);
2557 ast_cli(
a->fd, FMT_FIELDS2,
"Total", dropped, passed);
2558 ast_cli(
a->fd,
"\n%d subscriptions\n\n", count);
2571static char *subscription_statistics_complete_name(
const char *
word,
int state)
2573 struct stasis_subscription_statistics *
statistics;
2576 int wordlen = strlen(
word);
2581 if (!subscription_stats) {
2588 && ++which >
state) {
2597 ao2_ref(subscription_stats, -1);
2607 struct stasis_subscription_statistics *
statistics;
2614 e->
command =
"stasis statistics show subscription";
2616 "Usage: stasis statistics show subscription <uniqueid>\n"
2617 " Show stasis subscription statistics.\n";
2621 return subscription_statistics_complete_name(
a->word,
a->n);
2632 if (!subscription_stats) {
2633 ast_cli(
a->fd,
"Could not fetch subcription_statistics container\n");
2639 ao2_ref(subscription_stats, -1);
2640 ast_cli(
a->fd,
"Specified subscription '%s' does not exist\n",
a->argv[4]);
2644 ao2_ref(subscription_stats, -1);
2651 ast_cli(
a->fd,
"Number of messages dropped due to filtering: %d\n",
statistics->messages_dropped);
2652 ast_cli(
a->fd,
"Number of messages passed to subscriber callback: %d\n",
statistics->messages_passed);
2653 ast_cli(
a->fd,
"Using mailbox to queue messages: %s\n",
statistics->uses_mailbox ?
"Yes" :
"No");
2654 ast_cli(
a->fd,
"Using stasis threadpool for handling messages: %s\n",
statistics->uses_threadpool ?
"Yes" :
"No");
2655 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->lowest_time_invoked);
2656 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->highest_time_invoked);
2666 ast_cli(
a->fd,
"Subscribed topics:\n");
2692 int not_dispatched = 0;
2694#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2695#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2696#define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2700 e->
command =
"stasis statistics show topics";
2702 "Usage: stasis statistics show topics\n"
2703 " Shows a list of topics and their general statistics\n";
2709 if (
a->argc != e->
args) {
2715 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2720 stasis_topic_statistics_sort_fn,
NULL);
2721 if (!sorted_topics) {
2723 ast_cli(
a->fd,
"Could not create container for sorting topic statistics\n");
2730 ast_cli(
a->fd,
"Could not sort topic statistics\n");
2736 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Topic",
"Subscribers",
"Dropped",
"Dispatched",
"Lowest Dispatch",
"Highest Dispatch");
2743 not_dispatched +=
statistics->messages_not_dispatched;
2744 dispatched +=
statistics->messages_dispatched;
2752 ast_cli(
a->fd, FMT_FIELDS2,
"Total",
"", not_dispatched, dispatched);
2753 ast_cli(
a->fd,
"\n%d topics\n\n", count);
2766static char *topic_statistics_complete_name(
const char *
word,
int state)
2771 int wordlen = strlen(
word);
2783 && ++which >
state) {
2809 e->
command =
"stasis statistics show topic";
2811 "Usage: stasis statistics show topic <name>\n"
2812 " Show stasis topic statistics.\n";
2816 return topic_statistics_complete_name(
a->word,
a->n);
2828 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2835 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[4]);
2843 ast_cli(
a->fd,
"Number of messages published that went to no subscriber: %d\n",
statistics->messages_not_dispatched);
2844 ast_cli(
a->fd,
"Number of messages that went to at least one subscriber: %d\n",
statistics->messages_dispatched);
2845 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent dispatching message: %ld\n",
statistics->lowest_time_dispatched);
2846 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent dispatching messages: %ld\n",
statistics->highest_time_dispatched);
2852 ast_cli(
a->fd,
"\t%s\n", uniqueid);
2872#define FMT_HEADERS "%-64s %10s %10s\n"
2873#define FMT_FIELDS "%-64s %10d %10d\n"
2877 e->
command =
"stasis statistics show messages";
2879 "Usage: stasis statistics show messages\n"
2880 " Shows a list of message types and their general statistics\n";
2886 if (
a->argc != e->
args) {
2909 ast_cli(
a->fd,
"\n%d seen message types\n\n", count);
2918 AST_CLI_DEFINE(statistics_show_subscriptions,
"Show subscriptions with general statistics"),
2919 AST_CLI_DEFINE(statistics_show_subscription,
"Show subscription statistics"),
2920 AST_CLI_DEFINE(statistics_show_topics,
"Show topics with general statistics"),
2922 AST_CLI_DEFINE(statistics_show_messages,
"Show message types with general statistics"),
2925static int subscription_statistics_hash(
const void *obj,
const int flags)
2927 const struct stasis_subscription_statistics *object;
2936 key =
object->uniqueid;
2946static int subscription_statistics_cmp(
void *obj,
void *arg,
int flags)
2948 const struct stasis_subscription_statistics *object_left = obj;
2949 const struct stasis_subscription_statistics *object_right = arg;
2950 const char *right_key = arg;
2955 right_key = object_right->uniqueid;
2958 cmp = strcasecmp(object_left->uniqueid, right_key);
2983static int topic_statistics_hash(
const void *obj,
const int flags)
2985 const struct stasis_topic_statistics *object;
3004static int topic_statistics_cmp(
void *obj,
void *arg,
int flags)
3006 const struct stasis_topic_statistics *object_left = obj;
3007 const struct stasis_topic_statistics *object_right = arg;
3008 const char *right_key = arg;
3013 right_key = object_right->name;
3016 cmp = strcasecmp(object_left->name, right_key);
3102 ast_log(
LOG_ERROR,
"Failed to initialize defaults on Stasis configuration object\n");
3109 ast_log(
LOG_ERROR,
"Failed to load stasis.conf and failed to initialize defaults.\n");
3141 if (cache_init != 0) {
3153 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3167 subscription_statistics_hash, 0, subscription_statistics_cmp);
3168 if (!subscription_stats) {
3175 topic_statistics_hash, 0, topic_statistics_cmp);
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
#define ast_strdup(str)
A wrapper for strdup()
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
#define ast_calloc(num, len)
A wrapper for calloc()
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
#define ao2_iterator_next(iter)
#define ao2_link(container, obj)
Add an object to a container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink(container, obj)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
#define ao2_find(container, arg, flags)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
static struct console_pvt globals
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Configuration option-handling.
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
#define ast_debug(level,...)
Log a DEBUG message.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
#define ast_cond_destroy(cond)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define AST_MUTEX_DEFINE_STATIC(mutex)
#define ast_cond_signal(cond)
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct stasis_forward * sub
struct ao2_container * container
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
static struct ast_str * multi_object_blob_to_ami(void *obj)
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
static char * topic_complete_name(const char *word)
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
static struct ast_cli_entry cli_stasis[]
#define INITIAL_SUBSCRIBERS_MAX
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
static void subscription_change_dtor(void *obj)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
static int sub_cleanup(void *data)
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
static void forward_dtor(void *obj)
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
static void topic_pool_dtor(void *obj)
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
static void * stasis_config_alloc(void)
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
static void topic_dtor(void *obj)
#define TOPIC_POOL_BUCKETS
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
int stasis_init(void)
Initialize the Stasis subsystem.
static void proxy_dtor(void *weakproxy, void *container)
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
static void stasis_config_destructor(void *obj)
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
static struct ast_threadpool * threadpool
static int topic_pool_entry_hash(const void *obj, const int flags)
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
static struct aco_type threadpool_option
static void multi_object_blob_dtor(void *obj)
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
static void subscription_dtor(void *obj)
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
struct aco_file stasis_conf
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
static void stasis_declined_config_destructor(void *obj)
static struct aco_type * threadpool_options[]
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
static void topic_pool_entry_dtor(void *obj)
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
struct aco_type * declined_options[]
#define topic_lock_both(topic1, topic2)
Lock two topics.
struct ao2_container * topic_all
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define TOPIC_ALL_BUCKETS
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
stasis_subscription_message_filter
Stasis subscription message filters.
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
@ STASIS_SUBSCRIPTION_FILTER_NONE
stasis_subscription_message_formatters
Stasis subscription formatter filters.
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_cache_init(void)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
The representation of a single configuration file to be processed.
Type information about a category-level configurable object.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
A multi object blob data structure to carry user event stasis messages.
struct ast_multi_object_blob::@395 snapshots[STASIS_UMOS_MAX]
Structure for mutex and tracking information.
Support for dynamic strings.
A ast_taskprocessor structure is a singleton by name.
int idle_timeout
Time limit in seconds for idle threads.
int max_size
Maximum number of threads a pool may have.
int auto_increment
Number of threads to increment pool by.
int initial_size
Number of threads the pool will start with.
An opaque threadpool structure.
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
struct stasis_threadpool_conf * threadpool_options
A structure to hold global configuration-related options.
struct ao2_container * declined
struct stasis_topic * from_topic
struct stasis_topic * to_topic
Structure containing callbacks for Stasis message sanitization.
Holds details about changes to subscriptions for the specified topic.
struct stasis_topic * topic
struct stasis_topic * topic
stasis_subscription_cb callback
struct stasis_subscription::@394 accepted_message_types
struct ast_taskprocessor * mailbox
enum stasis_subscription_message_filter filter
int final_message_processed
enum stasis_subscription_message_formatters accepted_formatters
Threadpool configuration options.
struct ao2_container * pool_container
struct stasis_topic * pool_topic
struct stasis_topic::@393 upstream_topics
struct timeval * creationtime
struct stasis_topic::@392 subscribers
struct stasis_topic * topic
struct stasis_forward * forward
struct timeval creationtime
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
static void statistics(void)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR(name, type)
Define a vector structure.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.