Asterisk - The Open Source Telephony Project GIT-master-d856a3e
stasis_cache.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis Message API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
33#include "asterisk/hashtab.h"
35#include "asterisk/stasis.h"
36#include "asterisk/utils.h"
37#include "asterisk/vector.h"
38
39#ifdef LOW_MEMORY
40#define NUM_CACHE_BUCKETS 17
41#else
42#define NUM_CACHE_BUCKETS 563
43#endif
44
45/*! \internal */
52};
53
54/*! \internal */
60};
61
62static void stasis_caching_topic_dtor(void *obj)
63{
64 struct stasis_caching_topic *caching_topic = obj;
65
66 /* Caching topics contain subscriptions, and must be manually
67 * unsubscribed. */
69 /* If there are any messages in flight to this subscription; that would
70 * be bad. */
72
74
75 ao2_cleanup(caching_topic->sub);
76 caching_topic->sub = NULL;
77 ao2_cleanup(caching_topic->cache);
78 caching_topic->cache = NULL;
79 ao2_cleanup(caching_topic->topic);
80 caching_topic->topic = NULL;
81 ao2_cleanup(caching_topic->original_topic);
82 caching_topic->original_topic = NULL;
83}
84
86{
87 return caching_topic->topic;
88}
89
92{
93 int res;
94
95 if (!caching_topic) {
96 return -1;
97 }
98
99 /* We wait to accept the stasis specific message types until now so that by default everything
100 * will flow to us.
101 */
104 res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105
106 return res;
107}
108
111{
112 if (!caching_topic) {
113 return -1;
114 }
115 return stasis_subscription_set_filter(caching_topic->sub, filter);
116}
117
118
120{
121 if (!caching_topic) {
122 return NULL;
123 }
124
125 /*
126 * The subscription may hold the last reference to this caching
127 * topic, but we want to make sure the unsubscribe finishes
128 * before kicking of the caching topic's dtor.
129 */
130 ao2_ref(caching_topic, +1);
131
132 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133 /*
134 * Increment the reference to hold on to it past the
135 * unsubscribe. Will be cleaned up in dtor.
136 */
137 ao2_ref(caching_topic->sub, +1);
138 stasis_unsubscribe(caching_topic->sub);
139 } else {
140 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141 }
142 ao2_cleanup(caching_topic);
143 return NULL;
144}
145
147{
148 if (!caching_topic) {
149 return NULL;
150 }
151
152 /* Hold a ref past the unsubscribe */
153 ao2_ref(caching_topic, +1);
154 stasis_caching_unsubscribe(caching_topic);
155 stasis_subscription_join(caching_topic->sub);
156 ao2_cleanup(caching_topic);
157 return NULL;
158}
159
160/*!
161 * \brief The key for an entry in the cache
162 * \note The items in this struct must be immutable for the item in the cache
163 */
165 /*! The message type of the item stored in the cache */
167 /*! The unique ID of the item stored in the cache */
168 const char *id;
169 /*! The hash, computed from \c type and \c id */
170 unsigned int hash;
171};
172
175 /*! Aggregate snapshot of the stasis cache. */
177 /*! Local entity snapshot of the stasis event. */
179 /*! Remote entity snapshots of the stasis event. */
181};
182
183static void cache_entry_dtor(void *obj)
184{
185 struct stasis_cache_entry *entry = obj;
186 size_t idx;
187
188 entry->key.type = NULL;
189 ast_free((char *) entry->key.id);
190 entry->key.id = NULL;
191
192 ao2_cleanup(entry->aggregate);
193 entry->aggregate = NULL;
194 ao2_cleanup(entry->local);
195 entry->local = NULL;
196
197 for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
198 struct stasis_message *remote;
199
200 remote = AST_VECTOR_GET(&entry->remote, idx);
201 ao2_cleanup(remote);
202 }
203 AST_VECTOR_FREE(&entry->remote);
204}
205
207{
209 key->hash += ast_hashtab_hash_string(key->id);
210}
211
212static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
213{
215 int is_remote;
216
217 ast_assert(id != NULL);
218 ast_assert(snapshot != NULL);
219
220 if (!type) {
221 return NULL;
222 }
223
226 if (!entry) {
227 return NULL;
228 }
229
230 entry->key.id = ast_strdup(id);
231 if (!entry->key.id) {
233 return NULL;
234 }
235 /*
236 * Normal ao2 ref counting rules says we should increment the message
237 * type ref here and decrement it in cache_entry_dtor(). However, the
238 * stasis message snapshot is cached here, will always have the same type
239 * as the cache entry, and can legitimately cause the type ref count to
240 * hit the excessive ref count assertion. Since the cache entry will
241 * always have a snapshot we can get away with not holding a ref here.
242 */
243 ast_assert(type == stasis_message_type(snapshot));
244 entry->key.type = type;
246
247 is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
248 if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
250 return NULL;
251 }
252
253 if (is_remote) {
254 if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
256 return NULL;
257 }
258 } else {
259 entry->local = snapshot;
260 }
261 ao2_bump(snapshot);
262
263 return entry;
264}
265
266static int cache_entry_hash(const void *obj, int flags)
267{
268 const struct stasis_cache_entry *object;
269 const struct cache_entry_key *key;
270
271 switch (flags & OBJ_SEARCH_MASK) {
272 case OBJ_SEARCH_KEY:
273 key = obj;
274 break;
276 object = obj;
277 key = &object->key;
278 break;
279 default:
280 /* Hash can only work on something with a full key. */
281 ast_assert(0);
282 return 0;
283 }
284
285 return (int)key->hash;
286}
287
288static int cache_entry_cmp(void *obj, void *arg, int flags)
289{
290 const struct stasis_cache_entry *object_left = obj;
291 const struct stasis_cache_entry *object_right = arg;
292 const struct cache_entry_key *right_key = arg;
293 int cmp;
294
295 switch (flags & OBJ_SEARCH_MASK) {
297 right_key = &object_right->key;
298 /* Fall through */
299 case OBJ_SEARCH_KEY:
300 cmp = object_left->key.type != right_key->type
301 || strcmp(object_left->key.id, right_key->id);
302 break;
304 /* Not supported by container */
305 ast_assert(0);
306 cmp = -1;
307 break;
308 default:
309 /*
310 * What arg points to is specific to this traversal callback
311 * and has no special meaning to astobj2.
312 */
313 cmp = 0;
314 break;
315 }
316 if (cmp) {
317 return 0;
318 }
319 /*
320 * At this point the traversal callback is identical to a sorted
321 * container.
322 */
323 return CMP_MATCH;
324}
325
326static void cache_dtor(void *obj)
327{
328 struct stasis_cache *cache = obj;
329
330 ao2_cleanup(cache->entries);
331 cache->entries = NULL;
332}
333
337{
338 struct stasis_cache *cache;
339
342 if (!cache) {
343 return NULL;
344 }
345
348 if (!cache->entries) {
350 return NULL;
351 }
352
353 cache->id_fn = id_fn;
354 cache->aggregate_calc_fn = aggregate_calc_fn;
355 cache->aggregate_publish_fn = aggregate_publish_fn;
356
357 return cache;
358}
359
361{
363}
364
366{
367 return entry->aggregate;
368}
369
371{
372 return entry->local;
373}
374
376{
377 if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378 return AST_VECTOR_GET(&entry->remote, idx);
379 }
380 return NULL;
381}
382
383/*!
384 * \internal
385 * \brief Find the cache entry in the cache entries container.
386 *
387 * \param entries Container of cached entries.
388 * \param type Type of message to retrieve the cache entry.
389 * \param id Identity of the snapshot to retrieve the cache entry.
390 *
391 * \note The entries container is already locked.
392 *
393 * \return Cache-entry on success.
394 * \retval NULL Not in cache.
395 */
396static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
397{
398 struct cache_entry_key search_key;
400
401 search_key.type = type;
402 search_key.id = id;
403 cache_entry_compute_hash(&search_key);
404 entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
405
406 /* Ensure that what we looked for is what we found. */
408 || (!strcmp(stasis_message_type_name(entry->key.type),
409 stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
410 return entry;
411}
412
413/*!
414 * \internal
415 * \brief Remove the stasis snapshot in the cache entry determined by eid.
416 *
417 * \param entries Container of cached entries.
418 * \param cached_entry The entry to remove the snapshot from.
419 * \param eid Which snapshot in the cached entry.
420 *
421 * \note The entries container is already locked.
422 *
423 * \return Previous stasis entry snapshot.
424 */
425static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
426{
427 struct stasis_message *old_snapshot;
428 int is_remote;
429
430 is_remote = ast_eid_cmp(eid, &ast_eid_default);
431 if (!is_remote) {
432 old_snapshot = cached_entry->local;
433 cached_entry->local = NULL;
434 } else {
435 int idx;
436
437 old_snapshot = NULL;
438 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
439 struct stasis_message *cur;
440
441 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
442 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
443 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
444 break;
445 }
446 }
447 }
448
449 if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
450 ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
451 }
452
453 return old_snapshot;
454}
455
456/*!
457 * \internal
458 * \brief Update the stasis snapshot in the cache entry determined by eid.
459 *
460 * \param cached_entry The entry to remove the snapshot from.
461 * \param eid Which snapshot in the cached entry.
462 * \param new_snapshot Snapshot to replace the old snapshot.
463 *
464 * \return Previous stasis entry snapshot.
465 */
466static struct stasis_message *cache_update(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
467{
468 struct stasis_message *old_snapshot;
469 int is_remote;
470 int idx;
471
472 is_remote = ast_eid_cmp(eid, &ast_eid_default);
473 if (!is_remote) {
474 old_snapshot = cached_entry->local;
475 cached_entry->local = ao2_bump(new_snapshot);
476 return old_snapshot;
477 }
478
479 old_snapshot = NULL;
480 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
481 struct stasis_message *cur;
482
483 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
484 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
485 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
486 break;
487 }
488 }
489 if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
490 ao2_bump(new_snapshot);
491 }
492
493 return old_snapshot;
494}
495
497 /*! Old cache eid snapshot. */
499 /*! Old cache aggregate snapshot. */
501 /*! New cache aggregate snapshot. */
503};
504
506 struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
507 struct stasis_message *new_snapshot)
508{
509 struct stasis_cache_entry *cached_entry;
510 struct cache_put_snapshots snapshots;
511
512 ast_assert(cache->entries != NULL);
513 ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
514 ast_assert(new_snapshot == NULL ||
515 type == stasis_message_type(new_snapshot));
516
517 memset(&snapshots, 0, sizeof(snapshots));
518
519 ao2_wrlock(cache->entries);
520
521 cached_entry = cache_find(cache->entries, type, id);
522
523 /* Update the eid snapshot. */
524 if (!new_snapshot) {
525 /* Remove snapshot from cache */
526 if (cached_entry) {
527 snapshots.old = cache_remove(cache->entries, cached_entry, eid);
528 }
529 } else if (cached_entry) {
530 /* Update snapshot in cache */
531 snapshots.old = cache_update(cached_entry, eid, new_snapshot);
532 } else {
533 /* Insert into the cache */
534 cached_entry = cache_entry_create(type, id, new_snapshot);
535 if (cached_entry) {
536 ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
537 }
538 }
539
540 /* Update the aggregate snapshot. */
541 if (cache->aggregate_calc_fn && cached_entry) {
542 snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
543 snapshots.aggregate_old = cached_entry->aggregate;
544 cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
545 }
546
547 ao2_unlock(cache->entries);
548
549 ao2_cleanup(cached_entry);
550 return snapshots;
551}
552
553/*!
554 * \internal
555 * \brief Dump all entity snapshots in the cache entry into the given container.
556 *
557 * \param snapshots Container to put all snapshots in the cache entry.
558 * \param entry Cache entry to use.
559 *
560 * \retval 0 on success.
561 * \retval non-zero on error.
562 */
563static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
564{
565 int idx;
566 int err = 0;
567
568 ast_assert(snapshots != NULL);
570
571 /* The aggregate snapshot is not a snapshot from an entity. */
572
573 if (entry->local) {
574 err |= !ao2_link(snapshots, entry->local);
575 }
576
577 for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
578 struct stasis_message *snapshot;
579
580 snapshot = AST_VECTOR_GET(&entry->remote, idx);
581 err |= !ao2_link(snapshots, snapshot);
582 }
583
584 return err;
585}
586
588{
589 struct stasis_cache_entry *cached_entry;
590 struct ao2_container *found;
591
593 ast_assert(cache->entries != NULL);
594 ast_assert(id != NULL);
595
596 if (!type) {
597 return NULL;
598 }
599
601 if (!found) {
602 return NULL;
603 }
604
605 ao2_rdlock(cache->entries);
606
607 cached_entry = cache_find(cache->entries, type, id);
608 if (cached_entry && cache_entry_dump(found, cached_entry)) {
609 ao2_cleanup(found);
610 found = NULL;
611 }
612
613 ao2_unlock(cache->entries);
614
615 ao2_cleanup(cached_entry);
616 return found;
617}
618
619/*!
620 * \internal
621 * \brief Retrieve an item from the cache entry for a specific eid.
622 *
623 * \param entry Cache entry to use.
624 * \param eid Specific entity id to retrieve. NULL for aggregate.
625 *
626 * \note The returned snapshot has not had its reference bumped.
627 *
628 * \return Snapshot from the cache.
629 * \retval NULL if snapshot is not found.
630 */
631static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
632{
633 int is_remote;
634 int idx;
635
636 if (!eid) {
637 /* Get aggregate. */
638 return entry->aggregate;
639 }
640
641 /* Get snapshot with specific eid. */
642 is_remote = ast_eid_cmp(eid, &ast_eid_default);
643 if (!is_remote) {
644 return entry->local;
645 }
646
647 for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
648 struct stasis_message *cur;
649
650 cur = AST_VECTOR_GET(&entry->remote, idx);
651 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
652 return cur;
653 }
654 }
655
656 return NULL;
657}
658
659struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
660{
661 struct stasis_cache_entry *cached_entry;
662 struct stasis_message *snapshot = NULL;
663
665 ast_assert(cache->entries != NULL);
666 ast_assert(id != NULL);
667
668 if (!type) {
669 return NULL;
670 }
671
672 ao2_rdlock(cache->entries);
673
674 cached_entry = cache_find(cache->entries, type, id);
675 if (cached_entry) {
676 snapshot = cache_entry_by_eid(cached_entry, eid);
677 ao2_bump(snapshot);
678 }
679
680 ao2_unlock(cache->entries);
681
682 ao2_cleanup(cached_entry);
683 return snapshot;
684}
685
687{
689}
690
694 const struct ast_eid *eid;
695};
696
697static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
698{
699 struct cache_dump_data *cache_dump = arg;
700 struct stasis_cache_entry *entry = obj;
701
702 if (!cache_dump->type || entry->key.type == cache_dump->type) {
703 struct stasis_message *snapshot;
704
705 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
706 if (snapshot) {
707 if (!ao2_link(cache_dump->container, snapshot)) {
708 ao2_cleanup(cache_dump->container);
709 cache_dump->container = NULL;
710 return CMP_STOP;
711 }
712 }
713 }
714
715 return 0;
716}
717
719{
720 struct cache_dump_data cache_dump;
721
723 ast_assert(cache->entries != NULL);
724
725 cache_dump.eid = eid;
726 cache_dump.type = type;
728 if (!cache_dump.container) {
729 return NULL;
730 }
731
733 return cache_dump.container;
734}
735
737{
739}
740
741static int cache_dump_all_cb(void *obj, void *arg, int flags)
742{
743 struct cache_dump_data *cache_dump = arg;
744 struct stasis_cache_entry *entry = obj;
745
746 if (!cache_dump->type || entry->key.type == cache_dump->type) {
747 if (cache_entry_dump(cache_dump->container, entry)) {
748 ao2_cleanup(cache_dump->container);
749 cache_dump->container = NULL;
750 return CMP_STOP;
751 }
752 }
753
754 return 0;
755}
756
758{
759 struct cache_dump_data cache_dump;
760
762 ast_assert(cache->entries != NULL);
763
764 cache_dump.eid = NULL;
765 cache_dump.type = type;
767 if (!cache_dump.container) {
768 return NULL;
769 }
770
772 return cache_dump.container;
773}
774
777
779{
781}
782
783static void stasis_cache_update_dtor(void *obj)
784{
785 struct stasis_cache_update *update = obj;
786
787 ao2_cleanup(update->old_snapshot);
788 update->old_snapshot = NULL;
789 ao2_cleanup(update->new_snapshot);
790 update->new_snapshot = NULL;
791 ao2_cleanup(update->type);
792 update->type = NULL;
793}
794
795static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
796{
798 struct stasis_message *msg;
799
800 ast_assert(old_snapshot != NULL || new_snapshot != NULL);
801
803 return NULL;
804 }
805
808 if (!update) {
809 return NULL;
810 }
811
812 if (old_snapshot) {
813 ao2_ref(old_snapshot, +1);
814 update->old_snapshot = old_snapshot;
815 if (!new_snapshot) {
816 ao2_ref(stasis_message_type(old_snapshot), +1);
817 update->type = stasis_message_type(old_snapshot);
818 }
819 }
820 if (new_snapshot) {
821 ao2_ref(new_snapshot, +1);
822 update->new_snapshot = new_snapshot;
823 ao2_ref(stasis_message_type(new_snapshot), +1);
824 update->type = stasis_message_type(new_snapshot);
825 }
826
828
830 return msg;
831}
832
834 struct stasis_message *message)
835{
836 struct stasis_caching_topic *caching_topic_needs_unref;
837 struct stasis_caching_topic *caching_topic = data;
838 struct stasis_message *msg;
839 struct stasis_message *msg_put;
840 struct stasis_message_type *msg_type;
841 const struct ast_eid *msg_eid;
842 const char *msg_id;
843
844 ast_assert(caching_topic != NULL);
845 ast_assert(caching_topic->topic != NULL);
846 ast_assert(caching_topic->cache != NULL);
847 ast_assert(caching_topic->cache->id_fn != NULL);
848
850 caching_topic_needs_unref = caching_topic;
851 } else {
852 caching_topic_needs_unref = NULL;
853 }
854
855 msg_type = stasis_message_type(message);
856
857 if (stasis_subscription_change_type() == msg_type) {
859
860 /*
861 * If this change type is an unsubscribe, we need to find the original
862 * subscribe and remove it from the cache otherwise the cache will
863 * continue to grow unabated.
864 */
865 if (strcmp(change->description, "Unsubscribe") == 0) {
866 struct stasis_cache_entry *cached_sub;
867
868 ao2_wrlock(caching_topic->cache->entries);
869 cached_sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
870 if (cached_sub) {
871 ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub, stasis_message_eid(message)));
872 ao2_cleanup(cached_sub);
873 }
874 ao2_unlock(caching_topic->cache->entries);
875 ao2_cleanup(caching_topic_needs_unref);
876 return;
877 }
878 msg_put = message;
879 msg = message;
880 } else if (stasis_cache_clear_type() == msg_type) {
881 /* Cache clear event. */
882 msg_put = NULL;
884 msg_type = stasis_message_type(msg);
885 } else {
886 /* Normal cache update event. */
887 msg_put = message;
888 msg = message;
889 }
890 ast_assert(msg_type != NULL);
891
892 msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
893 msg_id = caching_topic->cache->id_fn(msg);
894 if (msg_id && msg_eid) {
895 struct stasis_message *update;
896 struct cache_put_snapshots snapshots;
897
898 /* Update the cache */
899 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
900 if (snapshots.old || msg_put) {
901 if (stasis_topic_subscribers(caching_topic->topic)) {
902 update = update_create(snapshots.old, msg_put);
903 if (update) {
904 stasis_publish(caching_topic->topic, update);
905 ao2_ref(update, -1);
906 }
907 }
908 } else {
909 ast_debug(1,
910 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
911 stasis_topic_name(caching_topic->topic),
912 stasis_message_type_name(msg_type), msg_id);
913 }
914
915 if (snapshots.aggregate_old != snapshots.aggregate_new) {
916 if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
917 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
918 snapshots.aggregate_new);
919 }
920 if (stasis_topic_subscribers(caching_topic->topic)) {
921 update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
922 if (update) {
923 stasis_publish(caching_topic->topic, update);
924 ao2_ref(update, -1);
925 }
926 }
927 }
928
929 ao2_cleanup(snapshots.old);
930 ao2_cleanup(snapshots.aggregate_old);
931 ao2_cleanup(snapshots.aggregate_new);
932 }
933
934 ao2_cleanup(caching_topic_needs_unref);
935}
936
937static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
938{
939 struct stasis_cache_entry *entry = v_obj;
940
941 if (!entry) {
942 return;
943 }
944 prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
945 entry->key.id, entry->key.hash);
946}
947
949{
950 struct stasis_caching_topic *caching_topic;
951 static int caching_id;
952 char *new_name;
953 int ret;
954
955 ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956 if (ret < 0) {
957 return NULL;
958 }
959
960 caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962 if (caching_topic == NULL) {
963 ast_free(new_name);
964
965 return NULL;
966 }
967
968 caching_topic->topic = stasis_topic_create(new_name);
969 if (caching_topic->topic == NULL) {
970 ao2_ref(caching_topic, -1);
971 ast_free(new_name);
972
973 return NULL;
974 }
975
976 ao2_ref(cache, +1);
977 caching_topic->cache = cache;
978 if (!cache->registered) {
979 if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980 ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981 cache->entries, new_name);
982 } else {
983 cache->registered = 1;
984 }
985 }
986 ast_free(new_name);
987
988 caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989 if (caching_topic->sub == NULL) {
990 ao2_ref(caching_topic, -1);
991
992 return NULL;
993 }
994
996 caching_topic->original_topic = original_topic;
997
998 /* The subscription holds the reference, so no additional ref bump. */
999 return caching_topic;
1000}
1001
1002static void stasis_cache_cleanup(void)
1003{
1006}
1007
1009{
1011
1013 return -1;
1014 }
1015
1017 return -1;
1018 }
1019
1020 return 0;
1021}
enum queue_result id
Definition: app_queue.c:1638
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028
#define ao2_rdlock(a)
Definition: astobj2.h:718
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition: astobj2.h:365
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition: astobj2.h:1600
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
static const char type[]
Definition: chan_ooh323.c:109
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
Generic (perhaps overly so) hashtable implementation Hash Table support in Asterisk.
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition: hashtab.c:153
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
struct ao2_container * cache
Definition: pbx_realtime.c:77
struct stasis_forward * sub
Definition: res_corosync.c:240
#define NULL
Definition: resample.c:96
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1119
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
Definition: stasis.h:1031
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1175
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
Definition: stasis.h:1055
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:972
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1512
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1151
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1009
static void stasis_cache_cleanup(void)
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
static void stasis_cache_update_dtor(void *obj)
Definition: stasis_cache.c:783
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
Definition: stasis_cache.c:937
static struct stasis_cache_entry * cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
Definition: stasis_cache.c:212
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
static void cache_entry_dtor(void *obj)
Definition: stasis_cache.c:183
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:697
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
static struct stasis_message * cache_update(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:466
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
static void cache_entry_compute_hash(struct cache_entry_key *key)
Definition: stasis_cache.c:206
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
Definition: stasis_cache.c:425
static void stasis_caching_topic_dtor(void *obj)
Definition: stasis_cache.c:62
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: stasis_cache.c:833
static struct cache_put_snapshots cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:505
int stasis_cache_init(void)
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
static int cache_entry_hash(const void *obj, int flags)
Definition: stasis_cache.c:266
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type)
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
#define NUM_CACHE_BUCKETS
Definition: stasis_cache.c:42
struct stasis_message * stasis_cache_clear_create(struct stasis_message *id_message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
static int cache_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis_cache.c:288
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
static void cache_dtor(void *obj)
Definition: stasis_cache.c:326
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
Definition: stasis_cache.c:370
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
static struct stasis_message * update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:795
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
Definition: stasis_cache.c:375
static int cache_dump_all_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:741
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
Internal Stasis APIs.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857
Generic container type.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:813
unsigned char eid[6]
Definition: utils.h:814
struct stasis_message_type * type
Definition: stasis_cache.c:693
const struct ast_eid * eid
Definition: stasis_cache.c:694
struct ao2_container * container
Definition: stasis_cache.c:692
The key for an entry in the cache.
Definition: stasis_cache.c:164
unsigned int hash
Definition: stasis_cache.c:170
struct stasis_message_type * type
Definition: stasis_cache.c:166
const char * id
Definition: stasis_cache.c:168
struct stasis_message * aggregate_old
Definition: stasis_cache.c:500
struct stasis_message * old
Definition: stasis_cache.c:498
struct stasis_message * aggregate_new
Definition: stasis_cache.c:502
Definition: search.h:40
char * key
Definition: search.h:41
Definition: stasis_cache.c:173
struct stasis_cache_entry::@396 remote
struct stasis_message * local
Definition: stasis_cache.c:178
struct cache_entry_key key
Definition: stasis_cache.c:174
struct stasis_message * aggregate
Definition: stasis_cache.c:176
Cache update message.
Definition: stasis.h:965
struct ao2_container * entries
Definition: stasis_cache.c:47
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
snapshot_get_id id_fn
Definition: stasis_cache.c:48
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
struct stasis_subscription * sub
Definition: stasis_cache.c:59
struct stasis_topic * topic
Definition: stasis_cache.c:57
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
struct ast_eid eid
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
Utility functions.
#define ast_assert(a)
Definition: utils.h:739
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: utils.c:3094
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
Vector container support.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680