Asterisk - The Open Source Telephony Project  GIT-master-b7027de
Macros | Functions
stasis_message_router.h File Reference
#include "asterisk/stasis.h"

Go to the source code of this file.

Macros

#define stasis_message_router_create(topic)   __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define stasis_message_router_create_pool(topic)   __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 

Functions

struct stasis_message_router__stasis_message_router_create (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 Create a new message router object. More...
 
struct stasis_message_router__stasis_message_router_create_pool (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 Create a new message router object. More...
 
void stasis_message_router_accept_formatters (struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
 Indicate to a message router that we are interested in messages with one or more formatters. More...
 
int stasis_message_router_add (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route to a message router. More...
 
int stasis_message_router_add_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route for stasis_cache_update messages to a message router. More...
 
int stasis_message_router_is_done (struct stasis_message_router *router)
 Returns whether router has received its final message. More...
 
void stasis_message_router_publish_sync (struct stasis_message_router *router, struct stasis_message *message)
 Publish a message to a message router's subscription synchronously. More...
 
void stasis_message_router_remove (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a route from a message router. More...
 
void stasis_message_router_remove_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a cache route from a message router. More...
 
int stasis_message_router_set_congestion_limits (struct stasis_message_router *router, long low_water, long high_water)
 Set the high and low alert water marks of the stasis message router. More...
 
int stasis_message_router_set_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
 Sets the default route of a router. More...
 
void stasis_message_router_set_formatters_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
 Sets the default route of a router with formatters. More...
 
void stasis_message_router_unsubscribe (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic. More...
 
void stasis_message_router_unsubscribe_and_join (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic, blocking until the final message has been processed. More...
 

Macro Definition Documentation

◆ stasis_message_router_create

#define stasis_message_router_create (   topic)    __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)

◆ stasis_message_router_create_pool

#define stasis_message_router_create_pool (   topic)    __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Function Documentation

◆ __stasis_message_router_create()

struct stasis_message_router* __stasis_message_router_create ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Create a new message router object.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
NULL on error.
Since
12

Definition at line 246 of file stasis_message_router.c.

References stasis_message_router_create_internal().

248 {
249  return stasis_message_router_create_internal(topic, 0, file, lineno, func);
250 }
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)

◆ __stasis_message_router_create_pool()

struct stasis_message_router* __stasis_message_router_create_pool ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Create a new message router object.

The subscription created for this message router will dispatch callbacks on a thread pool.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
NULL on error.
Since
12.8.0

Definition at line 252 of file stasis_message_router.c.

References stasis_message_router_create_internal().

254 {
255  return stasis_message_router_create_internal(topic, 1, file, lineno, func);
256 }
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)

◆ stasis_message_router_accept_formatters()

void stasis_message_router_accept_formatters ( struct stasis_message_router router,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a message router that we are interested in messages with one or more formatters.

The formatters are passed on to the underlying subscription.

Warning
With direct subscriptions, adding a formatter filter is an OR operation with any message type filters. In the current implementation of message router however, it's an AND operation. Even when setting a default route, the callback will only get messages that have the formatters provides in this call.
Parameters
routerRouter to set the formatters of.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 420 of file stasis_message_router.c.

References ast_assert, NULL, stasis_subscription_accept_formatters(), and stasis_message_router::subscription.

422 {
423  ast_assert(router != NULL);
424 
426 
427  return;
428 }
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_subscription * subscription

◆ stasis_message_router_add()

int stasis_message_router_add ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route to a message router.

A particular message_type may have at most one route per router. If you route stasis_cache_update messages, the callback will only receive updates for types not handled by routes added with stasis_message_router_add_cache_update().

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeType of message to route.
callbackCallback to forard messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 310 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_add(), stasis_message_router::routes, stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), AST_TEST_DEFINE(), create_routes(), endpoint_internal_create(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_confbridge_init(), manager_endpoints_init(), manager_mwi_init(), manager_subscriptions_init(), meetme_stasis_init(), pjsip_outbound_registration_metrics_init(), and setup_stasis_subs().

313 {
314  int res;
315 
316  ast_assert(router != NULL);
317 
318  if (!message_type) {
319  /* Cannot route to NULL type. */
320  return -1;
321  }
322  ao2_lock(router);
323  res = route_table_add(&router->routes, message_type, callback, data);
324  if (!res) {
326  /* Until a specific message type was added we would already drop the message, so being
327  * selective now doesn't harm us. If we have a default route then we are already forced
328  * to filter nothing and messages will come in regardless.
329  */
331  }
332  ao2_unlock(router);
333  return res;
334 }
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_subscription * subscription

◆ stasis_message_router_add_cache_update()

int stasis_message_router_add_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route for stasis_cache_update messages to a message router.

A particular message_type may have at most one cache route per router. These are distinct from regular routes, so one could have both a regular route and a cache route for the same message_type.

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeSubtype of cache update to route.
callbackCallback to forard messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 336 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, route_table_add(), stasis_cache_update_type(), stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), and AST_TEST_DEFINE().

339 {
340  int res;
341 
342  ast_assert(router != NULL);
343 
344  if (!message_type) {
345  /* Cannot cache a route to NULL type. */
346  return -1;
347  }
348  ao2_lock(router);
349  res = route_table_add(&router->cache_routes, message_type, callback, data);
350  if (!res) {
353  }
354  ao2_unlock(router);
355  return res;
356 }
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
struct route_table cache_routes
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define ao2_lock(a)
Definition: astobj2.h:718
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_subscription * subscription

◆ stasis_message_router_is_done()

int stasis_message_router_is_done ( struct stasis_message_router router)

Returns whether router has received its final message.

Parameters
routerRouter.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 278 of file stasis_message_router.c.

References stasis_subscription_is_done(), and stasis_message_router::subscription.

Referenced by endpoint_dtor().

279 {
280  if (!router) {
281  /* Null router is about as done as you can get */
282  return 1;
283  }
284 
286 }
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
struct stasis_subscription * subscription

◆ stasis_message_router_publish_sync()

void stasis_message_router_publish_sync ( struct stasis_message_router router,
struct stasis_message message 
)

Publish a message to a message router's subscription synchronously.

Parameters
routerRouter
messageThe Stasis Message Bus API message

This should be used when a message needs to be published synchronously to the underlying subscription created by a message router. This is analagous to stasis_publish_sync.

Note that the caller will be blocked until the thread servicing the message on the message router's subscription completes handling of the message.

Since
12.1.0

Definition at line 288 of file stasis_message_router.c.

References ao2_bump, ao2_cleanup, ast_assert, NULL, stasis_publish_sync(), and stasis_message_router::subscription.

Referenced by ast_cdr_engine_term(), cdr_prop_write(), cdr_read(), cdr_write(), forkcdr_exec(), and publish_app_cdr_message().

290 {
291  ast_assert(router != NULL);
292 
293  ao2_bump(router);
294  stasis_publish_sync(router->subscription, message);
295  ao2_cleanup(router);
296 }
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_subscription * subscription

◆ stasis_message_router_remove()

void stasis_message_router_remove ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 358 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_remove(), and stasis_message_router::routes.

Referenced by cleanup_module(), load_general_config(), and unload_module().

360 {
361  ast_assert(router != NULL);
362 
363  if (!message_type) {
364  /* Cannot remove a NULL type. */
365  return;
366  }
367  ao2_lock(router);
368  route_table_remove(&router->routes, message_type);
369  ao2_unlock(router);
370 }
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_message_router_remove_cache_update()

void stasis_message_router_remove_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a cache route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 372 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, and route_table_remove().

375 {
376  ast_assert(router != NULL);
377 
378  if (!message_type) {
379  /* Cannot remove a NULL type. */
380  return;
381  }
382  ao2_lock(router);
383  route_table_remove(&router->cache_routes, message_type);
384  ao2_unlock(router);
385 }
struct route_table cache_routes
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_message_router_set_congestion_limits()

int stasis_message_router_set_congestion_limits ( struct stasis_message_router router,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis message router.

Since
13.10.0
Parameters
routerPointer to a stasis message router
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 298 of file stasis_message_router.c.

References stasis_subscription_set_congestion_limits(), and stasis_message_router::subscription.

Referenced by create_routes(), load_module(), and manager_subscriptions_init().

300 {
301  int res = -1;
302 
303  if (router) {
305  low_water, high_water);
306  }
307  return res;
308 }
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
struct stasis_subscription * subscription

◆ stasis_message_router_set_default()

int stasis_message_router_set_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data 
)

Sets the default route of a router.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12
Note
Setting a default callback will automatically cause the underlying subscription to receive all messages and not be filtered. If filtering is desired then a specific route for each message type should be provided.

Definition at line 387 of file stasis_message_router.c.

References stasis_message_router_set_formatters_default(), and STASIS_SUBSCRIPTION_FORMATTER_NONE.

Referenced by AST_TEST_DEFINE(), load_module(), and setup_stasis_subs().

390 {
392 
393  /* While this implementation can never fail, it used to be able to */
394  return 0;
395 }
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.

◆ stasis_message_router_set_formatters_default()

void stasis_message_router_set_formatters_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data,
enum stasis_subscription_message_formatters  formatters 
)

Sets the default route of a router with formatters.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.26.0
16.3.0
Note
If formatters are specified then the message router will remain in a selective filtering state. Any explicit routes will receive messages of their message type and the default callback will only receive messages that have one of the given formatters. Explicit routes will not be filtered according to the given formatters.

Definition at line 397 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_route::callback, stasis_message_route::data, stasis_message_router::default_route, NULL, stasis_subscription_accept_formatters(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), manager_subscriptions_init(), and stasis_message_router_set_default().

401 {
402  ast_assert(router != NULL);
403  ast_assert(callback != NULL);
404 
406 
407  ao2_lock(router);
408  router->default_route.callback = callback;
409  router->default_route.data = data;
410  ao2_unlock(router);
411 
412  if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
413  /* Formatters govern what messages the default callback get, so it is only if none is
414  * specified that we accept all messages regardless.
415  */
417  }
418 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_message_route default_route
stasis_subscription_cb callback
struct stasis_subscription * subscription

◆ stasis_message_router_unsubscribe()

void stasis_message_router_unsubscribe ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic.

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 258 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, stasis_unsubscribe(), and stasis_message_router::subscription.

Referenced by app_shutdown(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), manager_confbridge_shutdown(), meetme_stasis_cleanup(), remove_stasis_subscriptions(), and setup_stasis_subs().

259 {
260  if (!router) {
261  return;
262  }
263 
264  ao2_lock(router);
265  router->subscription = stasis_unsubscribe(router->subscription);
266  ao2_unlock(router);
267 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
struct stasis_subscription * subscription

◆ stasis_message_router_unsubscribe_and_join()

void stasis_message_router_unsubscribe_and_join ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic, blocking until the final message has been processed.

See stasis_unsubscribe_and_join() for info on when to use this vs. stasis_message_router_unsubscribe().

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 269 of file stasis_message_router.c.

References stasis_unsubscribe_and_join(), and stasis_message_router::subscription.

Referenced by AST_TEST_DEFINE(), cdr_engine_shutdown(), cleanup_module(), destroy_routes(), manager_endpoints_shutdown(), manager_shutdown(), pjsip_outbound_registration_metrics_unload_cb(), and unload_module().

271 {
272  if (!router) {
273  return;
274  }
276 }
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
struct stasis_subscription * subscription