Asterisk - The Open Source Telephony Project GIT-master-a358458
Data Structures | Macros | Functions | Variables
messaging.c File Reference

Stasis out-of-call text message support. More...

#include "asterisk.h"
#include "asterisk/message.h"
#include "asterisk/endpoints.h"
#include "asterisk/astobj2.h"
#include "asterisk/vector.h"
#include "asterisk/lock.h"
#include "asterisk/utils.h"
#include "asterisk/test.h"
#include "messaging.h"
Include dependency graph for messaging.c:

Go to the source code of this file.

Data Structures

struct  application_tuple
 Storage object for an application. More...
 
struct  message_subscription
 A subscription to some endpoint or technology. More...
 

Macros

#define ENDPOINTS_NUM_BUCKETS   127
 Number of buckets for the endpoint_subscriptions container. More...
 
#define TECH_WILDCARD   "__AST_ALL_TECH"
 Subscription to all technologies. More...
 

Functions

static struct application_tupleapplication_tuple_alloc (const char *app_name, message_received_cb callback, void *pvt)
 
static int application_tuple_cmp (struct application_tuple *item, const char *key)
 
static void application_tuple_dtor (void *obj)
 
static void dispatch_message (struct message_subscription *sub, const char *endpoint_name, struct ast_json *json_msg)
 
static struct message_subscriptionget_or_create_subscription (struct ast_endpoint *endpoint)
 
static struct message_subscriptionget_subscription (struct ast_endpoint *endpoint)
 
static int handle_msg_cb (struct ast_msg *msg)
 
static int has_destination_cb (const struct ast_msg *msg)
 
static int is_app_subscribed (struct message_subscription *sub, const char *app_name)
 
static struct message_subscriptionmessage_subscription_alloc (const char *token)
 
static int message_subscription_compare_cb (void *obj, void *arg, int flags)
 
static void message_subscription_dtor (void *obj)
 
static int message_subscription_hash_cb (const void *obj, const int flags)
 
