Asterisk - The Open Source Telephony Project GIT-master-b023714
Loading...
Searching...
No Matches
test_stasis_state.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2019, Sangoma Technologies Corporation
5 *
6 * Kevin Harwell <kharwell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*** MODULEINFO
20 <depend>TEST_FRAMEWORK</depend>
21 <support_level>core</support_level>
22 ***/
23
24#include "asterisk.h"
25
26#include "asterisk/astobj2.h"
28#include "asterisk/module.h"
30#include "asterisk/test.h"
31
32#define test_category "/stasis/core/state/"
33
34#define TOPIC_COUNT 500
35
36#define MANAGER_TOPIC "foo"
37
39
40/*! foo stasis message type */
42
43/*! foo_type data */
44struct foo_data {
45 size_t bar;
46};
47
50
51/*!
52 * For testing purposes each subscribed state's id is a number. This value is
53 * the summation of all id's.
54 */
55static size_t sum_total;
56
57/*! Test variable that tracks the running total of state ids */
58static size_t running_total;
59
60/*! This value is set to check if state data is NULL before publishing */
61static int expect_null;
62
63static int validate_data(const char *id, struct foo_data *foo)
64{
65 size_t num;
66 uintmax_t tmp;
67
68 if (ast_str_to_umax(id, &tmp)) {
69 ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
70 return -1;
71 }
72 num = (size_t) tmp;
73
74 running_total += num;
75
76 if (!foo) {
77 if (expect_null) {
78 return 0;
79 }
80
81 ast_log(LOG_ERROR, "Expected state data for '%s'\n", id);
82 return -1;
83 }
84
85 if (expect_null) {
86 ast_log(LOG_ERROR, "Expected NULL state data for '%s'\n", id);
87 return -1;
88 }
89
90 if (foo->bar != num) {
91 ast_log(LOG_ERROR, "Unexpected state data for '%s'\n", id);
92 return -1;
93 }
94
95 return 0;
96}
97
98static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
99{
101 validate_data(id, foo);
102 ao2_cleanup(foo);
103}
104
109
110static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
111{
112 /* No op since we are not really testing stasis topic handling here */
113}
114
115static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
116{
118
120 AST_VECTOR_FREE(subs);
121
123
124 if (running_total != sum_total) {
125 ast_log(LOG_ERROR, "Failed to destroy all subscriptions: running=%zu, sum=%zu\n",
127 return -1;
128 }
129
130 return 0;
131}
132
134 struct subscriptions *subs)
135{
136 size_t i;
137
140 return -1;
141 }
142
144 expect_null = 1;
145
146 for (i = 0; i < TOPIC_COUNT; ++i) {
148 char id[32];
149
150 if (snprintf(id, 10, "%zu", i) == -1) {
151 ast_log(LOG_ERROR, "Unable to convert subscriber id to string\n");
152 break;
153 }
154
156 if (!sub) {
157 ast_log(LOG_ERROR, "Failed to create a state subscriber for id '%s'\n", id);
158 ao2_ref(sub, -1);
159 break;
160 }
161
162 if (AST_VECTOR_APPEND(subs, sub)) {
163 ast_log(LOG_ERROR, "Failed to add to foo_sub to vector for id '%s'\n", id);
164 ao2_ref(sub, -1);
165 break;
166 }
167
168 sum_total += i;
169 }
170
171 if (i != TOPIC_COUNT || running_total != sum_total) {
172 ast_log(LOG_ERROR, "Failed to create all subscriptions: running=%zu, sum=%zu\n",
174 subscriptions_destroy(manager, subs);
175 return -1;
176 }
177
178 return 0;
179}
180
181static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
182{
183 size_t i;
184
185 if (pubs) {
186 /* Remove explicit publishers */
188 AST_VECTOR_FREE(pubs);
189 return 0;
190 }
191
192 for (i = 0; i < TOPIC_COUNT; ++i) {
193 char id[32];
194
195 /* Remove implicit publishers */
196 if (snprintf(id, 10, "%zu", i) == -1) {
197 ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
198 return -1;
199 }
200
202 }
203
204 return 0;
205}
206
207static int publishers_create(struct stasis_state_manager *manager,
208 struct publishers *pubs)
209{
210 size_t i;
211
212 if (AST_VECTOR_INIT(pubs, TOPIC_COUNT)) {
213 return -1;
214 }
215
216 for (i = 0; i < TOPIC_COUNT; ++i) {
217 struct stasis_state_publisher *pub;
218 char id[32];
219
220 if (snprintf(id, 10, "%zu", i) == -1) {
221 ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
222 break;
223 }
224
225 /* Create the state publisher */
226 pub = stasis_state_add_publisher(manager, id);
227 if (!pub) {
228 ast_log(LOG_ERROR, "Failed to create a state publisher for id '%s'\n", id);
229 break;
230 }
231
232 if (AST_VECTOR_APPEND(pubs, pub)) {
233 ast_log(LOG_ERROR, "Failed to add to publisher to vector for id '%s'\n", id);
234 ao2_ref(pub, -1);
235 break;
236 }
237 }
238
239 if (i != TOPIC_COUNT) {
240 ast_log(LOG_ERROR, "Failed to create all publishers: count=%zu\n", i);
241 publishers_destroy(manager, pubs);
242 return -1;
243 }
244
245 return 0;
246}
247
248static struct stasis_message *create_foo_type_message(const char *id)
249{
250 struct stasis_message *msg;
251 struct foo_data *foo;
252 uintmax_t tmp;
253
254 foo = ao2_alloc(sizeof(*foo), NULL);
255 if (!foo) {
256 ast_log(LOG_ERROR, "Failed to allocate foo data for '%s'\n", id);
257 return NULL;
258 }
259
260 if (ast_str_to_umax(id, &tmp)) {
261 ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
262 ao2_ref(foo, -1);
263 return NULL;
264 }
265 foo->bar = (size_t) tmp;
266
268 if (!msg) {
269 ast_log(LOG_ERROR, "Failed to create stasis message for '%s'\n", id);
270 }
271
272 ao2_ref(foo, -1);
273 return msg;
274}
275
276static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
277{
278 /* For each state object create and publish new state data */
279 struct foo_data *foo = stasis_message_data(msg);
280
281 if (validate_data(id, foo)) {
282 return CMP_STOP;
283 }
284
285 msg = create_foo_type_message(id);
286 if (!msg) {
287 return CMP_STOP;
288 }
289
290 /* Now publish it on the managed state object */
291 stasis_state_publish_by_id(user_data, id, NULL, msg);
292 ao2_ref(msg, -1);
293
294 return 0;
295}
296
297static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
298{
299 /* For each state object create and publish new state data */
300 struct publishers *pubs = user_data;
301 struct stasis_state_publisher *pub = NULL;
302 struct foo_data *foo = stasis_message_data(msg);
303 size_t i;
304
305 if (validate_data(id, foo)) {
306 return CMP_STOP;
307 }
308
309 msg = create_foo_type_message(id);
310 if (!msg) {
311 return CMP_STOP;
312 }
313
314 for (i = 0; i < AST_VECTOR_SIZE(pubs); ++i) {
315 if (!strcmp(stasis_state_publisher_id(AST_VECTOR_GET(pubs, i)), id)) {
316 pub = AST_VECTOR_GET(pubs, i);
317 break;
318 }
319 }
320
321 if (!pub) {
322 ast_log(LOG_ERROR, "Unable to locate publisher for id '%s'\n", id);
323 return CMP_STOP;
324 }
325
326 stasis_state_publish(pub, msg);
327 ao2_ref(msg, -1);
328
329 return 0;
330}
331
332static int publish(struct stasis_state_manager *manager, on_stasis_state cb,
333 void *user_data)
334{
335 /* First time there is no state data */
336 expect_null = 1;
337
338 running_total = 0;
339 stasis_state_callback_all(manager, cb, user_data);
340
341 if (running_total != sum_total) {
342 ast_log(LOG_ERROR, "Failed manager_callback (1): running=%zu, sum=%zu\n",
344 return -1;
345 }
346
347 /* Second time check valid state data exists */
349 stasis_state_callback_all(manager, cb, user_data);
350
351 if (running_total != sum_total) {
352 ast_log(LOG_ERROR, "Failed manager_callback (2): running=%zu, sum=%zu\n",
354 return -1;
355 }
356
357 return 0;
358}
359
360AST_TEST_DEFINE(implicit_publish)
361{
362 RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
363 struct subscriptions subs;
364 int rc = AST_TEST_PASS;
365
366 switch (cmd) {
367 case TEST_INIT:
368 info->name = __func__;
369 info->category = test_category;
370 info->summary = "Test implicit publishing of stasis state";
371 info->description = info->summary;
372 return AST_TEST_NOT_RUN;
373 case TEST_EXECUTE:
374 break;
375 }
376
378 ast_test_validate(test, manager != NULL);
379
380 ast_test_validate(test, !subscriptions_create(manager, &subs));
381
382 ast_test_validate_cleanup(test, !publish(manager, implicit_publish_cb, manager),
383 rc, cleanup);
384
385cleanup:
386 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, NULL)) {
387 return AST_TEST_FAIL;
388 }
389
390 /*
391 * State subscriptions add a ref a state. The state in turn adds a ref
392 * to the manager. So if more than one ref is held on the manager before
393 * exiting, there is a ref leak some place.
394 */
395 if (ao2_ref(manager, 0) != 1) {
396 ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
397 return AST_TEST_FAIL;
398 }
399
400 return rc;
401}
402
403AST_TEST_DEFINE(explicit_publish)
404{
405 RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
406 struct subscriptions subs;
407 struct publishers pubs;
408 int rc = AST_TEST_PASS;
409
410 switch (cmd) {
411 case TEST_INIT:
412 info->name = __func__;
413 info->category = test_category;
414 info->summary = "Test explicit publishing of stasis state";
415 info->description = info->summary;
416 return AST_TEST_NOT_RUN;
417 case TEST_EXECUTE:
418 break;
419 }
420
422 ast_test_validate(test, manager != NULL);
423
424 ast_test_validate(test, !subscriptions_create(manager, &subs));
425 ast_test_validate_cleanup(test, !publishers_create(manager, &pubs), rc, cleanup);
426
427 ast_test_validate_cleanup(test, !publish(manager, explicit_publish_cb, &pubs),
428 rc, cleanup);
429
430cleanup:
431 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, &pubs)) {
432 return AST_TEST_FAIL;
433 }
434
435 /*
436 * State subscriptions add a ref a state. The state in turn adds a ref
437 * to the manager. So if more than one ref is held on the manager before
438 * exiting, there is a ref leak some place.
439 */
440 if (ao2_ref(manager, 0) != 1) {
441 ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
442 return AST_TEST_FAIL;
443 }
444
445 return rc;
446}
447
448static int unload_module(void)
449{
450 AST_TEST_UNREGISTER(implicit_publish);
451 AST_TEST_UNREGISTER(explicit_publish);
452
454
455 return 0;
456}
457
458static int load_module(void)
459{
461 return -1;
462 }
463
464 AST_TEST_REGISTER(implicit_publish);
465 AST_TEST_REGISTER(explicit_publish);
466
468}
469
Asterisk main include file. File version handling, generic pbx functions.
#define ast_log
Definition astobj2.c:42
@ CMP_STOP
Definition astobj2.h:1028
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
Conversion utility functions.
int ast_str_to_umax(const char *str, uintmax_t *res)
Convert the given string to an unsigned max size integer.
#define LOG_ERROR
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
unsigned char publish
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition res_stasis.c:327
#define NULL
Definition resample.c:96
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition stasis.h:1515
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
Definition stasis.h:1440
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
Stasis State API.
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Managed stasis state event interface.
void(* on_subscribe)(const char *id, struct stasis_state_subscriber *sub)
Raised when any managed state is being subscribed.
Test Framework API.
@ TEST_INIT
Definition test.h:200
@ TEST_EXECUTE
Definition test.h:201
#define AST_TEST_REGISTER(cb)
Definition test.h:127
#define AST_TEST_UNREGISTER(cb)
Definition test.h:128
#define AST_TEST_DEFINE(hdr)
Definition test.h:126
@ AST_TEST_PASS
Definition test.h:195
@ AST_TEST_FAIL
Definition test.h:196
@ AST_TEST_NOT_RUN
Definition test.h:194
static int expect_null
static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
struct stasis_message_type * foo_type(void)
#define MANAGER_TOPIC
static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
static size_t sum_total
#define TOPIC_COUNT
static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct stasis_message * create_foo_type_message(const char *id)
struct stasis_state_observer foo_observer
#define test_category
static int publishers_create(struct stasis_state_manager *manager, struct publishers *pubs)
static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
static int load_module(void)
static size_t running_total
static int unload_module(void)
static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
static int validate_data(const char *id, struct foo_data *foo)
static int subscriptions_create(struct stasis_state_manager *manager, struct subscriptions *subs)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:978
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873
#define AST_VECTOR(name, type)
Define a vector structure.
Definition vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691