Asterisk - The Open Source Telephony Project GIT-master-3dae2cf
Functions
stasis_cache_pattern.h File Reference

Caching pattern for Stasis Message Bus API topics. More...

#include "asterisk/stasis.h"
Include dependency graph for stasis_cache_pattern.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

struct stasis_cachestasis_cp_all_cache (struct stasis_cp_all *all)
 Get the cache. More...
 
struct stasis_cp_allstasis_cp_all_create (const char *name, snapshot_get_id id_fn)
 Create an all instance of the cache pattern. More...
 
struct stasis_topicstasis_cp_all_topic (struct stasis_cp_all *all)
 Get the aggregate topic. More...
 
struct stasis_topicstasis_cp_all_topic_cached (struct stasis_cp_all *all)
 Get the caching topic. More...
 
int stasis_cp_single_accept_message_type (struct stasis_cp_single *one, struct stasis_message_type *type)
 Indicate to an instance that we are interested in a message type. More...
 
struct stasis_cp_singlestasis_cp_single_create (struct stasis_cp_all *all, const char *name)
 Create the 'one' side of the cache pattern. More...
 
int stasis_cp_single_set_filter (struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_topicstasis_cp_single_topic (struct stasis_cp_single *one)
 Get the topic for this instance. More...
 
struct stasis_topicstasis_cp_single_topic_cached (struct stasis_cp_single *one)
 Get the caching topic for this instance. More...
 
void stasis_cp_single_unsubscribe (struct stasis_cp_single *one)
 Stops caching and forwarding messages. More...
 
struct stasis_cp_singlestasis_cp_sink_create (struct stasis_cp_all *all, const char *name)
 Create a sink in the cache pattern. More...
 

Detailed Description

Caching pattern for Stasis Message Bus API topics.

A typical pattern for Stasis objects is to have individual objects, which have their own topic and caching topic. These individual topics feed an upstream aggregate topics, and a shared cache.

The stasis_cp_all object contains the aggregate topics and shared cache. This is built with the base name for the topics, and the identity function to identify messages in the cache.

The stasis_cp_single object contains the stasis_topic for a single instance, and the corresponding stasis_caching_topic.

Since the stasis_cp_single object has subscriptions for forwarding and caching, it must be disposed of using stasis_cp_single_unsubscribe() instead of simply ao2_cleanup().

Definition in file stasis_cache_pattern.h.

Function Documentation

◆ stasis_cp_all_cache()

struct stasis_cache * stasis_cp_all_cache ( struct stasis_cp_all all)

Get the cache.

This is the shared cache for all corresponding stasis_cp_single objects.

Parameters
allAll side caching pattern object.
Returns
The cache.
Return values
NULLif all is NULL

Definition at line 118 of file stasis_cache_pattern.c.

119{
120 if (!all) {
121 return NULL;
122 }
123 return all->cache;
124}
#define NULL
Definition: resample.c:96
struct stasis_cache * cache

References stasis_cp_all::cache, and NULL.

Referenced by ast_endpoint_cache().

◆ stasis_cp_all_create()

struct stasis_cp_all * stasis_cp_all_create ( const char *  name,
snapshot_get_id  id_fn 
)

Create an all instance of the cache pattern.

This object is AO2 managed, so dispose of it with ao2_cleanup().

Parameters
nameBase name of the topics.
id_fnIdentity function for the cache.
Returns
All side instance.
Return values
NULLon error.

Definition at line 65 of file stasis_cache_pattern.c.

67{
68 char *cached_name = NULL;
69 struct stasis_cp_all *all;
70 static int cache_id;
71
72 all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73 if (!all) {
74 return NULL;
75 }
76
77 ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78 if (!cached_name) {
79 ao2_ref(all, -1);
80
81 return NULL;
82 }
83
85 all->topic_cached = stasis_topic_create(cached_name);
86 ast_free(cached_name);
87 all->cache = stasis_cache_create(id_fn);
90
91 if (!all->topic || !all->topic_cached || !all->cache ||
93 ao2_ref(all, -1);
94
95 return NULL;
96 }
97
98 return all;
99}
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static const char name[]
Definition: format_mp3.c:68
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1579
static void all_dtor(void *obj)
struct stasis_topic * topic
struct stasis_forward * forward_all_to_cached
struct stasis_topic * topic_cached

References all_dtor(), ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, stasis_cp_all::cache, stasis_cp_all::forward_all_to_cached, name, NULL, stasis_cache_create(), stasis_forward_all(), stasis_topic_create(), stasis_cp_all::topic, and stasis_cp_all::topic_cached.

Referenced by ast_endpoint_stasis_init().

◆ stasis_cp_all_topic()

struct stasis_topic * stasis_cp_all_topic ( struct stasis_cp_all all)

Get the aggregate topic.

This topic aggregates all messages published to corresponding stasis_cp_single_topic() topics.

Parameters
allAll side caching pattern object.
Returns
The aggregate topic.
Return values
NULLif all is NULL

Definition at line 101 of file stasis_cache_pattern.c.

102{
103 if (!all) {
104 return NULL;
105 }
106 return all->topic;
107}

References NULL, and stasis_cp_all::topic.

Referenced by ast_endpoint_topic_all().

◆ stasis_cp_all_topic_cached()

struct stasis_topic * stasis_cp_all_topic_cached ( struct stasis_cp_all all)

Get the caching topic.

This topic aggregates all messages from the corresponding stasis_cp_single_topic_cached() topics.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
allAll side caching pattern object.
Returns
The aggregate caching topic.
Return values
NULLif all is NULL

Definition at line 109 of file stasis_cache_pattern.c.

111{
112 if (!all) {
113 return NULL;
114 }
115 return all->topic_cached;
116}

References NULL, and stasis_cp_all::topic_cached.

Referenced by ast_endpoint_topic_all_cached().

◆ stasis_cp_single_accept_message_type()

int stasis_cp_single_accept_message_type ( struct stasis_cp_single one,
struct stasis_message_type type 
)

Indicate to an instance that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
oneOne side of the cache pattern.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 222 of file stasis_cache_pattern.c.

224{
225 if (!one) {
226 return -1;
227 }
229}
static const char type[]
Definition: chan_ooh323.c:109
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_caching_topic * topic_cached

References stasis_caching_accept_message_type(), stasis_cp_single::topic_cached, and type.

Referenced by endpoint_internal_create().

◆ stasis_cp_single_create()

struct stasis_cp_single * stasis_cp_single_create ( struct stasis_cp_all all,
const char *  name 
)

Create the 'one' side of the cache pattern.

Create the 'one' and forward to all's topic and topic_cached.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 139 of file stasis_cache_pattern.c.

141{
142 struct stasis_cp_single *one;
143
144 one = stasis_cp_sink_create(all, name);
145 if (!one) {
146 return NULL;
147 }
148
152
153 if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154 ao2_ref(one, -1);
155
156 return NULL;
157 }
158
159 return one;
160}
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
struct stasis_topic * topic
struct stasis_forward * forward_topic_to_all
struct stasis_forward * forward_cached_to_all

