Asterisk - The Open Source Telephony Project GIT-master-f36a736
Data Structures | Macros | Functions
stasis_cache.c File Reference

Stasis Message API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/hashtab.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"
Include dependency graph for stasis_cache.c:

Go to the source code of this file.

Data Structures

struct  cache_dump_data
 
struct  cache_entry_key
 The key for an entry in the cache. More...
 
struct  cache_put_snapshots
 
struct  stasis_cache
 
struct  stasis_cache_entry
 
struct  stasis_caching_topic
 

Macros

#define NUM_CACHE_BUCKETS   563
 

Functions

static void cache_dtor (void *obj)
 
static int cache_dump_all_cb (void *obj, void *arg, int flags)
 
static int cache_dump_by_eid_cb (void *obj, void *arg, int flags)
 
static struct stasis_messagecache_entry_by_eid (const struct stasis_cache_entry *entry, const struct ast_eid *eid)
 
static int cache_entry_cmp (void *obj, void *arg, int flags)
 
static void cache_entry_compute_hash (struct cache_entry_key *key)
 
static struct stasis_cache_entrycache_entry_create (struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
 
static void cache_entry_dtor (void *obj)
 
static int cache_entry_dump (struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
 
static int cache_entry_hash (const void *obj, int flags)
 
static struct stasis_cache_entrycache_find (struct ao2_container *entries, struct stasis_message_type *type, const char *id)
 
static struct cache_put_snapshots cache_put (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static struct stasis_messagecache_remove (struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
 
static struct stasis_messagecache_update (struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static void caching_topic_exec (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
static void print_cache_entry (void *v_obj, void *where, ao2_prnt_fn *prnt)
 
static void stasis_cache_cleanup (void)
 
struct stasis_messagestasis_cache_clear_create (struct stasis_message *id_message)
 A message which instructs the caching topic to remove an entry from its cache. More...
 
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache. More...
 
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache. More...
 
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription. More...
 
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity. More...
 
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index. More...
 
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache. More...
 
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity. More...
 
int stasis_cache_init (void)
 
static void stasis_cache_update_dtor (void *obj)
 
int stasis_caching_accept_message_type (struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
 Indicate to a caching topic that we are interested in a message type. More...
 
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics. More...
 
int stasis_caching_set_filter (struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic. More...
 
static void stasis_caching_topic_dtor (void *obj)
 
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic. More...
 
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_clear_type)
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_update_type)
 
static struct stasis_messageupdate_create (struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
 

Detailed Description

Stasis Message API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_cache.c.

Macro Definition Documentation

◆ NUM_CACHE_BUCKETS

#define NUM_CACHE_BUCKETS   563

Definition at line 42 of file stasis_cache.c.

Function Documentation

◆ cache_dtor()

static void cache_dtor ( void *  obj)
static

Definition at line 326 of file stasis_cache.c.

327{
328 struct stasis_cache *cache = obj;
329
330 ao2_cleanup(cache->entries);
331 cache->entries = NULL;
332}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
struct ao2_container * cache
Definition: pbx_realtime.c:77
#define NULL
Definition: resample.c:96

References ao2_cleanup, cache, and NULL.

Referenced by stasis_cache_create_full().

◆ cache_dump_all_cb()

static int cache_dump_all_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 741 of file stasis_cache.c.

742{
743 struct cache_dump_data *cache_dump = arg;
744 struct stasis_cache_entry *entry = obj;
745
746 if (!cache_dump->type || entry->key.type == cache_dump->type) {
747 if (cache_entry_dump(cache_dump->container, entry)) {
748 ao2_cleanup(cache_dump->container);
749 cache_dump->container = NULL;
750 return CMP_STOP;
751 }
752 }
753
754 return 0;
755}
@ CMP_STOP
Definition: astobj2.h:1028
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
struct stasis_message_type * type
Definition: stasis_cache.c:693
struct ao2_container * container
Definition: stasis_cache.c:692
Definition: search.h:40
char * key
Definition: search.h:41
Definition: stasis_cache.c:173

References ao2_cleanup, cache_entry_dump(), CMP_STOP, cache_dump_data::container, entry::key, NULL, and cache_dump_data::type.

Referenced by stasis_cache_dump_all().

◆ cache_dump_by_eid_cb()

static int cache_dump_by_eid_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 697 of file stasis_cache.c.

698{
699 struct cache_dump_data *cache_dump = arg;
700 struct stasis_cache_entry *entry = obj;
701
702 if (!cache_dump->type || entry->key.type == cache_dump->type) {
703 struct stasis_message *snapshot;
704
705 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
706 if (snapshot) {
707 if (!ao2_link(cache_dump->container, snapshot)) {
708 ao2_cleanup(cache_dump->container);
709 cache_dump->container = NULL;
710 return CMP_STOP;
711 }
712 }
713 }
714
715 return 0;
716}
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
const struct ast_eid * eid
Definition: stasis_cache.c:694

References ao2_cleanup, ao2_link, cache_entry_by_eid(), CMP_STOP, cache_dump_data::container, cache_dump_data::eid, entry::key, NULL, and cache_dump_data::type.

Referenced by stasis_cache_dump_by_eid().

◆ cache_entry_by_eid()

static struct stasis_message * cache_entry_by_eid ( const struct stasis_cache_entry entry,
const struct ast_eid eid 
)
static

Definition at line 631 of file stasis_cache.c.

632{
633 int is_remote;
634 int idx;
635
636 if (!eid) {
637 /* Get aggregate. */
638 return entry->aggregate;
639 }
640
641 /* Get snapshot with specific eid. */
642 is_remote = ast_eid_cmp(eid, &ast_eid_default);
643 if (!is_remote) {
644 return entry->local;
645 }
646
647 for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
648 struct stasis_message *cur;
649
650 cur = AST_VECTOR_GET(&entry->remote, idx);
651 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
652 return cur;
653 }
654 }
655
656 return NULL;
657}
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_eid eid
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: utils.c:3094
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ast_eid_cmp(), ast_eid_default, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_message::eid, NULL, and stasis_message_eid().

Referenced by cache_dump_by_eid_cb(), and stasis_cache_get_by_eid().

◆ cache_entry_cmp()

static int cache_entry_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 288 of file stasis_cache.c.

289{
290 const struct stasis_cache_entry *object_left = obj;
291 const struct stasis_cache_entry *object_right = arg;
292 const struct cache_entry_key *right_key = arg;
293 int cmp;
294
295 switch (flags & OBJ_SEARCH_MASK) {
297 right_key = &object_right->key;
298 /* Fall through */
299 case OBJ_SEARCH_KEY:
300 cmp = object_left->key.type != right_key->type
301 || strcmp(object_left->key.id, right_key->id);
302 break;
304 /* Not supported by container */
305 ast_assert(0);
306 cmp = -1;
307 break;
308 default:
309 /*
310 * What arg points to is specific to this traversal callback
311 * and has no special meaning to astobj2.
312 */
313 cmp = 0;
314 break;
315 }
316 if (cmp) {
317 return 0;
318 }
319 /*
320 * At this point the traversal callback is identical to a sorted
321 * container.
322 */
323 return CMP_MATCH;
324}
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
The key for an entry in the cache.
Definition: stasis_cache.c:164
struct stasis_message_type * type
Definition: stasis_cache.c:166
const char * id
Definition: stasis_cache.c:168
struct cache_entry_key key
Definition: stasis_cache.c:174
#define ast_assert(a)
Definition: utils.h:739

References ast_assert, CMP_MATCH, cache_entry_key::id, stasis_cache_entry::key, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and cache_entry_key::type.

Referenced by stasis_cache_create_full().

◆ cache_entry_compute_hash()

static void cache_entry_compute_hash ( struct cache_entry_key key)
static

Definition at line 206 of file stasis_cache.c.

207{
209 key->hash += ast_hashtab_hash_string(key->id);
210}
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition: hashtab.c:153
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
unsigned int hash
Definition: stasis_cache.c:170

References ast_hashtab_hash_string(), cache_entry_key::hash, cache_entry_key::id, stasis_message_type_hash(), and cache_entry_key::type.

Referenced by cache_entry_create(), and cache_find().

◆ cache_entry_create()

static struct stasis_cache_entry * cache_entry_create ( struct stasis_message_type type,
const char *  id,
struct stasis_message snapshot 
)
static

Definition at line 212 of file stasis_cache.c.

213{
215 int is_remote;
216
217 ast_assert(id != NULL);
218 ast_assert(snapshot != NULL);
219
220 if (!type) {
221 return NULL;
222 }
223
226 if (!entry) {
227 return NULL;
228 }
229
230 entry->key.id = ast_strdup(id);
231 if (!entry->key.id) {
233 return NULL;
234 }
235 /*
236 * Normal ao2 ref counting rules says we should increment the message
237 * type ref here and decrement it in cache_entry_dtor(). However, the
238 * stasis message snapshot is cached here, will always have the same type
239 * as the cache entry, and can legitimately cause the type ref count to
240 * hit the excessive ref count assertion. Since the cache entry will
241 * always have a snapshot we can get away with not holding a ref here.
242 */
243 ast_assert(type == stasis_message_type(snapshot));
244 entry->key.type = type;
246
247 is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
248 if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
250 return NULL;
251 }
252
253 if (is_remote) {
254 if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
256 return NULL;
257 }
258 } else {
259 entry->local = snapshot;
260 }
261 ao2_bump(snapshot);
262
263 return entry;
264}
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
static void cache_entry_dtor(void *obj)
Definition: stasis_cache.c:183
static void cache_entry_compute_hash(struct cache_entry_key *key)
Definition: stasis_cache.c:206
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_cleanup, ast_assert, ast_eid_cmp(), ast_eid_default, ast_strdup, AST_VECTOR_APPEND, AST_VECTOR_INIT, cache_entry_compute_hash(), cache_entry_dtor(), entry::key, NULL, stasis_message_eid(), stasis_message_type(), and type.

Referenced by cache_put().

◆ cache_entry_dtor()

static void cache_entry_dtor ( void *  obj)
static

Definition at line 183 of file stasis_cache.c.

184{
185 struct stasis_cache_entry *entry = obj;
186 size_t idx;
187
188 entry->key.type = NULL;
189 ast_free((char *) entry->key.id);
190 entry->key.id = NULL;
191
192 ao2_cleanup(entry->aggregate);
193 entry->aggregate = NULL;
194 ao2_cleanup(entry->local);
195 entry->local = NULL;
196
197 for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
198 struct stasis_message *remote;
199
200 remote = AST_VECTOR_GET(&entry->remote, idx);
201 ao2_cleanup(remote);
202 }
203 AST_VECTOR_FREE(&entry->remote);
204}
#define ast_free(a)
Definition: astmm.h:180
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174

References ao2_cleanup, ast_free, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, entry::key, and NULL.

Referenced by cache_entry_create().

◆ cache_entry_dump()

static int cache_entry_dump ( struct ao2_container snapshots,
const struct stasis_cache_entry entry 
)
static

Definition at line 563 of file stasis_cache.c.

564{
565 int idx;
566 int err = 0;
567
568 ast_assert(snapshots != NULL);
570
571 /* The aggregate snapshot is not a snapshot from an entity. */
572
573 if (entry->local) {
574 err |= !ao2_link(snapshots, entry->local);
575 }
576
577 for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
578 struct stasis_message *snapshot;
579
580 snapshot = AST_VECTOR_GET(&entry->remote, idx);
581 err |= !ao2_link(snapshots, snapshot);
582 }
583
584 return err;
585}

References ao2_link, ast_assert, AST_VECTOR_GET, AST_VECTOR_SIZE, and NULL.

Referenced by cache_dump_all_cb(), and stasis_cache_get_all().

◆ cache_entry_hash()

static int cache_entry_hash ( const void *  obj,
int  flags 
)
static

Definition at line 266 of file stasis_cache.c.

267{
268 const struct stasis_cache_entry *object;
269 const struct cache_entry_key *key;
270
271 switch (flags & OBJ_SEARCH_MASK) {
272 case OBJ_SEARCH_KEY:
273 key = obj;
274 break;
276 object = obj;
277 key = &object->key;
278 break;
279 default:
280 /* Hash can only work on something with a full key. */
281 ast_assert(0);
282 return 0;
283 }
284
285 return (int)key->hash;
286}

References ast_assert, cache_entry_key::hash, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by stasis_cache_create_full().

◆ cache_find()

static struct stasis_cache_entry * cache_find ( struct ao2_container entries,
struct stasis_message_type type,
const char *  id 
)
static

Definition at line 396 of file stasis_cache.c.

397{
398 struct cache_entry_key search_key;
400
401 search_key.type = type;
402 search_key.id = id;
403 cache_entry_compute_hash(&search_key);
404 entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
405
406 /* Ensure that what we looked for is what we found. */
408 || (!strcmp(stasis_message_type_name(entry->key.type),
409 stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
410 return entry;
411}
enum queue_result id
Definition: app_queue.c:1667
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.

References ao2_find, ast_assert, cache_entry_compute_hash(), id, cache_entry_key::id, entry::key, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_message_type_name(), type, and cache_entry_key::type.

Referenced by cache_put(), caching_topic_exec(), stasis_cache_get_all(), and stasis_cache_get_by_eid().

◆ cache_put()

static struct cache_put_snapshots cache_put ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid,
struct stasis_message new_snapshot 
)
static

Definition at line 505 of file stasis_cache.c.

508{
509 struct stasis_cache_entry *cached_entry;
510 struct cache_put_snapshots snapshots;
511
512 ast_assert(cache->entries != NULL);
513 ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
514 ast_assert(new_snapshot == NULL ||
515 type == stasis_message_type(new_snapshot));
516
517 memset(&snapshots, 0, sizeof(snapshots));
518
519 ao2_wrlock(cache->entries);
520
521 cached_entry = cache_find(cache->entries, type, id);
522
523 /* Update the eid snapshot. */
524 if (!new_snapshot) {
525 /* Remove snapshot from cache */
526 if (cached_entry) {
527 snapshots.old = cache_remove(cache->entries, cached_entry, eid);
528 }
529 } else if (cached_entry) {
530 /* Update snapshot in cache */
531 snapshots.old = cache_update(cached_entry, eid, new_snapshot);
532 } else {
533 /* Insert into the cache */
534 cached_entry = cache_entry_create(type, id, new_snapshot);
535 if (cached_entry) {
536 ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
537 }
538 }
539
540 /* Update the aggregate snapshot. */
541 if (cache->aggregate_calc_fn && cached_entry) {
542 snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
543 snapshots.aggregate_old = cached_entry->aggregate;
544 cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
545 }
546
547 ao2_unlock(cache->entries);
548
549 ao2_cleanup(cached_entry);
550 return snapshots;
551}
#define ao2_wrlock(a)
Definition: astobj2.h:719
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_unlock(a)
Definition: astobj2.h:729
static struct stasis_cache_entry * cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
Definition: stasis_cache.c:212
static struct stasis_message * cache_update(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:466
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
Definition: stasis_cache.c:425
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
struct stasis_message * aggregate
Definition: stasis_cache.c:176

References stasis_cache_entry::aggregate, cache_put_snapshots::aggregate_new, cache_put_snapshots::aggregate_old, ao2_bump, ao2_cleanup, ao2_link_flags, ao2_unlock, ao2_wrlock, ast_assert, cache, cache_entry_create(), cache_find(), cache_remove(), cache_update(), NULL, OBJ_NOLOCK, cache_put_snapshots::old, stasis_message_type(), and type.

Referenced by caching_topic_exec().

◆ cache_remove()

static struct stasis_message * cache_remove ( struct ao2_container entries,
struct stasis_cache_entry cached_entry,
const struct ast_eid eid 
)
static

Definition at line 425 of file stasis_cache.c.

426{
427 struct stasis_message *old_snapshot;
428 int is_remote;
429
430 is_remote = ast_eid_cmp(eid, &ast_eid_default);
431 if (!is_remote) {
432 old_snapshot = cached_entry->local;
433 cached_entry->local = NULL;
434 } else {
435 int idx;
436
437 old_snapshot = NULL;
438 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
439 struct stasis_message *cur;
440
441 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
442 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
443 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
444 break;
445 }
446 }
447 }
448
449 if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
450 ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
451 }
452
453 return old_snapshot;
454}
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition: astobj2.h:1600
struct stasis_cache_entry::@396 remote
struct stasis_message * local
Definition: stasis_cache.c:178
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438