int messaging_app_subscribe_endpoint (const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
 Subscribe an application to an endpoint for messages. More...
 
void messaging_app_unsubscribe_endpoint (const char *app_name, const char *endpoint_id)
 Subscribe for messages from a particular endpoint. More...
 
int messaging_cleanup (void)
 Tidy up the messaging layer. More...
 
int messaging_init (void)
 Initialize the messaging layer. More...
 
static int messaging_subscription_cmp (struct message_subscription *sub, const char *key)
 
static void msg_to_endpoint (const struct ast_msg *msg, char *buf, size_t len)
 
static struct ast_jsonmsg_to_json (struct ast_msg *msg)
 

Variables

struct ast_msg_handler ari_msg_handler
 
static struct ao2_containerendpoint_subscriptions
 The subscriptions to endpoints. More...
 
struct {
   size_t   current
 
   struct message_subscription **   elems
 
   size_t   max
 
tech_subscriptions
 The subscriptions to technologies. More...
 
static ast_rwlock_t tech_subscriptions_lock
 RWLock for tech_subscriptions. More...
 

Detailed Description

Stasis out-of-call text message support.

Author
Matt Jordan mjord.nosp@m.an@d.nosp@m.igium.nosp@m..com

Definition in file messaging.c.

Macro Definition Documentation

◆ ENDPOINTS_NUM_BUCKETS

#define ENDPOINTS_NUM_BUCKETS   127

Number of buckets for the endpoint_subscriptions container.

Definition at line 46 of file messaging.c.

◆ TECH_WILDCARD

#define TECH_WILDCARD   "__AST_ALL_TECH"

Subscription to all technologies.

Definition at line 41 of file messaging.c.

Function Documentation

◆ application_tuple_alloc()

static struct application_tuple * application_tuple_alloc ( const char *  app_name,
message_received_cb  callback,
void *  pvt 
)
static

Definition at line 89 of file messaging.c.

90{
91 struct application_tuple *tuple;
92 size_t size = sizeof(*tuple) + strlen(app_name) + 1;
93
95
97 if (!tuple) {
98 return NULL;
99 }
100
101 strcpy(tuple->app_name, app_name); /* Safe */
102 tuple->pvt = ao2_bump(pvt);
103 tuple->callback = callback;
104
105 return tuple;
106}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
static void application_tuple_dtor(void *obj)
Definition: messaging.c:81
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
#define NULL
Definition: resample.c:96
Storage object for an application.
Definition: messaging.c:49
message_received_cb callback
Definition: messaging.c:53
#define ast_assert(a)
Definition: utils.h:739

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, app_name(), application_tuple::app_name, application_tuple_dtor(), ast_assert, application_tuple::callback, NULL, and application_tuple::pvt.

Referenced by messaging_app_subscribe_endpoint().

◆ application_tuple_cmp()

static int application_tuple_cmp ( struct application_tuple item,
const char *  key 
)
static

Definition at line 373 of file messaging.c.

374{
375 return !strcmp(item->app_name, key) ? 1 : 0;
376}
static struct aco_type item
Definition: test_config.c:1463

References item.

Referenced by messaging_app_unsubscribe_endpoint().

◆ application_tuple_dtor()

static void application_tuple_dtor ( void *  obj)
static

Definition at line 81 of file messaging.c.

82{
83 struct application_tuple *tuple = obj;
84
85 ao2_cleanup(tuple->pvt);
86}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934

References ao2_cleanup, and application_tuple::pvt.

Referenced by application_tuple_alloc().

◆ dispatch_message()

static void dispatch_message ( struct message_subscription sub,
const char *  endpoint_name,
struct ast_json json_msg 
)
static

Definition at line 292 of file messaging.c.

293{
294 int i;
295
296 ast_debug(3, "Dispatching message to subscription %s for endpoint %s\n",
297 sub->token,
298 endpoint_name);
299 for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
300 struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
301
302 tuple->callback(endpoint_name, json_msg, tuple->pvt);
303 }
304}
#define ast_debug(level,...)
Log a DEBUG message.
struct stasis_forward * sub
Definition: res_corosync.c:240
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ast_debug, AST_VECTOR_GET, AST_VECTOR_SIZE, application_tuple::callback, application_tuple::pvt, and sub.

Referenced by handle_msg_cb().

◆ get_or_create_subscription()

static struct message_subscription * get_or_create_subscription ( struct ast_endpoint endpoint)
static

Definition at line 459 of file messaging.c.

460{
461 struct message_subscription *sub = get_subscription(endpoint);
462
463 if (sub) {
464 return sub;
465 }
466
468 if (!sub) {
469 return NULL;
470 }
471
472 /* Either endpoint_subscriptions or tech_subscriptions will hold a reference to
473 * the subscription. This reference is released to allow the subscription to
474 * eventually destruct when there are no longer any applications receiving
475 * events from the subscription.
476 */
477 if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
479 } else {
481 ao2_ref(sub, +1);
483 /* Release the refs that were for the vector and the allocation. */
484 ao2_ref(sub, -2);
485 sub = NULL;
486 }
488 }
489
490 return sub;
491}
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
const char * ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
Gets the resource name of the given endpoint.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
#define ast_rwlock_wrlock(a)
Definition: lock.h:236
#define ast_rwlock_unlock(a)
Definition: lock.h:234
static struct ao2_container * endpoint_subscriptions
The subscriptions to endpoints.
Definition: messaging.c:67
#define TECH_WILDCARD
Subscription to all technologies.
Definition: messaging.c:41
static struct @490 tech_subscriptions
The subscriptions to technologies.
static struct message_subscription * message_subscription_alloc(const char *token)
Definition: messaging.c:123
static struct message_subscription * get_subscription(struct ast_endpoint *endpoint)
Definition: messaging.c:394
static ast_rwlock_t tech_subscriptions_lock
RWLock for tech_subscriptions.
Definition: messaging.c:78
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
A subscription to some endpoint or technology.
Definition: messaging.c:59
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References ao2_link, ao2_ref, ast_endpoint_get_id(), ast_endpoint_get_resource(), ast_rwlock_unlock, ast_rwlock_wrlock, ast_strlen_zero(), AST_VECTOR_APPEND, endpoint_subscriptions, get_subscription(), message_subscription_alloc(), NULL, sub, tech_subscriptions, tech_subscriptions_lock, and TECH_WILDCARD.

