303 #define INITIAL_SUBSCRIBERS_MAX 4
306 #define TOPIC_POOL_BUCKETS 57
313 #if defined(LOW_MEMORY)
315 #define TOPIC_ALL_BUCKETS 257
319 #define TOPIC_ALL_BUCKETS 997
326 #define TOPIC_STATISTICS_BUCKETS 57
329 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
338 struct stasis_message_type_statistics {
351 static AST_VECTOR(,
struct stasis_message_type_statistics) message_type_statistics;
354 struct stasis_topic_statistics {
356 long highest_time_dispatched;
358 long lowest_time_dispatched;
360 int messages_not_dispatched;
362 int messages_dispatched;
427 #define topic_lock_both(topic1, topic2) \
430 while (ao2_trylock(topic2)) { \
431 AO2_DEADLOCK_AVOIDANCE(topic1); \
442 ast_debug(2,
"Destroying topic. name: %s, detail: %s\n",
451 ast_debug(1,
"Topic '%s': %p destroyed\n", topic->
name, topic);
454 if (topic->statistics) {
460 ao2_ref(topic->statistics, -1);
466 static void topic_statistics_destroy(
void *obj)
468 struct stasis_topic_statistics *
statistics = obj;
473 static struct stasis_topic_statistics *stasis_topic_statistics_create(
struct stasis_topic *topic)
523 detail_len = strlen(
detail) + 1;
526 sizeof(*proxy) + strlen(
name) + 1 + detail_len,
NULL,
name);
584 ast_debug(2,
"Topic is already exist. name: %s, detail: %s\n",
608 topic->statistics = stasis_topic_statistics_create(topic);
609 if (!topic->statistics) {
614 ast_debug(1,
"Topic '%s': %p created\n", topic->
name, topic);
651 struct stasis_subscription_statistics {
661 long highest_time_invoked;
663 long lowest_time_invoked;
665 int messages_dropped;
712 struct stasis_subscription_statistics *
statistics;
740 if (
sub->statistics) {
742 if (subscription_stats) {
744 ao2_ref(subscription_stats, -1);
762 struct timeval start;
771 sub->final_message_rxed = 1;
789 sub->final_message_processed = 1;
796 if (elapsed >
sub->statistics->highest_time_invoked) {
797 sub->statistics->highest_time_invoked = elapsed;
802 if (elapsed < sub->
statistics->lowest_time_invoked) {
803 sub->statistics->lowest_time_invoked = elapsed;
816 static void subscription_statistics_destroy(
void *obj)
818 struct stasis_subscription_statistics *
statistics = obj;
823 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(
struct stasis_subscription *
sub,
824 int needs_mailbox,
int use_thread_pool,
const char *
file,
int lineno,
827 struct stasis_subscription_statistics *
statistics;
830 if (!subscription_stats) {
849 statistics->uses_threadpool = use_thread_pool;
883 sub->statistics = stasis_subscription_statistics_create(
sub, needs_mailbox, use_thread_pool,
file, lineno, func);
901 use_thread_pool ?
'p' :
'm',
909 if (use_thread_pool) {
989 "Internal error: subscription has invalid topic\n");
1014 long low_water,
long high_water)
1020 low_water, high_water);
1028 if (!subscription) {
1058 if (!subscription) {
1082 if (!subscription) {
1139 if (!subscription) {
1173 return sub->uniqueid;
1329 int type_filter_specified = 0;
1330 int formatter_filter_specified = 0;
1331 int type_filter_passed = 0;
1332 int formatter_filter_passed = 0;
1343 if (!type_filter_specified && !formatter_filter_specified) {
1347 type_filter_passed = type_filter_specified
1355 if (type_filter_passed) {
1359 formatter_filter_passed = formatter_filter_specified
1362 if (formatter_filter_passed) {
1378 if (!
sub->mailbox) {
1437 unsigned int dispatched = 0;
1439 struct stasis_message_type_statistics *
statistics;
1440 struct timeval start;
1450 struct stasis_message_type_statistics new_statistics = {
1496 if (elapsed >
topic->statistics->highest_time_dispatched) {
1497 topic->statistics->highest_time_dispatched = elapsed;
1499 if (elapsed < topic->
statistics->lowest_time_dispatched) {
1500 topic->statistics->lowest_time_dispatched = elapsed;
1632 size_t uniqueid_len = strlen(
uniqueid) + 1;
1752 char *container_name =
1790 const char *right_key = arg;
1795 right_key = object_right->
name;
1798 cmp = strcasecmp(object_left->
name, right_key);
1824 static void topic_pool_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
1853 char *container_name =
1874 int pool_topic_name_len = strlen(pool_topic_name);
1875 const char *search_topic_name;
1877 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1878 search_topic_name = topic_name + pool_topic_name_len + 1;
1880 search_topic_name = topic_name;
1890 char *new_topic_name;
2029 if (!channel_snapshot) {
2118 ami_snapshot =
NULL;
2151 if (!strcmp(
"eventname", key)) {
2163 const char *eventname;
2168 if (!object_string || !body) {
2206 .name =
"threadpool",
2208 .category =
"threadpool",
2217 .name =
"declined_message_types",
2218 .item_offset = offsetof(
struct stasis_config, declined_message_types),
2220 .category =
"declined_message_types",
2288 char *name_in_declined;
2297 res = name_in_declined ? 1 : 0;
2342 #define FMT_HEADERS "%-64s %-64s\n"
2343 #define FMT_FIELDS "%-64s %-64s\n"
2347 e->
command =
"stasis show topics";
2349 "Usage: stasis show topics\n"
2350 " Shows a list of topics\n";
2356 if (
a->argc != e->
args) {
2363 topic_proxy_sort_fn,
NULL);
2381 ast_cli(
a->fd,
"\n%d Total topics\n\n", count);
2397 int wordlen = strlen(
word);
2402 if (!strncasecmp(
word, topic->
name, wordlen)) {
2422 char print_time[32];
2427 e->
command =
"stasis show topic";
2429 "Usage: stasis show topic <name>\n"
2430 " Show stasis topic detail info.\n";
2446 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[3]);
2455 ast_cli(
a->fd,
"Duration time: %s\n", print_time);
2458 ast_cli(
a->fd,
"\nSubscribers:\n");
2461 ast_cli(
a->fd,
" UniqueID: %s, Topic: %s, Detail: %s\n",
2465 ast_cli(
a->fd,
"\nForwarded topics:\n");
2497 struct stasis_subscription_statistics *
statistics;
2501 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2502 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2503 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2507 e->
command =
"stasis statistics show subscriptions";
2509 "Usage: stasis statistics show subscriptions\n"
2510 " Shows a list of subscriptions and their general statistics\n";
2516 if (
a->argc != e->
args) {
2521 if (!subscription_stats) {
2522 ast_cli(
a->fd,
"Could not fetch subscription_statistics container\n");
2527 stasis_subscription_statistics_sort_fn,
NULL);
2528 if (!sorted_subscriptions) {
2529 ao2_ref(subscription_stats, -1);
2530 ast_cli(
a->fd,
"Could not create container for sorting subscription statistics\n");
2535 ao2_ref(sorted_subscriptions, -1);
2536 ao2_ref(subscription_stats, -1);
2537 ast_cli(
a->fd,
"Could not sort subscription statistics\n");
2541 ao2_ref(subscription_stats, -1);
2543 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Subscription",
"Dropped",
"Passed",
"Lowest Invoke",
"Highest Invoke");
2556 ao2_ref(sorted_subscriptions, -1);
2558 ast_cli(
a->fd, FMT_FIELDS2,
"Total", dropped, passed);
2559 ast_cli(
a->fd,
"\n%d subscriptions\n\n", count);
2572 static char *subscription_statistics_complete_name(
const char *
word,
int state)
2574 struct stasis_subscription_statistics *
statistics;
2577 int wordlen = strlen(
word);
2582 if (!subscription_stats) {
2589 && ++which >
state) {
2598 ao2_ref(subscription_stats, -1);
2608 struct stasis_subscription_statistics *
statistics;
2615 e->
command =
"stasis statistics show subscription";
2617 "Usage: stasis statistics show subscription <uniqueid>\n"
2618 " Show stasis subscription statistics.\n";
2622 return subscription_statistics_complete_name(
a->word,
a->n);
2633 if (!subscription_stats) {
2634 ast_cli(
a->fd,
"Could not fetch subcription_statistics container\n");
2640 ao2_ref(subscription_stats, -1);
2641 ast_cli(
a->fd,
"Specified subscription '%s' does not exist\n",
a->argv[4]);
2645 ao2_ref(subscription_stats, -1);
2652 ast_cli(
a->fd,
"Number of messages dropped due to filtering: %d\n",
statistics->messages_dropped);
2653 ast_cli(
a->fd,
"Number of messages passed to subscriber callback: %d\n",
statistics->messages_passed);
2654 ast_cli(
a->fd,
"Using mailbox to queue messages: %s\n",
statistics->uses_mailbox ?
"Yes" :
"No");
2655 ast_cli(
a->fd,
"Using stasis threadpool for handling messages: %s\n",
statistics->uses_threadpool ?
"Yes" :
"No");
2656 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->lowest_time_invoked);
2657 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent invoking message: %ld\n",
statistics->highest_time_invoked);
2667 ast_cli(
a->fd,
"Subscribed topics:\n");
2693 int not_dispatched = 0;
2695 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2696 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2697 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2701 e->
command =
"stasis statistics show topics";
2703 "Usage: stasis statistics show topics\n"
2704 " Shows a list of topics and their general statistics\n";
2710 if (
a->argc != e->
args) {
2716 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2721 stasis_topic_statistics_sort_fn,
NULL);
2722 if (!sorted_topics) {
2724 ast_cli(
a->fd,
"Could not create container for sorting topic statistics\n");
2731 ast_cli(
a->fd,
"Could not sort topic statistics\n");
2737 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Topic",
"Subscribers",
"Dropped",
"Dispatched",
"Lowest Dispatch",
"Highest Dispatch");
2744 not_dispatched +=
statistics->messages_not_dispatched;
2745 dispatched +=
statistics->messages_dispatched;
2753 ast_cli(
a->fd, FMT_FIELDS2,
"Total",
"", not_dispatched, dispatched);
2754 ast_cli(
a->fd,
"\n%d topics\n\n", count);
2767 static char *topic_statistics_complete_name(
const char *
word,
int state)
2772 int wordlen = strlen(
word);
2784 && ++which >
state) {
2810 e->
command =
"stasis statistics show topic";
2812 "Usage: stasis statistics show topic <name>\n"
2813 " Show stasis topic statistics.\n";
2817 return topic_statistics_complete_name(
a->word,
a->n);
2829 ast_cli(
a->fd,
"Could not fetch topic_statistics container\n");
2836 ast_cli(
a->fd,
"Specified topic '%s' does not exist\n",
a->argv[4]);
2844 ast_cli(
a->fd,
"Number of messages published that went to no subscriber: %d\n",
statistics->messages_not_dispatched);
2845 ast_cli(
a->fd,
"Number of messages that went to at least one subscriber: %d\n",
statistics->messages_dispatched);
2846 ast_cli(
a->fd,
"Lowest amount of time (in milliseconds) spent dispatching message: %ld\n",
statistics->lowest_time_dispatched);
2847 ast_cli(
a->fd,
"Highest amount of time (in milliseconds) spent dispatching messages: %ld\n",
statistics->highest_time_dispatched);
2853 ast_cli(
a->fd,
"\t%s\n", uniqueid);
2873 #define FMT_HEADERS "%-64s %10s %10s\n"
2874 #define FMT_FIELDS "%-64s %10d %10d\n"
2878 e->
command =
"stasis statistics show messages";
2880 "Usage: stasis statistics show messages\n"
2881 " Shows a list of message types and their general statistics\n";
2887 if (
a->argc != e->
args) {
2910 ast_cli(
a->fd,
"\n%d seen message types\n\n", count);
2919 AST_CLI_DEFINE(statistics_show_subscriptions,
"Show subscriptions with general statistics"),
2920 AST_CLI_DEFINE(statistics_show_subscription,
"Show subscription statistics"),
2921 AST_CLI_DEFINE(statistics_show_topics,
"Show topics with general statistics"),
2923 AST_CLI_DEFINE(statistics_show_messages,
"Show message types with general statistics"),
2926 static int subscription_statistics_hash(
const void *obj,
const int flags)
2928 const struct stasis_subscription_statistics *object;
2937 key =
object->uniqueid;
2947 static int subscription_statistics_cmp(
void *obj,
void *arg,
int flags)
2949 const struct stasis_subscription_statistics *object_left = obj;
2950 const struct stasis_subscription_statistics *object_right = arg;
2951 const char *right_key = arg;
2956 right_key = object_right->uniqueid;
2959 cmp = strcasecmp(object_left->uniqueid, right_key);
2984 static int topic_statistics_hash(
const void *obj,
const int flags)
2986 const struct stasis_topic_statistics *object;
3005 static int topic_statistics_cmp(
void *obj,
void *arg,
int flags)
3007 const struct stasis_topic_statistics *object_left = obj;
3008 const struct stasis_topic_statistics *object_right = arg;
3009 const char *right_key = arg;
3014 right_key = object_right->name;
3017 cmp = strcasecmp(object_left->name, right_key);
3103 ast_log(
LOG_ERROR,
"Failed to initialize defaults on Stasis configuration object\n");
3110 ast_log(
LOG_ERROR,
"Failed to load stasis.conf and failed to initialize defaults.\n");
3142 if (cache_init != 0) {
3154 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3168 subscription_statistics_hash, 0, subscription_statistics_cmp);
3169 if (!subscription_stats) {
3176 topic_statistics_hash, 0, topic_statistics_cmp);
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
#define ast_strdup(str)
A wrapper for strdup()
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
#define ast_calloc(num, len)
A wrapper for calloc()
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
#define ao2_iterator_next(iter)
#define ao2_link(container, obj)
Add an object to a container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink(container, obj)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
#define ao2_find(container, arg, flags)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
static struct console_pvt globals
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Standard Command Line Interface.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Configuration option-handling.
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
@ ACO_PROCESS_ERROR
Their was an error and no changes were applied.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
@ OPT_INT_T
Type for default option handler for signed integers.
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define STASIS_UMOS_MAX
Number of snapshot types.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ast_debug(level,...)
Log a DEBUG message.
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
#define ast_cond_destroy(cond)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define AST_MUTEX_DEFINE_STATIC(mutex)
#define ast_cond_signal(cond)
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
struct stasis_forward * sub
struct ao2_container * container
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
static struct ast_cli_entry cli_stasis[]
#define INITIAL_SUBSCRIBERS_MAX
static void subscription_change_dtor(void *obj)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
static int sub_cleanup(void *data)
static void forward_dtor(void *obj)
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
static void topic_pool_dtor(void *obj)
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
static void * stasis_config_alloc(void)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
static void topic_dtor(void *obj)
#define TOPIC_POOL_BUCKETS
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
static struct ast_str * multi_object_blob_to_ami(void *obj)
int stasis_init(void)
Initialize the Stasis subsystem.
static void proxy_dtor(void *weakproxy, void *container)
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
static void stasis_config_destructor(void *obj)
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
static char * topic_complete_name(const char *word)
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
static struct ast_threadpool * threadpool
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
static int topic_pool_entry_hash(const void *obj, const int flags)
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
void stasis_log_bad_type_access(const char *name)
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct aco_type threadpool_option
static void multi_object_blob_dtor(void *obj)
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
static void subscription_dtor(void *obj)
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
struct aco_file stasis_conf
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
static void stasis_declined_config_destructor(void *obj)
static struct aco_type * threadpool_options[]
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
static void topic_pool_entry_dtor(void *obj)
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
static int userevent_exclusion_cb(const char *key)
struct aco_type * declined_options[]
#define topic_lock_both(topic1, topic2)
Lock two topics.
struct ao2_container * topic_all
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic.
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
#define TOPIC_ALL_BUCKETS
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
stasis_subscription_message_filter
Stasis subscription message filters.
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
@ STASIS_SUBSCRIPTION_FILTER_NONE
stasis_subscription_message_formatters
Stasis subscription formatter filters.
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
int stasis_cache_init(void)
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
static int message_type_id
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
The representation of a single configuration file to be processed.
Type information about a category-level configurable object.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
A multi object blob data structure to carry user event stasis messages.
struct ast_multi_object_blob::@422 snapshots[STASIS_UMOS_MAX]
Structure for mutex and tracking information.
Support for dynamic strings.
A ast_taskprocessor structure is a singleton by name.
int idle_timeout
Time limit in seconds for idle threads.
int max_size
Maximum number of threads a pool may have.
int auto_increment
Number of threads to increment pool by.
int initial_size
Number of threads the pool will start with.
An opaque threadpool structure.
Structure for variables, used for configurations and for channel variables.
struct stasis_declined_config * declined_message_types
struct stasis_threadpool_conf * threadpool_options
A structure to hold global configuration-related options.
struct ao2_container * declined
struct stasis_topic * from_topic
struct stasis_topic * to_topic
Structure containing callbacks for Stasis message sanitization.
Holds details about changes to subscriptions for the specified topic.
struct stasis_topic * topic
struct stasis_topic * topic
struct stasis_subscription::@421 accepted_message_types
stasis_subscription_cb callback
struct ast_taskprocessor * mailbox
enum stasis_subscription_message_filter filter
int final_message_processed
enum stasis_subscription_message_formatters accepted_formatters
Threadpool configuration options.
struct ao2_container * pool_container
struct stasis_topic * pool_topic
struct stasis_topic::@419 subscribers
struct stasis_topic::@420 upstream_topics
struct timeval * creationtime
struct stasis_topic * topic
struct stasis_forward * forward
struct timeval creationtime
An API for managing task processing threads that can be shared across modules.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
static void statistics(void)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Universally unique identifier support.
Vector container support.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR(name, type)
Define a vector structure.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.