References ao2_unlink_flags, ast_eid_cmp(), ast_eid_default, AST_VECTOR_GET, AST_VECTOR_REMOVE_UNORDERED, AST_VECTOR_SIZE, stasis_message::eid, stasis_cache_entry::local, NULL, OBJ_NOLOCK, stasis_cache_entry::remote, and stasis_message_eid().

Referenced by cache_put(), and caching_topic_exec().

◆ cache_update()

static struct stasis_message * cache_update ( struct stasis_cache_entry cached_entry,
const struct ast_eid eid,
struct stasis_message new_snapshot 
)
static

Definition at line 466 of file stasis_cache.c.

467{
468 struct stasis_message *old_snapshot;
469 int is_remote;
470 int idx;
471
472 is_remote = ast_eid_cmp(eid, &ast_eid_default);
473 if (!is_remote) {
474 old_snapshot = cached_entry->local;
475 cached_entry->local = ao2_bump(new_snapshot);
476 return old_snapshot;
477 }
478
479 old_snapshot = NULL;
480 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
481 struct stasis_message *cur;
482
483 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
484 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
485 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
486 break;
487 }
488 }
489 if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
490 ao2_bump(new_snapshot);
491 }
492
493 return old_snapshot;
494}

References ao2_bump, ast_eid_cmp(), ast_eid_default, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_REMOVE_UNORDERED, AST_VECTOR_SIZE, stasis_message::eid, stasis_cache_entry::local, NULL, stasis_cache_entry::remote, and stasis_message_eid().