Referenced by messaging_app_subscribe_endpoint().

◆ get_subscription()

static struct message_subscription * get_subscription ( struct ast_endpoint endpoint)
static

Definition at line 394 of file messaging.c.

395{
396 struct message_subscription *sub = NULL;
397
398 if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
400 } else {
401 int i;
402
404 for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
406
407 if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
408 ao2_bump(sub);
409 break;
410 }
411
412 /* Need to reset the pointer at this line to prevent from using the wrong subscription due to
413 * the token check failing.
414 */
415 sub = NULL;
416 }
418 }
419
420 return sub;
421}
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
const char * ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
Gets the technology of the given endpoint.
#define ast_rwlock_rdlock(a)
Definition: lock.h:235

References ao2_bump, ao2_find, ast_endpoint_get_id(), ast_endpoint_get_resource(), ast_endpoint_get_tech(), ast_rwlock_rdlock, ast_rwlock_unlock, ast_strlen_zero(), AST_VECTOR_GET, AST_VECTOR_SIZE, endpoint_subscriptions, NULL, OBJ_SEARCH_KEY, sub, tech_subscriptions, tech_subscriptions_lock, and TECH_WILDCARD.

Referenced by get_or_create_subscription(), and messaging_app_unsubscribe_endpoint().

◆ handle_msg_cb()

static int handle_msg_cb ( struct ast_msg msg)
static

Definition at line 306 of file messaging.c.

307{
308 /* We have at most 3 subscriptions: TECH_WILDCARD, tech itself, and endpoint. */
309 struct message_subscription *matching_subscriptions[3];
311 int i, j;
312 int result;
313 char buf[256];
314 const char *endpoint_name;
315 struct ast_json *json_msg;
316
317 msg_to_endpoint(msg, buf, sizeof(buf));
318 endpoint_name = buf;
319 json_msg = msg_to_json(msg);
320 if (!json_msg) {
321 return -1;
322 }
323 result = -1;
324
325 /* Find subscriptions to TECH_WILDCARD and to the endpoint's technology. */
327 for (i = 0, j = 0; i < AST_VECTOR_SIZE(&tech_subscriptions) && j < 2; i++) {
329
330 if (!sub) {
331 continue;
332 }
333
334 if (!strcmp(sub->token, TECH_WILDCARD)
335 || !strncasecmp(sub->token, buf, strlen(sub->token))) {
336 ao2_ref(sub, +1);
337 matching_subscriptions[j++] = sub;
338 }
339 }
341
342 /* Find the subscription to this particular endpoint. */
344 if (sub) {
345 matching_subscriptions[j++] = sub;
346 }
347
348 /* Dispatch the message to all matching subscriptions. */
349 for (i = 0; i < j; i++) {
350 sub = matching_subscriptions[i];
351
352 dispatch_message(sub, endpoint_name, json_msg);
353
354 ao2_ref(sub, -1);
355 result = 0;
356 }
357
358 ast_json_unref(json_msg);
359 return result;
360}
static PGresult * result
Definition: cel_pgsql.c:84
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static void dispatch_message(struct message_subscription *sub, const char *endpoint_name, struct ast_json *json_msg)
Definition: messaging.c:292
static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len)
Definition: messaging.c:200
static struct ast_json * msg_to_json(struct ast_msg *msg)
Definition: messaging.c:252
Abstract JSON element (object, array, string, int, ...).

References ao2_find, ao2_ref, ast_json_unref(), ast_rwlock_rdlock, ast_rwlock_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, buf, dispatch_message(), endpoint_subscriptions, msg_to_endpoint(), msg_to_json(), OBJ_SEARCH_KEY, result, sub, tech_subscriptions, tech_subscriptions_lock, and TECH_WILDCARD.

