Asterisk - The Open Source Telephony Project GIT-master-67613d1
Macros | Functions
stasis_message_router.h File Reference
#include "asterisk/stasis.h"
Include dependency graph for stasis_message_router.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define stasis_message_router_create(topic)   __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 Create a new message router object. More...
 
#define stasis_message_router_create_pool(topic)   __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 Create a new message router object. More...
 

Functions

struct stasis_message_router__stasis_message_router_create (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 
struct stasis_message_router__stasis_message_router_create_pool (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 
void stasis_message_router_accept_formatters (struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
 Indicate to a message router that we are interested in messages with one or more formatters. More...
 
int stasis_message_router_add (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route to a message router. More...
 
int stasis_message_router_add_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route for stasis_cache_update messages to a message router. More...
 
int stasis_message_router_is_done (struct stasis_message_router *router)
 Returns whether router has received its final message. More...
 
void stasis_message_router_publish_sync (struct stasis_message_router *router, struct stasis_message *message)
 Publish a message to a message router's subscription synchronously. More...
 
void stasis_message_router_remove (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a route from a message router. More...
 
void stasis_message_router_remove_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a cache route from a message router. More...
 
int stasis_message_router_set_congestion_limits (struct stasis_message_router *router, long low_water, long high_water)
 Set the high and low alert water marks of the stasis message router. More...
 
int stasis_message_router_set_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
 Sets the default route of a router. More...
 
void stasis_message_router_set_formatters_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
 Sets the default route of a router with formatters. More...
 
void stasis_message_router_unsubscribe (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic. More...
 
void stasis_message_router_unsubscribe_and_join (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic, blocking until the final message has been processed. More...
 

Macro Definition Documentation

◆ stasis_message_router_create

#define stasis_message_router_create (   topic)    __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Create a new message router object.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
Return values
NULLon error.
Since
12

Definition at line 58 of file stasis_message_router.h.

◆ stasis_message_router_create_pool

#define stasis_message_router_create_pool (   topic)    __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Create a new message router object.

The subscription created for this message router will dispatch callbacks on a thread pool.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
Return values
NULLon error.
Since
12.8.0

Definition at line 75 of file stasis_message_router.h.

Function Documentation

◆ __stasis_message_router_create()

struct stasis_message_router * __stasis_message_router_create ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Definition at line 244 of file stasis_message_router.c.

246{
247 return stasis_message_router_create_internal(topic, 0, file, lineno, func);
248}
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)

References make_ari_stubs::file, and stasis_message_router_create_internal().

◆ __stasis_message_router_create_pool()

struct stasis_message_router * __stasis_message_router_create_pool ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Definition at line 250 of file stasis_message_router.c.

252{
253 return stasis_message_router_create_internal(topic, 1, file, lineno, func);
254}

References make_ari_stubs::file, and stasis_message_router_create_internal().

◆ stasis_message_router_accept_formatters()

void stasis_message_router_accept_formatters ( struct stasis_message_router router,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a message router that we are interested in messages with one or more formatters.

The formatters are passed on to the underlying subscription.

Warning
With direct subscriptions, adding a formatter filter is an OR operation with any message type filters. In the current implementation of message router however, it's an AND operation. Even when setting a default route, the callback will only get messages that have the formatters provides in this call.
Parameters
routerRouter to set the formatters of.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 418 of file stasis_message_router.c.

420{
422
424
425 return;
426}
static struct stasis_message_router * router
#define NULL
Definition: resample.c:96
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
struct stasis_subscription * subscription
#define ast_assert(a)
Definition: utils.h:739

References ast_assert, NULL, router, stasis_subscription_accept_formatters(), and stasis_message_router::subscription.

◆ stasis_message_router_add()

int stasis_message_router_add ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route to a message router.

A particular message_type may have at most one route per router. If you route stasis_cache_update messages, the callback will only receive updates for types not handled by routes added with stasis_message_router_add_cache_update().

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeType of message to route.
callbackCallback to forward messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 308 of file stasis_message_router.c.

311{
312 int res;
313
315
316 if (!message_type) {
317 /* Cannot route to NULL type. */
318 return -1;
319 }
321 res = route_table_add(&router->routes, message_type, callback, data);
322 if (!res) {
324 /* Until a specific message type was added we would already drop the message, so being
325 * selective now doesn't harm us. If we have a default route then we are already forced
326 * to filter nothing and messages will come in regardless.
327 */
329 }
331 return res;
332}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_add(), router, stasis_message_router::routes, stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), AST_TEST_DEFINE(), create_routes(), endpoint_internal_create(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_confbridge_init(), manager_endpoints_init(), manager_mwi_init(), manager_subscriptions_init(), meetme_stasis_init(), pjsip_outbound_registration_metrics_init(), and setup_stasis_subs().

◆ stasis_message_router_add_cache_update()

int stasis_message_router_add_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route for stasis_cache_update messages to a message router.

A particular message_type may have at most one cache route per router. These are distinct from regular routes, so one could have both a regular route and a cache route for the same message_type.

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeSubtype of cache update to route.
callbackCallback to forward messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 334 of file stasis_message_router.c.

337{
338 int res;
339
341
342 if (!message_type) {
343 /* Cannot cache a route to NULL type. */
344 return -1;
345 }
347 res = route_table_add(&router->cache_routes, message_type, callback, data);
348 if (!res) {
351 }
353 return res;
354}
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct route_table cache_routes

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, route_table_add(), router, stasis_cache_update_type(), stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), and AST_TEST_DEFINE().

◆ stasis_message_router_is_done()

int stasis_message_router_is_done ( struct stasis_message_router router)

Returns whether router has received its final message.

Parameters
routerRouter.
Return values
True(non-zero) if stasis_subscription_final_message() has been received.
False(zero) if waiting for the end.

Definition at line 276 of file stasis_message_router.c.

277{
278 if (!router) {
279 /* Null router is about as done as you can get */
280 return 1;
281 }
282
284}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1118

References router, stasis_subscription_is_done(), and stasis_message_router::subscription.

Referenced by endpoint_dtor().

◆ stasis_message_router_publish_sync()

void stasis_message_router_publish_sync ( struct stasis_message_router router,
struct stasis_message message 
)

Publish a message to a message router's subscription synchronously.

Parameters
routerRouter
messageThe Stasis Message Bus API message

This should be used when a message needs to be published synchronously to the underlying subscription created by a message router. This is analagous to stasis_publish_sync.

Note that the caller will be blocked until the thread servicing the message on the message router's subscription completes handling of the message.

Since
12.1.0

Definition at line 286 of file stasis_message_router.c.

288{
290
294}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516

References ao2_bump, ao2_cleanup, ast_assert, NULL, router, stasis_publish_sync(), and stasis_message_router::subscription.

Referenced by ast_cdr_engine_term(), cdr_prop_write(), cdr_read(), cdr_write(), forkcdr_exec(), and publish_app_cdr_message().

◆ stasis_message_router_remove()

void stasis_message_router_remove ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 356 of file stasis_message_router.c.

358{
360
361 if (!message_type) {
362 /* Cannot remove a NULL type. */
363 return;
364 }
366 route_table_remove(&router->routes, message_type);
368}
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_remove(), router, and stasis_message_router::routes.

Referenced by cleanup_module(), load_general_config(), and unload_module().

◆ stasis_message_router_remove_cache_update()

void stasis_message_router_remove_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a cache route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 370 of file stasis_message_router.c.

373{
375
376 if (!message_type) {
377 /* Cannot remove a NULL type. */
378 return;
379 }
381 route_table_remove(&router->cache_routes, message_type);
383}

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, route_table_remove(), and router.

◆ stasis_message_router_set_congestion_limits()

int stasis_message_router_set_congestion_limits ( struct stasis_message_router router,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis message router.

Since
13.10.0
Parameters
routerPointer to a stasis message router
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 296 of file stasis_message_router.c.

298{
299 int res = -1;
300
301 if (router) {
303 low_water, high_water);
304 }
305 return res;
306}
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1011

References router, stasis_subscription_set_congestion_limits(), and stasis_message_router::subscription.

Referenced by create_routes(), load_module(), and manager_subscriptions_init().

◆ stasis_message_router_set_default()

int stasis_message_router_set_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data 
)

Sets the default route of a router.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12
Note
Setting a default callback will automatically cause the underlying subscription to receive all messages and not be filtered. If filtering is desired then a specific route for each message type should be provided.

Definition at line 385 of file stasis_message_router.c.

388{
390
391 /* While this implementation can never fail, it used to be able to */
392 return 0;
393}
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.

References router, stasis_message_router_set_formatters_default(), and STASIS_SUBSCRIPTION_FORMATTER_NONE.

Referenced by AST_TEST_DEFINE(), load_module(), and setup_stasis_subs().

◆ stasis_message_router_set_formatters_default()

void stasis_message_router_set_formatters_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data,
enum stasis_subscription_message_formatters  formatters 
)

Sets the default route of a router with formatters.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.26.0
16.3.0
Note
If formatters are specified then the message router will remain in a selective filtering state. Any explicit routes will receive messages of their message type and the default callback will only receive messages that have one of the given formatters. Explicit routes will not be filtered according to the given formatters.

Definition at line 395 of file stasis_message_router.c.

399{
401 ast_assert(callback != NULL);
402
404
406 router->default_route.callback = callback;
407 router->default_route.data = data;
409
410 if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
411 /* Formatters govern what messages the default callback get, so it is only if none is
412 * specified that we accept all messages regardless.
413 */
415 }
416}
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
stasis_subscription_cb callback
struct stasis_message_route default_route

References ao2_lock, ao2_unlock, ast_assert, stasis_message_route::callback, stasis_message_route::data, stasis_message_router::default_route, NULL, router, stasis_subscription_accept_formatters(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), manager_subscriptions_init(), and stasis_message_router_set_default().

◆ stasis_message_router_unsubscribe()

void stasis_message_router_unsubscribe ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic.

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 256 of file stasis_message_router.c.

257{
258 if (!router) {
259 return;
260 }
261
265}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971

References ao2_lock, ao2_unlock, router, stasis_unsubscribe(), and stasis_message_router::subscription.

Referenced by app_shutdown(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), manager_confbridge_shutdown(), meetme_stasis_cleanup(), remove_stasis_subscriptions(), and setup_stasis_subs().

◆ stasis_message_router_unsubscribe_and_join()

void stasis_message_router_unsubscribe_and_join ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic, blocking until the final message has been processed.

See stasis_unsubscribe_and_join() for info on when to use this vs. stasis_message_router_unsubscribe().

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 267 of file stasis_message_router.c.

269{
270 if (!router) {
271 return;
272 }
274}
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134

References router, stasis_unsubscribe_and_join(), and stasis_message_router::subscription.

Referenced by AST_TEST_DEFINE(), cdr_engine_shutdown(), cleanup_module(), destroy_routes(), manager_endpoints_shutdown(), manager_shutdown(), pjsip_outbound_registration_metrics_unload_cb(), and unload_module().