Referenced by cache_put(), and consumer_exec().

◆ caching_topic_exec()

static void caching_topic_exec ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 833 of file stasis_cache.c.

835{
836 struct stasis_caching_topic *caching_topic_needs_unref;
837 struct stasis_caching_topic *caching_topic = data;
838 struct stasis_message *msg;
839 struct stasis_message *msg_put;
840 struct stasis_message_type *msg_type;
841 const struct ast_eid *msg_eid;
842 const char *msg_id;
843
844 ast_assert(caching_topic != NULL);
845 ast_assert(caching_topic->topic != NULL);
846 ast_assert(caching_topic->cache != NULL);
847 ast_assert(caching_topic->cache->id_fn != NULL);
848
850 caching_topic_needs_unref = caching_topic;
851 } else {
852 caching_topic_needs_unref = NULL;
853 }
854
855 msg_type = stasis_message_type(message);
856
857 if (stasis_subscription_change_type() == msg_type) {
859
860 /*
861 * If this change type is an unsubscribe, we need to find the original
862 * subscribe and remove it from the cache otherwise the cache will
863 * continue to grow unabated.
864 */
865 if (strcmp(change->description, "Unsubscribe") == 0) {
866 struct stasis_cache_entry *cached_sub;
867
868 ao2_wrlock(caching_topic->cache->entries);
869 cached_sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
870 if (cached_sub) {
871 ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub, stasis_message_eid(message)));
872 ao2_cleanup(cached_sub);
873 }
874 ao2_unlock(caching_topic->cache->entries);
875 ao2_cleanup(caching_topic_needs_unref);
876 return;
877 }
878 msg_put = message;
879 msg = message;
880 } else if (stasis_cache_clear_type() == msg_type) {
881 /* Cache clear event. */
882 msg_put = NULL;
884 msg_type = stasis_message_type(msg);
885 } else {
886 /* Normal cache update event. */
887 msg_put = message;
888 msg = message;
889 }
890 ast_assert(msg_type != NULL);
891
892 msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
893 msg_id = caching_topic->cache->id_fn(msg);
894 if (msg_id && msg_eid) {
895 struct stasis_message *update;
896 struct cache_put_snapshots snapshots;
897
898 /* Update the cache */
899 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
900 if (snapshots.old || msg_put) {
901 if (stasis_topic_subscribers(caching_topic->topic)) {
902 update = update_create(snapshots.old, msg_put);
903 if (update) {
904 stasis_publish(caching_topic->topic, update);
905 ao2_ref(update, -1);
906 }
907 }
908 } else {
909 ast_debug(1,
910 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
911 stasis_topic_name(caching_topic->topic),
912 stasis_message_type_name(msg_type), msg_id);
913 }
914
915 if (snapshots.aggregate_old != snapshots.aggregate_new) {
916 if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
917 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
918 snapshots.aggregate_new);
919 }
920 if (stasis_topic_subscribers(caching_topic->topic)) {
921 update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
922 if (update) {
923 stasis_publish(caching_topic->topic, update);
924 ao2_ref(update, -1);
925 }
926 }
927 }
928
929 ao2_cleanup(snapshots.old);
930 ao2_cleanup(snapshots.aggregate_old);
931 ao2_cleanup(snapshots.aggregate_new);
932 }
933
934 ao2_cleanup(caching_topic_needs_unref);
935}
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
#define ast_debug(level,...)
Log a DEBUG message.
struct stasis_forward * sub
Definition: res_corosync.c:240
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1175
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1512
static struct cache_put_snapshots cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:505
static struct stasis_message * update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:795
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:813
struct ao2_container * entries
Definition: stasis_cache.c:47
snapshot_get_id id_fn
Definition: stasis_cache.c:48
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
struct stasis_topic * topic
Definition: stasis_cache.c:57
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890

