Asterisk - The Open Source Telephony Project GIT-master-a358458
res_stasis_test.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief Test infrastructure for dealing with Stasis.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <depend>TEST_FRAMEWORK</depend>
28 <support_level>core</support_level>
29 ***/
30
31#include "asterisk.h"
32
33#include "asterisk/astobj2.h"
34#include "asterisk/module.h"
36
38
39static void stasis_message_sink_dtor(void *obj)
40{
41 struct stasis_message_sink *sink = obj;
42
43 {
44 SCOPED_MUTEX(lock, &sink->lock);
45 while (!sink->is_done) {
46 /* Normally waiting forever is bad, but if we're not
47 * done, we're not done. */
48 ast_cond_wait(&sink->cond, &sink->lock);
49 }
50 }
51
52 ast_mutex_destroy(&sink->lock);
53 ast_cond_destroy(&sink->cond);
54
55 while (sink->num_messages > 0) {
56 ao2_cleanup(sink->messages[--sink->num_messages]);
57 }
58 ast_free(sink->messages);
59 sink->messages = NULL;
60 sink->max_messages = 0;
61}
62
63static struct timespec make_deadline(int timeout_millis)
64{
65 struct timeval start = ast_tvnow();
66 struct timeval delta = {
67 .tv_sec = timeout_millis / 1000,
68 .tv_usec = (timeout_millis % 1000) * 1000,
69 };
70 struct timeval deadline_tv = ast_tvadd(start, delta);
71 struct timespec deadline = {
72 .tv_sec = deadline_tv.tv_sec,
73 .tv_nsec = 1000 * deadline_tv.tv_usec,
74 };
75
76 return deadline;
77}
78
80{
82
83 sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
84 if (!sink) {
85 return NULL;
86 }
87 ast_mutex_init(&sink->lock);
88 ast_cond_init(&sink->cond, NULL);
89 sink->max_messages = 4;
90 sink->messages =
91 ast_malloc(sizeof(*sink->messages) * sink->max_messages);
92 if (!sink->messages) {
93 return NULL;
94 }
95 ao2_ref(sink, +1);
96 return sink;
97}
98
99/*!
100 * \brief Implementation of the stasis_message_sink_cb() callback.
101 *
102 * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
103 * it has to do with how we previously loaded modules, using \c RTLD_LAZY.
104 *
105 * The stasis_message_sink_cb() function gave us a layer of indirection so that
106 * the initial lazy binding would still work as expected.
107 */
108static void message_sink_cb(void *data, struct stasis_subscription *sub,
109 struct stasis_message *message)
110{
111 struct stasis_message_sink *sink = data;
112
113 SCOPED_MUTEX(lock, &sink->lock);
114
116 sink->is_done = 1;
117 ast_cond_signal(&sink->cond);
118 return;
119 }
120
122 /* Ignore subscription changes */
123 return;
124 }
125
126 if (sink->num_messages == sink->max_messages) {
127 size_t new_max_messages = sink->max_messages * 2;
128 struct stasis_message **new_messages = ast_realloc(
129 sink->messages,
130 sizeof(*new_messages) * new_max_messages);
131 if (!new_messages) {
132 return;
133 }
134 sink->max_messages = new_max_messages;
135 sink->messages = new_messages;
136 }
137
138 ao2_ref(message, +1);
139 sink->messages[sink->num_messages++] = message;
140 ast_cond_signal(&sink->cond);
141}
142
144{
145 return message_sink_cb;
146}
147
148
150 int num_messages, int timeout_millis)
151{
152 struct timespec deadline = make_deadline(timeout_millis);
153
154 SCOPED_MUTEX(lock, &sink->lock);
155 while (sink->num_messages < num_messages) {
156 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
157
158 if (r == ETIMEDOUT) {
159 break;
160 }
161 if (r != 0) {
162 ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
163 strerror(r));
164 break;
165 }
166 }
167 return sink->num_messages;
168}
169
171 int num_messages, int timeout_millis)
172{
173 struct timespec deadline = make_deadline(timeout_millis);
174
175 SCOPED_MUTEX(lock, &sink->lock);
176 while (sink->num_messages == num_messages) {
177 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
178
179 if (r == ETIMEDOUT) {
180 break;
181 }
182 if (r != 0) {
183 ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
184 strerror(r));
185 break;
186 }
187 }
188 return sink->num_messages;
189}
190
192 stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
193{
194 struct timespec deadline = make_deadline(timeout_millis);
195
196 SCOPED_MUTEX(lock, &sink->lock);
197
198 /* wait for the start */
199 while (sink->num_messages < start + 1) {
200 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
201
202 if (r == ETIMEDOUT) {
203 /* Timed out waiting for the start */
204 return -1;
205 }
206 if (r != 0) {
207 ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
208 strerror(r));
209 return -2;
210 }
211 }
212
213
214 while (!cmp_cb(sink->messages[start], data)) {
215 ++start;
216
217 while (sink->num_messages < start + 1) {
218 int r = ast_cond_timedwait(&sink->cond,
219 &sink->lock, &deadline);
220
221 if (r == ETIMEDOUT) {
222 return -1;
223 }
224 if (r != 0) {
226 "Unexpected condition error: %s\n",
227 strerror(r));
228 return -2;
229 }
230 }
231 }
232
233 return start;
234}
235
237{
238 RAII_VAR(void *, data, NULL, ao2_cleanup);
239
241 return NULL;
242 }
243
244 /* We just need the unique pointer; don't care what's in it */
245 data = ao2_alloc(1, NULL);
246 if (!data) {
247 return NULL;
248 }
249
251}
252
253static int unload_module(void)
254{
256 return 0;
257}
258
259static int load_module(void)
260{
263 }
264
266}
267
269 .support_level = AST_MODULE_SUPPORT_CORE,
270 .load = load_module,
271 .unload = unload_module,
272 .load_pri = AST_MODPRI_APP_DEPEND,
ast_mutex_t lock
Definition: app_sla.c:331
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_realloc(p, len)
A wrapper for realloc()
Definition: astmm.h:226
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_test_message_type(void)
Gets the type of messages created by stasis_test_message_create().
struct stasis_message * stasis_test_message_create(void)
Creates a test message.
#define LOG_ERROR
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_cond_signal(cond)
Definition: lock.h:203
Asterisk module definitions.
@ AST_MODFLAG_LOAD_ORDER
Definition: module.h:317
@ AST_MODFLAG_GLOBAL_SYMBOLS
Definition: module.h:316
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition: module.h:543
@ AST_MODPRI_APP_DEPEND
Definition: module.h:328
@ AST_MODULE_SUPPORT_CORE
Definition: module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
struct stasis_forward * sub
Definition: res_corosync.c:240
int stasis_message_sink_should_stay(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Ensures that no new messages are received.
static void message_sink_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Implementation of the stasis_message_sink_cb() callback.
static struct timespec make_deadline(int timeout_millis)
static void stasis_message_sink_dtor(void *obj)
STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type)
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Wait for a sink's num_messages field to reach a certain level.
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
Wait for a message that matches the given criteria.
static int load_module(void)
static int unload_module(void)
struct stasis_message_sink * stasis_message_sink_create(void)
Create a message sink.
stasis_subscription_cb stasis_message_sink_cb(void)
Topic callback to receive messages.
#define NULL
Definition: resample.c:96
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
Test infrastructure for dealing with Stasis.
int(* stasis_wait_cb)(struct stasis_message *msg, const void *data)
Definition: stasis_test.h:95
Structure that collects messages from a topic.
Definition: stasis_test.h:44
struct stasis_message ** messages
Definition: stasis_test.h:57
ast_mutex_t lock
Definition: stasis_test.h:46
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2282
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941