Asterisk - The Open Source Telephony Project GIT-master-8f1982c
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
Functions
stasis_internal.h File Reference

Internal Stasis APIs. More...

#include "asterisk/stasis.h"
Include dependency graph for stasis_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 

Detailed Description

Internal Stasis APIs.

This header file is used to define functions that are shared between files that make up Stasis Message Bus API. Functions declared here should not be used by any module outside of Stasis.

If you find yourself needing to call one of these functions directly, something has probably gone horribly wrong.

Author
Matt Jordan mjord.nosp@m.an@d.nosp@m.igium.nosp@m..com

Definition in file stasis_internal.h.

Function Documentation

◆ internal_stasis_subscribe()

struct stasis_subscription * internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12

Definition at line 883 of file stasis.c.

892{
893 struct stasis_subscription *sub;
894 int ret;
895
896 if (!topic) {
897 return NULL;
898 }
899
900 /* The ao2 lock is used for join_cond. */
902 if (!sub) {
903 return NULL;
904 }
905
906#ifdef AST_DEVMODE
908 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
909 if (ret < 0 || !sub->statistics) {
910 ao2_ref(sub, -1);
911 return NULL;
912 }
913#else
915 if (ret < 0) {
916 ao2_ref(sub, -1);
917 return NULL;
918 }
919#endif
920
921 if (needs_mailbox) {
922 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
923
924 /* Create name with seq number appended. */
925 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
926 use_thread_pool ? 'p' : 'm',
928
929 /*
930 * With a small number of subscribers, a thread-per-sub is
931 * acceptable. For a large number of subscribers, a thread
932 * pool should be used.
933 */
934 if (use_thread_pool) {
935 sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
936 } else {
937 sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
938 }
939 if (!sub->mailbox) {
940 ao2_ref(sub, -1);
941
942 return NULL;
943 }
945 /* Taskprocessor has a reference */
946 ao2_ref(sub, +1);
947 }
948
949 ao2_ref(topic, +1);
950 sub->topic = topic;
951 sub->callback = callback;
952 sub->data = data;
953 ast_cond_init(&sub->join_cond, NULL);
955 AST_VECTOR_INIT(&sub->accepted_message_types, 0);
956 sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
957
958 if (topic_add_subscription(topic, sub) != 0) {
959 ao2_ref(sub, -1);
960 ao2_ref(topic, -1);
961
962 return NULL;
963 }
965
966 return sub;
967}
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static struct ast_channel * callback(struct ast_channelstorage_instance *driver, ao2_callback_data_fn *cb_fn, void *arg, void *data, int ao2_flags)
#define ast_cond_init(cond, attr)
Definition: lock.h:208
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:764
struct stasis_forward * sub
Definition: res_corosync.c:240
#define NULL
Definition: resample.c:96
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:654
static struct ast_threadpool * threadpool
Definition: stasis.c:334
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1228
static void subscription_dtor(void *obj)
Definition: stasis.c:741
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1675
@ STASIS_SUBSCRIPTION_FILTER_NONE
Definition: stasis.h:295
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
struct stasis_topic * topic
Definition: stasis.c:711
int subscriber_id
Definition: stasis.c:410
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
static void statistics(void)
Definition: utils/frame.c:287
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113

References ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), ast_threadpool_serializer(), AST_VECTOR_INIT, callback(), stasis_subscription::data, make_ari_stubs::file, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_topic_name(), statistics(), sub, stasis_topic::subscriber_id, subscription_dtor(), threadpool, stasis_subscription::topic, topic_add_subscription(), and TPS_REF_DEFAULT.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().