References cache_put_snapshots::aggregate_new, cache_put_snapshots::aggregate_old, stasis_cache::aggregate_publish_fn, ao2_cleanup, ao2_ref, ao2_unlock, ao2_wrlock, ast_assert, ast_debug, stasis_caching_topic::cache, cache_find(), cache_put(), cache_remove(), stasis_subscription_change::description, stasis_cache::entries, stasis_cache::id_fn, NULL, cache_put_snapshots::old, stasis_caching_topic::original_topic, stasis_cache_clear_type(), stasis_message_data(), stasis_message_eid(), stasis_message_type(), stasis_message_type_name(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_final_message(), stasis_topic_name(), stasis_topic_subscribers(), sub, stasis_caching_topic::topic, stasis_subscription_change::uniqueid, update(), and update_create().

Referenced by stasis_caching_topic_create().

◆ print_cache_entry()

static void print_cache_entry ( void *  v_obj,
void *  where,
ao2_prnt_fn prnt 
)
static

Definition at line 937 of file stasis_cache.c.

938{
939 struct stasis_cache_entry *entry = v_obj;
940
941 if (!entry) {
942 return;
943 }
944 prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
945 entry->key.id, entry->key.hash);
946}

References entry::key, and stasis_message_type_name().

Referenced by stasis_caching_topic_create().

