Asterisk - The Open Source Telephony Project  GIT-master-a24979a
stasis_state.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2019, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <kharwell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*** MODULEINFO
20  <support_level>core</support_level>
21  ***/
22 
23 #include "asterisk.h"
24 
25 #include "asterisk/stasis_state.h"
26 
27 /*!
28  * \internal
29  * \brief Used to link a stasis_state to it's manager
30  */
33  /*! The manager that owns and handles this state */
35  /*! A unique id for this state object. */
36  char id[0];
37 };
38 
39 /*!
40  * \internal
41  * \brief Associates a stasis topic to its last known published message
42  *
43  * This object's lifetime is tracked by the number of publishers and subscribers to it.
44  * Once all publishers and subscribers have been removed this object is removed from the
45  * manager's collection and destroyed. While a single object type (namely this one) could
46  * be utilized for both publishers, and subscribers this implementation purposely keeps
47  * them separated. This was done to maintain readability, make debugging easier, and allow
48  * for better logging and future enhancements.
49  */
50 struct stasis_state {
51  /*! The number of state subscribers */
52  unsigned int num_subscribers;
53  /*!
54  * \brief The manager that owns and handles this state
55  * \note This reference is owned by stasis_state_proxy
56  */
58  /*! Forwarding information, i.e. this topic to manager's topic */
60  /*! The managed topic */
62  /*! The actual state data */
64  /*!
65  * A container of eids. It's assumed that there is only a single publisher per
66  * eid per topic. Thus the publisher is tracked by the system's eid.
67  */
68  AST_VECTOR(, struct ast_eid) eids;
69  /*! A unique id for this state object. */
70  char *id;
71 };
72 
75 
76 /*! The number of buckets to use for managed states */
77 #define STATE_BUCKETS 57
78 
80  /*! Holds all state objects handled by this manager */
82  /*! The manager's topic. All state topics are forward to this one */
84  /*! A collection of manager event handlers */
86 };
87 
88 /*!
89  * \internal
90  * \brief Retrieve a state's topic name without the manager topic.
91  *
92  * State topics have names that consist of the manager's topic name
93  * combined with a unique id separated by a slash. For instance:
94  *
95  * manager topic's name/unique id
96  *
97  * This method retrieves the unique id part from the state's topic name.
98  *
99  * \param manager_topic The manager's topic
100  * \param state_topic A state topic
101  *
102  * \return The state's topic unique id
103  */
104 static const char *state_id_by_topic(struct stasis_topic *manager_topic,
105  const struct stasis_topic *state_topic)
106 {
107  const char *id;
108 
109  /* This topic should always belong to the manager */
111  stasis_topic_name(state_topic)));
112 
113  id = strchr(stasis_topic_name(state_topic), '/');
114 
115  /* The state's unique id should always exist */
116  ast_assert(id != NULL && *(id + 1) != '\0');
117 
118  return (id + 1);
119 }
120 
121 static void state_dtor(void *obj)
122 {
123  struct stasis_state *state = obj;
124 
125  state->forward = stasis_forward_cancel(state->forward);
126  ao2_cleanup(state->topic);
127  state->topic = NULL;
128  ao2_cleanup(state->msg);
129  state->msg = NULL;
130 
131  /* All eids should have been removed */
132  ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
133  AST_VECTOR_FREE(&state->eids);
134 }
135 
136 static void state_proxy_dtor(void *obj) {
137  struct stasis_state_proxy *proxy = obj;
138 
139  ao2_cleanup(proxy->manager);
140 }
141 
142 static void state_proxy_sub_cb(void *obj, void *data)
143 {
144  struct stasis_state_proxy *proxy = obj;
145 
146  ao2_unlink(proxy->manager->states, proxy);
147 }
148 
149 /*!
150  * \internal
151  * \brief Allocate a stasis state object and add it to the manager.
152  *
153  * Create and initialize a state structure. It's required that either a state
154  * topic, or an id is specified. If a state topic is not given then one will be
155  * created using the given id.
156  *
157  * \param manager The owning manager
158  * \param state_topic A state topic to be managed
159  * \param id The unique id for the state
160  * \param file, line, func
161  *
162  * \return A stasis_state object or NULL
163  * \retval NULL on error
164  *
165  * \pre manager->states must be locked.
166  * \pre manager->states does not contain an object matching key \a id.
167  */
169  struct stasis_topic *state_topic, const char *id,
170  const char *file, int line, const char *func)
171 {
172  struct stasis_state_proxy *proxy = NULL;
173  struct stasis_state *state = NULL;
174 
175  if (!id) {
176  /* If not given an id, then a state topic is required */
177  ast_assert(state_topic != NULL);
178 
179  /* Get the id we'll key off of from the state topic */
180  id = state_id_by_topic(manager->all_topic, state_topic);
181  }
182 
183  state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
184  if (!state) {
185  goto error_return;
186  }
187 
188  if (!state_topic) {
189  char *name;
190 
191  /*
192  * To provide further detail and to ensure that the topic is unique within the
193  * scope of the system we prefix it with the manager's topic name, which should
194  * itself already be unique.
195  */
196  if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
197  goto error_return;
198  }
199 
200  state->topic = stasis_topic_create(name);
201 
202  ast_free(name);
203  if (!state->topic) {
204  goto error_return;
205  }
206  } else {
207  /*
208  * Since the state topic was passed in, go ahead and bump its reference.
209  * By doing this here first, it allows us to consistently decrease the reference on
210  * state allocation error.
211  */
212  ao2_ref(state_topic, +1);
213  state->topic = state_topic;
214  }
215 
216  proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
217  if (!proxy) {
218  goto error_return;
219  }
220 
221  strcpy(proxy->id, id); /* Safe */
222 
223  state->id = proxy->id;
224  proxy->manager = ao2_bump(manager);
225  state->manager = proxy->manager; /* state->manager is owned by the proxy */
226 
227  state->forward = stasis_forward_all(state->topic, manager->all_topic);
228  if (!state->forward) {
229  goto error_return;
230  }
231 
232  if (AST_VECTOR_INIT(&state->eids, 2)) {
233  goto error_return;
234  }
235 
236  if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
237  goto error_return;
238  }
239 
241  goto error_return;
242  }
243 
244  if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
245  goto error_return;
246  }
247 
248  ao2_ref(proxy, -1);
249 
250  return state;
251 
252 error_return:
253  ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
256  ao2_cleanup(proxy);
257  return NULL;
258 }
259 
260 /*!
261  * \internal
262  * \brief Find a state by id, or create one if not found and add it to the manager.
263  *
264  * \param manager The manager to be added to
265  * \param state_topic A state topic to be managed (if NULL id is required)
266  * \param id The unique id for the state (if NULL state_topic is required)
267  *
268  * \return The added state object
269  * \retval NULL on error
270  */
271 #define state_find_or_add(manager, state_topic, id) __state_find_or_add(manager, state_topic, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
273  struct stasis_topic *state_topic, const char *id,
274  const char *file, int line, const char *func)
275 {
276  struct stasis_state *state;
277 
279  if (ast_strlen_zero(id)) {
280  id = state_id_by_topic(manager->all_topic, state_topic);
281  }
282 
284  if (!state) {
285  state = state_alloc(manager, state_topic, id, file, line, func);
286  }
287 
289 
290  return state;
291 }
292 
293 static void state_manager_dtor(void *obj)
294 {
295  struct stasis_state_manager *manager = obj;
296 
297 #ifdef AO2_DEBUG
298  {
299  char *container_name =
300  ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
301  sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
302  ao2_container_unregister(container_name);
303  }
304 #endif
305 
306  ao2_cleanup(manager->states);
307  manager->states = NULL;
308  ao2_cleanup(manager->all_topic);
309  manager->all_topic = NULL;
310  AST_VECTOR_RW_FREE(&manager->observers);
311 }
312 
313 #ifdef AO2_DEBUG
314 static void state_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
315 {
316  struct stasis_state *state = v_obj;
317 
318  if (!state) {
319  return;
320  }
321  prnt(where, "%s", stasis_topic_name(state->topic));
322 }
323 #endif
324 
325 struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
326 {
327  struct stasis_state_manager *manager;
328 
329  manager = ao2_alloc_options(sizeof(*manager), state_manager_dtor,
331  if (!manager) {
332  return NULL;
333  }
334 
336  STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
337  if (!manager->states) {
338  ao2_ref(manager, -1);
339  return NULL;
340  }
341 
342  manager->all_topic = stasis_topic_create(topic_name);
343  if (!manager->all_topic) {
344  ao2_ref(manager, -1);
345  return NULL;
346  }
347 
348  if (AST_VECTOR_RW_INIT(&manager->observers, 2) != 0) {
349  ao2_ref(manager, -1);
350  return NULL;
351  }
352 
353 #ifdef AO2_DEBUG
354  {
355  char *container_name =
356  ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
357  sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
358  ao2_container_register(container_name, manager->states, state_prnt_obj);
359  }
360 #endif
361 
362  return manager;
363 }
364 
366 {
367  return manager->all_topic;
368 }
369 
370 struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, const char *id)
371 {
372  struct stasis_topic *topic;
373  struct stasis_state *state;
374 
376  if (!state) {
377  return NULL;
378  }
379 
380  topic = state->topic;
381  ao2_ref(state, -1);
382  return topic;
383 }
384 
386  /*! The stasis state subscribed to */
388  /*! The stasis subscription. */
390 };
391 
392 static void subscriber_dtor(void *obj)
393 {
394  size_t i;
395  struct stasis_state_subscriber *sub = obj;
396  struct stasis_state_manager *manager = sub->state->manager;
397 
398  AST_VECTOR_RW_RDLOCK(&manager->observers);
399  for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
400  if (AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe) {
401  AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe(sub->state->id, sub);
402  }
403  }
404  AST_VECTOR_RW_UNLOCK(&manager->observers);
405 
406  ao2_lock(sub->state);
407  --sub->state->num_subscribers;
408  ao2_unlock(sub->state);
409 
410  ao2_ref(sub->state, -1);
411 }
412 
414  struct stasis_state_manager *manager, const char *id)
415 {
416  size_t i;
419 
420  if (!sub) {
421  ast_log(LOG_ERROR, "Unable to create subscriber to %s/%s\n",
422  stasis_topic_name(manager->all_topic), id);
423  return NULL;
424  }
425 
426  sub->state = state_find_or_add(manager, NULL, id);
427  if (!sub->state) {
428  ao2_ref(sub, -1);
429  return NULL;
430  }
431 
432  ao2_lock(sub->state);
433  ++sub->state->num_subscribers;
434  ao2_unlock(sub->state);
435 
436  AST_VECTOR_RW_RDLOCK(&manager->observers);
437  for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
438  if (AST_VECTOR_GET(&manager->observers, i)->on_subscribe) {
439  AST_VECTOR_GET(&manager->observers, i)->on_subscribe(id, sub);
440  }
441  }
442  AST_VECTOR_RW_UNLOCK(&manager->observers);
443 
444  return sub;
445 }
446 
448  const char *id, stasis_subscription_cb callback, void *data)
449 {
450  struct stasis_topic *topic;
452 
453  if (!sub) {
454  return NULL;
455  }
456 
457  topic = sub->state->topic;
458  ast_debug(3, "Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
459  id, stasis_topic_name(topic), topic, (int)ao2_ref(topic, 0));
460 
461  sub->stasis_sub = stasis_subscribe_pool(topic, callback, data);
462 
463  if (!sub->stasis_sub) {
464  ao2_ref(sub, -1);
465  return NULL;
466  }
467 
468  return sub;
469 }
470 
472 {
473  sub->stasis_sub = stasis_unsubscribe(sub->stasis_sub);
474  ao2_ref(sub, -1);
475  return NULL;
476 }
477 
479 {
480  if (sub) {
481  sub->stasis_sub = stasis_unsubscribe_and_join(sub->stasis_sub);
482  ao2_ref(sub, -1);
483  }
484 
485  return NULL;
486 }
487 
489 {
490  return sub->state->id;
491 }
492 
494 {
495  return sub->state->topic;
496 }
497 
499 {
500  void *res;
501 
502  /*
503  * The data's reference needs to be bumped before returning so it doesn't disappear
504  * for the caller. Lock state, so the underlying message data is not replaced while
505  * retrieving.
506  */
507  ao2_lock(sub->state);
508  res = ao2_bump(stasis_message_data(sub->state->msg));
509  ao2_unlock(sub->state);
510 
511  return res;
512 }
513 
516 {
517  return sub->stasis_sub;
518 }
519 
521  /*! The stasis state to publish to */
523 };
524 
525 static void publisher_dtor(void *obj)
526 {
527  struct stasis_state_publisher *pub = obj;
528 
529  ao2_ref(pub->state, -1);
530 }
531 
533  struct stasis_state_manager *manager, const char *id)
534 {
537 
538  if (!pub) {
539  ast_log(LOG_ERROR, "Unable to create publisher to %s/%s\n",
540  stasis_topic_name(manager->all_topic), id);
541  return NULL;
542  }
543 
544  pub->state = state_find_or_add(manager, NULL, id);
545  if (!pub->state) {
546  ao2_ref(pub, -1);
547  return NULL;
548  }
549 
550  return pub;
551 }
552 
553 const char *stasis_state_publisher_id(const struct stasis_state_publisher *pub)
554 {
555  return pub->state->id;
556 }
557 
559 {
560  return pub->state->topic;
561 }
562 
564 {
565  ao2_lock(pub->state);
566  ao2_replace(pub->state->msg, msg);
567  ao2_unlock(pub->state);
568 
569  stasis_publish(pub->state->topic, msg);
570 }
571 
572 /*!
573  * \internal
574  * \brief Find, or add the given eid to the state object
575  *
576  * Publishers can be tracked implicitly using eids. This allows us to add, and subsequently
577  * remove state objects from the managed states container in a deterministic way. Using the
578  * eids in this way is possible because it's guaranteed that there will only ever be a single
579  * publisher for a uniquely named topic (topics tracked by this module) on a system.
580  *
581  * \note The vector does not use locking. Instead we use the state object for that, so it
582  * needs to be locked prior to calling this method.
583  *
584  * \param state The state object
585  * \param eid The system id to add to the state object
586  */
587 static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
588 {
589  size_t i;
590 
591  if (!eid) {
592  eid = &ast_eid_default;
593  }
594 
595  for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
596  if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
597  break;
598  }
599  }
600 
601  if (i == AST_VECTOR_SIZE(&state->eids)) {
602  if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
603  /* This ensures state cannot be freed if it has any eids */
604  ao2_ref(state, +1);
605  }
606  }
607 }
608 
609 /*!
610  * \internal
611  * \brief Find, and remove the given eid from the state object
612  *
613  * Used to remove an eid from an implicit publisher.
614  *
615  * \note The vector does not use locking. Instead we use the state object for that, so it
616  * needs to be locked prior to calling this method.
617  *
618  * \param state The state object
619  * \param eid The system id to remove from the state object
620  */
621 static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
622 {
623  size_t i;
624 
625  if (!eid) {
626  eid = &ast_eid_default;
627  }
628 
629  for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
630  if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
632  /* Balance the reference from state_find_or_add_eid */
633  ao2_ref(state, -1);
634  return;
635  }
636  }
637 }
638 
639 void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id,
640  const struct ast_eid *eid, struct stasis_message *msg)
641 {
642  struct stasis_state *state;
643 
645  if (!state) {
646  return;
647  }
648 
649  ao2_lock(state);
651  ao2_replace(state->msg, msg);
652  ao2_unlock(state);
653 
654  stasis_publish(state->topic, msg);
655 
656  ao2_ref(state, -1);
657 }
658 
660  const char *id, const struct ast_eid *eid, struct stasis_message *msg)
661 {
663 
664  if (!state) {
665  /*
666  * In most circumstances state should already exist here. However, if there is no
667  * state then it can mean one of a few things:
668  *
669  * 1. This function was called prior to an implicit publish for the same given
670  * manager, and id.
671  * 2. This function was called more than once for the same manager, and id.
672  * 3. There is ref count problem with the explicit subscribers, and publishers.
673  */
674  ast_debug(5, "Attempted to remove state for id '%s', but state not found\n", id);
675  return;
676  }
677 
678  if (msg) {
679  stasis_publish(state->topic, msg);
680  }
681 
682  ao2_lock(state);
684  ao2_unlock(state);
685 
686  ao2_ref(state, -1);
687 }
688 
691 {
692  int res;
693 
694  AST_VECTOR_RW_WRLOCK(&manager->observers);
695  res = AST_VECTOR_APPEND(&manager->observers, observer);
696  AST_VECTOR_RW_UNLOCK(&manager->observers);
697 
698  return res;
699 }
700 
703 {
704  AST_VECTOR_RW_WRLOCK(&manager->observers);
706  AST_VECTOR_RW_UNLOCK(&manager->observers);
707 }
708 
710 {
711  struct stasis_message *msg;
712  int res;
713 
714  /*
715  * State needs to be locked here while we retrieve and bump the reference on its message
716  * object. Doing so guarantees the message object will live throughout its handling.
717  */
718  ao2_lock(state);
719  msg = ao2_bump(state->msg);
720  ao2_unlock(state);
721 
722  res = handler(state->id, msg, data);
723  ao2_cleanup(msg);
724  return res;
725 }
726 
727 static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
728 {
729  struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
730 
731  if (state) {
732  int res;
733  res = handle_stasis_state(state, arg, data);
734  ao2_ref(state, -1);
735  return res;
736  }
737 
738  return 0;
739 }
740 
742  void *data)
743 {
744  ast_assert(handler != NULL);
745 
748 }
749 
750 static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
751 {
752  struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
753  int res = 0;
754 
755  if (state && state->num_subscribers) {
756  res = handle_stasis_state(state, arg, data);
757  }
758 
760 
761  return res;
762 }
763 
765  void *data)
766 {
767  ast_assert(handler != NULL);
768 
771 }
enum queue_result id
Definition: app_queue.c:1640
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1723
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:553
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_weakproxy_get_object(weakproxy, flags)
Get the object associated with weakproxy.
Definition: astobj2.h:621
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
void * __ao2_alloc(size_t data_size, ao2_destructor_fn destructor_fn, unsigned int options, const char *tag, const char *file, int line, const char *func) attribute_warn_unused_result
Definition: astobj2.c:768
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:582
enum sip_cc_notify_state state
Definition: chan_sip.c:966
static const char name[]
Definition: format_mp3.c:68
static struct stasis_topic * manager_topic
A stasis_topic that all topics AMI cares about will be forwarded to.
Definition: manager.c:1497
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
struct stasis_forward * sub
Definition: res_corosync.c:240
struct ast_sorcery_instance_observer observer
#define NULL
Definition: resample.c:96
struct ao2_container * observers
Registered global observers.
Definition: sorcery.c:281
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1580
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1550
#define stasis_subscribe_pool(topic, callback, data)
Definition: stasis.h:680
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1513
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:619
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:629
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_state_subscriber * stasis_state_add_subscriber(struct stasis_state_manager *manager, const char *id)
Add a subscriber to the managed stasis state for the given id.
Definition: stasis_state.c:413
static void state_manager_dtor(void *obj)
Definition: stasis_state.c:293
static void state_proxy_dtor(void *obj)
Definition: stasis_state.c:136
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:325
static struct stasis_state * state_alloc(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:168
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:741
#define state_find_or_add(manager, state_topic, id)
Definition: stasis_state.c:271
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:639
static struct stasis_state * __state_find_or_add(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:272
struct stasis_topic * stasis_state_publisher_topic(struct stasis_state_publisher *pub)
Retrieve the publisher's topic.
Definition: stasis_state.c:558
AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id)
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:750
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:498
struct stasis_topic * stasis_state_topic(struct stasis_state_manager *manager, const char *id)
Retrieve a managed topic creating one if not currently managed.
Definition: stasis_state.c:370
#define STATE_BUCKETS
Definition: stasis_state.c:77
void * stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic and stasis state.
Definition: stasis_state.c:471
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:701
const char * stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
Retrieve the underlying subscribed to state's unique id.
Definition: stasis_state.c:488
static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:727
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:689
static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:621
static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:587
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:478
static void state_dtor(void *obj)
Definition: stasis_state.c:121
struct stasis_topic * stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
Retrieve the subscriber's topic.
Definition: stasis_state.c:493
static void subscriber_dtor(void *obj)
Definition: stasis_state.c:392
static void publisher_dtor(void *obj)
Definition: stasis_state.c:525
static void state_proxy_sub_cb(void *obj, void *data)
Definition: stasis_state.c:142
struct stasis_subscription * stasis_state_subscriber_subscription(struct stasis_state_subscriber *sub)
Retrieve the stasis topic subscription if available.
Definition: stasis_state.c:514
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed, and explicitly subscribed state call the given handler.
Definition: stasis_state.c:764
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
Definition: stasis_state.c:553
static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
Definition: stasis_state.c:709
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:659
AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id)
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:563
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:532
struct stasis_topic * stasis_state_all_topic(struct stasis_state_manager *manager)
Retrieve the manager's topic (the topic that all state topics get forwarded to)
Definition: stasis_state.c:365
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:447
static const char * state_id_by_topic(struct stasis_topic *manager_topic, const struct stasis_topic *state_topic)
Definition: stasis_state.c:104
Stasis State API.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
Generic container type.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:808
Forwarding information.
Definition: stasis.c:1533
struct ao2_container * states
Definition: stasis_state.c:81
struct stasis_topic * all_topic
Definition: stasis_state.c:83
Managed stasis state event interface.
Definition: stasis_state.h:463
struct stasis_state_manager * manager
Definition: stasis_state.c:34
struct stasis_state * state
Definition: stasis_state.c:522
struct stasis_state * state
Definition: stasis_state.c:387
struct stasis_subscription * stasis_sub
Definition: stasis_state.c:389
struct stasis_topic * topic
Definition: stasis_state.c:61
struct stasis_message * msg
Definition: stasis_state.c:63
struct stasis_state::@426 eids
struct stasis_state_manager * manager
The manager that owns and handles this state.
Definition: stasis_state.c:57
unsigned int num_subscribers
Definition: stasis_state.c:52
struct stasis_forward * forward
Definition: stasis_state.c:59
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
#define ast_assert(a)
Definition: utils.h:734
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2990
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:887
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:897
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:93
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:877
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158