Asterisk - The Open Source Telephony Project GIT-master-f36a736
stasis_cache_pattern.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Typical cache pattern for Stasis topics.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
34
39
41};
42
46
49};
50
51static void all_dtor(void *obj)
52{
53 struct stasis_cp_all *all = obj;
54
55 ao2_cleanup(all->topic);
56 all->topic = NULL;
58 all->topic_cached = NULL;
59 ao2_cleanup(all->cache);
60 all->cache = NULL;
63}
64
66 snapshot_get_id id_fn)
67{
68 char *cached_name = NULL;
69 struct stasis_cp_all *all;
70 static int cache_id;
71
72 all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73 if (!all) {
74 return NULL;
75 }
76
77 ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78 if (!cached_name) {
79 ao2_ref(all, -1);
80
81 return NULL;
82 }
83
85 all->topic_cached = stasis_topic_create(cached_name);
86 ast_free(cached_name);
87 all->cache = stasis_cache_create(id_fn);
90
91 if (!all->topic || !all->topic_cached || !all->cache ||
93 ao2_ref(all, -1);
94
95 return NULL;
96 }
97
98 return all;
99}
100
102{
103 if (!all) {
104 return NULL;
105 }
106 return all->topic;
107}
108
110 struct stasis_cp_all *all)
111{
112 if (!all) {
113 return NULL;
114 }
115 return all->topic_cached;
116}
117
119{
120 if (!all) {
121 return NULL;
122 }
123 return all->cache;
124}
125
126static void one_dtor(void *obj)
127{
128 struct stasis_cp_single *one = obj;
129
130 /* Should already be unsubscribed */
134
135 ao2_cleanup(one->topic);
136 one->topic = NULL;
137}
138
140 const char *name)
141{
142 struct stasis_cp_single *one;
143
144 one = stasis_cp_sink_create(all, name);
145 if (!one) {
146 return NULL;
147 }
148
152
153 if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154 ao2_ref(one, -1);
155
156 return NULL;
157 }
158
159 return one;
160}
161
163 const char *name)
164{
165 struct stasis_cp_single *one;
166
167 one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168 if (!one) {
169 return NULL;
170 }
171
173 if (!one->topic) {
174 ao2_ref(one, -1);
175
176 return NULL;
177 }
178
180 if (!one->topic_cached) {
181 ao2_ref(one, -1);
182
183 return NULL;
184 }
185
186 return one;
187}
188
190{
191 if (!one) {
192 return;
193 }
194
200 one->topic_cached = NULL;
201
202 ao2_cleanup(one);
203}
204
206{
207 if (!one) {
208 return NULL;
209 }
210 return one->topic;
211}
212
214 struct stasis_cp_single *one)
215{
216 if (!one) {
217 return NULL;
218 }
220}
221
224{
225 if (!one) {
226 return -1;
227 }
229}
230
233{
234 if (!one) {
235 return -1;
236 }
238}
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static const char type[]
Definition: chan_ooh323.c:109
static const char name[]
Definition: format_mp3.c:68
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
#define NULL
Definition: resample.c:96
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1549
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1579
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1009
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the 'one' side of the cache pattern.
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
static void all_dtor(void *obj)
static void one_dtor(void *obj)
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
Caching pattern for Stasis Message Bus API topics.
struct stasis_topic * topic
struct stasis_forward * forward_all_to_cached
struct stasis_topic * topic_cached
struct stasis_cache * cache
struct stasis_topic * topic
struct stasis_caching_topic * topic_cached
struct stasis_forward * forward_topic_to_all
struct stasis_forward * forward_cached_to_all
Forwarding information.
Definition: stasis.c:1532
#define ast_assert(a)
Definition: utils.h:739