Asterisk - The Open Source Telephony Project GIT-master-7e7a603
stasis_state.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2019, Sangoma Technologies Corporation
5 *
6 * Kevin Harwell <kharwell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*** MODULEINFO
20 <support_level>core</support_level>
21 ***/
22
23#include "asterisk.h"
24
26
27/*!
28 * \internal
29 * \brief Used to link a stasis_state to it's manager
30 */
33 /*! The manager that owns and handles this state */
35 /*! A unique id for this state object. */
36 char id[0];
37};
38
39/*!
40 * \internal
41 * \brief Associates a stasis topic to its last known published message
42 *
43 * This object's lifetime is tracked by the number of publishers and subscribers to it.
44 * Once all publishers and subscribers have been removed this object is removed from the
45 * manager's collection and destroyed. While a single object type (namely this one) could
46 * be utilized for both publishers, and subscribers this implementation purposely keeps
47 * them separated. This was done to maintain readability, make debugging easier, and allow
48 * for better logging and future enhancements.
49 */
51 /*! The number of state subscribers */
52 unsigned int num_subscribers;
53 /*!
54 * \brief The manager that owns and handles this state
55 * \note This reference is owned by stasis_state_proxy
56 */
58 /*! Forwarding information, i.e. this topic to manager's topic */
60 /*! The managed topic */
62 /*! The actual state data */
64 /*!
65 * A container of eids. It's assumed that there is only a single publisher per
66 * eid per topic. Thus the publisher is tracked by the system's eid.
67 */
69 /*! A unique id for this state object. */
70 char *id;
71};
72
75
76/*! The number of buckets to use for managed states */
77#define STATE_BUCKETS 57
78
80 /*! Holds all state objects handled by this manager */
82 /*! The manager's topic. All state topics are forward to this one */
84 /*! A collection of manager event handlers */
86};
87
88/*!
89 * \internal
90 * \brief Retrieve a state's topic name without the manager topic.
91 *
92 * State topics have names that consist of the manager's topic name
93 * combined with a unique id separated by a slash. For instance:
94 *
95 * manager topic's name/unique id
96 *
97 * This method retrieves the unique id part from the state's topic name.
98 *
99 * \param manager_topic The manager's topic
100 * \param state_topic A state topic
101 *
102 * \return The state's topic unique id
103 */
105 const struct stasis_topic *state_topic)
106{
107 const char *id;
108
109 /* This topic should always belong to the manager */
111 stasis_topic_name(state_topic)));
112
113 id = strchr(stasis_topic_name(state_topic), '/');
114
115 /* The state's unique id should always exist */
116 ast_assert(id != NULL && *(id + 1) != '\0');
117
118 return (id + 1);
119}
120
121static void state_dtor(void *obj)
122{
123 struct stasis_state *state = obj;
124
125 state->forward = stasis_forward_cancel(state->forward);
126 ao2_cleanup(state->topic);
127 state->topic = NULL;
128 ao2_cleanup(state->msg);
129 state->msg = NULL;
130
131 /* All eids should have been removed */
132 ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
133 AST_VECTOR_FREE(&state->eids);
134}
135
136static void state_proxy_dtor(void *obj) {
137 struct stasis_state_proxy *proxy = obj;
138
139 ao2_cleanup(proxy->manager);
140}
141
142static void state_proxy_sub_cb(void *obj, void *data)
143{
144 struct stasis_state_proxy *proxy = obj;
145
146 ao2_unlink(proxy->manager->states, proxy);
147}
148
149/*!
150 * \internal
151 * \brief Allocate a stasis state object and add it to the manager.
152 *
153 * Create and initialize a state structure. It's required that either a state
154 * topic, or an id is specified. If a state topic is not given then one will be
155 * created using the given id.
156 *
157 * \param manager The owning manager
158 * \param state_topic A state topic to be managed
159 * \param id The unique id for the state
160 * \param file, line, func
161 *
162 * \return A stasis_state object or NULL
163 * \retval NULL on error
164 *
165 * \pre manager->states must be locked.
166 * \pre manager->states does not contain an object matching key \a id.
167 */
169 struct stasis_topic *state_topic, const char *id,
170 const char *file, int line, const char *func)
171{
172 struct stasis_state_proxy *proxy = NULL;
173 struct stasis_state *state = NULL;
174
175 if (!id) {
176 /* If not given an id, then a state topic is required */
177 ast_assert(state_topic != NULL);
178
179 /* Get the id we'll key off of from the state topic */
180 id = state_id_by_topic(manager->all_topic, state_topic);
181 }
182
183 state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
184 if (!state) {
185 goto error_return;
186 }
187
188 if (!state_topic) {
189 char *name;
190
191 /*
192 * To provide further detail and to ensure that the topic is unique within the
193 * scope of the system we prefix it with the manager's topic name, which should
194 * itself already be unique.
195 */
196 if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
197 goto error_return;
198 }
199
201
202 ast_free(name);
203 if (!state->topic) {
204 goto error_return;
205 }
206 } else {
207 /*
208 * Since the state topic was passed in, go ahead and bump its reference.
209 * By doing this here first, it allows us to consistently decrease the reference on
210 * state allocation error.
211 */
212 ao2_ref(state_topic, +1);
213 state->topic = state_topic;
214 }
215
216 proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
217 if (!proxy) {
218 goto error_return;
219 }
220
221 strcpy(proxy->id, id); /* Safe */
222
223 state->id = proxy->id;
224 proxy->manager = ao2_bump(manager);
225 state->manager = proxy->manager; /* state->manager is owned by the proxy */
226
227 state->forward = stasis_forward_all(state->topic, manager->all_topic);
228 if (!state->forward) {
229 goto error_return;
230 }
231
232 if (AST_VECTOR_INIT(&state->eids, 2)) {
233 goto error_return;
234 }
235
236 if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
237 goto error_return;
238 }
239
241 goto error_return;
242 }
243
244 if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
245 goto error_return;
246 }
247
248 ao2_ref(proxy, -1);
249
250 return state;
251
252error_return:
253 ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
256 ao2_cleanup(proxy);
257 return NULL;
258}
259
260/*!
261 * \internal
262 * \brief Find a state by id, or create one if not found and add it to the manager.
263 *
264 * \param manager The manager to be added to
265 * \param state_topic A state topic to be managed (if NULL id is required)
266 * \param id The unique id for the state (if NULL state_topic is required)
267 *
268 * \return The added state object
269 * \retval NULL on error
270 */
271#define state_find_or_add(manager, state_topic, id) __state_find_or_add(manager, state_topic, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
273 struct stasis_topic *state_topic, const char *id,
274 const char *file, int line, const char *func)
275{
276 struct stasis_state *state;
277
279 if (ast_strlen_zero(id)) {
280 id = state_id_by_topic(manager->all_topic, state_topic);
281 }
282
284 if (!state) {
285 state = state_alloc(manager, state_topic, id, file, line, func);
286 }
287
289
290 return state;
291}
292
293static void state_manager_dtor(void *obj)
294{
295 struct stasis_state_manager *manager = obj;
296
297#ifdef AO2_DEBUG
298 {
299 char *container_name =
300 ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
301 sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
302 ao2_container_unregister(container_name);
303 }
304#endif
305
306 ao2_cleanup(manager->states);
307 manager->states = NULL;
308 ao2_cleanup(manager->all_topic);
309 manager->all_topic = NULL;
310 AST_VECTOR_RW_FREE(&manager->observers);
311}
312
313#ifdef AO2_DEBUG
314static void state_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
315{
316 struct stasis_state *state = v_obj;
317
318 if (!state) {
319 return;
320 }
321 prnt(where, "%s", stasis_topic_name(state->topic));
322}
323#endif
324
326{
327 struct stasis_state_manager *manager;
328
329 manager = ao2_alloc_options(sizeof(*manager), state_manager_dtor,
331 if (!manager) {
332 return NULL;
333 }
334
336 STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
337 if (!manager->states) {
338 ao2_ref(manager, -1);
339 return NULL;
340 }
341
342 manager->all_topic = stasis_topic_create(topic_name);
343 if (!manager->all_topic) {
344 ao2_ref(manager, -1);
345 return NULL;
346 }
347
348 if (AST_VECTOR_RW_INIT(&manager->observers, 2) != 0) {
349 ao2_ref(manager, -1);
350 return NULL;
351 }
352
353#ifdef AO2_DEBUG
354 {
355 char *container_name =
356 ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
357 sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
358 ao2_container_register(container_name, manager->states, state_prnt_obj);
359 }
360#endif
361
362 return manager;
363}
364
366{
367 return manager->all_topic;
368}
369
370struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, const char *id)
371{
372 struct stasis_topic *topic;
373 struct stasis_state *state;
374
376 if (!state) {
377 return NULL;
378 }
379
380 topic = state->topic;
381 ao2_ref(state, -1);
382 return topic;
383}
384
386 /*! The stasis state subscribed to */
388 /*! The stasis subscription. */
390};
391
392static void subscriber_dtor(void *obj)
393{
394 size_t i;
395 struct stasis_state_subscriber *sub = obj;
396 struct stasis_state_manager *manager = sub->state->manager;
397
398 AST_VECTOR_RW_RDLOCK(&manager->observers);
399 for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
400 if (AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe) {
401 AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe(sub->state->id, sub);
402 }
403 }
404 AST_VECTOR_RW_UNLOCK(&manager->observers);
405
406 ao2_lock(sub->state);
407 --sub->state->num_subscribers;
408 ao2_unlock(sub->state);
409
410 ao2_ref(sub->state, -1);
411}
412
414 struct stasis_state_manager *manager, const char *id)
415{
416 size_t i;
419
420 if (!sub) {
421 ast_log(LOG_ERROR, "Unable to create subscriber to %s/%s\n",
422 stasis_topic_name(manager->all_topic), id);
423 return NULL;
424 }
425
426 sub->state = state_find_or_add(manager, NULL, id);
427 if (!sub->state) {
428 ao2_ref(sub, -1);
429 return NULL;
430 }
431
432 ao2_lock(sub->state);
433 ++sub->state->num_subscribers;
434 ao2_unlock(sub->state);
435
436 AST_VECTOR_RW_RDLOCK(&manager->observers);
437 for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
438 if (AST_VECTOR_GET(&manager->observers, i)->on_subscribe) {
439 AST_VECTOR_GET(&manager->observers, i)->on_subscribe(id, sub);
440 }
441 }
442 AST_VECTOR_RW_UNLOCK(&manager->observers);
443
444 return sub;
445}
446
448 const char *id, stasis_subscription_cb callback, void *data)
449{
450 struct stasis_topic *topic;
452
453 if (!sub) {
454 return NULL;
455 }
456
457 topic = sub->state->topic;
458 ast_debug(3, "Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
459 id, stasis_topic_name(topic), topic, (int)ao2_ref(topic, 0));
460
461 sub->stasis_sub = stasis_subscribe_pool(topic, callback, data);
462
463 if (!sub->stasis_sub) {
464 ao2_ref(sub, -1);
465 return NULL;
466 }
467
468 return sub;
469}
470
472{
473 sub->stasis_sub = stasis_unsubscribe(sub->stasis_sub);
474 ao2_ref(sub, -1);
475 return NULL;
476}
477
479{
480 if (sub) {
481 sub->stasis_sub = stasis_unsubscribe_and_join(sub->stasis_sub);
482 ao2_ref(sub, -1);
483 }
484
485 return NULL;
486}
487
489{
490 return sub->state->id;
491}
492
494{
495 return sub->state->topic;
496}
497
499{
500 void *res;
501
502 /*
503 * The data's reference needs to be bumped before returning so it doesn't disappear
504 * for the caller. Lock state, so the underlying message data is not replaced while
505 * retrieving.
506 */
507 ao2_lock(sub->state);
508 res = ao2_bump(stasis_message_data(sub->state->msg));
509 ao2_unlock(sub->state);
510
511 return res;
512}
513
516{
517 return sub->stasis_sub;
518}
519
521 /*! The stasis state to publish to */
523};
524
525static void publisher_dtor(void *obj)
526{
527 struct stasis_state_publisher *pub = obj;
528
529 ao2_ref(pub->state, -1);
530}
531
533 struct stasis_state_manager *manager, const char *id)
534{
537
538 if (!pub) {
539 ast_log(LOG_ERROR, "Unable to create publisher to %s/%s\n",
540 stasis_topic_name(manager->all_topic), id);
541 return NULL;
542 }
543
544 pub->state = state_find_or_add(manager, NULL, id);
545 if (!pub->state) {
546 ao2_ref(pub, -1);
547 return NULL;
548 }
549
550 return pub;
551}
552
554{
555 return pub->state->id;
556}
557
559{
560 return pub->state->topic;
561}
562
564{
565 ao2_lock(pub->state);
566 ao2_replace(pub->state->msg, msg);
567 ao2_unlock(pub->state);
568
569 stasis_publish(pub->state->topic, msg);
570}
571
572/*!
573 * \internal
574 * \brief Find, or add the given eid to the state object
575 *
576 * Publishers can be tracked implicitly using eids. This allows us to add, and subsequently
577 * remove state objects from the managed states container in a deterministic way. Using the
578 * eids in this way is possible because it's guaranteed that there will only ever be a single
579 * publisher for a uniquely named topic (topics tracked by this module) on a system.
580 *
581 * \note The vector does not use locking. Instead we use the state object for that, so it
582 * needs to be locked prior to calling this method.
583 *
584 * \param state The state object
585 * \param eid The system id to add to the state object
586 */
587static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
588{
589 size_t i;
590
591 if (!eid) {
592 eid = &ast_eid_default;
593 }
594
595 for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
596 if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
597 break;
598 }
599 }
600
601 if (i == AST_VECTOR_SIZE(&state->eids)) {
602 if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
603 /* This ensures state cannot be freed if it has any eids */
604 ao2_ref(state, +1);
605 }
606 }
607}
608
609/*!
610 * \internal
611 * \brief Find, and remove the given eid from the state object
612 *
613 * Used to remove an eid from an implicit publisher.
614 *
615 * \note The vector does not use locking. Instead we use the state object for that, so it
616 * needs to be locked prior to calling this method.
617 *
618 * \param state The state object
619 * \param eid The system id to remove from the state object
620 */
621static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
622{
623 size_t i;
624
625 if (!eid) {
626 eid = &ast_eid_default;
627 }
628
629 for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
630 if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
632 /* Balance the reference from state_find_or_add_eid */
633 ao2_ref(state, -1);
634 return;
635 }
636 }
637}
638
639void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id,
640 const struct ast_eid *eid, struct stasis_message *msg)
641{
642 struct stasis_state *state;
643
645 if (!state) {
646 return;
647 }
648
651 ao2_replace(state->msg, msg);
653
654 stasis_publish(state->topic, msg);
655
656 ao2_ref(state, -1);
657}
658
660 const char *id, const struct ast_eid *eid, struct stasis_message *msg)
661{
663
664 if (!state) {
665 /*
666 * In most circumstances state should already exist here. However, if there is no
667 * state then it can mean one of a few things:
668 *
669 * 1. This function was called prior to an implicit publish for the same given
670 * manager, and id.
671 * 2. This function was called more than once for the same manager, and id.
672 * 3. There is ref count problem with the explicit subscribers, and publishers.
673 */
674 ast_debug(5, "Attempted to remove state for id '%s', but state not found\n", id);
675 return;
676 }
677
678 if (msg) {
679 stasis_publish(state->topic, msg);
680 }
681
685
686 ao2_ref(state, -1);
687}
688
691{
692 int res;
693
694 AST_VECTOR_RW_WRLOCK(&manager->observers);
695 res = AST_VECTOR_APPEND(&manager->observers, observer);
696 AST_VECTOR_RW_UNLOCK(&manager->observers);
697
698 return res;
699}
700
703{
704 AST_VECTOR_RW_WRLOCK(&manager->observers);
706 AST_VECTOR_RW_UNLOCK(&manager->observers);
707}
708
710{
711 struct stasis_message *msg;
712 int res;
713
714 /*
715 * State needs to be locked here while we retrieve and bump the reference on its message
716 * object. Doing so guarantees the message object will live throughout its handling.
717 */
719 msg = ao2_bump(state->msg);
721
722 res = handler(state->id, msg, data);
723 ao2_cleanup(msg);
724 return res;
725}
726
727static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
728{
730
731 if (state) {
732 int res;
733 res = handle_stasis_state(state, arg, data);
734 ao2_ref(state, -1);
735 return res;
736 }
737
738 return 0;
739}
740
742 void *data)
743{
745
748}
749
750static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
751{
753 int res = 0;
754
755 if (state && state->num_subscribers) {
756 res = handle_stasis_state(state, arg, data);
757 }
758
760
761 return res;
762}
763
765 void *data)
766{
768
771}
enum queue_result id
Definition: app_queue.c:1638
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1723
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_weakproxy_get_object(weakproxy, flags)
Get the object associated with weakproxy.
Definition: astobj2.h:621
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
void * __ao2_alloc(size_t data_size, ao2_destructor_fn destructor_fn, unsigned int options, const char *tag, const char *file, int line, const char *func) attribute_warn_unused_result
Definition: astobj2.c:768
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
enum cc_state state
Definition: ccss.c:393
static const char name[]
Definition: format_mp3.c:68
static struct stasis_topic * manager_topic
A stasis_topic that all topics AMI cares about will be forwarded to.
Definition: manager.c:1636
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ast_sorcery_instance_observer observer
#define NULL
Definition: resample.c:96
struct ao2_container * observers
Registered global observers.
Definition: sorcery.c:281
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
#define stasis_subscribe_pool(topic, callback, data)
Definition: stasis.h:680
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
void * stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic and stasis state.
Definition: stasis_state.c:471
static void state_manager_dtor(void *obj)
Definition: stasis_state.c:293
static struct stasis_state * state_alloc(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:168
static void state_proxy_dtor(void *obj)
Definition: stasis_state.c:136
struct stasis_topic * stasis_state_all_topic(struct stasis_state_manager *manager)
Retrieve the manager's topic (the topic that all state topics get forwarded to)
Definition: stasis_state.c:365
const char * stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
Retrieve the underlying subscribed to state's unique id.
Definition: stasis_state.c:488
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:741
struct stasis_state_subscriber * stasis_state_add_subscriber(struct stasis_state_manager *manager, const char *id)
Add a subscriber to the managed stasis state for the given id.
Definition: stasis_state.c:413
#define state_find_or_add(manager, state_topic, id)
Definition: stasis_state.c:271
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:325
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:639
AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id)
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:750
#define STATE_BUCKETS
Definition: stasis_state.c:77
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:701
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:478
struct stasis_topic * stasis_state_publisher_topic(struct stasis_state_publisher *pub)
Retrieve the publisher's topic.
Definition: stasis_state.c:558
static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:727
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:689
static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:621
static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:587
static void state_dtor(void *obj)
Definition: stasis_state.c:121
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
Definition: stasis_state.c:553
struct stasis_subscription * stasis_state_subscriber_subscription(struct stasis_state_subscriber *sub)
Retrieve the stasis topic subscription if available.
Definition: stasis_state.c:514
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:447
static struct stasis_state * __state_find_or_add(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:272
struct stasis_topic * stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
Retrieve the subscriber's topic.
Definition: stasis_state.c:493
static void subscriber_dtor(void *obj)
Definition: stasis_state.c:392
static void publisher_dtor(void *obj)
Definition: stasis_state.c:525
static void state_proxy_sub_cb(void *obj, void *data)
Definition: stasis_state.c:142
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed, and explicitly subscribed state call the given handler.
Definition: stasis_state.c:764
static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
Definition: stasis_state.c:709
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:659
AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id)
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:563
struct stasis_topic * stasis_state_topic(struct stasis_state_manager *manager, const char *id)
Retrieve a managed topic creating one if not currently managed.
Definition: stasis_state.c:370
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:532
static const char * state_id_by_topic(struct stasis_topic *manager_topic, const struct stasis_topic *state_topic)
Definition: stasis_state.c:104
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:498
Stasis State API.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
Generic container type.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:813
Forwarding information.
Definition: stasis.c:1531
struct ao2_container * states
Definition: stasis_state.c:81
struct stasis_topic * all_topic
Definition: stasis_state.c:83
Managed stasis state event interface.
Definition: stasis_state.h:463
struct stasis_state_manager * manager
Definition: stasis_state.c:34
struct stasis_state * state
Definition: stasis_state.c:522
struct stasis_state * state
Definition: stasis_state.c:387
struct stasis_subscription * stasis_sub
Definition: stasis_state.c:389
struct stasis_topic * topic
Definition: stasis_state.c:61
struct stasis_message * msg
Definition: stasis_state.c:63
struct stasis_state::@399 eids
struct stasis_state_manager * manager
The manager that owns and handles this state.
Definition: stasis_state.c:57
unsigned int num_subscribers
Definition: stasis_state.c:52
struct stasis_forward * forward
Definition: stasis_state.c:59
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
#define ast_assert(a)
Definition: utils.h:739
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: utils.c:3094
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:887
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:897
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:93
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:877
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158