◆ stasis_cache_cleanup()

static void stasis_cache_cleanup ( void  )
static

Definition at line 1002 of file stasis_cache.c.

1003{
1006}
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515

References stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_CLEANUP.

Referenced by stasis_cache_init().

◆ stasis_cache_clear_create()

struct stasis_message * stasis_cache_clear_create ( struct stasis_message message)

A message which instructs the caching topic to remove an entry from its cache.

Parameters
messageMessage representative of the cache entry that should be cleared. This will become the data held in the stasis_cache_clear message.
Returns
Message which, when sent to a stasis_caching_topic, will clear the item from the cache.
Return values
NULLon error.
Since
12

Definition at line 778 of file stasis_cache.c.

779{
781}
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.

References stasis_cache_clear_type(), and stasis_message_create().

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), clear_node_cache(), and remove_device_states_cb().

◆ stasis_cache_create()

struct stasis_cache * stasis_cache_create ( snapshot_get_id  id_fn)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12

Definition at line 360 of file stasis_cache.c.

361{
362 return stasis_cache_create_full(id_fn, NULL, NULL);
363}
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334

References stasis_cache::id_fn, NULL, and stasis_cache_create_full().

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), mwi_init(), and stasis_cp_all_create().

◆ stasis_cache_create_full()

struct stasis_cache * stasis_cache_create_full ( snapshot_get_id  id_fn,
cache_aggregate_calc_fn  aggregate_calc_fn,
cache_aggregate_publish_fn  aggregate_publish_fn 
)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
aggregate_calc_fnCallback to calculate the aggregate cache entry.
aggregate_publish_fnCallback to publish the aggregate cache entry.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12.2.0

Definition at line 334 of file stasis_cache.c.

337{
338 struct stasis_cache *cache;
339
342 if (!cache) {
343 return NULL;
344 }
345
348 if (!cache->entries) {
350 return NULL;
351 }
352
353 cache->id_fn = id_fn;
354 cache->aggregate_calc_fn = aggregate_calc_fn;
355 cache->aggregate_publish_fn = aggregate_publish_fn;
356
357 return cache;
358}
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition: astobj2.h:365
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
static int cache_entry_hash(const void *obj, int flags)
Definition: stasis_cache.c:266
#define NUM_CACHE_BUCKETS
Definition: stasis_cache.c:42
static int cache_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis_cache.c:288
static void cache_dtor(void *obj)
Definition: stasis_cache.c:326
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49

References stasis_cache::aggregate_calc_fn, stasis_cache::aggregate_publish_fn, AO2_ALLOC_OPT_LOCK_NOLOCK, AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, cache, cache_dtor(), cache_entry_cmp(), cache_entry_hash(), stasis_cache::id_fn, NULL, and NUM_CACHE_BUCKETS.

Referenced by AST_TEST_DEFINE(), devstate_init(), and stasis_cache_create().

◆ stasis_cache_dump()

struct ao2_container * stasis_cache_dump ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump cached items to a subscription for the ast_eid_default entity.

Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error
Since
12

Definition at line 736 of file stasis_cache.c.

737{
739}
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718