◆ has_destination_cb()

static int has_destination_cb ( const struct ast_msg msg)
static

Definition at line 213 of file messaging.c.

214{
216 int i;
217 char buf[256];
218
219 msg_to_endpoint(msg, buf, sizeof(buf));
220
222 for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
224
225 if (!sub) {
226 continue;
227 }
228
229 if (!strcmp(sub->token, TECH_WILDCARD)
230 || !strncasecmp(sub->token, buf, strlen(sub->token))
231 || !strncasecmp(sub->token, buf, strlen(sub->token))) {
233 goto match;
234 }
235
236 }
238
240 if (sub) {
241 ao2_ref(sub, -1);
242 goto match;
243 }
244
245 ast_debug(1, "No subscription found for %s\n", buf);
246 return 0;
247
248match:
249 return 1;
250}
static int match(struct ast_sockaddr *addr, unsigned short callno, unsigned short dcallno, const struct chan_iax2_pvt *cur, int check_dcallno)
Definition: chan_iax2.c:2362

References ao2_find, ao2_ref, ast_debug, ast_rwlock_rdlock, ast_rwlock_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, buf, endpoint_subscriptions, match(), msg_to_endpoint(), OBJ_SEARCH_KEY, sub, tech_subscriptions, tech_subscriptions_lock, and TECH_WILDCARD.

◆ is_app_subscribed()

static int is_app_subscribed ( struct message_subscription sub,
const char *  app_name 
)
static

Definition at line 378 of file messaging.c.

379{
380 int i;
381
382 for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
383 struct application_tuple *tuple;
384
385 tuple = AST_VECTOR_GET(&sub->applications, i);
386 if (tuple && !strcmp(tuple->app_name, app_name)) {
387 return 1;
388 }
389 }
390
391 return 0;
392}

References app_name(), application_tuple::app_name, AST_VECTOR_GET, AST_VECTOR_SIZE, and sub.

Referenced by messaging_app_subscribe_endpoint(), and messaging_app_unsubscribe_endpoint().

◆ message_subscription_alloc()

static struct message_subscription * message_subscription_alloc ( const char *  token)
static

Definition at line 123 of file messaging.c.

124{
126 size_t size = sizeof(*sub) + strlen(token) + 1;
127
129 if (!sub) {
130 return NULL;
131 }
132 strcpy(sub->token, token); /* Safe */
133
134 return sub;
135}
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition: astobj2.h:365
static void message_subscription_dtor(void *obj)
Definition: messaging.c:109

References AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_alloc_options, message_subscription_dtor(), NULL, sub, and message_subscription::token.

Referenced by get_or_create_subscription().

◆ message_subscription_compare_cb()

static int message_subscription_compare_cb ( void *  obj,
void *  arg,
int  flags 
)
static

AO2 comparison function for message_subscription

Definition at line 160 of file messaging.c.

161{
162 const struct message_subscription *object_left = obj;
163 const struct message_subscription *object_right = arg;
164 const char *right_key = arg;
165 int cmp;
166
167 switch (flags & OBJ_SEARCH_MASK) {
169 right_key = object_right->token;
170 /* Fall through */
171 case OBJ_SEARCH_KEY:
172 cmp = strcmp(object_left->token, right_key);
173 break;
175 /*
176 * We could also use a partial key struct containing a length
177 * so strlen() does not get called for every comparison instead.
178 */
179 cmp = strncmp(object_left->token, right_key, strlen(right_key));
180 break;
181 default:
182 /*
183 * What arg points to is specific to this traversal callback
184 * and has no special meaning to astobj2.
185 */
186 cmp = 0;
187 break;
188 }
189 if (cmp) {
190 return 0;
191 }
192 /*
193 * At this point the traversal callback is identical to a sorted
194 * container.
195 */
196 return CMP_MATCH;
197}
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072

References CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and message_subscription::token.

Referenced by messaging_init().

◆ message_subscription_dtor()

