Asterisk - The Open Source Telephony Project GIT-master-67613d1
stasis_message_router.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis message router implementation.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34#include "asterisk/vector.h"
35
36/*! \internal */
38 /*! Message type handle by this route. */
40 /*! Callback function for incoming message processing. */
42 /*! Data pointer to be handed to the callback. */
43 void *data;
44};
45
47
50{
51 size_t idx;
52 struct stasis_message_route *route;
53
54 /* While a linear search for routes may seem very inefficient, most
55 * route tables have six routes or less. For such small data, it's
56 * hard to beat a linear search. If we start having larger route
57 * tables, then we can look into containers with more efficient
58 * lookups.
59 */
60 for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
61 route = AST_VECTOR_GET_ADDR(table, idx);
62 if (route->message_type == message_type) {
63 return route;
64 }
65 }
66
67 return NULL;
68}
69
70/*!
71 * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
72 *
73 * \param elem Element to compare against
74 * \param value Value to compare with the vector element.
75 *
76 * \return 0 if element does not match.
77 * \return Non-zero if element matches.
78 */
79#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
80
81/*!
82 * \brief route_table vector element cleanup.
83 *
84 * \param elem Element to cleanup
85 */
86#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
87
90{
93}
94
98{
99 struct stasis_message_route route;
100 int res;
101
104
106 route.callback = callback;
107 route.data = data;
108
109 res = AST_VECTOR_APPEND(table, route);
110 if (res) {
112 }
113 return res;
114}
115
117{
118 size_t idx;
119 struct stasis_message_route *route;
120
121 for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
122 route = AST_VECTOR_GET_ADDR(table, idx);
124 }
126}
127
128/*! \internal */
130 /*! Subscription to the upstream topic */
132 /*! Subscribed routes */
134 /*! Subscribed routes for \ref stasis_cache_update messages */
136 /*! Route of last resort */
138};
139
140static void router_dtor(void *obj)
141{
142 struct stasis_message_router *router = obj;
143
146
148
151}
152
153static int find_route(
155 struct stasis_message *message,
156 struct stasis_message_route *route_out)
157{
158 struct stasis_message_route *route = NULL;
161
162 ast_assert(route_out != NULL);
163
165 /* Find a cache route */
168 route = route_table_find(&router->cache_routes, update->type);
169 }
170
171 if (route == NULL) {
172 /* Find a regular route */
174 }
175
176 if (route == NULL && router->default_route.callback) {
177 /* Maybe the default route, then? */
178 route = &router->default_route;
179 }
180
181 if (!route) {
182 return -1;
183 }
184
185 *route_out = *route;
186 return 0;
187}
188
189static void router_dispatch(void *data,
190 struct stasis_subscription *sub,
191 struct stasis_message *message)
192{
193 struct stasis_message_router *router = data;
194 struct stasis_message_route route;
195
196 if (find_route(router, message, &route) == 0) {
197 route.callback(route.data, sub, message);
198 }
199
202 }
203}
204
206 struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
207 const char *func)
208{
209 int res;
211
213 if (!router) {
214 return NULL;
215 }
216
217 res = 0;
218 res |= AST_VECTOR_INIT(&router->routes, 0);
220 if (res) {
221 ao2_ref(router, -1);
222
223 return NULL;
224 }
225
226 if (use_thread_pool) {
228 } else {
230 }
231
232 if (!router->subscription) {
233 ao2_ref(router, -1);
234
235 return NULL;
236 }
237
238 /* We need to receive subscription change messages so we know when our subscription goes away */
240
241 return router;
242}
243
245 struct stasis_topic *topic, const char *file, int lineno, const char *func)
246{
247 return stasis_message_router_create_internal(topic, 0, file, lineno, func);
248}
249
251 struct stasis_topic *topic, const char *file, int lineno, const char *func)
252{
253 return stasis_message_router_create_internal(topic, 1, file, lineno, func);
254}
255
257{
258 if (!router) {
259 return;
260 }
261
265}
266
269{
270 if (!router) {
271 return;
272 }
274}
275
277{
278 if (!router) {
279 /* Null router is about as done as you can get */
280 return 1;
281 }
282
284}
285
287 struct stasis_message *message)
288{
290
294}
295
297 long low_water, long high_water)
298{
299 int res = -1;
300
301 if (router) {
303 low_water, high_water);
304 }
305 return res;
306}
307
309 struct stasis_message_type *message_type,
310 stasis_subscription_cb callback, void *data)
311{
312 int res;
313
315
316 if (!message_type) {
317 /* Cannot route to NULL type. */
318 return -1;
319 }
321 res = route_table_add(&router->routes, message_type, callback, data);
322 if (!res) {
324 /* Until a specific message type was added we would already drop the message, so being
325 * selective now doesn't harm us. If we have a default route then we are already forced
326 * to filter nothing and messages will come in regardless.
327 */
329 }
331 return res;
332}
333
335 struct stasis_message_type *message_type,
336 stasis_subscription_cb callback, void *data)
337{
338 int res;
339
341
342 if (!message_type) {
343 /* Cannot cache a route to NULL type. */
344 return -1;
345 }
347 res = route_table_add(&router->cache_routes, message_type, callback, data);
348 if (!res) {
351 }
353 return res;
354}
355
357 struct stasis_message_type *message_type)
358{
360
361 if (!message_type) {
362 /* Cannot remove a NULL type. */
363 return;
364 }
366 route_table_remove(&router->routes, message_type);
368}
369
372 struct stasis_message_type *message_type)
373{
375
376 if (!message_type) {
377 /* Cannot remove a NULL type. */
378 return;
379 }
381 route_table_remove(&router->cache_routes, message_type);
383}
384
386 stasis_subscription_cb callback,
387 void *data)
388{
390
391 /* While this implementation can never fail, it used to be able to */
392 return 0;
393}
394
396 stasis_subscription_cb callback,
397 void *data,
399{
401 ast_assert(callback != NULL);
402
404
406 router->default_route.callback = callback;
407 router->default_route.data = data;
409
410 if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
411 /* Formatters govern what messages the default callback get, so it is only if none is
412 * specified that we accept all messages regardless.
413 */
415 }
416}
417
420{
422
424
425 return;
426}
ast_mutex_t lock
Definition: app_sla.c:331
Asterisk main include file. File version handling, generic pbx functions.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static char * table
Definition: cdr_odbc.c:55
static const char type[]
Definition: chan_ooh323.c:109
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
static struct stasis_message_router * router
struct stasis_forward * sub
Definition: res_corosync.c:240
#define NULL
Definition: resample.c:96
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1118
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1011
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
@ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE
Definition: stasis.h:296
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
@ STASIS_SUBSCRIPTION_FORMATTER_NONE
Definition: stasis.h:309
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:953
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:942
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1150
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
int stasis_message_router_is_done(struct stasis_message_router *router)
Returns whether router has received its final message.
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.
static int find_route(struct stasis_message_router *router, struct stasis_message *message, struct stasis_message_route *route_out)
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
struct stasis_message_router * __stasis_message_router_create(struct stasis_topic *topic, const char *file, int lineno, const char *func)
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
void stasis_message_router_remove_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a cache route from a message router.
static void router_dispatch(void *data, struct stasis_subscription *sub, struct stasis_message *message)
void stasis_message_router_accept_formatters(struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
Indicate to a message router that we are interested in messages with one or more formatters.
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
struct stasis_message_router * __stasis_message_router_create_pool(struct stasis_topic *topic, const char *file, int lineno, const char *func)
#define ROUTE_TABLE_ELEM_CMP(elem, value)
route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
static struct stasis_message_route * route_table_find(struct route_table *table, struct stasis_message_type *message_type)
static void router_dtor(void *obj)
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
static void route_table_dtor(struct route_table *table)
void stasis_message_router_publish_sync(struct stasis_message_router *router, struct stasis_message *message)
Publish a message to a message router's subscription synchronously.
Cache update message.
Definition: stasis.h:965
stasis_subscription_cb callback
struct stasis_message_type * message_type
struct stasis_subscription * subscription
struct route_table cache_routes
struct stasis_message_route default_route
#define ast_assert(a)
Definition: utils.h:739
Vector container support.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
Definition: vector.h:488
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668