References ast_eid_default, cache, stasis_cache_dump_by_eid(), and type.

Referenced by action_presencestatelist(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), AST_TEST_DEFINE(), asterisk_publication_devicestate_refresh(), asterisk_publication_mwi_refresh(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), endpoints_scrape_cb(), load_module(), unload_module(), and xmpp_init_event_distribution().

◆ stasis_cache_dump_all()

struct ao2_container * stasis_cache_dump_all ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump all entity items from the cache to a subscription.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 757 of file stasis_cache.c.

758{
759 struct cache_dump_data cache_dump;
760
762 ast_assert(cache->entries != NULL);
763
764 cache_dump.eid = NULL;
765 cache_dump.type = type;
766 cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767 if (!cache_dump.container) {
768 return NULL;
769 }
770
772 return cache_dump.container;
773}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
static int cache_dump_all_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:741

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache, cache_dump_all_cb(), cache_dump_data::container, cache_dump_data::eid, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by AST_TEST_DEFINE(), cache_cleanup(), and cleanup_module().

◆ stasis_cache_dump_by_eid()

struct ao2_container * stasis_cache_dump_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const struct ast_eid eid 
)

Dump cached items to a subscription for a specific entity.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
eidSpecific entity id to retrieve. NULL for aggregate.
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 718 of file stasis_cache.c.

719{
720 struct cache_dump_data cache_dump;
721
723 ast_assert(cache->entries != NULL);
724
725 cache_dump.eid = eid;
726 cache_dump.type = type;
727 cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728 if (!cache_dump.container) {
729 return NULL;
730 }
731
733 return cache_dump.container;
734}
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:697

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache, cache_dump_by_eid_cb(), cache_dump_data::container, cache_dump_data::eid, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by action_devicestatelist(), AST_TEST_DEFINE(), cpg_confchg_cb(), and stasis_cache_dump().

◆ stasis_cache_entry_get_aggregate()

struct stasis_message * stasis_cache_entry_get_aggregate ( struct stasis_cache_entry entry)

Get the aggregate cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the aggregate snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Aggregate-snapshotin cache.
NULLif not present.

Definition at line 365 of file stasis_cache.c.

366{
367 return entry->aggregate;
368}

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_entry_get_local()

struct stasis_message * stasis_cache_entry_get_local ( struct stasis_cache_entry entry)

Get the local entity's cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the local entity's snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Internal-snapshotin cache.
NULLif not present.

Definition at line 370 of file stasis_cache.c.

371{
372 return entry->local;
373}

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_entry_get_remote()

struct stasis_message * stasis_cache_entry_get_remote ( struct stasis_cache_entry entry,
int  idx 
)

Get a remote entity's cache entry snapshot by index.

Since
12.2.0
Parameters
entryCache entry to get a remote entity's snapshot.
idxWhich remote entity's snapshot to get.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Remote-entity-snapshotin cache.
NULLif not present.

Definition at line 375 of file stasis_cache.c.

376{
377 if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378 return AST_VECTOR_GET(&entry->remote, idx);
379 }
380 return NULL;
381}

References AST_VECTOR_GET, AST_VECTOR_SIZE, and NULL.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

◆ stasis_cache_get()

struct stasis_message * stasis_cache_get ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve an item from the cache for the ast_eid_default entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12

Definition at line 686 of file stasis_cache.c.

687{
689}
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659

References ast_eid_default, cache, stasis_cache_get_by_eid(), and type.

Referenced by ast_endpoint_latest_snapshot(), AST_TEST_DEFINE(), has_voicemail(), presence_state_cached(), unistim_send_mwi_to_peer(), and update_registry().

◆ stasis_cache_get_all()

struct ao2_container * stasis_cache_get_all ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve all matching entity items from the cache.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Container of matching items found.
Return values
NULLif error.

Definition at line 587 of file stasis_cache.c.

588{
589 struct stasis_cache_entry *cached_entry;
590 struct ao2_container *found;
591
593 ast_assert(cache->entries != NULL);
594 ast_assert(id != NULL);
595
596 if (!type) {
597 return NULL;
598 }
599
601 if (!found) {
602 return NULL;
603 }
604
605 ao2_rdlock(cache->entries);
606
607 cached_entry = cache_find(cache->entries, type, id);
608 if (cached_entry && cache_entry_dump(found, cached_entry)) {
609 ao2_cleanup(found);
610 found = NULL;
611 }
612
613 ao2_unlock(cache->entries);
614
615 ao2_cleanup(cached_entry);
616 return found;
617}
#define ao2_rdlock(a)
Definition: astobj2.h:718
Generic container type.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_rdlock, ao2_unlock, ast_assert, cache, cache_entry_dump(), cache_find(), NULL, and type.

Referenced by AST_TEST_DEFINE().

◆ stasis_cache_get_by_eid()

struct stasis_message * stasis_cache_get_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid 
)

Retrieve an item from the cache for a specific entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
eidSpecific entity id to retrieve. NULL for aggregate.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12.2.0

Definition at line 659 of file stasis_cache.c.