static void message_subscription_dtor ( void *  obj)
static

Definition at line 109 of file messaging.c.

110{
111 struct message_subscription *sub = obj;
112 int i;
113
114 for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
115 struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
116
117 ao2_cleanup(tuple);
118 }
119 AST_VECTOR_FREE(&sub->applications);
120}
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174

References ao2_cleanup, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, and sub.

Referenced by message_subscription_alloc().

◆ message_subscription_hash_cb()

static int message_subscription_hash_cb ( const void *  obj,
const int  flags 
)
static

AO2 hash function for message_subscription

Definition at line 138 of file messaging.c.

139{
140 const struct message_subscription *sub;
141 const char *key;
142
143 switch (flags & OBJ_SEARCH_MASK) {
144 case OBJ_SEARCH_KEY:
145 key = obj;
146 break;
148 sub = obj;
149 key = sub->token;
150 break;
151 default:
152 /* Hash can only work on something with a full key. */
153 ast_assert(0);
154 return 0;
155 }
156 return ast_str_hash(key);
157}
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
Definition: strings.h:1259

References ast_assert, ast_str_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and sub.

Referenced by messaging_init().

◆ messaging_app_subscribe_endpoint()

int messaging_app_subscribe_endpoint ( const char *  app_name,
struct ast_endpoint endpoint,
message_received_cb  callback,
void *  pvt 
)

Subscribe an application to an endpoint for messages.

Parameters
app_nameThe name of the Stasis Message Bus API application to subscribe to endpoint
endpointThe endpoint object to subscribe to
callbackThe callback to call when a message is received
pvtAn ao2 ref counted object that will be passed to the callback.
Return values
0subscription was successful
-1subscription failed

Definition at line 493 of file messaging.c.

494{
496 struct application_tuple *tuple;
497
499 if (!sub) {
500 return -1;
501 }
502
503 ao2_lock(sub);
506 return 0;
507 }
508
510 if (!tuple) {
512 return -1;
513 }
514 if (AST_VECTOR_APPEND(&sub->applications, tuple)) {
515 ao2_ref(tuple, -1);
517 return -1;
518 }
520
521 ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
522 ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
523 app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
524
525 return 0;
526}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
static struct application_tuple * application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt)
Definition: messaging.c:89
static struct message_subscription * get_or_create_subscription(struct ast_endpoint *endpoint)
Definition: messaging.c:459
static int is_app_subscribed(struct message_subscription *sub, const char *app_name)
Definition: messaging.c:378
#define ast_test_suite_event_notify(s, f,...)
Definition: test.h:189
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlock, app_name(), application_tuple_alloc(), ast_debug, ast_endpoint_get_id(), ast_test_suite_event_notify, AST_VECTOR_APPEND, application_tuple::callback, get_or_create_subscription(), is_app_subscribed(), NULL, application_tuple::pvt, RAII_VAR, and sub.

Referenced by app_subscribe_endpoint().

◆ messaging_app_unsubscribe_endpoint()

void messaging_app_unsubscribe_endpoint ( const char *  app_name,
const char *  endpoint_id 
)

Subscribe for messages from a particular endpoint.

Parameters
app_nameName of the stasis application to unsubscribe from messaging
endpoint_idThe ID of the endpoint we no longer care about

Definition at line 423 of file messaging.c.

424{
426 RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
427
428 endpoint = ast_endpoint_find_by_id(endpoint_id);
429 sub = get_subscription(endpoint);
430 if (!sub) {
431 return;
432 }
433
434 ao2_lock(sub);
437 return;
438 }
439
441 if (AST_VECTOR_SIZE(&sub->applications) == 0) {
442 if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
444 } else {
449 ao2_ref(sub, -1); /* Release the reference held by tech_subscriptions */
450 }
451 }
453
454 ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
455 ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
456 app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
457}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
static int messaging_subscription_cmp(struct message_subscription *sub, const char *key)
Definition: messaging.c:368
static int application_tuple_cmp(struct application_tuple *item, const char *key)
Definition: messaging.c:373
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
Definition: vector.h:488

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlink, ao2_unlock, app_name(), application_tuple_cmp(), ast_debug, ast_endpoint_find_by_id(), ast_endpoint_get_id(), ast_endpoint_get_resource(), ast_rwlock_unlock, ast_rwlock_wrlock, ast_strlen_zero(), ast_test_suite_event_notify, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_REMOVE_CMP_UNORDERED, AST_VECTOR_SIZE, endpoint_subscriptions, get_subscription(), is_app_subscribed(), messaging_subscription_cmp(), NULL, RAII_VAR, sub, tech_subscriptions, tech_subscriptions_lock, and TECH_WILDCARD.

