Asterisk - The Open Source Telephony Project GIT-master-7e7a603
test_stasis.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief Test Stasis message bus.
22 *
23 * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
24 *
25 * \ingroup tests
26 */
27
28/*** MODULEINFO
29 <depend>TEST_FRAMEWORK</depend>
30 <support_level>core</support_level>
31 ***/
32
33#include "asterisk.h"
34
35#include "asterisk/astobj2.h"
36#include "asterisk/module.h"
37#include "asterisk/stasis.h"
39#include "asterisk/test.h"
40
41#define test_category "/stasis/core/"
42
44{
47}
48
49static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
50{
51 const char *text = stasis_message_data(message);
52
54}
55
57{
59 const char *text = stasis_message_data(message);
60
62 "Message: %s\r\n", text);
63
64 if (res == NULL) {
65 return NULL;
66 }
67
68 ao2_ref(res, +1);
69 return res;
70}
71
74 .to_ami = fake_ami
75};
76
77AST_TEST_DEFINE(message_type)
78{
80
81 switch (cmd) {
82 case TEST_INIT:
83 info->name = __func__;
84 info->category = test_category;
85 info->summary = "Test basic message_type functions";
86 info->description = "Test basic message_type functions";
87 return AST_TEST_NOT_RUN;
88 case TEST_EXECUTE:
89 break;
90 }
91
93 ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
94 ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
95
96 return AST_TEST_PASS;
97}
98
100{
102 RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
103 RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
104 RAII_VAR(char *, data, NULL, ao2_cleanup);
105 char *expected = "SomeData";
106 struct timeval expected_timestamp;
107 struct timeval time_diff;
108 struct ast_eid foreign_eid;
109
110 switch (cmd) {
111 case TEST_INIT:
112 info->name = __func__;
113 info->category = test_category;
114 info->summary = "Test basic message functions";
115 info->description = "Test basic message functions";
116 return AST_TEST_NOT_RUN;
117 case TEST_EXECUTE:
118 break;
119 }
120
121
122 memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
123
124 ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
125
126 ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
127 ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
128
129 data = ao2_alloc(strlen(expected) + 1, NULL);
130 strcpy(data, expected);/* Safe */
131 expected_timestamp = ast_tvnow();
132 uut1 = stasis_message_create_full(type, data, &foreign_eid);
133 uut2 = stasis_message_create_full(type, data, NULL);
134
135 ast_test_validate(test, NULL != uut1);
136 ast_test_validate(test, NULL != uut2);
137 ast_test_validate(test, type == stasis_message_type(uut1));
138 ast_test_validate(test, type == stasis_message_type(uut2));
139 ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
140 ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
141 ast_test_validate(test, NULL != stasis_message_eid(uut1));
142 ast_test_validate(test, NULL == stasis_message_eid(uut2));
143 ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
144
145 ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
146
147 time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
148 /* 10ms is certainly long enough for the two calls to complete */
149 ast_test_validate(test, time_diff.tv_sec == 0);
150 ast_test_validate(test, time_diff.tv_usec < 10000);
151
152 ao2_ref(uut1, -1);
153 uut1 = NULL;
154 ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
155 ao2_ref(uut2, -1);
156 uut2 = NULL;
157 ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
158
159 return AST_TEST_PASS;
160}
161
162struct consumer {
168};
169
170static void consumer_dtor(void *obj)
171{
172 struct consumer *consumer = obj;
173
175
176 while (consumer->messages_rxed_len > 0) {
178 }
181}
182
184{
185 struct consumer *consumer;
186
188 if (!consumer) {
189 return NULL;
190 }
191
194 if (!consumer->messages_rxed) {
196 return NULL;
197 }
198
200
201 return consumer;
202}
203
204static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
205{
206 struct consumer *consumer = data;
207 RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
209
215 ao2_ref(message, +1);
216 }
217
219 consumer->complete = 1;
220 consumer_needs_cleanup = consumer;
221 }
222
224}
225
226static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
227{
228 struct consumer *consumer = data;
229 RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
231
237 ao2_ref(message, +1);
238 }
239
241 consumer->complete = 1;
242 consumer_needs_cleanup = consumer;
243 }
244}
245
246static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
247{
248 struct timeval start = ast_tvnow();
249 struct timespec end = {
250 .tv_sec = start.tv_sec + 30,
251 .tv_nsec = start.tv_usec * 1000
252 };
253
255
256 while (consumer->messages_rxed_len < expected_len) {
258
259 if (r == ETIMEDOUT) {
260 break;
261 }
262 ast_assert(r == 0); /* Not expecting any other types of errors */
263 }
265}
266
268{
269 struct timeval start = ast_tvnow();
270 struct timespec end = {
271 .tv_sec = start.tv_sec + 3,
272 .tv_nsec = start.tv_usec * 1000
273 };
274
276
277 while (!consumer->complete) {
279
280 if (r == ETIMEDOUT) {
281 break;
282 }
283 ast_assert(r == 0); /* Not expecting any other types of errors */
284 }
285 return consumer->complete;
286}
287
288static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
289{
290 struct timeval start = ast_tvnow();
291 struct timeval diff = {
292 .tv_sec = 0,
293 .tv_usec = 100000 /* wait for 100ms */
294 };
295 struct timeval end_tv = ast_tvadd(start, diff);
296 struct timespec end = {
297 .tv_sec = end_tv.tv_sec,
298 .tv_nsec = end_tv.tv_usec * 1000
299 };
300
302
303 while (consumer->messages_rxed_len == expected_len) {
305
306 if (r == ETIMEDOUT) {
307 break;
308 }
309 ast_assert(r == 0); /* Not expecting any other types of errors */
310 }
312}
313
314AST_TEST_DEFINE(subscription_messages)
315{
316 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
319 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
320 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
322 RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
323 int complete;
324 struct stasis_subscription_change *change;
325
326 switch (cmd) {
327 case TEST_INIT:
328 info->name = __func__;
329 info->category = test_category;
330 info->summary = "Test subscribe/unsubscribe messages";
331 info->description = "Test subscribe/unsubscribe messages";
332 return AST_TEST_NOT_RUN;
333 case TEST_EXECUTE:
334 break;
335 }
336
337 topic = stasis_topic_create("TestTopic");
338 ast_test_validate(test, NULL != topic);
339
341 ast_test_validate(test, NULL != consumer);
342
344 ast_test_validate(test, NULL != uut);
345 ao2_ref(consumer, +1);
346 expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
347
348 uut = stasis_unsubscribe(uut);
350 ast_test_validate(test, 1 == complete);
351
352 ast_test_validate(test, 2 == consumer->messages_rxed_len);
355
357 ast_test_validate(test, topic == change->topic);
358 ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
359 ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
360
362 ast_test_validate(test, topic == change->topic);
363 ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
364 ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
365
366 return AST_TEST_PASS;
367}
368
369AST_TEST_DEFINE(subscription_pool_messages)
370{
374 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
375 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
377 RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
378 int complete;
379 struct stasis_subscription_change *change;
380
381 switch (cmd) {
382 case TEST_INIT:
383 info->name = __func__;
384 info->category = test_category;
385 info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
386 info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
387 return AST_TEST_NOT_RUN;
388 case TEST_EXECUTE:
389 break;
390 }
391
392 topic = stasis_topic_create("TestTopic");
393 ast_test_validate(test, NULL != topic);
394
396 ast_test_validate(test, NULL != consumer);
397
399 ast_test_validate(test, NULL != uut);
400 ao2_ref(consumer, +1);
401 expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
402
403 uut = stasis_unsubscribe(uut);
405 ast_test_validate(test, 1 == complete);
406
407 ast_test_validate(test, 2 == consumer->messages_rxed_len);
410
412 ast_test_validate(test, topic == change->topic);
413 ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
414 ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
415
417 ast_test_validate(test, topic == change->topic);
418 ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
419 ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
420
421 return AST_TEST_PASS;
422}
423
425{
429 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
430 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
432 int actual_len;
433 const char *actual;
434
435 switch (cmd) {
436 case TEST_INIT:
437 info->name = __func__;
438 info->category = test_category;
439 info->summary = "Test publishing";
440 info->description = "Test publishing";
441 return AST_TEST_NOT_RUN;
442 case TEST_EXECUTE:
443 break;
444 }
445
446 topic = stasis_topic_create("TestTopic");
447 ast_test_validate(test, NULL != topic);
448
450 ast_test_validate(test, NULL != consumer);
451
453 ast_test_validate(test, NULL != uut);
454 ao2_ref(consumer, +1);
455
457 ast_test_validate(test, NULL != test_data);
458 ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
459 test_message = stasis_message_create(test_message_type, test_data);
460
461 stasis_publish(topic, test_message);
462
463 actual_len = consumer_wait_for(consumer, 1);
464 ast_test_validate(test, 1 == actual_len);
466 ast_test_validate(test, test_data == actual);
467
468 return AST_TEST_PASS;
469}
470
471AST_TEST_DEFINE(publish_sync)
472{
476 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
477 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
479 int actual_len;
480 const char *actual;
481
482 switch (cmd) {
483 case TEST_INIT:
484 info->name = __func__;
485 info->category = test_category;
486 info->summary = "Test synchronous publishing";
487 info->description = "Test synchronous publishing";
488 return AST_TEST_NOT_RUN;
489 case TEST_EXECUTE:
490 break;
491 }
492
493 topic = stasis_topic_create("TestTopic");
494 ast_test_validate(test, NULL != topic);
495
497 ast_test_validate(test, NULL != consumer);
498
500 ast_test_validate(test, NULL != uut);
501 ao2_ref(consumer, +1);
502
504 ast_test_validate(test, NULL != test_data);
505 ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
506 test_message = stasis_message_create(test_message_type, test_data);
507
508 stasis_publish_sync(uut, test_message);
509
510 actual_len = consumer->messages_rxed_len;
511 ast_test_validate(test, 1 == actual_len);
513 ast_test_validate(test, test_data == actual);
514
515 return AST_TEST_PASS;
516}
517
518AST_TEST_DEFINE(publish_pool)
519{
523 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
524 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
526 int actual_len;
527 const char *actual;
528
529 switch (cmd) {
530 case TEST_INIT:
531 info->name = __func__;
532 info->category = test_category;
533 info->summary = "Test publishing with a threadpool";
534 info->description = "Test publishing to a subscriber whose\n"
535 "subscription dictates messages are received through a\n"
536 "threadpool.";
537 return AST_TEST_NOT_RUN;
538 case TEST_EXECUTE:
539 break;
540 }
541
542 topic = stasis_topic_create("TestTopic");
543 ast_test_validate(test, NULL != topic);
544
546 ast_test_validate(test, NULL != consumer);
547
549 ast_test_validate(test, NULL != uut);
550 ao2_ref(consumer, +1);
551
553 ast_test_validate(test, NULL != test_data);
554 ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
555 test_message = stasis_message_create(test_message_type, test_data);
556
557 stasis_publish(topic, test_message);
558
559 actual_len = consumer_wait_for(consumer, 1);
560 ast_test_validate(test, 1 == actual_len);
562 ast_test_validate(test, test_data == actual);
563
564 return AST_TEST_PASS;
565}
566
567AST_TEST_DEFINE(unsubscribe_stops_messages)
568{
573 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
574 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
575 int actual_len;
576
577 switch (cmd) {
578 case TEST_INIT:
579 info->name = __func__;
580 info->category = test_category;
581 info->summary = "Test simple subscriptions";
582 info->description = "Test simple subscriptions";
583 return AST_TEST_NOT_RUN;
584 case TEST_EXECUTE:
585 break;
586 }
587
588 topic = stasis_topic_create("TestTopic");
589 ast_test_validate(test, NULL != topic);
590
592 ast_test_validate(test, NULL != consumer);
593
595 ast_test_validate(test, NULL != uut);
596 ao2_ref(consumer, +1);
597
598 uut = stasis_unsubscribe(uut);
599
601 ast_test_validate(test, NULL != test_data);
602 ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
603 test_message = stasis_message_create(test_message_type, test_data);
604
605 stasis_publish(topic, test_message);
606
607 actual_len = consumer_should_stay(consumer, 0);
608 ast_test_validate(test, 0 == actual_len);
609
610 return AST_TEST_PASS;
611}
612
614{
615 RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
617
618 RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
620
621 RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
622 RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
624
626 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
627 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
628 int actual_len;
629
630 switch (cmd) {
631 case TEST_INIT:
632 info->name = __func__;
633 info->category = test_category;
634 info->summary = "Test sending events to a parent topic";
635 info->description = "Test sending events to a parent topic.\n"
636 "This test creates three topics (one parent, two children)\n"
637 "and publishes a message to one child, and verifies it's\n"
638 "only seen by that child and the parent";
639 return AST_TEST_NOT_RUN;
640 case TEST_EXECUTE:
641 break;
642 }
643
644 parent_topic = stasis_topic_create("ParentTestTopic");
645 ast_test_validate(test, NULL != parent_topic);
646 topic = stasis_topic_create("TestTopic");
647 ast_test_validate(test, NULL != topic);
648
649 forward_sub = stasis_forward_all(topic, parent_topic);
650 ast_test_validate(test, NULL != forward_sub);
651
652 parent_consumer = consumer_create(1);
653 ast_test_validate(test, NULL != parent_consumer);
655 ast_test_validate(test, NULL != consumer);
656
657 parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
658 ast_test_validate(test, NULL != parent_sub);
659 ao2_ref(parent_consumer, +1);
661 ast_test_validate(test, NULL != sub);
662 ao2_ref(consumer, +1);
663
665 ast_test_validate(test, NULL != test_data);
666 ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
667 test_message = stasis_message_create(test_message_type, test_data);
668
669 stasis_publish(topic, test_message);
670
671 actual_len = consumer_wait_for(consumer, 1);
672 ast_test_validate(test, 1 == actual_len);
673 actual_len = consumer_wait_for(parent_consumer, 1);
674 ast_test_validate(test, 1 == actual_len);
675
676 return AST_TEST_PASS;
677}
678
679AST_TEST_DEFINE(interleaving)
680{
681 RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
682 RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
683 RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
684
685 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
686
688
689 RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
690 RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
691 RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
692
693 RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
694 RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
696
698
699 int actual_len;
700
701 switch (cmd) {
702 case TEST_INIT:
703 info->name = __func__;
704 info->category = test_category;
705 info->summary = "Test sending interleaved events to a parent topic";
706 info->description = "Test sending events to a parent topic.\n"
707 "This test creates three topics (one parent, two children)\n"
708 "and publishes messages alternately between the children.\n"
709 "It verifies that the messages are received in the expected\n"
710 "order.";
711 return AST_TEST_NOT_RUN;
712 case TEST_EXECUTE:
713 break;
714 }
715
716 ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
717 ast_test_validate(test, NULL != test_message_type);
718
720 ast_test_validate(test, NULL != test_data);
721
722 test_message1 = stasis_message_create(test_message_type, test_data);
723 ast_test_validate(test, NULL != test_message1);
724 test_message2 = stasis_message_create(test_message_type, test_data);
725 ast_test_validate(test, NULL != test_message2);
726 test_message3 = stasis_message_create(test_message_type, test_data);
727 ast_test_validate(test, NULL != test_message3);
728
729 parent_topic = stasis_topic_create("ParentTestTopic");
730 ast_test_validate(test, NULL != parent_topic);
731 topic1 = stasis_topic_create("Topic1");
732 ast_test_validate(test, NULL != topic1);
733 topic2 = stasis_topic_create("Topic2");
734 ast_test_validate(test, NULL != topic2);
735
736 forward_sub1 = stasis_forward_all(topic1, parent_topic);
737 ast_test_validate(test, NULL != forward_sub1);
738 forward_sub2 = stasis_forward_all(topic2, parent_topic);
739 ast_test_validate(test, NULL != forward_sub2);
740
742 ast_test_validate(test, NULL != consumer);
743
744 sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
745 ast_test_validate(test, NULL != sub);
746 ao2_ref(consumer, +1);
747
748 stasis_publish(topic1, test_message1);
749 stasis_publish(topic2, test_message2);
750 stasis_publish(topic1, test_message3);
751
752 actual_len = consumer_wait_for(consumer, 3);
753 ast_test_validate(test, 3 == actual_len);
754
755 ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
756 ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
757 ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
758
759 return AST_TEST_PASS;
760}
761
762AST_TEST_DEFINE(subscription_interleaving)
763{
764 RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
765 RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
766 RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
767
768 RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
769
771
772 RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
773 RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
774 RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
775
776 RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
777 RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
780
781 RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
782 RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
783
784 int actual_len;
785
786 switch (cmd) {
787 case TEST_INIT:
788 info->name = __func__;
789 info->category = test_category;
790 info->summary = "Test sending interleaved events to a parent topic with different subscribers";
791 info->description = "Test sending events to a parent topic.\n"
792 "This test creates three topics (one parent, two children)\n"
793 "and publishes messages alternately between the children.\n"
794 "It verifies that the messages are received in the expected\n"
795 "order, for different subscription types: one with a dedicated\n"
796 "thread, the other on the Stasis threadpool.";
797 return AST_TEST_NOT_RUN;
798 case TEST_EXECUTE:
799 break;
800 }
801
802 ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
803 ast_test_validate(test, NULL != test_message_type);
804
806 ast_test_validate(test, NULL != test_data);
807
808 test_message1 = stasis_message_create(test_message_type, test_data);
809 ast_test_validate(test, NULL != test_message1);
810 test_message2 = stasis_message_create(test_message_type, test_data);
811 ast_test_validate(test, NULL != test_message2);
812 test_message3 = stasis_message_create(test_message_type, test_data);
813 ast_test_validate(test, NULL != test_message3);
814
815 parent_topic = stasis_topic_create("ParentTestTopic");
816 ast_test_validate(test, NULL != parent_topic);
817 topic1 = stasis_topic_create("Topic1");
818 ast_test_validate(test, NULL != topic1);
819 topic2 = stasis_topic_create("Topic2");
820 ast_test_validate(test, NULL != topic2);
821
822 forward_sub1 = stasis_forward_all(topic1, parent_topic);
823 ast_test_validate(test, NULL != forward_sub1);
824 forward_sub2 = stasis_forward_all(topic2, parent_topic);
825 ast_test_validate(test, NULL != forward_sub2);
826
827 consumer1 = consumer_create(1);
828 ast_test_validate(test, NULL != consumer1);
829
830 consumer2 = consumer_create(1);
831 ast_test_validate(test, NULL != consumer2);
832
833 sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
834 ast_test_validate(test, NULL != sub1);
835 ao2_ref(consumer1, +1);
836
837 sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
838 ast_test_validate(test, NULL != sub2);
839 ao2_ref(consumer2, +1);
840
841 stasis_publish(topic1, test_message1);
842 stasis_publish(topic2, test_message2);
843 stasis_publish(topic1, test_message3);
844
845 actual_len = consumer_wait_for(consumer1, 3);
846 ast_test_validate(test, 3 == actual_len);
847
848 actual_len = consumer_wait_for(consumer2, 3);
849 ast_test_validate(test, 3 == actual_len);
850
851 ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
852 ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
853 ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
854
855 ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
856 ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
857 ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
858
859 return AST_TEST_PASS;
860}
861
863 char *id;
864 char *value;
865};
866
867static void cache_test_data_dtor(void *obj)
868{
869 struct cache_test_data *data = obj;
870
871 ast_free(data->id);
872 ast_free(data->value);
873}
874
875static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
876{
878
880 if (data == NULL) {
881 return NULL;
882 }
883
884 ast_assert(name != NULL);
886
887 data->id = ast_strdup(name);
888 data->value = ast_strdup(value);
889 if (!data->id || !data->value) {
890 return NULL;
891 }
892
894}
895
896static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
897{
899}
900
901static const char *cache_test_data_id(struct stasis_message *message)
902{
903 struct cache_test_data *cachable = stasis_message_data(message);
904
905 if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
906 return NULL;
907 }
908 return cachable->id;
909}
910
912{
913 struct stasis_message *aggregate_snapshot;
914 struct stasis_message *snapshot;
915 struct stasis_message_type *type = NULL;
917 int idx;
918 int accumulated = 0;
919 char aggregate_str[30];
920
921 /* Accumulate the aggregate value. */
923 if (snapshot) {
924 type = stasis_message_type(snapshot);
925 test_data = stasis_message_data(snapshot);
926 accumulated += atoi(test_data->value);
927 }
928 for (idx = 0; ; ++idx) {
929 snapshot = stasis_cache_entry_get_remote(entry, idx);
930 if (!snapshot) {
931 break;
932 }
933
934 type = stasis_message_type(snapshot);
935 test_data = stasis_message_data(snapshot);
936 accumulated += atoi(test_data->value);
937 }
938
939 if (!test_data) {
940 /* There are no test entries cached. Delete the aggregate. */
941 return NULL;
942 }
943
945 if (snapshot) {
946 type = stasis_message_type(snapshot);
947 test_data = stasis_message_data(snapshot);
948 if (accumulated == atoi(test_data->value)) {
949 /* Aggregate test entry did not change. */
950 return ao2_bump(snapshot);
951 }
952 }
953
954 snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
955 aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
956 if (!aggregate_snapshot) {
957 /* Bummer. We have to keep the old aggregate snapshot. */
958 ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
959 return ao2_bump(snapshot);
960 }
961
962 return aggregate_snapshot;
963}
964
965static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
966{
967 stasis_publish(topic, aggregate);
968}
969
970static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
971{
972 RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
974
975 aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
976 if (!aggregate) {
977 /* No aggregate, return true if given no value. */
978 return !value;
979 }
980
981 /* Return true if the given value matches the aggregate value. */
982 test_data = stasis_message_data(aggregate);
983 return value && !strcmp(value, test_data->value);
984}
985
986AST_TEST_DEFINE(cache_filter)
987{
988 RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
989 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
994 RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
995 int actual_len;
996
997 switch (cmd) {
998 case TEST_INIT:
999 info->name = __func__;
1000 info->category = test_category;
1001 info->summary = "Test caching topics only forward cache_update messages.";
1002 info->description = "Test caching topics only forward cache_update messages.";
1003 return AST_TEST_NOT_RUN;
1004 case TEST_EXECUTE:
1005 break;
1006 }
1007
1008 ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1009 ast_test_validate(test, NULL != non_cache_type);
1010 topic = stasis_topic_create("SomeTopic");
1011 ast_test_validate(test, NULL != topic);
1013 ast_test_validate(test, NULL != cache);
1014 caching_topic = stasis_caching_topic_create(topic, cache);
1015 ast_test_validate(test, NULL != caching_topic);
1017 ast_test_validate(test, NULL != consumer);
1019 ast_test_validate(test, NULL != sub);
1020 ao2_ref(consumer, +1);
1021
1022 test_message = cache_test_message_create(non_cache_type, "1", "1");
1023 ast_test_validate(test, NULL != test_message);
1024
1025 stasis_publish(topic, test_message);
1026
1027 actual_len = consumer_should_stay(consumer, 0);
1028 ast_test_validate(test, 0 == actual_len);
1029
1030 return AST_TEST_PASS;
1031}
1032
1034{
1035 RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1036 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1041 RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1042 RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1043 RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1044 RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1045 int actual_len;
1046 struct stasis_cache_update *actual_update;
1047
1048 switch (cmd) {
1049 case TEST_INIT:
1050 info->name = __func__;
1051 info->category = test_category;
1052 info->summary = "Test passing messages through cache topic unscathed.";
1053 info->description = "Test passing messages through cache topic unscathed.";
1054 return AST_TEST_NOT_RUN;
1055 case TEST_EXECUTE:
1056 break;
1057 }
1058
1059 ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1060 ast_test_validate(test, NULL != cache_type);
1061 topic = stasis_topic_create("SomeTopic");
1062 ast_test_validate(test, NULL != topic);
1064 ast_test_validate(test, NULL != cache);
1065 caching_topic = stasis_caching_topic_create(topic, cache);
1066 ast_test_validate(test, NULL != caching_topic);
1068 ast_test_validate(test, NULL != consumer);
1070 ast_test_validate(test, NULL != sub);
1071 ao2_ref(consumer, +1);
1072
1073 test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1074 ast_test_validate(test, NULL != test_message1_1);
1075 test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1076 ast_test_validate(test, NULL != test_message2_1);
1077
1078 /* Post a couple of snapshots */
1079 stasis_publish(topic, test_message1_1);
1080 stasis_publish(topic, test_message2_1);
1081 actual_len = consumer_wait_for(consumer, 2);
1082 ast_test_validate(test, 2 == actual_len);
1083
1084 /* Check for new snapshot messages */
1086 actual_update = stasis_message_data(consumer->messages_rxed[0]);
1087 ast_test_validate(test, NULL == actual_update->old_snapshot);
1088 ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
1089 ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
1090 /* stasis_cache_get returned a ref, so unref test_message1_1 */
1091 ao2_ref(test_message1_1, -1);
1092
1094 actual_update = stasis_message_data(consumer->messages_rxed[1]);
1095 ast_test_validate(test, NULL == actual_update->old_snapshot);
1096 ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
1097 ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
1098 /* stasis_cache_get returned a ref, so unref test_message2_1 */
1099 ao2_ref(test_message2_1, -1);
1100
1101 /* Update snapshot 2 */
1102 test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1103 ast_test_validate(test, NULL != test_message2_2);
1104 stasis_publish(topic, test_message2_2);
1105
1106 actual_len = consumer_wait_for(consumer, 3);
1107 ast_test_validate(test, 3 == actual_len);
1108
1109 actual_update = stasis_message_data(consumer->messages_rxed[2]);
1110 ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
1111 ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
1112 ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
1113 /* stasis_cache_get returned a ref, so unref test_message2_2 */
1114 ao2_ref(test_message2_2, -1);
1115
1116 /* Clear snapshot 1 */
1117 test_message1_clear = stasis_cache_clear_create(test_message1_1);
1118 ast_test_validate(test, NULL != test_message1_clear);
1119 stasis_publish(topic, test_message1_clear);
1120
1121 actual_len = consumer_wait_for(consumer, 4);
1122 ast_test_validate(test, 4 == actual_len);
1123
1124 actual_update = stasis_message_data(consumer->messages_rxed[3]);
1125 ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
1126 ast_test_validate(test, NULL == actual_update->new_snapshot);
1127 ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
1128
1129 return AST_TEST_PASS;
1130}
1131
1133{
1134 RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1135 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1140 RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1141 RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1142 RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1143 RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1144 RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1145 int actual_len;
1146 struct ao2_iterator i;
1147 void *obj;
1148
1149 switch (cmd) {
1150 case TEST_INIT:
1151 info->name = __func__;
1152 info->category = test_category;
1153 info->summary = "Test cache dump routines.";
1154 info->description = "Test cache dump routines.";
1155 return AST_TEST_NOT_RUN;
1156 case TEST_EXECUTE:
1157 break;
1158 }
1159
1160 ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1161 ast_test_validate(test, NULL != cache_type);
1162 topic = stasis_topic_create("SomeTopic");
1163 ast_test_validate(test, NULL != topic);
1165 ast_test_validate(test, NULL != cache);
1166 caching_topic = stasis_caching_topic_create(topic, cache);
1167 ast_test_validate(test, NULL != caching_topic);
1169 ast_test_validate(test, NULL != consumer);
1171 ast_test_validate(test, NULL != sub);
1172 ao2_ref(consumer, +1);
1173
1174 test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1175 ast_test_validate(test, NULL != test_message1_1);
1176 test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1177 ast_test_validate(test, NULL != test_message2_1);
1178
1179 /* Post a couple of snapshots */
1180 stasis_publish(topic, test_message1_1);
1181 stasis_publish(topic, test_message2_1);
1182 actual_len = consumer_wait_for(consumer, 2);
1183 ast_test_validate(test, 2 == actual_len);
1184
1185 /* Check the cache */
1186 ao2_cleanup(cache_dump);
1187 cache_dump = stasis_cache_dump(cache, NULL);
1188 ast_test_validate(test, NULL != cache_dump);
1189 ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1190 i = ao2_iterator_init(cache_dump, 0);
1191 while ((obj = ao2_iterator_next(&i))) {
1192 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1193 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1194 }
1196
1197 /* Update snapshot 2 */
1198 test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1199 ast_test_validate(test, NULL != test_message2_2);
1200 stasis_publish(topic, test_message2_2);
1201
1202 actual_len = consumer_wait_for(consumer, 3);
1203 ast_test_validate(test, 3 == actual_len);
1204
1205 /* Check the cache */
1206 ao2_cleanup(cache_dump);
1207 cache_dump = stasis_cache_dump(cache, NULL);
1208 ast_test_validate(test, NULL != cache_dump);
1209 ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1210 i = ao2_iterator_init(cache_dump, 0);
1211 while ((obj = ao2_iterator_next(&i))) {
1212 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1213 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1214 }
1216
1217 /* Clear snapshot 1 */
1218 test_message1_clear = stasis_cache_clear_create(test_message1_1);
1219 ast_test_validate(test, NULL != test_message1_clear);
1220 stasis_publish(topic, test_message1_clear);
1221
1222 actual_len = consumer_wait_for(consumer, 4);
1223 ast_test_validate(test, 4 == actual_len);
1224
1225 /* Check the cache */
1226 ao2_cleanup(cache_dump);
1227 cache_dump = stasis_cache_dump(cache, NULL);
1228 ast_test_validate(test, NULL != cache_dump);
1229 ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1230 i = ao2_iterator_init(cache_dump, 0);
1231 while ((obj = ao2_iterator_next(&i))) {
1232 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1233 ast_test_validate(test, actual_cache_entry == test_message2_2);
1234 }
1236
1237 /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
1238 ao2_cleanup(cache_dump);
1240 ast_test_validate(test, 0 == ao2_container_count(cache_dump));
1241
1242 return AST_TEST_PASS;
1243}
1244
1245AST_TEST_DEFINE(cache_eid_aggregate)
1246{
1247 RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1248 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1251 RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
1252 RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
1253 RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
1254 RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
1255 RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1256 RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1257 RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1258 RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
1259 RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
1260 RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1261 RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
1262 RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1263 int actual_len;
1264 struct ao2_iterator i;
1265 void *obj;
1266 struct ast_eid foreign_eid1;
1267 struct ast_eid foreign_eid2;
1268
1269 switch (cmd) {
1270 case TEST_INIT:
1271 info->name = __func__;
1272 info->category = test_category;
1273 info->summary = "Test cache eid and aggregate support.";
1274 info->description = "Test cache eid and aggregate support.";
1275 return AST_TEST_NOT_RUN;
1276 case TEST_EXECUTE:
1277 break;
1278 }
1279
1280 memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
1281 memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
1282
1283 ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1284 ast_test_validate(test, NULL != cache_type);
1285
1286 topic = stasis_topic_create("SomeTopic");
1287 ast_test_validate(test, NULL != topic);
1288
1289 /* To consume events published to the topic. */
1290 topic_consumer = consumer_create(1);
1291 ast_test_validate(test, NULL != topic_consumer);
1292
1293 topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1294 ast_test_validate(test, NULL != topic_sub);
1295 ao2_ref(topic_consumer, +1);
1296
1299 ast_test_validate(test, NULL != cache);
1300
1301 caching_topic = stasis_caching_topic_create(topic, cache);
1302 ast_test_validate(test, NULL != caching_topic);
1303
1304 /* To consume update events published to the caching_topic. */
1305 cache_consumer = consumer_create(1);
1306 ast_test_validate(test, NULL != cache_consumer);
1307
1308 cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
1309 ast_test_validate(test, NULL != cache_sub);
1310 ao2_ref(cache_consumer, +1);
1311
1312 /* Create test messages. */
1313 test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
1314 ast_test_validate(test, NULL != test_message1_1);
1315 test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
1316 ast_test_validate(test, NULL != test_message2_1);
1317 test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
1318 ast_test_validate(test, NULL != test_message2_2);
1319 test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
1320 ast_test_validate(test, NULL != test_message2_3);
1321 test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
1322 ast_test_validate(test, NULL != test_message2_4);
1323
1324 /* Post some snapshots */
1325 stasis_publish(topic, test_message1_1);
1326 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
1327 stasis_publish(topic, test_message2_1);
1328 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
1329 stasis_publish(topic, test_message2_2);
1330 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
1331
1332 actual_len = consumer_wait_for(cache_consumer, 6);
1333 ast_test_validate(test, 6 == actual_len);
1334 actual_len = consumer_wait_for(topic_consumer, 6);
1335 ast_test_validate(test, 6 == actual_len);
1336
1337 /* Check the cache */
1338 ao2_cleanup(cache_dump);
1339 cache_dump = stasis_cache_dump_all(cache, NULL);
1340 ast_test_validate(test, NULL != cache_dump);
1341 ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1342 i = ao2_iterator_init(cache_dump, 0);
1343 while ((obj = ao2_iterator_next(&i))) {
1344 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1345
1346 ast_test_validate(test,
1347 actual_cache_entry == test_message1_1
1348 || actual_cache_entry == test_message2_1
1349 || actual_cache_entry == test_message2_2);
1350 }
1352
1353 /* Check the local cached items */
1354 ao2_cleanup(cache_dump);
1356 ast_test_validate(test, NULL != cache_dump);
1357 ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1358 i = ao2_iterator_init(cache_dump, 0);
1359 while ((obj = ao2_iterator_next(&i))) {
1360 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1361
1362 ast_test_validate(test,
1363 actual_cache_entry == test_message1_1
1364 || actual_cache_entry == test_message2_1);
1365 }
1367
1368 /* Post snapshot 2 from another eid. */
1369 stasis_publish(topic, test_message2_3);
1370 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
1371
1372 actual_len = consumer_wait_for(cache_consumer, 8);
1373 ast_test_validate(test, 8 == actual_len);
1374 actual_len = consumer_wait_for(topic_consumer, 8);
1375 ast_test_validate(test, 8 == actual_len);
1376
1377 /* Check the cache */
1378 ao2_cleanup(cache_dump);
1379 cache_dump = stasis_cache_dump_all(cache, NULL);
1380 ast_test_validate(test, NULL != cache_dump);
1381 ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1382 i = ao2_iterator_init(cache_dump, 0);
1383 while ((obj = ao2_iterator_next(&i))) {
1384 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1385
1386 ast_test_validate(test,
1387 actual_cache_entry == test_message1_1
1388 || actual_cache_entry == test_message2_1
1389 || actual_cache_entry == test_message2_2
1390 || actual_cache_entry == test_message2_3);
1391 }
1393
1394 /* Check the remote cached items */
1395 ao2_cleanup(cache_dump);
1396 cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
1397 ast_test_validate(test, NULL != cache_dump);
1398 ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1399 i = ao2_iterator_init(cache_dump, 0);
1400 while ((obj = ao2_iterator_next(&i))) {
1401 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1402
1403 ast_test_validate(test, actual_cache_entry == test_message2_2);
1404 }
1406
1407 /* Post snapshot 2 from a repeated eid. */
1408 stasis_publish(topic, test_message2_4);
1409 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
1410
1411 actual_len = consumer_wait_for(cache_consumer, 10);
1412 ast_test_validate(test, 10 == actual_len);
1413 actual_len = consumer_wait_for(topic_consumer, 10);
1414 ast_test_validate(test, 10 == actual_len);
1415
1416 /* Check the cache */
1417 ao2_cleanup(cache_dump);
1418 cache_dump = stasis_cache_dump_all(cache, NULL);
1419 ast_test_validate(test, NULL != cache_dump);
1420 ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1421 i = ao2_iterator_init(cache_dump, 0);
1422 while ((obj = ao2_iterator_next(&i))) {
1423 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1424
1425 ast_test_validate(test,
1426 actual_cache_entry == test_message1_1
1427 || actual_cache_entry == test_message2_1
1428 || actual_cache_entry == test_message2_2
1429 || actual_cache_entry == test_message2_4);
1430 }
1432
1433 /* Check all snapshot 2 cache entries. */
1434 ao2_cleanup(cache_dump);
1435 cache_dump = stasis_cache_get_all(cache, cache_type, "2");
1436 ast_test_validate(test, NULL != cache_dump);
1437 ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1438 i = ao2_iterator_init(cache_dump, 0);
1439 while ((obj = ao2_iterator_next(&i))) {
1440 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1441
1442 ast_test_validate(test,
1443 actual_cache_entry == test_message2_1
1444 || actual_cache_entry == test_message2_2
1445 || actual_cache_entry == test_message2_4);
1446 }
1448
1449 /* Clear snapshot 1 */
1450 test_message1_clear = stasis_cache_clear_create(test_message1_1);
1451 ast_test_validate(test, NULL != test_message1_clear);
1452 stasis_publish(topic, test_message1_clear);
1453 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
1454
1455 actual_len = consumer_wait_for(cache_consumer, 12);
1456 ast_test_validate(test, 12 == actual_len);
1457 actual_len = consumer_wait_for(topic_consumer, 11);
1458 ast_test_validate(test, 11 == actual_len);
1459
1460 /* Check the cache */
1461 ao2_cleanup(cache_dump);
1462 cache_dump = stasis_cache_dump_all(cache, NULL);
1463 ast_test_validate(test, NULL != cache_dump);
1464 ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1465 i = ao2_iterator_init(cache_dump, 0);
1466 while ((obj = ao2_iterator_next(&i))) {
1467 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1468
1469 ast_test_validate(test,
1470 actual_cache_entry == test_message2_1
1471 || actual_cache_entry == test_message2_2
1472 || actual_cache_entry == test_message2_4);
1473 }
1475
1476 /* Clear snapshot 2 from a remote eid */
1477 test_message2_clear = stasis_cache_clear_create(test_message2_2);
1478 ast_test_validate(test, NULL != test_message2_clear);
1479 stasis_publish(topic, test_message2_clear);
1480 ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
1481
1482 actual_len = consumer_wait_for(cache_consumer, 14);
1483 ast_test_validate(test, 14 == actual_len);
1484 actual_len = consumer_wait_for(topic_consumer, 13);
1485 ast_test_validate(test, 13 == actual_len);
1486
1487 /* Check the cache */
1488 ao2_cleanup(cache_dump);
1489 cache_dump = stasis_cache_dump_all(cache, NULL);
1490 ast_test_validate(test, NULL != cache_dump);
1491 ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1492 i = ao2_iterator_init(cache_dump, 0);
1493 while ((obj = ao2_iterator_next(&i))) {
1494 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1495
1496 ast_test_validate(test,
1497 actual_cache_entry == test_message2_1
1498 || actual_cache_entry == test_message2_4);
1499 }
1501
1502 return AST_TEST_PASS;
1503}
1504
1506{
1507 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1510 RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1511 RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1512 RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1513 RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1514 RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1515 RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1516 RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1517 RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1518 RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1519 int actual_len, ret;
1520 struct stasis_message *actual;
1521
1522 switch (cmd) {
1523 case TEST_INIT:
1524 info->name = __func__;
1525 info->category = test_category;
1526 info->summary = "Test simple message routing";
1527 info->description = "Test simple message routing";
1528 return AST_TEST_NOT_RUN;
1529 case TEST_EXECUTE:
1530 break;
1531 }
1532
1533 topic = stasis_topic_create("TestTopic");
1534 ast_test_validate(test, NULL != topic);
1535
1536 consumer1 = consumer_create(1);
1537 ast_test_validate(test, NULL != consumer1);
1538 consumer2 = consumer_create(1);
1539 ast_test_validate(test, NULL != consumer2);
1540 consumer3 = consumer_create(1);
1541 ast_test_validate(test, NULL != consumer3);
1542
1543 ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1544 ast_test_validate(test, NULL != test_message_type1);
1545 ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1546 ast_test_validate(test, NULL != test_message_type2);
1547 ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1548 ast_test_validate(test, NULL != test_message_type3);
1549
1550 uut = stasis_message_router_create(topic);
1551 ast_test_validate(test, NULL != uut);
1552
1554 uut, test_message_type1, consumer_exec, consumer1);
1555 ast_test_validate(test, 0 == ret);
1556 ao2_ref(consumer1, +1);
1558 uut, test_message_type2, consumer_exec, consumer2);
1559 ast_test_validate(test, 0 == ret);
1560 ao2_ref(consumer2, +1);
1561 ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1562 ast_test_validate(test, 0 == ret);
1563 ao2_ref(consumer3, +1);
1564
1565 test_data = ao2_alloc(1, NULL);
1566 ast_test_validate(test, NULL != test_data);
1567 test_message1 = stasis_message_create(test_message_type1, test_data);
1568 ast_test_validate(test, NULL != test_message1);
1569 test_message2 = stasis_message_create(test_message_type2, test_data);
1570 ast_test_validate(test, NULL != test_message2);
1571 test_message3 = stasis_message_create(test_message_type3, test_data);
1572 ast_test_validate(test, NULL != test_message3);
1573
1574 stasis_publish(topic, test_message1);
1575 stasis_publish(topic, test_message2);
1576 stasis_publish(topic, test_message3);
1577
1578 actual_len = consumer_wait_for(consumer1, 1);
1579 ast_test_validate(test, 1 == actual_len);
1580 actual_len = consumer_wait_for(consumer2, 1);
1581 ast_test_validate(test, 1 == actual_len);
1582 actual_len = consumer_wait_for(consumer3, 1);
1583 ast_test_validate(test, 1 == actual_len);
1584
1585 actual = consumer1->messages_rxed[0];
1586 ast_test_validate(test, test_message1 == actual);
1587
1588 actual = consumer2->messages_rxed[0];
1589 ast_test_validate(test, test_message2 == actual);
1590
1591 actual = consumer3->messages_rxed[0];
1592 ast_test_validate(test, test_message3 == actual);
1593
1594 /* consumer1 and consumer2 do not get the final message. */
1595 ao2_cleanup(consumer1);
1596 ao2_cleanup(consumer2);
1597
1598 return AST_TEST_PASS;
1599}
1600
1602{
1603 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1606 RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1607 RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1608 RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1609 RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1610 RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1611 RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1612 RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1613 RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1614 RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1615 int actual_len, ret;
1616 struct stasis_message *actual;
1617
1618 switch (cmd) {
1619 case TEST_INIT:
1620 info->name = __func__;
1621 info->category = test_category;
1622 info->summary = "Test message routing via threadpool";
1623 info->description = "Test simple message routing when\n"
1624 "the subscriptions dictate usage of the Stasis\n"
1625 "threadpool.";
1626 return AST_TEST_NOT_RUN;
1627 case TEST_EXECUTE:
1628 break;
1629 }
1630
1631 topic = stasis_topic_create("TestTopic");
1632 ast_test_validate(test, NULL != topic);
1633
1634 consumer1 = consumer_create(1);
1635 ast_test_validate(test, NULL != consumer1);
1636 consumer2 = consumer_create(1);
1637 ast_test_validate(test, NULL != consumer2);
1638 consumer3 = consumer_create(1);
1639 ast_test_validate(test, NULL != consumer3);
1640
1641 ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1642 ast_test_validate(test, NULL != test_message_type1);
1643 ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1644 ast_test_validate(test, NULL != test_message_type2);
1645 ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1646 ast_test_validate(test, NULL != test_message_type3);
1647
1649 ast_test_validate(test, NULL != uut);
1650
1652 uut, test_message_type1, consumer_exec, consumer1);
1653 ast_test_validate(test, 0 == ret);
1654 ao2_ref(consumer1, +1);
1656 uut, test_message_type2, consumer_exec, consumer2);
1657 ast_test_validate(test, 0 == ret);
1658 ao2_ref(consumer2, +1);
1659 ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1660 ast_test_validate(test, 0 == ret);
1661 ao2_ref(consumer3, +1);
1662
1663 test_data = ao2_alloc(1, NULL);
1664 ast_test_validate(test, NULL != test_data);
1665 test_message1 = stasis_message_create(test_message_type1, test_data);
1666 ast_test_validate(test, NULL != test_message1);
1667 test_message2 = stasis_message_create(test_message_type2, test_data);
1668 ast_test_validate(test, NULL != test_message2);
1669 test_message3 = stasis_message_create(test_message_type3, test_data);
1670 ast_test_validate(test, NULL != test_message3);
1671
1672 stasis_publish(topic, test_message1);
1673 stasis_publish(topic, test_message2);
1674 stasis_publish(topic, test_message3);
1675
1676 actual_len = consumer_wait_for(consumer1, 1);
1677 ast_test_validate(test, 1 == actual_len);
1678 actual_len = consumer_wait_for(consumer2, 1);
1679 ast_test_validate(test, 1 == actual_len);
1680 actual_len = consumer_wait_for(consumer3, 1);
1681 ast_test_validate(test, 1 == actual_len);
1682
1683 actual = consumer1->messages_rxed[0];
1684 ast_test_validate(test, test_message1 == actual);
1685
1686 actual = consumer2->messages_rxed[0];
1687 ast_test_validate(test, test_message2 == actual);
1688
1689 actual = consumer3->messages_rxed[0];
1690 ast_test_validate(test, test_message3 == actual);
1691
1692 /* consumer1 and consumer2 do not get the final message. */
1693 ao2_cleanup(consumer1);
1694 ao2_cleanup(consumer2);
1695
1696 return AST_TEST_PASS;
1697}
1698
1699static const char *cache_simple(struct stasis_message *message)
1700{
1701 const char *type_name =
1703 if (!ast_begins_with(type_name, "Cache")) {
1704 return NULL;
1705 }
1706
1707 return "cached";
1708}
1709
1710AST_TEST_DEFINE(router_cache_updates)
1711{
1712 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1715 RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1716 RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1717 RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1720 RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1721 RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1722 RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1723 RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1724 RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1725 RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1726 RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1727 RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1729 int actual_len, ret;
1730 struct stasis_message *actual;
1731
1732 switch (cmd) {
1733 case TEST_INIT:
1734 info->name = __func__;
1735 info->category = test_category;
1736 info->summary = "Test special handling cache_update messages";
1737 info->description = "Test special handling cache_update messages";
1738 return AST_TEST_NOT_RUN;
1739 case TEST_EXECUTE:
1740 break;
1741 }
1742
1743 topic = stasis_topic_create("TestTopic");
1744 ast_test_validate(test, NULL != topic);
1745
1747 ast_test_validate(test, NULL != cache);
1748 caching_topic = stasis_caching_topic_create(topic, cache);
1749 ast_test_validate(test, NULL != caching_topic);
1750
1751 consumer1 = consumer_create(1);
1752 ast_test_validate(test, NULL != consumer1);
1753 consumer2 = consumer_create(1);
1754 ast_test_validate(test, NULL != consumer2);
1755 consumer3 = consumer_create(1);
1756 ast_test_validate(test, NULL != consumer3);
1757
1758 ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1759 ast_test_validate(test, NULL != test_message_type1);
1760 ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1761 ast_test_validate(test, NULL != test_message_type2);
1762 ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1763 ast_test_validate(test, NULL != test_message_type3);
1764
1766 stasis_caching_get_topic(caching_topic));
1767 ast_test_validate(test, NULL != uut);
1768
1770 uut, test_message_type1, consumer_exec, consumer1);
1771 ast_test_validate(test, 0 == ret);
1772 ao2_ref(consumer1, +1);
1774 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1775 ast_test_validate(test, 0 == ret);
1776 ao2_ref(consumer2, +1);
1777 ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1778 ast_test_validate(test, 0 == ret);
1779 ao2_ref(consumer3, +1);
1780
1781 test_data = ao2_alloc(1, NULL);
1782 ast_test_validate(test, NULL != test_data);
1783 test_message1 = stasis_message_create(test_message_type1, test_data);
1784 ast_test_validate(test, NULL != test_message1);
1785 test_message2 = stasis_message_create(test_message_type2, test_data);
1786 ast_test_validate(test, NULL != test_message2);
1787 test_message3 = stasis_message_create(test_message_type3, test_data);
1788 ast_test_validate(test, NULL != test_message3);
1789
1790 stasis_publish(topic, test_message1);
1791 stasis_publish(topic, test_message2);
1792 stasis_publish(topic, test_message3);
1793
1794 actual_len = consumer_wait_for(consumer1, 1);
1795 ast_test_validate(test, 1 == actual_len);
1796 actual_len = consumer_wait_for(consumer2, 1);
1797 ast_test_validate(test, 1 == actual_len);
1798 /* Uncacheable message should not be passed through */
1799 actual_len = consumer_should_stay(consumer3, 0);
1800 ast_test_validate(test, 0 == actual_len);
1801
1802 actual = consumer1->messages_rxed[0];
1803 ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1804 update = stasis_message_data(actual);
1805 ast_test_validate(test, test_message_type1 == update->type);
1806 ast_test_validate(test, test_message1 == update->new_snapshot);
1807
1808 actual = consumer2->messages_rxed[0];
1809 ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1810 update = stasis_message_data(actual);
1811 ast_test_validate(test, test_message_type2 == update->type);
1812 ast_test_validate(test, test_message2 == update->new_snapshot);
1813
1814 /* consumer1 and consumer2 do not get the final message. */
1815 ao2_cleanup(consumer1);
1816 ao2_cleanup(consumer2);
1817
1818 return AST_TEST_PASS;
1819}
1820
1822{
1824 RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1825 RAII_VAR(char *, data, NULL, ao2_cleanup);
1826 RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1827 char *expected = "SomeData";
1828
1829 switch (cmd) {
1830 case TEST_INIT:
1831 info->name = __func__;
1832 info->category = test_category;
1833 info->summary = "Test message to_json function";
1834 info->description = "Test message to_json function";
1835 return AST_TEST_NOT_RUN;
1836 case TEST_EXECUTE:
1837 break;
1838 }
1839
1840 /* Test NULL */
1841 actual = stasis_message_to_json(NULL, NULL);
1842 ast_test_validate(test, NULL == actual);
1843
1844 /* Test message with NULL to_json function */
1845 ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1846
1847 data = ao2_alloc(strlen(expected) + 1, NULL);
1848 strcpy(data, expected);
1850 ast_test_validate(test, NULL != uut);
1851
1852 actual = stasis_message_to_json(uut, NULL);
1853 ast_test_validate(test, NULL == actual);
1854
1855 return AST_TEST_PASS;
1856}
1857
1859{
1861 RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1862 RAII_VAR(char *, data, NULL, ao2_cleanup);
1863 RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1864 const char *expected_text = "SomeData";
1865 RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1866
1867 switch (cmd) {
1868 case TEST_INIT:
1869 info->name = __func__;
1870 info->category = test_category;
1871 info->summary = "Test message to_json function when NULL";
1872 info->description = "Test message to_json function when NULL";
1873 return AST_TEST_NOT_RUN;
1874 case TEST_EXECUTE:
1875 break;
1876 }
1877
1878 ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1879
1880 data = ao2_alloc(strlen(expected_text) + 1, NULL);
1881 strcpy(data, expected_text);
1883 ast_test_validate(test, NULL != uut);
1884
1885 expected = ast_json_string_create(expected_text);
1886 actual = stasis_message_to_json(uut, NULL);
1887 ast_test_validate(test, ast_json_equal(expected, actual));
1888
1889 return AST_TEST_PASS;
1890}
1891
1893{
1895 RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1896 RAII_VAR(char *, data, NULL, ao2_cleanup);
1897 RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1898 char *expected = "SomeData";
1899
1900 switch (cmd) {
1901 case TEST_INIT:
1902 info->name = __func__;
1903 info->category = test_category;
1904 info->summary = "Test message to_ami function when NULL";
1905 info->description = "Test message to_ami function when NULL";
1906 return AST_TEST_NOT_RUN;
1907 case TEST_EXECUTE:
1908 break;
1909 }
1910
1911 /* Test NULL */
1912 actual = stasis_message_to_ami(NULL);
1913 ast_test_validate(test, NULL == actual);
1914
1915 /* Test message with NULL to_ami function */
1916 ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1917
1918 data = ao2_alloc(strlen(expected) + 1, NULL);
1919 strcpy(data, expected);
1921 ast_test_validate(test, NULL != uut);
1922
1923 actual = stasis_message_to_ami(uut);
1924 ast_test_validate(test, NULL == actual);
1925
1926 return AST_TEST_PASS;
1927}
1928
1930{
1932 RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1933 RAII_VAR(char *, data, NULL, ao2_cleanup);
1934 RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1935 const char *expected_text = "SomeData";
1936 const char *expected = "Message: SomeData\r\n";
1937
1938 switch (cmd) {
1939 case TEST_INIT:
1940 info->name = __func__;
1941 info->category = test_category;
1942 info->summary = "Test message to_ami function";
1943 info->description = "Test message to_ami function";
1944 return AST_TEST_NOT_RUN;
1945 case TEST_EXECUTE:
1946 break;
1947 }
1948
1949 ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1950
1951 data = ao2_alloc(strlen(expected_text) + 1, NULL);
1952 strcpy(data, expected_text);
1954 ast_test_validate(test, NULL != uut);
1955
1956 actual = stasis_message_to_ami(uut);
1957 ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1958
1959 return AST_TEST_PASS;
1960}
1961
1962static void noop(void *data, struct stasis_subscription *sub,
1963 struct stasis_message *message)
1964{
1965 /* no-op */
1966}
1967
1969{
1970 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1972
1973 switch (cmd) {
1974 case TEST_INIT:
1975 info->name = __func__;
1976 info->category = test_category;
1977 info->summary = "Test that destruction order doesn't bomb stuff";
1978 info->description = "Test that destruction order doesn't bomb stuff";
1979 return AST_TEST_NOT_RUN;
1980 case TEST_EXECUTE:
1981 break;
1982 }
1983
1984 topic = stasis_topic_create("test-topic");
1985 ast_test_validate(test, NULL != topic);
1986
1987 sub = stasis_subscribe(topic, noop, NULL);
1988 ast_test_validate(test, NULL != sub);
1989
1990 /* With any luck, this won't completely blow everything up */
1991 ao2_cleanup(topic);
1993
1994 /* These refs were cleaned up manually */
1995 topic = NULL;
1996 sub = NULL;
1997
1998 return AST_TEST_PASS;
1999}
2000
2001static const char *noop_get_id(struct stasis_message *message)
2002{
2003 return NULL;
2004}
2005
2006AST_TEST_DEFINE(caching_dtor_order)
2007{
2008 RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
2010 RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
2013
2014 switch (cmd) {
2015 case TEST_INIT:
2016 info->name = __func__;
2017 info->category = test_category;
2018 info->summary = "Test that destruction order doesn't bomb stuff";
2019 info->description = "Test that destruction order doesn't bomb stuff";
2020 return AST_TEST_NOT_RUN;
2021 case TEST_EXECUTE:
2022 break;
2023 }
2024
2026 ast_test_validate(test, NULL != cache);
2027
2028 topic = stasis_topic_create("test-topic");
2029 ast_test_validate(test, NULL != topic);
2030
2031 caching_topic = stasis_caching_topic_create(topic, cache);
2032 ast_test_validate(test, NULL != caching_topic);
2033
2035 NULL);
2036 ast_test_validate(test, NULL != sub);
2037
2038 /* With any luck, this won't completely blow everything up */
2040 ao2_cleanup(topic);
2041 stasis_caching_unsubscribe(caching_topic);
2043
2044 /* These refs were cleaned up manually */
2045 cache = NULL;
2046 topic = NULL;
2047 caching_topic = NULL;
2048 sub = NULL;
2049
2050 return AST_TEST_PASS;
2051}
2052
2063};
2064
2065static void destroy_message_types(void *obj)
2066{
2067 struct test_message_types *types = obj;
2068
2069 ao2_cleanup(types->none);
2070 ao2_cleanup(types->ami);
2071 ao2_cleanup(types->json);
2072 ao2_cleanup(types->event);
2073 ao2_cleanup(types->amievent);
2074 ao2_cleanup(types->type1);
2075 ao2_cleanup(types->type2);
2076 ao2_cleanup(types->type3);
2077 /* N.B. Don't cleanup types->change! */
2078}
2079
2080static struct test_message_types *create_message_types(struct ast_test *test)
2081{
2082 struct stasis_message_vtable vtable = { 0 };
2083 struct test_message_types *types;
2084 enum ast_test_result_state __attribute__ ((unused)) rc;
2085
2086 types = ao2_alloc(sizeof(*types), destroy_message_types);
2087 if (!types) {
2088 return NULL;
2089 }
2090
2091 ast_test_validate_cleanup(test,
2092 stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
2093 rc, cleanup);
2094
2095 vtable.to_ami = fake_ami;
2096 ast_test_validate_cleanup(test,
2097 stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
2098 rc, cleanup);
2099
2100 vtable.to_ami = NULL;
2101 vtable.to_json = fake_json;
2102 ast_test_validate_cleanup(test,
2103 stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
2104 rc, cleanup);
2105
2106 vtable.to_ami = NULL;
2107 vtable.to_json = NULL;
2108 vtable.to_event = fake_event;
2109 ast_test_validate_cleanup(test,
2110 stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
2111 rc, cleanup);
2112
2113 vtable.to_ami = fake_ami;
2114 ast_test_validate_cleanup(test,
2115 stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
2116 rc, cleanup);
2117
2118 ast_test_validate_cleanup(test,
2119 stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
2120 rc, cleanup);
2121
2122 ast_test_validate_cleanup(test,
2123 stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
2124 rc, cleanup);
2125
2126 ast_test_validate_cleanup(test,
2127 stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
2128 rc, cleanup);
2129
2131
2132 return types;
2133
2134cleanup:
2135 ao2_cleanup(types);
2136 return NULL;
2137}
2138
2139struct cts {
2143};
2144
2145static void destroy_cts(void *obj)
2146{
2147 struct cts *c = obj;
2148
2149 stasis_unsubscribe(c->sub);
2150 ao2_cleanup(c->topic);
2151 ao2_cleanup(c->consumer);
2152}
2153
2154static struct cts *create_cts(struct ast_test *test)
2155{
2156 struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
2157 enum ast_test_result_state __attribute__ ((unused)) rc;
2158
2159 ast_test_validate_cleanup(test, cts, rc, cleanup);
2160
2161 cts->topic = stasis_topic_create("TestTopic");
2162 ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
2163
2165 ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
2166
2167 ao2_ref(cts->consumer, +1);
2169 ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
2170
2171 return cts;
2172
2173cleanup:
2175 return NULL;
2176}
2177
2178static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
2179{
2181
2182 if (stasis_message_type(msg) != mtype) {
2183 return 0;
2184 }
2185
2186 if (data) {
2187 return (strcmp(data, msg_data->description) == 0);
2188 }
2189
2190 return 1;
2191}
2192
2193static void dump_consumer(struct ast_test *test, struct cts *cts)
2194{
2195 int i;
2196 struct stasis_subscription_change *data;
2197
2198 ast_test_status_update(test, "Messages received: %zu Final? %s\n", cts->consumer->messages_rxed_len,
2199 cts->consumer->complete ? "yes" : "no");
2200 for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
2202 ast_test_status_update(test, "Message type received: %s %s\n",
2204 data && !ast_strlen_zero(data->description) ? data->description : "no data");
2205 }
2206}
2207
2208static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
2209 const char *data)
2210{
2211 struct stasis_message *msg;
2213 ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
2214
2215 if (!test_data) {
2216 return 0;
2217 }
2218 strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
2219
2220 msg = stasis_message_create(msg_type, test_data);
2221 ao2_ref(test_data, -1);
2222 if (!msg) {
2223 ast_test_status_update(test, "Unable to create %s message\n",
2224 stasis_message_type_name(msg_type));
2225 return 0;
2226 }
2227
2228 stasis_publish(cts->topic, msg);
2229 ao2_ref(msg, -1);
2230
2231 return 1;
2232}
2233
2234AST_TEST_DEFINE(type_filters)
2235{
2236 RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2237 RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
2238 int ix = 0;
2239
2240 switch (cmd) {
2241 case TEST_INIT:
2242 info->name = __func__;
2243 info->category = test_category "filtering/";
2244 info->summary = "Test message filtering by type";
2245 info->description = "Test message filtering by type";
2246 return AST_TEST_NOT_RUN;
2247 case TEST_EXECUTE:
2248 break;
2249 }
2250
2251 types = create_message_types(test);
2252 ast_test_validate(test, NULL != types);
2253
2254 cts = create_cts(test);
2255 ast_test_validate(test, NULL != cts);
2256
2257 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
2258 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
2259 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
2261
2262 /* We should get these */
2263 ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
2264 ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
2265 /* ... but not this one */
2266 ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2267
2268 /* Wait for change(subscribe) and "Pass" messages */
2270
2271 /* Remove type 1 */
2272 ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
2273
2274 /* We should now NOT get this one */
2275 ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
2276 /* We should get this one (again) */
2277 ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
2278 /* We still should NOT get this one */
2279 ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2280
2281 /* We should now have a second type2 */
2283
2285 cts->sub = NULL;
2287
2289
2290 ast_test_validate(test, 1 == cts->consumer->complete);
2291 ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
2292 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2293 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
2294 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
2295 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
2296 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2297
2298 return AST_TEST_PASS;
2299}
2300
2301AST_TEST_DEFINE(formatter_filters)
2302{
2303 RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2304 RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
2305 int ix = 0;
2306
2307 switch (cmd) {
2308 case TEST_INIT:
2309 info->name = __func__;
2310 info->category = test_category "filtering/";
2311 info->summary = "Test message filtering by formatter";
2312 info->description = "Test message filtering by formatter";
2313 return AST_TEST_NOT_RUN;
2314 case TEST_EXECUTE:
2315 break;
2316 }
2317
2318 types = create_message_types(test);
2319 ast_test_validate(test, NULL != types);
2320
2321 cts = create_cts(test);
2322 ast_test_validate(test, NULL != cts);
2323
2326
2327 /* We should get these */
2328 ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
2329 ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
2330 ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
2331
2332 /* ... but not these */
2333 ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
2334 ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
2335 ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
2336
2337 /* Wait for change(subscribe) and the "Pass" messages */
2339
2340 /* Change the subscription to accept only event formatters */
2342
2343 /* We should NOT get these now */
2344 ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
2345 ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
2346 /* ... but we should still get this one */
2347 ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
2348 /* ... and this one should be new */
2349 ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
2350
2351 /* We should now have a second amievent */
2353
2355 cts->sub = NULL;
2357
2359
2360 ast_test_validate(test, 1 == cts->consumer->complete);
2361 ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2362 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2363 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
2364 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
2365 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
2366 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
2367 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
2368 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2369
2370 return AST_TEST_PASS;
2371}
2372
2373AST_TEST_DEFINE(combo_filters)
2374{
2375 RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2376 RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
2377 int ix = 0;
2378
2379 switch (cmd) {
2380 case TEST_INIT:
2381 info->name = __func__;
2382 info->category = test_category "filtering/";
2383 info->summary = "Test message filtering by type and formatter";
2384 info->description = "Test message filtering by type and formatter";
2385 return AST_TEST_NOT_RUN;
2386 case TEST_EXECUTE:
2387 break;
2388 }
2389
2390 types = create_message_types(test);
2391 ast_test_validate(test, NULL != types);
2392
2393 cts = create_cts(test);
2394 ast_test_validate(test, NULL != cts);
2395
2396 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
2397 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
2398 ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
2402
2403 /* We should get these */
2404 ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
2405 ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
2406 ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
2407 ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
2408 ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
2409
2410 /* ... but not these */
2411 ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2412 ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
2413
2414 /* Wait for change(subscribe) and the "Pass" messages */
2416
2418 cts->sub = NULL;
2420
2422
2423 ast_test_validate(test, 1 == cts->consumer->complete);
2424 ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2425 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2426 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
2427 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
2428 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
2429 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
2430 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
2431 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2432
2433 return AST_TEST_PASS;
2434}
2435
2436static int unload_module(void)
2437{
2438 AST_TEST_UNREGISTER(message_type);
2440 AST_TEST_UNREGISTER(subscription_messages);
2441 AST_TEST_UNREGISTER(subscription_pool_messages);
2443 AST_TEST_UNREGISTER(publish_sync);
2444 AST_TEST_UNREGISTER(publish_pool);
2445 AST_TEST_UNREGISTER(unsubscribe_stops_messages);
2446 AST_TEST_UNREGISTER(forward);
2447 AST_TEST_UNREGISTER(cache_filter);
2449 AST_TEST_UNREGISTER(cache_dump);
2450 AST_TEST_UNREGISTER(cache_eid_aggregate);
2452 AST_TEST_UNREGISTER(router_pool);
2453 AST_TEST_UNREGISTER(router_cache_updates);
2454 AST_TEST_UNREGISTER(interleaving);
2455 AST_TEST_UNREGISTER(subscription_interleaving);
2456 AST_TEST_UNREGISTER(no_to_json);
2457 AST_TEST_UNREGISTER(to_json);
2458 AST_TEST_UNREGISTER(no_to_ami);
2460 AST_TEST_UNREGISTER(dtor_order);
2461 AST_TEST_UNREGISTER(caching_dtor_order);
2462 AST_TEST_UNREGISTER(type_filters);
2463 AST_TEST_UNREGISTER(formatter_filters);
2464 AST_TEST_UNREGISTER(combo_filters);
2465 return 0;
2466}
2467
2468static int load_module(void)
2469{
2470 AST_TEST_REGISTER(message_type);
2472 AST_TEST_REGISTER(subscription_messages);
2473 AST_TEST_REGISTER(subscription_pool_messages);
2475 AST_TEST_REGISTER(publish_sync);
2476 AST_TEST_REGISTER(publish_pool);
2477 AST_TEST_REGISTER(unsubscribe_stops_messages);
2478 AST_TEST_REGISTER(forward);
2479 AST_TEST_REGISTER(cache_filter);
2481 AST_TEST_REGISTER(cache_dump);
2482 AST_TEST_REGISTER(cache_eid_aggregate);
2484 AST_TEST_REGISTER(router_pool);
2485 AST_TEST_REGISTER(router_cache_updates);
2486 AST_TEST_REGISTER(interleaving);
2487 AST_TEST_REGISTER(subscription_interleaving);
2488 AST_TEST_REGISTER(no_to_json);
2489 AST_TEST_REGISTER(to_json);
2490 AST_TEST_REGISTER(no_to_ami);
2492 AST_TEST_REGISTER(dtor_order);
2493 AST_TEST_REGISTER(caching_dtor_order);
2494 AST_TEST_REGISTER(type_filters);
2495 AST_TEST_REGISTER(formatter_filters);
2496 AST_TEST_REGISTER(combo_filters);
2498}
2499
2500AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
2501 .support_level = AST_MODULE_SUPPORT_CORE,
2502 .load = load_module,
2503 .unload = unload_module
char * text
Definition: app_queue.c:1639
ast_mutex_t lock
Definition: app_sla.c:331
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_realloc(p, len)
A wrapper for realloc()
Definition: astmm.h:226
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char type[]
Definition: chan_ooh323.c:109
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
char * end
Definition: eagi_proxy.c:73
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
@ AST_EVENT_IE_END
Definition: event_defs.h:70
@ AST_EVENT_IE_DESCRIPTION
Description Used by: AST_EVENT_SUB, AST_EVENT_UNSUB Payload type: STR.
Definition: event_defs.h:265
@ AST_EVENT_CUSTOM
Definition: event_defs.h:36
@ AST_EVENT_IE_PLTYPE_STR
Definition: event_defs.h:328
static const char name[]
Definition: format_mp3.c:68
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
const char * mtype
Definition: http.c:151
#define LOG_ERROR
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
int ast_json_equal(const struct ast_json *lhs, const struct ast_json *rhs)
Compare two JSON objects.
Definition: json.c:357
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define ast_cond_signal(cond)
Definition: lock.h:203
#define EVENT_FLAG_TEST
Definition: manager.h:92
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10548
Asterisk module definitions.
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition: module.h:543
@ AST_MODULE_SUPPORT_CORE
Definition: module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
def info(msg)
static void * cleanup(void *unused)
Definition: pbx_realtime.c:124
struct ao2_container * cache
Definition: pbx_realtime.c:77
static struct stasis_message_router * router
struct stasis_forward * sub
Definition: res_corosync.c:240
unsigned char publish
Definition: res_corosync.c:241
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1053
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define stasis_subscribe_pool(topic, callback, data)
Definition: stasis.h:680
@ STASIS_SUBSCRIPTION_FORMATTER_EVENT
Definition: stasis.h:312
@ STASIS_SUBSCRIPTION_FORMATTER_AMI
Definition: stasis.h:311
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
Definition: stasis.h:310
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1169
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
Definition: stasis_cache.c:370
#define stasis_subscribe(topic, callback, data)
Definition: stasis.h:649
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
@ STASIS_MESSAGE_TYPE_ERROR
Definition: stasis.h:286
@ STASIS_MESSAGE_TYPE_SUCCESS
Definition: stasis.h:287
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
Definition: stasis_cache.c:375
#define stasis_message_router_create(topic)
Create a new message router object.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
#define stasis_message_router_create_pool(topic)
Create a new message router object.
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:813
An event.
Definition: event.c:81
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:502
size_t messages_rxed_len
Definition: test_stasis.c:165
ast_cond_t out
int complete
Definition: test_stasis.c:167
struct stasis_message ** messages_rxed
Definition: test_stasis.c:164
int ignore_subscriptions
Definition: test_stasis.c:166
struct stasis_subscription * sub
Definition: test_stasis.c:2142
struct stasis_topic * topic
Definition: test_stasis.c:2141
struct consumer * consumer
Definition: test_stasis.c:2140
Definition: search.h:40
Definition: stasis_cache.c:173
Cache update message.
Definition: stasis.h:965
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:969
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:971
Forwarding information.
Definition: stasis.c:1531
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
Virtual table providing methods for messages.
Definition: stasis.h:239
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
Definition: stasis.h:264
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
Definition: stasis.h:252
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
Definition: stasis.h:278
struct ast_eid eid
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_topic * topic
Definition: stasis.h:891
Sorcery object created based on backend data.
struct stasis_message_type * change
Definition: test_stasis.c:2062
struct stasis_message_type * event
Definition: test_stasis.c:2057
struct stasis_message_type * type3
Definition: test_stasis.c:2061
struct stasis_message_type * ami
Definition: test_stasis.c:2055
struct stasis_message_type * json
Definition: test_stasis.c:2056
struct stasis_message_type * amievent
Definition: test_stasis.c:2058
struct stasis_message_type * type2
Definition: test_stasis.c:2060
struct stasis_message_type * none
Definition: test_stasis.c:2054
struct stasis_message_type * type1
Definition: test_stasis.c:2059
int value
Definition: syslog.c:37
Test Framework API.
@ TEST_INIT
Definition: test.h:200
@ TEST_EXECUTE
Definition: test.h:201
#define AST_TEST_REGISTER(cb)
Definition: test.h:127
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
#define AST_TEST_UNREGISTER(cb)
Definition: test.h:128
ast_test_result_state
Definition: test.h:193
@ AST_TEST_PASS
Definition: test.h:195
@ AST_TEST_NOT_RUN
Definition: test.h:194
static struct test_val c
static void cache_test_data_dtor(void *obj)
Definition: test_stasis.c:867
AST_TEST_DEFINE(message_type)
Definition: test_stasis.c:77
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:204
static struct test_message_types * create_message_types(struct ast_test *test)
Definition: test_stasis.c:2080
static struct stasis_message * cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Definition: test_stasis.c:911
static int consumer_wait_for_completion(struct consumer *consumer)
Definition: test_stasis.c:267
static struct stasis_message * cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
Definition: test_stasis.c:875
static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type, const char *data)
Definition: test_stasis.c:2208
static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:226
static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
Definition: test_stasis.c:965
static void consumer_dtor(void *obj)
Definition: test_stasis.c:170
static struct ast_event * fake_event(struct stasis_message *message)
Definition: test_stasis.c:43
static const char * cache_test_data_id(struct stasis_message *message)
Definition: test_stasis.c:901
static struct ast_manager_event_blob * fake_ami(struct stasis_message *message)
Definition: test_stasis.c:56
static void destroy_message_types(void *obj)
Definition: test_stasis.c:2065
static struct ast_json * fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: test_stasis.c:49
#define test_category
Definition: test_stasis.c:41
static void destroy_cts(void *obj)
Definition: test_stasis.c:2145
static struct stasis_message * cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
Definition: test_stasis.c:896
static struct consumer * consumer_create(int ignore_subscriptions)
Definition: test_stasis.c:183
static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
Definition: test_stasis.c:2178
static const char * cache_simple(struct stasis_message *message)
Definition: test_stasis.c:1699
static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
Definition: test_stasis.c:288
static struct cts * create_cts(struct ast_test *test)
Definition: test_stasis.c:2154
static int load_module(void)
Definition: test_stasis.c:2468
static int unload_module(void)
Definition: test_stasis.c:2436
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
Definition: test_stasis.c:246
static const char * noop_get_id(struct stasis_message *message)
Definition: test_stasis.c:2001
static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
Definition: test_stasis.c:970
static void dump_consumer(struct ast_test *test, struct cts *cts)
Definition: test_stasis.c:2193
static void noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:1962
static struct stasis_message_vtable fake_vtable
Definition: test_stasis.c:72
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2282
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2297
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: utils.c:3094
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93