660{
661 struct stasis_cache_entry *cached_entry;
662 struct stasis_message *snapshot = NULL;
663
665 ast_assert(cache->entries != NULL);
666 ast_assert(id != NULL);
667
668 if (!type) {
669 return NULL;
670 }
671
672 ao2_rdlock(cache->entries);
673
674 cached_entry = cache_find(cache->entries, type, id);
675 if (cached_entry) {
676 snapshot = cache_entry_by_eid(cached_entry, eid);
677 ao2_bump(snapshot);
678 }
679
680 ao2_unlock(cache->entries);
681
682 ao2_cleanup(cached_entry);
683 return snapshot;
684}

References ao2_bump, ao2_cleanup, ao2_rdlock, ao2_unlock, ast_assert, cache, cache_entry_by_eid(), cache_find(), stasis_message::eid, NULL, and type.

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), AST_TEST_DEFINE(), check_cache_aggregate(), devstate_cached(), and stasis_cache_get().

◆ stasis_cache_init()

int stasis_cache_init ( void  )

Definition at line 1008 of file stasis_cache.c.

1009{
1011
1013 return -1;
1014 }
1015
1017 return -1;
1018 }
1019
1020 return 0;
1021}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
static void stasis_cache_cleanup(void)

References ast_register_cleanup(), stasis_cache_cleanup(), stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_INIT.

Referenced by stasis_init().

◆ stasis_cache_update_dtor()

static void stasis_cache_update_dtor ( void *  obj)
static

Definition at line 783 of file stasis_cache.c.

784{
785 struct stasis_cache_update *update = obj;
786
787 ao2_cleanup(update->old_snapshot);
788 update->old_snapshot = NULL;
789 ao2_cleanup(update->new_snapshot);
790 update->new_snapshot = NULL;
791 ao2_cleanup(update->type);
792 update->type = NULL;
793}
Cache update message.
Definition: stasis.h:965

References ao2_cleanup, NULL, and update().

Referenced by update_create().

◆ stasis_caching_accept_message_type()

int stasis_caching_accept_message_type ( struct stasis_caching_topic caching_topic,
struct stasis_message_type type 
)

Indicate to a caching topic that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
caching_topicThe caching topic.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 90 of file stasis_cache.c.

92{
93 int res;
94
95 if (!caching_topic) {
96 return -1;
97 }
98
99 /* We wait to accept the stasis specific message types until now so that by default everything
100 * will flow to us.
101 */
104 res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105
106 return res;
107}
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
struct stasis_subscription * sub
Definition: stasis_cache.c:59

References stasis_cache_clear_type(), stasis_subscription_accept_message_type(), stasis_subscription_change_type(), stasis_caching_topic::sub, and type.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_accept_message_type().

◆ stasis_caching_get_topic()

struct stasis_topic * stasis_caching_get_topic ( struct stasis_caching_topic caching_topic)

Returns the topic of cached events from a caching topics.

Parameters
caching_topicThe caching topic.
Returns
The topic that publishes cache update events, along with passthrough events from the underlying topic.
Return values
NULLif caching_topic is NULL.
Since
12

Definition at line 85 of file stasis_cache.c.

86{
87 return caching_topic->topic;
88}

References stasis_caching_topic::topic.

Referenced by ast_device_state_topic_cached(), ast_mwi_topic_cached(), ast_presence_state_topic_cached(), AST_TEST_DEFINE(), stasis_cp_single_create(), and stasis_cp_single_topic_cached().

◆ stasis_caching_set_filter()

int stasis_caching_set_filter ( struct stasis_caching_topic caching_topic,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
caching_topicThe caching topic.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 109 of file stasis_cache.c.

111{
112 if (!caching_topic) {
113 return -1;
114 }
115 return stasis_subscription_set_filter(caching_topic->sub, filter);
116}
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078

References filter(), stasis_subscription_set_filter(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_set_filter().

◆ stasis_caching_topic_create()

struct stasis_caching_topic * stasis_caching_topic_create ( struct stasis_topic original_topic,
struct stasis_cache cache 
)

Create a topic which monitors and caches messages from another topic.

The idea is that some topics publish 'snapshots' of some other object's state that should be cached. When these snapshot messages are received, the cache is updated, and a stasis_cache_update() message is forwarded, which has both the original snapshot message and the new message.

The returned object is AO2 managed, so ao2_cleanup() when done with it.

Parameters
original_topicTopic publishing snapshot messages.
cacheBackend cache in which to keep snapshots.
Returns
New topic which changes snapshot messages to stasis_cache_update() messages, and forwards all other messages from the original topic.
Return values
NULLon error
Since
12

Definition at line 948 of file stasis_cache.c.

949{
950 struct stasis_caching_topic *caching_topic;
951 static int caching_id;
952 char *new_name;
953 int ret;
954
955 ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956 if (ret < 0) {
957 return NULL;
958 }
959
960 caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962 if (caching_topic == NULL) {
963 ast_free(new_name);
964
965 return NULL;
966 }
967
968 caching_topic->topic = stasis_topic_create(new_name);
969 if (caching_topic->topic == NULL) {
970 ao2_ref(caching_topic, -1);
971 ast_free(new_name);
972
973 return NULL;
974 }
975
976 ao2_ref(cache, +1);
977 caching_topic->cache = cache;
978 if (!cache->registered) {
979 if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980 ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981 cache->entries, new_name);
982 } else {
983 cache->registered = 1;
984 }
985 }
986 ast_free(new_name);
987
988 caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989 if (caching_topic->sub == NULL) {
990 ao2_ref(caching_topic, -1);
991
992 return NULL;
993 }
994
996 caching_topic->original_topic = original_topic;
997
998 /* The subscription holds the reference, so no additional ref bump. */
999 return caching_topic;
1000}
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define LOG_ERROR
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
Definition: stasis_cache.c:937
static void stasis_caching_topic_dtor(void *obj)
Definition: stasis_cache.c:62
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: stasis_cache.c:833
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:857

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_container_register(), ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, ast_log, stasis_caching_topic::cache, cache, caching_topic_exec(), internal_stasis_subscribe(), LOG_ERROR, NULL, stasis_caching_topic::original_topic, print_cache_entry(), stasis_caching_topic_dtor(), stasis_topic_create(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), devstate_init(), mwi_init(), and stasis_cp_sink_create().

