Asterisk - The Open Source Telephony Project GIT-master-f36a736
test_stasis_state.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2019, Sangoma Technologies Corporation
5 *
6 * Kevin Harwell <kharwell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*** MODULEINFO
20 <depend>TEST_FRAMEWORK</depend>
21 <support_level>core</support_level>
22 ***/
23
24#include "asterisk.h"
25
26#include "asterisk/astobj2.h"
28#include "asterisk/module.h"
30#include "asterisk/test.h"
31
32#define test_category "/stasis/core/state/"
33
34#define TOPIC_COUNT 500
35
36#define MANAGER_TOPIC "foo"
37
39
40/*! foo stasis message type */
42
43/*! foo_type data */
44struct foo_data {
45 size_t bar;
46};
47
50
51/*!
52 * For testing purposes each subscribed state's id is a number. This value is
53 * the summation of all id's.
54 */
55static size_t sum_total;
56
57/*! Test variable that tracks the running total of state ids */
58static size_t running_total;
59
60/*! This value is set to check if state data is NULL before publishing */
61static int expect_null;
62
63static int validate_data(const char *id, struct foo_data *foo)
64{
65 size_t num;
66 uintmax_t tmp;
67
68 if (ast_str_to_umax(id, &tmp)) {
69 ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
70 return -1;
71 }
72 num = (size_t) tmp;
73
74 running_total += num;
75
76 if (!foo) {
77 if (expect_null) {
78 return 0;
79 }
80
81 ast_log(LOG_ERROR, "Expected state data for '%s'\n", id);
82 return -1;
83 }
84
85 if (expect_null) {
86 ast_log(LOG_ERROR, "Expected NULL state data for '%s'\n", id);
87 return -1;
88 }
89
90 if (foo->bar != num) {
91 ast_log(LOG_ERROR, "Unexpected state data for '%s'\n", id);
92 return -1;
93 }
94
95 return 0;
96}
97
98static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
99{
101 validate_data(id, foo);
102 ao2_cleanup(foo);
103}
104
107 .on_unsubscribe = handle_validate
108};
109
110static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
111{
112 /* No op since we are not really testing stasis topic handling here */
113}
114
115static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
116{
118
120 AST_VECTOR_FREE(subs);
121
123
124 if (running_total != sum_total) {
125 ast_log(LOG_ERROR, "Failed to destroy all subscriptions: running=%zu, sum=%zu\n",
127 return -1;
128 }
129
130 return 0;
131}
132
134 struct subscriptions *subs)
135{
136 size_t i;
137
140 return -1;
141 }
142
144 expect_null = 1;
145
146 for (i = 0; i < TOPIC_COUNT; ++i) {
148 char id[32];
149
150 if (snprintf(id, 10, "%zu", i) == -1) {
151 ast_log(LOG_ERROR, "Unable to convert subscriber id to string\n");
152 break;
153 }
154
156 if (!sub) {
157 ast_log(LOG_ERROR, "Failed to create a state subscriber for id '%s'\n", id);
158 ao2_ref(sub, -1);
159 break;
160 }
161
162 if (AST_VECTOR_APPEND(subs, sub)) {
163 ast_log(LOG_ERROR, "Failed to add to foo_sub to vector for id '%s'\n", id);
164 ao2_ref(sub, -1);
165 break;
166 }
167
168 sum_total += i;
169 }
170
171 if (i != TOPIC_COUNT || running_total != sum_total) {
172 ast_log(LOG_ERROR, "Failed to create all subscriptions: running=%zu, sum=%zu\n",
174 subscriptions_destroy(manager, subs);
175 return -1;
176 }
177
178 return 0;
179}
180
181static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
182{
183 size_t i;
184
185 if (pubs) {
186 /* Remove explicit publishers */
188 AST_VECTOR_FREE(pubs);
189 return 0;
190 }
191
192 for (i = 0; i < TOPIC_COUNT; ++i) {
193 char id[32];
194
195 /* Remove implicit publishers */
196 if (snprintf(id, 10, "%zu", i) == -1) {
197 ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
198 return -1;
199 }
200
202 }
203
204 return 0;
205}
206
207static int publishers_create(struct stasis_state_manager *manager,
208 struct publishers *pubs)
209{
210 size_t i;
211
212 if (AST_VECTOR_INIT(pubs, TOPIC_COUNT)) {
213 return -1;
214 }
215
216 for (i = 0; i < TOPIC_COUNT; ++i) {
217 struct stasis_state_publisher *pub;
218 char id[32];
219
220 if (snprintf(id, 10, "%zu", i) == -1) {
221 ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
222 break;
223 }
224
225 /* Create the state publisher */
226 pub = stasis_state_add_publisher(manager, id);
227 if (!pub) {
228 ast_log(LOG_ERROR, "Failed to create a state publisher for id '%s'\n", id);
229 break;
230 }
231
232 if (AST_VECTOR_APPEND(pubs, pub)) {
233 ast_log(LOG_ERROR, "Failed to add to publisher to vector for id '%s'\n", id);
234 ao2_ref(pub, -1);
235 break;
236 }
237 }
238
239 if (i != TOPIC_COUNT) {
240 ast_log(LOG_ERROR, "Failed to create all publishers: count=%zu\n", i);
241 publishers_destroy(manager, pubs);
242 return -1;
243 }
244
245 return 0;
246}
247
248static struct stasis_message *create_foo_type_message(const char *id)
249{
250 struct stasis_message *msg;
251 struct foo_data *foo;
252 uintmax_t tmp;
253
254 foo = ao2_alloc(sizeof(*foo), NULL);
255 if (!foo) {
256 ast_log(LOG_ERROR, "Failed to allocate foo data for '%s'\n", id);
257 return NULL;
258 }
259
260 if (ast_str_to_umax(id, &tmp)) {
261 ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
262 ao2_ref(foo, -1);
263 return NULL;
264 }
265 foo->bar = (size_t) tmp;
266
268 if (!msg) {
269 ast_log(LOG_ERROR, "Failed to create stasis message for '%s'\n", id);
270 }
271
272 ao2_ref(foo, -1);
273 return msg;
274}
275
276static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
277{
278 /* For each state object create and publish new state data */
279 struct foo_data *foo = stasis_message_data(msg);
280
281 if (validate_data(id, foo)) {
282 return CMP_STOP;
283 }
284
285 msg = create_foo_type_message(id);
286 if (!msg) {
287 return CMP_STOP;
288 }
289
290 /* Now publish it on the managed state object */
291 stasis_state_publish_by_id(user_data, id, NULL, msg);
292 ao2_ref(msg, -1);
293
294 return 0;
295}
296
297static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
298{
299 /* For each state object create and publish new state data */
300 struct publishers *pubs = user_data;
301 struct stasis_state_publisher *pub = NULL;
302 struct foo_data *foo = stasis_message_data(msg);
303 size_t i;
304
305 if (validate_data(id, foo)) {
306 return CMP_STOP;
307 }
308
309 msg = create_foo_type_message(id);
310 if (!msg) {
311 return CMP_STOP;
312 }
313
314 for (i = 0; i < AST_VECTOR_SIZE(pubs); ++i) {
315 if (!strcmp(stasis_state_publisher_id(AST_VECTOR_GET(pubs, i)), id)) {
316 pub = AST_VECTOR_GET(pubs, i);
317 break;
318 }
319 }
320
321 if (!pub) {
322 ast_log(LOG_ERROR, "Unable to locate publisher for id '%s'\n", id);
323 return CMP_STOP;
324 }
325
326 stasis_state_publish(pub, msg);
327 ao2_ref(msg, -1);
328
329 return 0;
330}
331
332static int publish(struct stasis_state_manager *manager, on_stasis_state cb,
333 void *user_data)
334{
335 /* First time there is no state data */
336 expect_null = 1;
337
338 running_total = 0;
339 stasis_state_callback_all(manager, cb, user_data);
340
341 if (running_total != sum_total) {
342 ast_log(LOG_ERROR, "Failed manager_callback (1): running=%zu, sum=%zu\n",
344 return -1;
345 }
346
347 /* Second time check valid state data exists */
349 stasis_state_callback_all(manager, cb, user_data);
350
351 if (running_total != sum_total) {
352 ast_log(LOG_ERROR, "Failed manager_callback (2): running=%zu, sum=%zu\n",
354 return -1;
355 }
356
357 return 0;
358}
359
360AST_TEST_DEFINE(implicit_publish)
361{
362 RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
363 struct subscriptions subs;
364 int rc = AST_TEST_PASS;
365
366 switch (cmd) {
367 case TEST_INIT:
368 info->name = __func__;
369 info->category = test_category;
370 info->summary = "Test implicit publishing of stasis state";
371 info->description = info->summary;
372 return AST_TEST_NOT_RUN;
373 case TEST_EXECUTE:
374 break;
375 }
376
378 ast_test_validate(test, manager != NULL);
379
380 ast_test_validate(test, !subscriptions_create(manager, &subs));
381
382 ast_test_validate_cleanup(test, !publish(manager, implicit_publish_cb, manager),
383 rc, cleanup);
384
385cleanup:
386 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, NULL)) {
387 return AST_TEST_FAIL;
388 }
389
390 /*
391 * State subscriptions add a ref a state. The state in turn adds a ref
392 * to the manager. So if more than one ref is held on the manager before
393 * exiting, there is a ref leak some place.
394 */
395 if (ao2_ref(manager, 0) != 1) {
396 ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
397 return AST_TEST_FAIL;
398 }
399
400 return rc;
401}
402
403AST_TEST_DEFINE(explicit_publish)
404{
405 RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
406 struct subscriptions subs;
407 struct publishers pubs;
408 int rc = AST_TEST_PASS;
409
410 switch (cmd) {
411 case TEST_INIT:
412 info->name = __func__;
413 info->category = test_category;
414 info->summary = "Test explicit publishing of stasis state";
415 info->description = info->summary;
416 return AST_TEST_NOT_RUN;
417 case TEST_EXECUTE:
418 break;
419 }
420
422 ast_test_validate(test, manager != NULL);
423
424 ast_test_validate(test, !subscriptions_create(manager, &subs));
425 ast_test_validate_cleanup(test, !publishers_create(manager, &pubs), rc, cleanup);
426
427 ast_test_validate_cleanup(test, !publish(manager, explicit_publish_cb, &pubs),
428 rc, cleanup);
429
430cleanup:
431 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, &pubs)) {
432 return AST_TEST_FAIL;
433 }
434
435 /*
436 * State subscriptions add a ref a state. The state in turn adds a ref
437 * to the manager. So if more than one ref is held on the manager before
438 * exiting, there is a ref leak some place.
439 */
440 if (ao2_ref(manager, 0) != 1) {
441 ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
442 return AST_TEST_FAIL;
443 }
444
445 return rc;
446}
447
448static int unload_module(void)
449{
450 AST_TEST_UNREGISTER(implicit_publish);
451 AST_TEST_UNREGISTER(explicit_publish);
452
454
455 return 0;
456}
457
458static int load_module(void)
459{
461 return -1;
462 }
463
464 AST_TEST_REGISTER(implicit_publish);
465 AST_TEST_REGISTER(explicit_publish);
466
468}
469
Asterisk main include file. File version handling, generic pbx functions.
#define ast_log
Definition: astobj2.c:42
@ CMP_STOP
Definition: astobj2.h:1028
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static int tmp()
Definition: bt_open.c:389
Conversion utility functions.
int ast_str_to_umax(const char *str, uintmax_t *res)
Convert the given string to an unsigned max size integer.
Definition: conversions.c:119
#define LOG_ERROR
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
def info(msg)
static void * cleanup(void *unused)
Definition: pbx_realtime.c:124
struct stasis_forward * sub
Definition: res_corosync.c:240
#define NULL
Definition: resample.c:96
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
Stasis State API.
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:741
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:325
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:639
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:701
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:478
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:689
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
Definition: stasis_state.c:553
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:447
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:659
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:563
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:532
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:498
Managed stasis state event interface.
Definition: stasis_state.h:463
void(* on_subscribe)(const char *id, struct stasis_state_subscriber *sub)
Raised when any managed state is being subscribed.
Definition: stasis_state.h:470
Test Framework API.
@ TEST_INIT
Definition: test.h:200
@ TEST_EXECUTE
Definition: test.h:201
#define AST_TEST_REGISTER(cb)
Definition: test.h:127
#define AST_TEST_UNREGISTER(cb)
Definition: test.h:128
@ AST_TEST_PASS
Definition: test.h:195
@ AST_TEST_FAIL
Definition: test.h:196
@ AST_TEST_NOT_RUN
Definition: test.h:194
static int expect_null
static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
struct stasis_message_type * foo_type(void)
#define MANAGER_TOPIC
static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
static size_t sum_total
#define TOPIC_COUNT
AST_TEST_DEFINE(implicit_publish)
static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct stasis_message * create_foo_type_message(const char *id)
struct stasis_state_observer foo_observer
#define test_category
static int publishers_create(struct stasis_state_manager *manager, struct publishers *pubs)
static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
STASIS_MESSAGE_TYPE_DEFN(foo_type)
static int load_module(void)
static size_t running_total
static int unload_module(void)
static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
static int publish(struct stasis_state_manager *manager, on_stasis_state cb, void *user_data)
static int validate_data(const char *id, struct foo_data *foo)
static int subscriptions_create(struct stasis_state_manager *manager, struct subscriptions *subs)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:862
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680