Referenced by unsubscribe().

◆ messaging_cleanup()

int messaging_cleanup ( void  )

Tidy up the messaging layer.

Return values
0success
-1failure

Definition at line 529 of file messaging.c.

530{
535
536 return 0;
537}
#define ast_rwlock_destroy(rwlock)
Definition: lock.h:233
int ast_msg_handler_unregister(const struct ast_msg_handler *handler)
Unregister a ast_msg_handler.
struct ast_msg_handler ari_msg_handler
Definition: messaging.c:362

References ao2_ref, ari_msg_handler, ast_msg_handler_unregister(), ast_rwlock_destroy, AST_VECTOR_FREE, endpoint_subscriptions, tech_subscriptions, and tech_subscriptions_lock.

Referenced by unload_module().

◆ messaging_init()

int messaging_init ( void  )

Initialize the messaging layer.

Return values
0success
-1failure

Definition at line 539 of file messaging.c.

540{
543 message_subscription_compare_cb, "Endpoint messaging subscription container creation");
545 return -1;
546 }
547
550 return -1;
551 }
552
556 return -1;
557 }
558
563 return -1;
564 }
565
566 return 0;
567}
#define ao2_t_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn, tag)
Definition: astobj2.h:1306
#define ast_rwlock_init(rwlock)
wrapper for rwlock with tracking enabled
Definition: lock.h:224
int ast_msg_handler_register(const struct ast_msg_handler *handler)
Register a ast_msg_handler.
#define ENDPOINTS_NUM_BUCKETS
Number of buckets for the endpoint_subscriptions container.
Definition: messaging.c:46
static int message_subscription_hash_cb(const void *obj, const int flags)
Definition: messaging.c:138
static int message_subscription_compare_cb(void *obj, void *arg, int flags)
Definition: messaging.c:160
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113

References AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_ref, ao2_t_container_alloc_hash, ari_msg_handler, ast_msg_handler_register(), ast_rwlock_destroy, ast_rwlock_init, AST_VECTOR_FREE, AST_VECTOR_INIT, endpoint_subscriptions, ENDPOINTS_NUM_BUCKETS, message_subscription_compare_cb(), message_subscription_hash_cb(), NULL, tech_subscriptions, and tech_subscriptions_lock.

Referenced by load_module().

◆ messaging_subscription_cmp()

static int messaging_subscription_cmp ( struct message_subscription sub,
const char *  key 
)
static

Definition at line 368 of file messaging.c.

369{
370 return !strcmp(sub->token, key) ? 1 : 0;
371}

References sub.

Referenced by messaging_app_unsubscribe_endpoint().

◆ msg_to_endpoint()

static void msg_to_endpoint ( const struct ast_msg msg,
char *  buf,
size_t  len 
)
static

Definition at line 200 of file messaging.c.

201{
202 const char *endpoint = ast_msg_get_endpoint(msg);
203
204 snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg),
205 ast_strlen_zero(endpoint) ? "" : "/",
206 S_OR(endpoint, ""));
207}
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
const char * ast_msg_get_endpoint(const struct ast_msg *msg)
Retrieve the endpoint associated with this message.
Definition: main/message.c:565
const char * ast_msg_get_tech(const struct ast_msg *msg)
Retrieve the technology associated with this message.
Definition: main/message.c:560
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80