References ao2_ref, stasis_cp_single::forward_cached_to_all, stasis_cp_single::forward_topic_to_all, name, NULL, stasis_caching_get_topic(), stasis_cp_sink_create(), stasis_forward_all(), stasis_cp_all::topic, stasis_cp_single::topic, stasis_cp_all::topic_cached, and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create().

◆ stasis_cp_single_set_filter()

int stasis_cp_single_set_filter ( struct stasis_cp_single one,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
oneOne side of the cache pattern.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 231 of file stasis_cache_pattern.c.

233{
234 if (!one) {
235 return -1;
236 }
238}
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109

References filter(), stasis_caching_set_filter(), and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create().

◆ stasis_cp_single_topic()

struct stasis_topic * stasis_cp_single_topic ( struct stasis_cp_single one)

Get the topic for this instance.

This is the topic to which one would post instance-specific messages, or subscribe for single-instance, uncached messages.

Parameters
oneOne side of the cache pattern.
Returns
The main topic.
Return values
NULLif one is NULL

Definition at line 205 of file stasis_cache_pattern.c.

206{
207 if (!one) {
208 return NULL;
209 }
210 return one->topic;
211}

References NULL, and stasis_cp_single::topic.

Referenced by ast_endpoint_topic(), and endpoint_internal_create().

◆ stasis_cp_single_topic_cached()

struct stasis_topic * stasis_cp_single_topic_cached ( struct stasis_cp_single one)

Get the caching topic for this instance.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
oneOne side of the cache pattern.
Returns
The caching topic.
Return values
NULLif one is NULL

Definition at line 213 of file stasis_cache_pattern.c.

215{
216 if (!one) {
217 return NULL;
218 }
220}

References NULL, stasis_caching_get_topic(), and stasis_cp_single::topic_cached.

Referenced by ast_endpoint_topic_cached().

◆ stasis_cp_single_unsubscribe()

void stasis_cp_single_unsubscribe ( struct stasis_cp_single one)

Stops caching and forwarding messages.

Parameters
oneOne side of the cache pattern.

Definition at line 189 of file stasis_cache_pattern.c.

190{
191 if (!one) {
192 return;
193 }
194
200 one->topic_cached = NULL;
201
202 ao2_cleanup(one);
203}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1549

References ao2_cleanup, stasis_cp_single::forward_cached_to_all, stasis_cp_single::forward_topic_to_all, NULL, stasis_caching_unsubscribe(), stasis_forward_cancel(), and stasis_cp_single::topic_cached.

Referenced by endpoint_dtor().

◆ stasis_cp_sink_create()

struct stasis_cp_single * stasis_cp_sink_create ( struct stasis_cp_all all,
const char *  name 
)

Create a sink in the cache pattern.

Create the 'one' but do not automatically forward to the all's topic. This is useful when aggregating other topic's messages created with stasis_cp_single_create in another caching topic without replicating those messages in the all's topics.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 162 of file stasis_cache_pattern.c.

164{
165 struct stasis_cp_single *one;
166
167 one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168 if (!one) {
169 return NULL;
170 }
171
173 if (!one->topic) {
174 ao2_ref(one, -1);
175
176 return NULL;
177 }
178
180 if (!one->topic_cached) {
181 ao2_ref(one, -1);
182
183 return NULL;
184 }
185
186 return one;
187}
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
static void one_dtor(void *obj)

References ao2_ref, ao2_t_alloc, stasis_cp_all::cache, name, NULL, one_dtor(), stasis_caching_topic_create(), stasis_topic_create(), stasis_cp_single::topic, and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create(), and stasis_cp_single_create().