◆ stasis_caching_topic_dtor()

static void stasis_caching_topic_dtor ( void *  obj)
static

Definition at line 62 of file stasis_cache.c.

63{
64 struct stasis_caching_topic *caching_topic = obj;
65
66 /* Caching topics contain subscriptions, and must be manually
67 * unsubscribed. */
69 /* If there are any messages in flight to this subscription; that would
70 * be bad. */
72
74
75 ao2_cleanup(caching_topic->sub);
76 caching_topic->sub = NULL;
77 ao2_cleanup(caching_topic->cache);
78 caching_topic->cache = NULL;
79 ao2_cleanup(caching_topic->topic);
80 caching_topic->topic = NULL;
81 ao2_cleanup(caching_topic->original_topic);
82 caching_topic->original_topic = NULL;
83}
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1119
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1151

References ao2_cleanup, ao2_container_unregister(), ast_assert, stasis_caching_topic::cache, NULL, stasis_caching_topic::original_topic, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by stasis_caching_topic_create().

◆ stasis_caching_unsubscribe()

struct stasis_caching_topic * stasis_caching_unsubscribe ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic.

This function returns immediately, so be sure to cleanup when stasis_subscription_final_message() is received.

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 119 of file stasis_cache.c.

120{
121 if (!caching_topic) {
122 return NULL;
123 }
124
125 /*
126 * The subscription may hold the last reference to this caching
127 * topic, but we want to make sure the unsubscribe finishes
128 * before kicking of the caching topic's dtor.
129 */
130 ao2_ref(caching_topic, +1);
131
132 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133 /*
134 * Increment the reference to hold on to it past the
135 * unsubscribe. Will be cleaned up in dtor.
136 */
137 ao2_ref(caching_topic->sub, +1);
138 stasis_unsubscribe(caching_topic->sub);
139 } else {
140 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141 }
142 ao2_cleanup(caching_topic);
143 return NULL;
144}
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:972

References ao2_cleanup, ao2_ref, ast_log, LOG_ERROR, NULL, stasis_subscription_is_subscribed(), stasis_unsubscribe(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), stasis_caching_unsubscribe_and_join(), and stasis_cp_single_unsubscribe().

◆ stasis_caching_unsubscribe_and_join()

struct stasis_caching_topic * stasis_caching_unsubscribe_and_join ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.

See stasis_unsubscribe_and_join() for more info on when to use this as opposed to stasis_caching_unsubscribe().

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 146 of file stasis_cache.c.

147{
148 if (!caching_topic) {
149 return NULL;
150 }
151
152 /* Hold a ref past the unsubscribe */
153 ao2_ref(caching_topic, +1);
154 stasis_caching_unsubscribe(caching_topic);
155 stasis_subscription_join(caching_topic->sub);
156 ao2_cleanup(caching_topic);
157 return NULL;
158}
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1106
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119

References ao2_cleanup, ao2_ref, NULL, stasis_caching_unsubscribe(), stasis_subscription_join(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), devstate_cleanup(), mwi_cleanup(), and presence_state_engine_cleanup().

◆ STASIS_MESSAGE_TYPE_DEFN() [1/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_cache_clear_type  )

◆ STASIS_MESSAGE_TYPE_DEFN() [2/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_cache_update_type  )

◆ update_create()

static struct stasis_message * update_create ( struct stasis_message old_snapshot,
struct stasis_message new_snapshot 
)
static

Definition at line 795 of file stasis_cache.c.

796{
798 struct stasis_message *msg;
799
800 ast_assert(old_snapshot != NULL || new_snapshot != NULL);
801
803 return NULL;
804 }
805
808 if (!update) {
809 return NULL;
810 }
811
812 if (old_snapshot) {
813 ao2_ref(old_snapshot, +1);
814 update->old_snapshot = old_snapshot;
815 if (!new_snapshot) {
816 ao2_ref(stasis_message_type(old_snapshot), +1);
817 update->type = stasis_message_type(old_snapshot);
818 }
819 }
820 if (new_snapshot) {
821 ao2_ref(new_snapshot, +1);
822 update->new_snapshot = new_snapshot;
823 ao2_ref(stasis_message_type(new_snapshot), +1);
824 update->type = stasis_message_type(new_snapshot);
825 }
826
828
830 return msg;
831}
static void stasis_cache_update_dtor(void *obj)
Definition: stasis_cache.c:783

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_ref, ast_assert, NULL, stasis_cache_update_dtor(), stasis_cache_update_type(), stasis_message_create(), stasis_message_type(), and update().

Referenced by caching_topic_exec().