References ast_msg_get_endpoint(), ast_msg_get_tech(), ast_strlen_zero(), buf, len(), and S_OR.

Referenced by handle_msg_cb(), and has_destination_cb().

◆ msg_to_json()

static struct ast_json * msg_to_json ( struct ast_msg msg)
static

Definition at line 252 of file messaging.c.

253{
254 struct ast_json *json_obj;
255 struct ast_json *json_vars;
256 struct ast_msg_var_iterator *it_vars;
257 const char *name;
258 const char *value;
259
260 it_vars = ast_msg_var_iterator_init(msg);
261 if (!it_vars) {
262 return NULL;
263 }
264
265 json_vars = ast_json_object_create();
266 if (!json_vars) {
268 return NULL;
269 }
270
271 while (ast_msg_var_iterator_next_received(msg, it_vars, &name, &value)) {
272 struct ast_json *json_val = ast_json_string_create(value);
273 if (!json_val || ast_json_object_set(json_vars, name, json_val)) {
274 ast_json_unref(json_vars);
276 return NULL;
277 }
278
280 }
282
283 json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}",
284 "from", ast_msg_get_from(msg),
285 "to", ast_msg_get_to(msg),
286 "body", ast_msg_get_body(msg),
287 "variables", json_vars);
288
289 return json_obj;
290}
static const char name[]
Definition: format_mp3.c:68
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_msg_get_from(const struct ast_msg *msg)
Retrieve the source of this message.
Definition: main/message.c:550
void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *iter)
Destroy a message variable iterator.
Definition: main/message.c:720
const char * ast_msg_get_to(const struct ast_msg *msg)
Retrieve the destination of this message.
Definition: main/message.c:555
const char * ast_msg_get_body(const struct ast_msg *msg)
Get the body of a message.
Definition: main/message.c:545
int ast_msg_var_iterator_next_received(const struct ast_msg *msg, struct ast_msg_var_iterator *iter, const char **name, const char **value)
Get the next variable name and value that was set on a received message.
Definition: main/message.c:708
void ast_msg_var_unref_current(struct ast_msg_var_iterator *iter)
Unref a message var from inside an iterator loop.
Definition: main/message.c:714
struct ast_msg_var_iterator * ast_msg_var_iterator_init(const struct ast_msg *msg)
Create a new message variable iterator.
Definition: main/message.c:658
int value
Definition: syslog.c:37

References ast_json_object_create(), ast_json_object_set(), ast_json_pack(), ast_json_string_create(), ast_json_unref(), ast_msg_get_body(), ast_msg_get_from(), ast_msg_get_to(), ast_msg_var_iterator_destroy(), ast_msg_var_iterator_init(), ast_msg_var_iterator_next_received(), ast_msg_var_unref_current(), name, NULL, and value.

Referenced by handle_msg_cb().

Variable Documentation

◆ ari_msg_handler

struct ast_msg_handler ari_msg_handler
Initial value:
= {
.name = "ari",
.handle_msg = handle_msg_cb,
.has_destination = has_destination_cb,
}
static int handle_msg_cb(struct ast_msg *msg)
Definition: messaging.c:306
static int has_destination_cb(const struct ast_msg *msg)
Definition: messaging.c:213

Definition at line 362 of file messaging.c.

Referenced by messaging_cleanup(), and messaging_init().

◆ current

size_t current

Definition at line 75 of file messaging.c.

◆ elems

struct message_subscription* * elems

Definition at line 75 of file messaging.c.

◆ endpoint_subscriptions

struct ao2_container* endpoint_subscriptions
static

◆ max

size_t max

Definition at line 75 of file messaging.c.

◆ 

struct { ... } tech_subscriptions

The subscriptions to technologies.

Note
These are stored separately from standard endpoints, given how relatively few of them there are.

Referenced by get_or_create_subscription(), get_subscription(), handle_msg_cb(), has_destination_cb(), messaging_app_unsubscribe_endpoint(), messaging_cleanup(), and messaging_init().

◆ tech_subscriptions_lock

ast_rwlock_t tech_subscriptions_lock
static