Asterisk - The Open Source Telephony Project  GIT-master-09303e8
test_stasis.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief Test Stasis message bus.
22  *
23  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
24  *
25  * \ingroup tests
26  */
27 
28 /*** MODULEINFO
29  <depend>TEST_FRAMEWORK</depend>
30  <support_level>core</support_level>
31  ***/
32 
33 #include "asterisk.h"
34 
35 #include "asterisk/astobj2.h"
36 #include "asterisk/module.h"
37 #include "asterisk/stasis.h"
39 #include "asterisk/test.h"
40 
41 #define test_category "/stasis/core/"
42 
43 static struct ast_event *fake_event(struct stasis_message *message)
44 {
47 }
48 
49 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
50 {
51  const char *text = stasis_message_data(message);
52 
53  return ast_json_string_create(text);
54 }
55 
57 {
59  const char *text = stasis_message_data(message);
60 
62  "Message: %s\r\n", text);
63 
64  if (res == NULL) {
65  return NULL;
66  }
67 
68  ao2_ref(res, +1);
69  return res;
70 }
71 
73  .to_json = fake_json,
74  .to_ami = fake_ami
75 };
76 
77 AST_TEST_DEFINE(message_type)
78 {
80 
81  switch (cmd) {
82  case TEST_INIT:
83  info->name = __func__;
84  info->category = test_category;
85  info->summary = "Test basic message_type functions";
86  info->description = "Test basic message_type functions";
87  return AST_TEST_NOT_RUN;
88  case TEST_EXECUTE:
89  break;
90  }
91 
93  ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
94  ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
95 
96  return AST_TEST_PASS;
97 }
98 
100 {
102  RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
103  RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
104  RAII_VAR(char *, data, NULL, ao2_cleanup);
105  char *expected = "SomeData";
106  struct timeval expected_timestamp;
107  struct timeval time_diff;
108  struct ast_eid foreign_eid;
109 
110  switch (cmd) {
111  case TEST_INIT:
112  info->name = __func__;
113  info->category = test_category;
114  info->summary = "Test basic message functions";
115  info->description = "Test basic message functions";
116  return AST_TEST_NOT_RUN;
117  case TEST_EXECUTE:
118  break;
119  }
120 
121 
122  memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
123 
124  ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
125 
126  ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
127  ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
128 
129  data = ao2_alloc(strlen(expected) + 1, NULL);
130  strcpy(data, expected);/* Safe */
131  expected_timestamp = ast_tvnow();
132  uut1 = stasis_message_create_full(type, data, &foreign_eid);
133  uut2 = stasis_message_create_full(type, data, NULL);
134 
135  ast_test_validate(test, NULL != uut1);
136  ast_test_validate(test, NULL != uut2);
137  ast_test_validate(test, type == stasis_message_type(uut1));
138  ast_test_validate(test, type == stasis_message_type(uut2));
139  ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
140  ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
141  ast_test_validate(test, NULL != stasis_message_eid(uut1));
142  ast_test_validate(test, NULL == stasis_message_eid(uut2));
143  ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
144 
145  ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
146 
147  time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
148  /* 10ms is certainly long enough for the two calls to complete */
149  ast_test_validate(test, time_diff.tv_sec == 0);
150  ast_test_validate(test, time_diff.tv_usec < 10000);
151 
152  ao2_ref(uut1, -1);
153  uut1 = NULL;
154  ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
155  ao2_ref(uut2, -1);
156  uut2 = NULL;
157  ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
158 
159  return AST_TEST_PASS;
160 }
161 
162 struct consumer {
163  ast_cond_t out;
167  int complete;
168 };
169 
170 static void consumer_dtor(void *obj)
171 {
172  struct consumer *consumer = obj;
173 
174  ast_cond_destroy(&consumer->out);
175 
176  while (consumer->messages_rxed_len > 0) {
177  ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
178  }
179  ast_free(consumer->messages_rxed);
180  consumer->messages_rxed = NULL;
181 }
182 
184 {
185  struct consumer *consumer;
186 
187  consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
188  if (!consumer) {
189  return NULL;
190  }
191 
193  consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
194  if (!consumer->messages_rxed) {
195  ao2_cleanup(consumer);
196  return NULL;
197  }
198 
199  ast_cond_init(&consumer->out, NULL);
200 
201  return consumer;
202 }
203 
204 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
205 {
206  struct consumer *consumer = data;
207  RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
208  SCOPED_AO2LOCK(lock, consumer);
209 
211  ++consumer->messages_rxed_len;
212  consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
213  ast_assert(consumer->messages_rxed != NULL);
214  consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
215  ao2_ref(message, +1);
216  }
217 
218  if (stasis_subscription_final_message(sub, message)) {
219  consumer->complete = 1;
220  consumer_needs_cleanup = consumer;
221  }
222 
223  ast_cond_signal(&consumer->out);
224 }
225 
226 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
227 {
228  struct consumer *consumer = data;
229  RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
230  SCOPED_AO2LOCK(lock, consumer);
231 
233  ++consumer->messages_rxed_len;
234  consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
235  ast_assert(consumer->messages_rxed != NULL);
236  consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
237  ao2_ref(message, +1);
238  }
239 
240  if (stasis_subscription_final_message(sub, message)) {
241  consumer->complete = 1;
242  consumer_needs_cleanup = consumer;
243  }
244 }
245 
246 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
247 {
248  struct timeval start = ast_tvnow();
249  struct timespec end = {
250  .tv_sec = start.tv_sec + 30,
251  .tv_nsec = start.tv_usec * 1000
252  };
253 
254  SCOPED_AO2LOCK(lock, consumer);
255 
256  while (consumer->messages_rxed_len < expected_len) {
257  int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
258 
259  if (r == ETIMEDOUT) {
260  break;
261  }
262  ast_assert(r == 0); /* Not expecting any othet types of errors */
263  }
264  return consumer->messages_rxed_len;
265 }
266 
268 {
269  struct timeval start = ast_tvnow();
270  struct timespec end = {
271  .tv_sec = start.tv_sec + 3,
272  .tv_nsec = start.tv_usec * 1000
273  };
274 
275  SCOPED_AO2LOCK(lock, consumer);
276 
277  while (!consumer->complete) {
278  int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
279 
280  if (r == ETIMEDOUT) {
281  break;
282  }
283  ast_assert(r == 0); /* Not expecting any othet types of errors */
284  }
285  return consumer->complete;
286 }
287 
288 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
289 {
290  struct timeval start = ast_tvnow();
291  struct timeval diff = {
292  .tv_sec = 0,
293  .tv_usec = 100000 /* wait for 100ms */
294  };
295  struct timeval end_tv = ast_tvadd(start, diff);
296  struct timespec end = {
297  .tv_sec = end_tv.tv_sec,
298  .tv_nsec = end_tv.tv_usec * 1000
299  };
300 
301  SCOPED_AO2LOCK(lock, consumer);
302 
303  while (consumer->messages_rxed_len == expected_len) {
304  int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
305 
306  if (r == ETIMEDOUT) {
307  break;
308  }
309  ast_assert(r == 0); /* Not expecting any othet types of errors */
310  }
311  return consumer->messages_rxed_len;
312 }
313 
314 AST_TEST_DEFINE(subscription_messages)
315 {
316  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
318  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
319  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
320  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
322  RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
323  int complete;
324  struct stasis_subscription_change *change;
325 
326  switch (cmd) {
327  case TEST_INIT:
328  info->name = __func__;
329  info->category = test_category;
330  info->summary = "Test subscribe/unsubscribe messages";
331  info->description = "Test subscribe/unsubscribe messages";
332  return AST_TEST_NOT_RUN;
333  case TEST_EXECUTE:
334  break;
335  }
336 
337  topic = stasis_topic_create("TestTopic");
338  ast_test_validate(test, NULL != topic);
339 
341  ast_test_validate(test, NULL != consumer);
342 
344  ast_test_validate(test, NULL != uut);
345  ao2_ref(consumer, +1);
346  expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
347 
348  uut = stasis_unsubscribe(uut);
350  ast_test_validate(test, 1 == complete);
351 
352  ast_test_validate(test, 2 == consumer->messages_rxed_len);
355 
357  ast_test_validate(test, topic == change->topic);
358  ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
359  ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
360 
362  ast_test_validate(test, topic == change->topic);
363  ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
364  ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
365 
366  return AST_TEST_PASS;
367 }
368 
369 AST_TEST_DEFINE(subscription_pool_messages)
370 {
373  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
374  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
375  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
377  RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
378  int complete;
379  struct stasis_subscription_change *change;
380 
381  switch (cmd) {
382  case TEST_INIT:
383  info->name = __func__;
384  info->category = test_category;
385  info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
386  info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
387  return AST_TEST_NOT_RUN;
388  case TEST_EXECUTE:
389  break;
390  }
391 
392  topic = stasis_topic_create("TestTopic");
393  ast_test_validate(test, NULL != topic);
394 
396  ast_test_validate(test, NULL != consumer);
397 
399  ast_test_validate(test, NULL != uut);
400  ao2_ref(consumer, +1);
401  expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
402 
403  uut = stasis_unsubscribe(uut);
405  ast_test_validate(test, 1 == complete);
406 
407  ast_test_validate(test, 2 == consumer->messages_rxed_len);
410 
412  ast_test_validate(test, topic == change->topic);
413  ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
414  ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
415 
417  ast_test_validate(test, topic == change->topic);
418  ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
419  ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
420 
421  return AST_TEST_PASS;
422 }
423 
425 {
428  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
429  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
430  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
432  int actual_len;
433  const char *actual;
434 
435  switch (cmd) {
436  case TEST_INIT:
437  info->name = __func__;
438  info->category = test_category;
439  info->summary = "Test publishing";
440  info->description = "Test publishing";
441  return AST_TEST_NOT_RUN;
442  case TEST_EXECUTE:
443  break;
444  }
445 
446  topic = stasis_topic_create("TestTopic");
447  ast_test_validate(test, NULL != topic);
448 
450  ast_test_validate(test, NULL != consumer);
451 
453  ast_test_validate(test, NULL != uut);
454  ao2_ref(consumer, +1);
455 
456  test_data = ao2_alloc(1, NULL);
457  ast_test_validate(test, NULL != test_data);
458  ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
459  test_message = stasis_message_create(test_message_type, test_data);
460 
461  stasis_publish(topic, test_message);
462 
463  actual_len = consumer_wait_for(consumer, 1);
464  ast_test_validate(test, 1 == actual_len);
466  ast_test_validate(test, test_data == actual);
467 
468  return AST_TEST_PASS;
469 }
470 
471 AST_TEST_DEFINE(publish_sync)
472 {
475  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
476  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
477  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
479  int actual_len;
480  const char *actual;
481 
482  switch (cmd) {
483  case TEST_INIT:
484  info->name = __func__;
485  info->category = test_category;
486  info->summary = "Test synchronous publishing";
487  info->description = "Test synchronous publishing";
488  return AST_TEST_NOT_RUN;
489  case TEST_EXECUTE:
490  break;
491  }
492 
493  topic = stasis_topic_create("TestTopic");
494  ast_test_validate(test, NULL != topic);
495 
497  ast_test_validate(test, NULL != consumer);
498 
500  ast_test_validate(test, NULL != uut);
501  ao2_ref(consumer, +1);
502 
503  test_data = ao2_alloc(1, NULL);
504  ast_test_validate(test, NULL != test_data);
505  ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
506  test_message = stasis_message_create(test_message_type, test_data);
507 
508  stasis_publish_sync(uut, test_message);
509 
510  actual_len = consumer->messages_rxed_len;
511  ast_test_validate(test, 1 == actual_len);
513  ast_test_validate(test, test_data == actual);
514 
515  return AST_TEST_PASS;
516 }
517 
518 AST_TEST_DEFINE(publish_pool)
519 {
522  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
523  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
524  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
526  int actual_len;
527  const char *actual;
528 
529  switch (cmd) {
530  case TEST_INIT:
531  info->name = __func__;
532  info->category = test_category;
533  info->summary = "Test publishing with a threadpool";
534  info->description = "Test publishing to a subscriber whose\n"
535  "subscription dictates messages are received through a\n"
536  "threadpool.";
537  return AST_TEST_NOT_RUN;
538  case TEST_EXECUTE:
539  break;
540  }
541 
542  topic = stasis_topic_create("TestTopic");
543  ast_test_validate(test, NULL != topic);
544 
546  ast_test_validate(test, NULL != consumer);
547 
549  ast_test_validate(test, NULL != uut);
550  ao2_ref(consumer, +1);
551 
552  test_data = ao2_alloc(1, NULL);
553  ast_test_validate(test, NULL != test_data);
554  ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
555  test_message = stasis_message_create(test_message_type, test_data);
556 
557  stasis_publish(topic, test_message);
558 
559  actual_len = consumer_wait_for(consumer, 1);
560  ast_test_validate(test, 1 == actual_len);
562  ast_test_validate(test, test_data == actual);
563 
564  return AST_TEST_PASS;
565 }
566 
567 AST_TEST_DEFINE(unsubscribe_stops_messages)
568 {
572  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
573  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
574  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
575  int actual_len;
576 
577  switch (cmd) {
578  case TEST_INIT:
579  info->name = __func__;
580  info->category = test_category;
581  info->summary = "Test simple subscriptions";
582  info->description = "Test simple subscriptions";
583  return AST_TEST_NOT_RUN;
584  case TEST_EXECUTE:
585  break;
586  }
587 
588  topic = stasis_topic_create("TestTopic");
589  ast_test_validate(test, NULL != topic);
590 
592  ast_test_validate(test, NULL != consumer);
593 
595  ast_test_validate(test, NULL != uut);
596  ao2_ref(consumer, +1);
597 
598  uut = stasis_unsubscribe(uut);
599 
600  test_data = ao2_alloc(1, NULL);
601  ast_test_validate(test, NULL != test_data);
602  ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
603  test_message = stasis_message_create(test_message_type, test_data);
604 
605  stasis_publish(topic, test_message);
606 
607  actual_len = consumer_should_stay(consumer, 0);
608  ast_test_validate(test, 0 == actual_len);
609 
610  return AST_TEST_PASS;
611 }
612 
614 {
615  RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
617 
618  RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
620 
621  RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
622  RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
624 
625  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
626  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
627  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
628  int actual_len;
629 
630  switch (cmd) {
631  case TEST_INIT:
632  info->name = __func__;
633  info->category = test_category;
634  info->summary = "Test sending events to a parent topic";
635  info->description = "Test sending events to a parent topic.\n"
636  "This test creates three topics (one parent, two children)\n"
637  "and publishes a message to one child, and verifies it's\n"
638  "only seen by that child and the parent";
639  return AST_TEST_NOT_RUN;
640  case TEST_EXECUTE:
641  break;
642  }
643 
644  parent_topic = stasis_topic_create("ParentTestTopic");
645  ast_test_validate(test, NULL != parent_topic);
646  topic = stasis_topic_create("TestTopic");
647  ast_test_validate(test, NULL != topic);
648 
649  forward_sub = stasis_forward_all(topic, parent_topic);
650  ast_test_validate(test, NULL != forward_sub);
651 
652  parent_consumer = consumer_create(1);
653  ast_test_validate(test, NULL != parent_consumer);
655  ast_test_validate(test, NULL != consumer);
656 
657  parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
658  ast_test_validate(test, NULL != parent_sub);
659  ao2_ref(parent_consumer, +1);
661  ast_test_validate(test, NULL != sub);
662  ao2_ref(consumer, +1);
663 
664  test_data = ao2_alloc(1, NULL);
665  ast_test_validate(test, NULL != test_data);
666  ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
667  test_message = stasis_message_create(test_message_type, test_data);
668 
669  stasis_publish(topic, test_message);
670 
671  actual_len = consumer_wait_for(consumer, 1);
672  ast_test_validate(test, 1 == actual_len);
673  actual_len = consumer_wait_for(parent_consumer, 1);
674  ast_test_validate(test, 1 == actual_len);
675 
676  return AST_TEST_PASS;
677 }
678 
679 AST_TEST_DEFINE(interleaving)
680 {
681  RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
682  RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
683  RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
684 
685  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
686 
687  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
688 
689  RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
690  RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
691  RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
692 
693  RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
694  RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
696 
698 
699  int actual_len;
700 
701  switch (cmd) {
702  case TEST_INIT:
703  info->name = __func__;
704  info->category = test_category;
705  info->summary = "Test sending interleaved events to a parent topic";
706  info->description = "Test sending events to a parent topic.\n"
707  "This test creates three topics (one parent, two children)\n"
708  "and publishes messages alternately between the children.\n"
709  "It verifies that the messages are received in the expected\n"
710  "order.";
711  return AST_TEST_NOT_RUN;
712  case TEST_EXECUTE:
713  break;
714  }
715 
716  ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
717  ast_test_validate(test, NULL != test_message_type);
718 
719  test_data = ao2_alloc(1, NULL);
720  ast_test_validate(test, NULL != test_data);
721 
722  test_message1 = stasis_message_create(test_message_type, test_data);
723  ast_test_validate(test, NULL != test_message1);
724  test_message2 = stasis_message_create(test_message_type, test_data);
725  ast_test_validate(test, NULL != test_message2);
726  test_message3 = stasis_message_create(test_message_type, test_data);
727  ast_test_validate(test, NULL != test_message3);
728 
729  parent_topic = stasis_topic_create("ParentTestTopic");
730  ast_test_validate(test, NULL != parent_topic);
731  topic1 = stasis_topic_create("Topic1");
732  ast_test_validate(test, NULL != topic1);
733  topic2 = stasis_topic_create("Topic2");
734  ast_test_validate(test, NULL != topic2);
735 
736  forward_sub1 = stasis_forward_all(topic1, parent_topic);
737  ast_test_validate(test, NULL != forward_sub1);
738  forward_sub2 = stasis_forward_all(topic2, parent_topic);
739  ast_test_validate(test, NULL != forward_sub2);
740 
742  ast_test_validate(test, NULL != consumer);
743 
744  sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
745  ast_test_validate(test, NULL != sub);
746  ao2_ref(consumer, +1);
747 
748  stasis_publish(topic1, test_message1);
749  stasis_publish(topic2, test_message2);
750  stasis_publish(topic1, test_message3);
751 
752  actual_len = consumer_wait_for(consumer, 3);
753  ast_test_validate(test, 3 == actual_len);
754 
755  ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
756  ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
757  ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
758 
759  return AST_TEST_PASS;
760 }
761 
762 AST_TEST_DEFINE(subscription_interleaving)
763 {
764  RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
765  RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
766  RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
767 
768  RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
769 
770  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
771 
772  RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
773  RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
774  RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
775 
776  RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
777  RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
780 
781  RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
782  RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
783 
784  int actual_len;
785 
786  switch (cmd) {
787  case TEST_INIT:
788  info->name = __func__;
789  info->category = test_category;
790  info->summary = "Test sending interleaved events to a parent topic with different subscribers";
791  info->description = "Test sending events to a parent topic.\n"
792  "This test creates three topics (one parent, two children)\n"
793  "and publishes messages alternately between the children.\n"
794  "It verifies that the messages are received in the expected\n"
795  "order, for different subscription types: one with a dedicated\n"
796  "thread, the other on the Stasis threadpool.";
797  return AST_TEST_NOT_RUN;
798  case TEST_EXECUTE:
799  break;
800  }
801 
802  ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
803  ast_test_validate(test, NULL != test_message_type);
804 
805  test_data = ao2_alloc(1, NULL);
806  ast_test_validate(test, NULL != test_data);
807 
808  test_message1 = stasis_message_create(test_message_type, test_data);
809  ast_test_validate(test, NULL != test_message1);
810  test_message2 = stasis_message_create(test_message_type, test_data);
811  ast_test_validate(test, NULL != test_message2);
812  test_message3 = stasis_message_create(test_message_type, test_data);
813  ast_test_validate(test, NULL != test_message3);
814 
815  parent_topic = stasis_topic_create("ParentTestTopic");
816  ast_test_validate(test, NULL != parent_topic);
817  topic1 = stasis_topic_create("Topic1");
818  ast_test_validate(test, NULL != topic1);
819  topic2 = stasis_topic_create("Topic2");
820  ast_test_validate(test, NULL != topic2);
821 
822  forward_sub1 = stasis_forward_all(topic1, parent_topic);
823  ast_test_validate(test, NULL != forward_sub1);
824  forward_sub2 = stasis_forward_all(topic2, parent_topic);
825  ast_test_validate(test, NULL != forward_sub2);
826 
827  consumer1 = consumer_create(1);
828  ast_test_validate(test, NULL != consumer1);
829 
830  consumer2 = consumer_create(1);
831  ast_test_validate(test, NULL != consumer2);
832 
833  sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
834  ast_test_validate(test, NULL != sub1);
835  ao2_ref(consumer1, +1);
836 
837  sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
838  ast_test_validate(test, NULL != sub2);
839  ao2_ref(consumer2, +1);
840 
841  stasis_publish(topic1, test_message1);
842  stasis_publish(topic2, test_message2);
843  stasis_publish(topic1, test_message3);
844 
845  actual_len = consumer_wait_for(consumer1, 3);
846  ast_test_validate(test, 3 == actual_len);
847 
848  actual_len = consumer_wait_for(consumer2, 3);
849  ast_test_validate(test, 3 == actual_len);
850 
851  ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
852  ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
853  ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
854 
855  ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
856  ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
857  ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
858 
859  return AST_TEST_PASS;
860 }
861 
863  char *id;
864  char *value;
865 };
866 
867 static void cache_test_data_dtor(void *obj)
868 {
869  struct cache_test_data *data = obj;
870 
871  ast_free(data->id);
872  ast_free(data->value);
873 }
874 
875 static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
876 {
878 
880  if (data == NULL) {
881  return NULL;
882  }
883 
884  ast_assert(name != NULL);
885  ast_assert(value != NULL);
886 
887  data->id = ast_strdup(name);
888  data->value = ast_strdup(value);
889  if (!data->id || !data->value) {
890  return NULL;
891  }
892 
893  return stasis_message_create_full(type, data, eid);
894 }
895 
896 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
897 {
898  return cache_test_message_create_full(type, name, value, &ast_eid_default);
899 }
900 
901 static const char *cache_test_data_id(struct stasis_message *message)
902 {
903  struct cache_test_data *cachable = stasis_message_data(message);
904 
905  if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
906  return NULL;
907  }
908  return cachable->id;
909 }
910 
912 {
913  struct stasis_message *aggregate_snapshot;
914  struct stasis_message *snapshot;
915  struct stasis_message_type *type = NULL;
916  struct cache_test_data *test_data = NULL;
917  int idx;
918  int accumulated = 0;
919  char aggregate_str[30];
920 
921  /* Accumulate the aggregate value. */
922  snapshot = stasis_cache_entry_get_local(entry);
923  if (snapshot) {
924  type = stasis_message_type(snapshot);
925  test_data = stasis_message_data(snapshot);
926  accumulated += atoi(test_data->value);
927  }
928  for (idx = 0; ; ++idx) {
929  snapshot = stasis_cache_entry_get_remote(entry, idx);
930  if (!snapshot) {
931  break;
932  }
933 
934  type = stasis_message_type(snapshot);
935  test_data = stasis_message_data(snapshot);
936  accumulated += atoi(test_data->value);
937  }
938 
939  if (!test_data) {
940  /* There are no test entries cached. Delete the aggregate. */
941  return NULL;
942  }
943 
944  snapshot = stasis_cache_entry_get_aggregate(entry);
945  if (snapshot) {
946  type = stasis_message_type(snapshot);
947  test_data = stasis_message_data(snapshot);
948  if (accumulated == atoi(test_data->value)) {
949  /* Aggregate test entry did not change. */
950  return ao2_bump(snapshot);
951  }
952  }
953 
954  snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
955  aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
956  if (!aggregate_snapshot) {
957  /* Bummer. We have to keep the old aggregate snapshot. */
958  ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
959  return ao2_bump(snapshot);
960  }
961 
962  return aggregate_snapshot;
963 }
964 
965 static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
966 {
967  stasis_publish(topic, aggregate);
968 }
969 
970 static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
971 {
972  RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
973  struct cache_test_data *test_data;
974 
975  aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
976  if (!aggregate) {
977  /* No aggregate, return true if given no value. */
978  return !value;
979  }
980 
981  /* Return true if the given value matches the aggregate value. */
982  test_data = stasis_message_data(aggregate);
983  return value && !strcmp(value, test_data->value);
984 }
985 
986 AST_TEST_DEFINE(cache_filter)
987 {
988  RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
989  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
994  RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
995  int actual_len;
996 
997  switch (cmd) {
998  case TEST_INIT:
999  info->name = __func__;
1000  info->category = test_category;
1001  info->summary = "Test caching topics only forward cache_update messages.";
1002  info->description = "Test caching topics only forward cache_update messages.";
1003  return AST_TEST_NOT_RUN;
1004  case TEST_EXECUTE:
1005  break;
1006  }
1007 
1008  ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1009  ast_test_validate(test, NULL != non_cache_type);
1010  topic = stasis_topic_create("SomeTopic");
1011  ast_test_validate(test, NULL != topic);
1013  ast_test_validate(test, NULL != cache);
1014  caching_topic = stasis_caching_topic_create(topic, cache);
1015  ast_test_validate(test, NULL != caching_topic);
1017  ast_test_validate(test, NULL != consumer);
1019  ast_test_validate(test, NULL != sub);
1020  ao2_ref(consumer, +1);
1021 
1022  test_message = cache_test_message_create(non_cache_type, "1", "1");
1023  ast_test_validate(test, NULL != test_message);
1024 
1025  stasis_publish(topic, test_message);
1026 
1027  actual_len = consumer_should_stay(consumer, 0);
1028  ast_test_validate(test, 0 == actual_len);
1029 
1030  return AST_TEST_PASS;
1031 }
1032 
1034 {
1035  RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1036  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1038  RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1039  RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1041  RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1042  RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1043  RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1044  RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1045  int actual_len;
1046  struct stasis_cache_update *actual_update;
1047 
1048  switch (cmd) {
1049  case TEST_INIT:
1050  info->name = __func__;
1051  info->category = test_category;
1052  info->summary = "Test passing messages through cache topic unscathed.";
1053  info->description = "Test passing messages through cache topic unscathed.";
1054  return AST_TEST_NOT_RUN;
1055  case TEST_EXECUTE:
1056  break;
1057  }
1058 
1059  ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1060  ast_test_validate(test, NULL != cache_type);
1061  topic = stasis_topic_create("SomeTopic");
1062  ast_test_validate(test, NULL != topic);
1064  ast_test_validate(test, NULL != cache);
1065  caching_topic = stasis_caching_topic_create(topic, cache);
1066  ast_test_validate(test, NULL != caching_topic);
1068  ast_test_validate(test, NULL != consumer);
1070  ast_test_validate(test, NULL != sub);
1071  ao2_ref(consumer, +1);
1072 
1073  test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1074  ast_test_validate(test, NULL != test_message1_1);
1075  test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1076  ast_test_validate(test, NULL != test_message2_1);
1077 
1078  /* Post a couple of snapshots */
1079  stasis_publish(topic, test_message1_1);
1080  stasis_publish(topic, test_message2_1);
1081  actual_len = consumer_wait_for(consumer, 2);
1082  ast_test_validate(test, 2 == actual_len);
1083 
1084  /* Check for new snapshot messages */
1086  actual_update = stasis_message_data(consumer->messages_rxed[0]);
1087  ast_test_validate(test, NULL == actual_update->old_snapshot);
1088  ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
1089  ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
1090  /* stasis_cache_get returned a ref, so unref test_message1_1 */
1091  ao2_ref(test_message1_1, -1);
1092 
1094  actual_update = stasis_message_data(consumer->messages_rxed[1]);
1095  ast_test_validate(test, NULL == actual_update->old_snapshot);
1096  ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
1097  ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
1098  /* stasis_cache_get returned a ref, so unref test_message2_1 */
1099  ao2_ref(test_message2_1, -1);
1100 
1101  /* Update snapshot 2 */
1102  test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1103  ast_test_validate(test, NULL != test_message2_2);
1104  stasis_publish(topic, test_message2_2);
1105 
1106  actual_len = consumer_wait_for(consumer, 3);
1107  ast_test_validate(test, 3 == actual_len);
1108 
1109  actual_update = stasis_message_data(consumer->messages_rxed[2]);
1110  ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
1111  ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
1112  ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
1113  /* stasis_cache_get returned a ref, so unref test_message2_2 */
1114  ao2_ref(test_message2_2, -1);
1115 
1116  /* Clear snapshot 1 */
1117  test_message1_clear = stasis_cache_clear_create(test_message1_1);
1118  ast_test_validate(test, NULL != test_message1_clear);
1119  stasis_publish(topic, test_message1_clear);
1120 
1121  actual_len = consumer_wait_for(consumer, 4);
1122  ast_test_validate(test, 4 == actual_len);
1123 
1124  actual_update = stasis_message_data(consumer->messages_rxed[3]);
1125  ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
1126  ast_test_validate(test, NULL == actual_update->new_snapshot);
1127  ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
1128 
1129  return AST_TEST_PASS;
1130 }
1131 
1132 AST_TEST_DEFINE(cache_dump)
1133 {
1134  RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1135  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1137  RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1138  RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1140  RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1141  RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1142  RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1143  RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1144  RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1145  int actual_len;
1146  struct ao2_iterator i;
1147  void *obj;
1148 
1149  switch (cmd) {
1150  case TEST_INIT:
1151  info->name = __func__;
1152  info->category = test_category;
1153  info->summary = "Test cache dump routines.";
1154  info->description = "Test cache dump routines.";
1155  return AST_TEST_NOT_RUN;
1156  case TEST_EXECUTE:
1157  break;
1158  }
1159 
1160  ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1161  ast_test_validate(test, NULL != cache_type);
1162  topic = stasis_topic_create("SomeTopic");
1163  ast_test_validate(test, NULL != topic);
1165  ast_test_validate(test, NULL != cache);
1166  caching_topic = stasis_caching_topic_create(topic, cache);
1167  ast_test_validate(test, NULL != caching_topic);
1169  ast_test_validate(test, NULL != consumer);
1171  ast_test_validate(test, NULL != sub);
1172  ao2_ref(consumer, +1);
1173 
1174  test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1175  ast_test_validate(test, NULL != test_message1_1);
1176  test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1177  ast_test_validate(test, NULL != test_message2_1);
1178 
1179  /* Post a couple of snapshots */
1180  stasis_publish(topic, test_message1_1);
1181  stasis_publish(topic, test_message2_1);
1182  actual_len = consumer_wait_for(consumer, 2);
1183  ast_test_validate(test, 2 == actual_len);
1184 
1185  /* Check the cache */
1186  ao2_cleanup(cache_dump);
1187  cache_dump = stasis_cache_dump(cache, NULL);
1188  ast_test_validate(test, NULL != cache_dump);
1189  ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1190  i = ao2_iterator_init(cache_dump, 0);
1191  while ((obj = ao2_iterator_next(&i))) {
1192  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1193  ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1194  }
1196 
1197  /* Update snapshot 2 */
1198  test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1199  ast_test_validate(test, NULL != test_message2_2);
1200  stasis_publish(topic, test_message2_2);
1201 
1202  actual_len = consumer_wait_for(consumer, 3);
1203  ast_test_validate(test, 3 == actual_len);
1204 
1205  /* Check the cache */
1206  ao2_cleanup(cache_dump);
1207  cache_dump = stasis_cache_dump(cache, NULL);
1208  ast_test_validate(test, NULL != cache_dump);
1209  ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1210  i = ao2_iterator_init(cache_dump, 0);
1211  while ((obj = ao2_iterator_next(&i))) {
1212  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1213  ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1214  }
1216 
1217  /* Clear snapshot 1 */
1218  test_message1_clear = stasis_cache_clear_create(test_message1_1);
1219  ast_test_validate(test, NULL != test_message1_clear);
1220  stasis_publish(topic, test_message1_clear);
1221 
1222  actual_len = consumer_wait_for(consumer, 4);
1223  ast_test_validate(test, 4 == actual_len);
1224 
1225  /* Check the cache */
1226  ao2_cleanup(cache_dump);
1227  cache_dump = stasis_cache_dump(cache, NULL);
1228  ast_test_validate(test, NULL != cache_dump);
1229  ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1230  i = ao2_iterator_init(cache_dump, 0);
1231  while ((obj = ao2_iterator_next(&i))) {
1232  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1233  ast_test_validate(test, actual_cache_entry == test_message2_2);
1234  }
1236 
1237  /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
1238  ao2_cleanup(cache_dump);
1240  ast_test_validate(test, 0 == ao2_container_count(cache_dump));
1241 
1242  return AST_TEST_PASS;
1243 }
1244 
1245 AST_TEST_DEFINE(cache_eid_aggregate)
1246 {
1247  RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1248  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1250  RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1251  RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
1252  RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
1253  RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
1254  RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
1255  RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1256  RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1257  RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1258  RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
1259  RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
1260  RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1261  RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
1262  RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1263  int actual_len;
1264  struct ao2_iterator i;
1265  void *obj;
1266  struct ast_eid foreign_eid1;
1267  struct ast_eid foreign_eid2;
1268 
1269  switch (cmd) {
1270  case TEST_INIT:
1271  info->name = __func__;
1272  info->category = test_category;
1273  info->summary = "Test cache eid and aggregate support.";
1274  info->description = "Test cache eid and aggregate support.";
1275  return AST_TEST_NOT_RUN;
1276  case TEST_EXECUTE:
1277  break;
1278  }
1279 
1280  memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
1281  memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
1282 
1283  ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1284  ast_test_validate(test, NULL != cache_type);
1285 
1286  topic = stasis_topic_create("SomeTopic");
1287  ast_test_validate(test, NULL != topic);
1288 
1289  /* To consume events published to the topic. */
1290  topic_consumer = consumer_create(1);
1291  ast_test_validate(test, NULL != topic_consumer);
1292 
1293  topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1294  ast_test_validate(test, NULL != topic_sub);
1295  ao2_ref(topic_consumer, +1);
1296 
1299  ast_test_validate(test, NULL != cache);
1300 
1301  caching_topic = stasis_caching_topic_create(topic, cache);
1302  ast_test_validate(test, NULL != caching_topic);
1303 
1304  /* To consume update events published to the caching_topic. */
1305  cache_consumer = consumer_create(1);
1306  ast_test_validate(test, NULL != cache_consumer);
1307 
1308  cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
1309  ast_test_validate(test, NULL != cache_sub);
1310  ao2_ref(cache_consumer, +1);
1311 
1312  /* Create test messages. */
1313  test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
1314  ast_test_validate(test, NULL != test_message1_1);
1315  test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
1316  ast_test_validate(test, NULL != test_message2_1);
1317  test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
1318  ast_test_validate(test, NULL != test_message2_2);
1319  test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
1320  ast_test_validate(test, NULL != test_message2_3);
1321  test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
1322  ast_test_validate(test, NULL != test_message2_4);
1323 
1324  /* Post some snapshots */
1325  stasis_publish(topic, test_message1_1);
1326  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
1327  stasis_publish(topic, test_message2_1);
1328  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
1329  stasis_publish(topic, test_message2_2);
1330  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
1331 
1332  actual_len = consumer_wait_for(cache_consumer, 6);
1333  ast_test_validate(test, 6 == actual_len);
1334  actual_len = consumer_wait_for(topic_consumer, 6);
1335  ast_test_validate(test, 6 == actual_len);
1336 
1337  /* Check the cache */
1338  ao2_cleanup(cache_dump);
1339  cache_dump = stasis_cache_dump_all(cache, NULL);
1340  ast_test_validate(test, NULL != cache_dump);
1341  ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1342  i = ao2_iterator_init(cache_dump, 0);
1343  while ((obj = ao2_iterator_next(&i))) {
1344  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1345 
1346  ast_test_validate(test,
1347  actual_cache_entry == test_message1_1
1348  || actual_cache_entry == test_message2_1
1349  || actual_cache_entry == test_message2_2);
1350  }
1352 
1353  /* Check the local cached items */
1354  ao2_cleanup(cache_dump);
1356  ast_test_validate(test, NULL != cache_dump);
1357  ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1358  i = ao2_iterator_init(cache_dump, 0);
1359  while ((obj = ao2_iterator_next(&i))) {
1360  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1361 
1362  ast_test_validate(test,
1363  actual_cache_entry == test_message1_1
1364  || actual_cache_entry == test_message2_1);
1365  }
1367 
1368  /* Post snapshot 2 from another eid. */
1369  stasis_publish(topic, test_message2_3);
1370  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
1371 
1372  actual_len = consumer_wait_for(cache_consumer, 8);
1373  ast_test_validate(test, 8 == actual_len);
1374  actual_len = consumer_wait_for(topic_consumer, 8);
1375  ast_test_validate(test, 8 == actual_len);
1376 
1377  /* Check the cache */
1378  ao2_cleanup(cache_dump);
1379  cache_dump = stasis_cache_dump_all(cache, NULL);
1380  ast_test_validate(test, NULL != cache_dump);
1381  ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1382  i = ao2_iterator_init(cache_dump, 0);
1383  while ((obj = ao2_iterator_next(&i))) {
1384  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1385 
1386  ast_test_validate(test,
1387  actual_cache_entry == test_message1_1
1388  || actual_cache_entry == test_message2_1
1389  || actual_cache_entry == test_message2_2
1390  || actual_cache_entry == test_message2_3);
1391  }
1393 
1394  /* Check the remote cached items */
1395  ao2_cleanup(cache_dump);
1396  cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
1397  ast_test_validate(test, NULL != cache_dump);
1398  ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1399  i = ao2_iterator_init(cache_dump, 0);
1400  while ((obj = ao2_iterator_next(&i))) {
1401  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1402 
1403  ast_test_validate(test, actual_cache_entry == test_message2_2);
1404  }
1406 
1407  /* Post snapshot 2 from a repeated eid. */
1408  stasis_publish(topic, test_message2_4);
1409  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
1410 
1411  actual_len = consumer_wait_for(cache_consumer, 10);
1412  ast_test_validate(test, 10 == actual_len);
1413  actual_len = consumer_wait_for(topic_consumer, 10);
1414  ast_test_validate(test, 10 == actual_len);
1415 
1416  /* Check the cache */
1417  ao2_cleanup(cache_dump);
1418  cache_dump = stasis_cache_dump_all(cache, NULL);
1419  ast_test_validate(test, NULL != cache_dump);
1420  ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1421  i = ao2_iterator_init(cache_dump, 0);
1422  while ((obj = ao2_iterator_next(&i))) {
1423  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1424 
1425  ast_test_validate(test,
1426  actual_cache_entry == test_message1_1
1427  || actual_cache_entry == test_message2_1
1428  || actual_cache_entry == test_message2_2
1429  || actual_cache_entry == test_message2_4);
1430  }
1432 
1433  /* Check all snapshot 2 cache entries. */
1434  ao2_cleanup(cache_dump);
1435  cache_dump = stasis_cache_get_all(cache, cache_type, "2");
1436  ast_test_validate(test, NULL != cache_dump);
1437  ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1438  i = ao2_iterator_init(cache_dump, 0);
1439  while ((obj = ao2_iterator_next(&i))) {
1440  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1441 
1442  ast_test_validate(test,
1443  actual_cache_entry == test_message2_1
1444  || actual_cache_entry == test_message2_2
1445  || actual_cache_entry == test_message2_4);
1446  }
1448 
1449  /* Clear snapshot 1 */
1450  test_message1_clear = stasis_cache_clear_create(test_message1_1);
1451  ast_test_validate(test, NULL != test_message1_clear);
1452  stasis_publish(topic, test_message1_clear);
1453  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
1454 
1455  actual_len = consumer_wait_for(cache_consumer, 12);
1456  ast_test_validate(test, 12 == actual_len);
1457  actual_len = consumer_wait_for(topic_consumer, 11);
1458  ast_test_validate(test, 11 == actual_len);
1459 
1460  /* Check the cache */
1461  ao2_cleanup(cache_dump);
1462  cache_dump = stasis_cache_dump_all(cache, NULL);
1463  ast_test_validate(test, NULL != cache_dump);
1464  ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1465  i = ao2_iterator_init(cache_dump, 0);
1466  while ((obj = ao2_iterator_next(&i))) {
1467  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1468 
1469  ast_test_validate(test,
1470  actual_cache_entry == test_message2_1
1471  || actual_cache_entry == test_message2_2
1472  || actual_cache_entry == test_message2_4);
1473  }
1475 
1476  /* Clear snapshot 2 from a remote eid */
1477  test_message2_clear = stasis_cache_clear_create(test_message2_2);
1478  ast_test_validate(test, NULL != test_message2_clear);
1479  stasis_publish(topic, test_message2_clear);
1480  ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
1481 
1482  actual_len = consumer_wait_for(cache_consumer, 14);
1483  ast_test_validate(test, 14 == actual_len);
1484  actual_len = consumer_wait_for(topic_consumer, 13);
1485  ast_test_validate(test, 13 == actual_len);
1486 
1487  /* Check the cache */
1488  ao2_cleanup(cache_dump);
1489  cache_dump = stasis_cache_dump_all(cache, NULL);
1490  ast_test_validate(test, NULL != cache_dump);
1491  ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1492  i = ao2_iterator_init(cache_dump, 0);
1493  while ((obj = ao2_iterator_next(&i))) {
1494  RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1495 
1496  ast_test_validate(test,
1497  actual_cache_entry == test_message2_1
1498  || actual_cache_entry == test_message2_4);
1499  }
1501 
1502  return AST_TEST_PASS;
1503 }
1504 
1506 {
1507  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1509  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1510  RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1511  RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1512  RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1513  RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1514  RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1515  RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1516  RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1517  RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1518  RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1519  int actual_len, ret;
1520  struct stasis_message *actual;
1521 
1522  switch (cmd) {
1523  case TEST_INIT:
1524  info->name = __func__;
1525  info->category = test_category;
1526  info->summary = "Test simple message routing";
1527  info->description = "Test simple message routing";
1528  return AST_TEST_NOT_RUN;
1529  case TEST_EXECUTE:
1530  break;
1531  }
1532 
1533  topic = stasis_topic_create("TestTopic");
1534  ast_test_validate(test, NULL != topic);
1535 
1536  consumer1 = consumer_create(1);
1537  ast_test_validate(test, NULL != consumer1);
1538  consumer2 = consumer_create(1);
1539  ast_test_validate(test, NULL != consumer2);
1540  consumer3 = consumer_create(1);
1541  ast_test_validate(test, NULL != consumer3);
1542 
1543  ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1544  ast_test_validate(test, NULL != test_message_type1);
1545  ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1546  ast_test_validate(test, NULL != test_message_type2);
1547  ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1548  ast_test_validate(test, NULL != test_message_type3);
1549 
1550  uut = stasis_message_router_create(topic);
1551  ast_test_validate(test, NULL != uut);
1552 
1554  uut, test_message_type1, consumer_exec, consumer1);
1555  ast_test_validate(test, 0 == ret);
1556  ao2_ref(consumer1, +1);
1558  uut, test_message_type2, consumer_exec, consumer2);
1559  ast_test_validate(test, 0 == ret);
1560  ao2_ref(consumer2, +1);
1561  ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1562  ast_test_validate(test, 0 == ret);
1563  ao2_ref(consumer3, +1);
1564 
1565  test_data = ao2_alloc(1, NULL);
1566  ast_test_validate(test, NULL != test_data);
1567  test_message1 = stasis_message_create(test_message_type1, test_data);
1568  ast_test_validate(test, NULL != test_message1);
1569  test_message2 = stasis_message_create(test_message_type2, test_data);
1570  ast_test_validate(test, NULL != test_message2);
1571  test_message3 = stasis_message_create(test_message_type3, test_data);
1572  ast_test_validate(test, NULL != test_message3);
1573 
1574  stasis_publish(topic, test_message1);
1575  stasis_publish(topic, test_message2);
1576  stasis_publish(topic, test_message3);
1577 
1578  actual_len = consumer_wait_for(consumer1, 1);
1579  ast_test_validate(test, 1 == actual_len);
1580  actual_len = consumer_wait_for(consumer2, 1);
1581  ast_test_validate(test, 1 == actual_len);
1582  actual_len = consumer_wait_for(consumer3, 1);
1583  ast_test_validate(test, 1 == actual_len);
1584 
1585  actual = consumer1->messages_rxed[0];
1586  ast_test_validate(test, test_message1 == actual);
1587 
1588  actual = consumer2->messages_rxed[0];
1589  ast_test_validate(test, test_message2 == actual);
1590 
1591  actual = consumer3->messages_rxed[0];
1592  ast_test_validate(test, test_message3 == actual);
1593 
1594  /* consumer1 and consumer2 do not get the final message. */
1595  ao2_cleanup(consumer1);
1596  ao2_cleanup(consumer2);
1597 
1598  return AST_TEST_PASS;
1599 }
1600 
1601 AST_TEST_DEFINE(router_pool)
1602 {
1603  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1605  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1606  RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1607  RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1608  RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1609  RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1610  RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1611  RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1612  RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1613  RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1614  RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1615  int actual_len, ret;
1616  struct stasis_message *actual;
1617 
1618  switch (cmd) {
1619  case TEST_INIT:
1620  info->name = __func__;
1621  info->category = test_category;
1622  info->summary = "Test message routing via threadpool";
1623  info->description = "Test simple message routing when\n"
1624  "the subscriptions dictate usage of the Stasis\n"
1625  "threadpool.";
1626  return AST_TEST_NOT_RUN;
1627  case TEST_EXECUTE:
1628  break;
1629  }
1630 
1631  topic = stasis_topic_create("TestTopic");
1632  ast_test_validate(test, NULL != topic);
1633 
1634  consumer1 = consumer_create(1);
1635  ast_test_validate(test, NULL != consumer1);
1636  consumer2 = consumer_create(1);
1637  ast_test_validate(test, NULL != consumer2);
1638  consumer3 = consumer_create(1);
1639  ast_test_validate(test, NULL != consumer3);
1640 
1641  ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1642  ast_test_validate(test, NULL != test_message_type1);
1643  ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1644  ast_test_validate(test, NULL != test_message_type2);
1645  ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1646  ast_test_validate(test, NULL != test_message_type3);
1647 
1648  uut = stasis_message_router_create_pool(topic);
1649  ast_test_validate(test, NULL != uut);
1650 
1652  uut, test_message_type1, consumer_exec, consumer1);
1653  ast_test_validate(test, 0 == ret);
1654  ao2_ref(consumer1, +1);
1656  uut, test_message_type2, consumer_exec, consumer2);
1657  ast_test_validate(test, 0 == ret);
1658  ao2_ref(consumer2, +1);
1659  ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1660  ast_test_validate(test, 0 == ret);
1661  ao2_ref(consumer3, +1);
1662 
1663  test_data = ao2_alloc(1, NULL);
1664  ast_test_validate(test, NULL != test_data);
1665  test_message1 = stasis_message_create(test_message_type1, test_data);
1666  ast_test_validate(test, NULL != test_message1);
1667  test_message2 = stasis_message_create(test_message_type2, test_data);
1668  ast_test_validate(test, NULL != test_message2);
1669  test_message3 = stasis_message_create(test_message_type3, test_data);
1670  ast_test_validate(test, NULL != test_message3);
1671 
1672  stasis_publish(topic, test_message1);
1673  stasis_publish(topic, test_message2);
1674  stasis_publish(topic, test_message3);
1675 
1676  actual_len = consumer_wait_for(consumer1, 1);
1677  ast_test_validate(test, 1 == actual_len);
1678  actual_len = consumer_wait_for(consumer2, 1);
1679  ast_test_validate(test, 1 == actual_len);
1680  actual_len = consumer_wait_for(consumer3, 1);
1681  ast_test_validate(test, 1 == actual_len);
1682 
1683  actual = consumer1->messages_rxed[0];
1684  ast_test_validate(test, test_message1 == actual);
1685 
1686  actual = consumer2->messages_rxed[0];
1687  ast_test_validate(test, test_message2 == actual);
1688 
1689  actual = consumer3->messages_rxed[0];
1690  ast_test_validate(test, test_message3 == actual);
1691 
1692  /* consumer1 and consumer2 do not get the final message. */
1693  ao2_cleanup(consumer1);
1694  ao2_cleanup(consumer2);
1695 
1696  return AST_TEST_PASS;
1697 }
1698 
1699 static const char *cache_simple(struct stasis_message *message)
1700 {
1701  const char *type_name =
1703  if (!ast_begins_with(type_name, "Cache")) {
1704  return NULL;
1705  }
1706 
1707  return "cached";
1708 }
1709 
1710 AST_TEST_DEFINE(router_cache_updates)
1711 {
1712  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1715  RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1716  RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1717  RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1719  RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1720  RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1721  RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1722  RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1723  RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1724  RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1725  RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1726  RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1727  RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1728  struct stasis_cache_update *update;
1729  int actual_len, ret;
1730  struct stasis_message *actual;
1731 
1732  switch (cmd) {
1733  case TEST_INIT:
1734  info->name = __func__;
1735  info->category = test_category;
1736  info->summary = "Test special handling cache_update messages";
1737  info->description = "Test special handling cache_update messages";
1738  return AST_TEST_NOT_RUN;
1739  case TEST_EXECUTE:
1740  break;
1741  }
1742 
1743  topic = stasis_topic_create("TestTopic");
1744  ast_test_validate(test, NULL != topic);
1745 
1747  ast_test_validate(test, NULL != cache);
1748  caching_topic = stasis_caching_topic_create(topic, cache);
1749  ast_test_validate(test, NULL != caching_topic);
1750 
1751  consumer1 = consumer_create(1);
1752  ast_test_validate(test, NULL != consumer1);
1753  consumer2 = consumer_create(1);
1754  ast_test_validate(test, NULL != consumer2);
1755  consumer3 = consumer_create(1);
1756  ast_test_validate(test, NULL != consumer3);
1757 
1758  ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1759  ast_test_validate(test, NULL != test_message_type1);
1760  ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1761  ast_test_validate(test, NULL != test_message_type2);
1762  ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1763  ast_test_validate(test, NULL != test_message_type3);
1764 
1766  stasis_caching_get_topic(caching_topic));
1767  ast_test_validate(test, NULL != uut);
1768 
1770  uut, test_message_type1, consumer_exec, consumer1);
1771  ast_test_validate(test, 0 == ret);
1772  ao2_ref(consumer1, +1);
1774  uut, stasis_cache_update_type(), consumer_exec, consumer2);
1775  ast_test_validate(test, 0 == ret);
1776  ao2_ref(consumer2, +1);
1777  ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1778  ast_test_validate(test, 0 == ret);
1779  ao2_ref(consumer3, +1);
1780 
1781  test_data = ao2_alloc(1, NULL);
1782  ast_test_validate(test, NULL != test_data);
1783  test_message1 = stasis_message_create(test_message_type1, test_data);
1784  ast_test_validate(test, NULL != test_message1);
1785  test_message2 = stasis_message_create(test_message_type2, test_data);
1786  ast_test_validate(test, NULL != test_message2);
1787  test_message3 = stasis_message_create(test_message_type3, test_data);
1788  ast_test_validate(test, NULL != test_message3);
1789 
1790  stasis_publish(topic, test_message1);
1791  stasis_publish(topic, test_message2);
1792  stasis_publish(topic, test_message3);
1793 
1794  actual_len = consumer_wait_for(consumer1, 1);
1795  ast_test_validate(test, 1 == actual_len);
1796  actual_len = consumer_wait_for(consumer2, 1);
1797  ast_test_validate(test, 1 == actual_len);
1798  /* Uncacheable message should not be passed through */
1799  actual_len = consumer_should_stay(consumer3, 0);
1800  ast_test_validate(test, 0 == actual_len);
1801 
1802  actual = consumer1->messages_rxed[0];
1803  ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1804  update = stasis_message_data(actual);
1805  ast_test_validate(test, test_message_type1 == update->type);
1806  ast_test_validate(test, test_message1 == update->new_snapshot);
1807 
1808  actual = consumer2->messages_rxed[0];
1809  ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1810  update = stasis_message_data(actual);
1811  ast_test_validate(test, test_message_type2 == update->type);
1812  ast_test_validate(test, test_message2 == update->new_snapshot);
1813 
1814  /* consumer1 and consumer2 do not get the final message. */
1815  ao2_cleanup(consumer1);
1816  ao2_cleanup(consumer2);
1817 
1818  return AST_TEST_PASS;
1819 }
1820 
1821 AST_TEST_DEFINE(no_to_json)
1822 {
1824  RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1825  RAII_VAR(char *, data, NULL, ao2_cleanup);
1826  RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1827  char *expected = "SomeData";
1828 
1829  switch (cmd) {
1830  case TEST_INIT:
1831  info->name = __func__;
1832  info->category = test_category;
1833  info->summary = "Test message to_json function";
1834  info->description = "Test message to_json function";
1835  return AST_TEST_NOT_RUN;
1836  case TEST_EXECUTE:
1837  break;
1838  }
1839 
1840  /* Test NULL */
1841  actual = stasis_message_to_json(NULL, NULL);
1842  ast_test_validate(test, NULL == actual);
1843 
1844  /* Test message with NULL to_json function */
1845  ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1846 
1847  data = ao2_alloc(strlen(expected) + 1, NULL);
1848  strcpy(data, expected);
1850  ast_test_validate(test, NULL != uut);
1851 
1852  actual = stasis_message_to_json(uut, NULL);
1853  ast_test_validate(test, NULL == actual);
1854 
1855  return AST_TEST_PASS;
1856 }
1857 
1859 {
1861  RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1862  RAII_VAR(char *, data, NULL, ao2_cleanup);
1863  RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1864  const char *expected_text = "SomeData";
1865  RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1866 
1867  switch (cmd) {
1868  case TEST_INIT:
1869  info->name = __func__;
1870  info->category = test_category;
1871  info->summary = "Test message to_json function when NULL";
1872  info->description = "Test message to_json function when NULL";
1873  return AST_TEST_NOT_RUN;
1874  case TEST_EXECUTE:
1875  break;
1876  }
1877 
1878  ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1879 
1880  data = ao2_alloc(strlen(expected_text) + 1, NULL);
1881  strcpy(data, expected_text);
1883  ast_test_validate(test, NULL != uut);
1884 
1885  expected = ast_json_string_create(expected_text);
1886  actual = stasis_message_to_json(uut, NULL);
1887  ast_test_validate(test, ast_json_equal(expected, actual));
1888 
1889  return AST_TEST_PASS;
1890 }
1891 
1893 {
1895  RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1896  RAII_VAR(char *, data, NULL, ao2_cleanup);
1897  RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1898  char *expected = "SomeData";
1899 
1900  switch (cmd) {
1901  case TEST_INIT:
1902  info->name = __func__;
1903  info->category = test_category;
1904  info->summary = "Test message to_ami function when NULL";
1905  info->description = "Test message to_ami function when NULL";
1906  return AST_TEST_NOT_RUN;
1907  case TEST_EXECUTE:
1908  break;
1909  }
1910 
1911  /* Test NULL */
1912  actual = stasis_message_to_ami(NULL);
1913  ast_test_validate(test, NULL == actual);
1914 
1915  /* Test message with NULL to_ami function */
1916  ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1917 
1918  data = ao2_alloc(strlen(expected) + 1, NULL);
1919  strcpy(data, expected);
1921  ast_test_validate(test, NULL != uut);
1922 
1923  actual = stasis_message_to_ami(uut);
1924  ast_test_validate(test, NULL == actual);
1925 
1926  return AST_TEST_PASS;
1927 }
1928 
1930 {
1932  RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1933  RAII_VAR(char *, data, NULL, ao2_cleanup);
1934  RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1935  const char *expected_text = "SomeData";
1936  const char *expected = "Message: SomeData\r\n";
1937 
1938  switch (cmd) {
1939  case TEST_INIT:
1940  info->name = __func__;
1941  info->category = test_category;
1942  info->summary = "Test message to_ami function";
1943  info->description = "Test message to_ami function";
1944  return AST_TEST_NOT_RUN;
1945  case TEST_EXECUTE:
1946  break;
1947  }
1948 
1949  ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1950 
1951  data = ao2_alloc(strlen(expected_text) + 1, NULL);
1952  strcpy(data, expected_text);
1954  ast_test_validate(test, NULL != uut);
1955 
1956  actual = stasis_message_to_ami(uut);
1957  ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1958 
1959  return AST_TEST_PASS;
1960 }
1961 
1962 static void noop(void *data, struct stasis_subscription *sub,
1963  struct stasis_message *message)
1964 {
1965  /* no-op */
1966 }
1967 
1968 AST_TEST_DEFINE(dtor_order)
1969 {
1970  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1972 
1973  switch (cmd) {
1974  case TEST_INIT:
1975  info->name = __func__;
1976  info->category = test_category;
1977  info->summary = "Test that destruction order doesn't bomb stuff";
1978  info->description = "Test that destruction order doesn't bomb stuff";
1979  return AST_TEST_NOT_RUN;
1980  case TEST_EXECUTE:
1981  break;
1982  }
1983 
1984  topic = stasis_topic_create("test-topic");
1985  ast_test_validate(test, NULL != topic);
1986 
1987  sub = stasis_subscribe(topic, noop, NULL);
1988  ast_test_validate(test, NULL != sub);
1989 
1990  /* With any luck, this won't completely blow everything up */
1991  ao2_cleanup(topic);
1993 
1994  /* These refs were cleaned up manually */
1995  topic = NULL;
1996  sub = NULL;
1997 
1998  return AST_TEST_PASS;
1999 }
2000 
2001 static const char *noop_get_id(struct stasis_message *message)
2002 {
2003  return NULL;
2004 }
2005 
2006 AST_TEST_DEFINE(caching_dtor_order)
2007 {
2008  RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
2010  RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
2013 
2014  switch (cmd) {
2015  case TEST_INIT:
2016  info->name = __func__;
2017  info->category = test_category;
2018  info->summary = "Test that destruction order doesn't bomb stuff";
2019  info->description = "Test that destruction order doesn't bomb stuff";
2020  return AST_TEST_NOT_RUN;
2021  case TEST_EXECUTE:
2022  break;
2023  }
2024 
2026  ast_test_validate(test, NULL != cache);
2027 
2028  topic = stasis_topic_create("test-topic");
2029  ast_test_validate(test, NULL != topic);
2030 
2031  caching_topic = stasis_caching_topic_create(topic, cache);
2032  ast_test_validate(test, NULL != caching_topic);
2033 
2035  NULL);
2036  ast_test_validate(test, NULL != sub);
2037 
2038  /* With any luck, this won't completely blow everything up */
2039  ao2_cleanup(cache);
2040  ao2_cleanup(topic);
2041  stasis_caching_unsubscribe(caching_topic);
2043 
2044  /* These refs were cleaned up manually */
2045  cache = NULL;
2046  topic = NULL;
2047  caching_topic = NULL;
2048  sub = NULL;
2049 
2050  return AST_TEST_PASS;
2051 }
2052 
2063 };
2064 
2065 static void destroy_message_types(void *obj)
2066 {
2067  struct test_message_types *types = obj;
2068 
2069  ao2_cleanup(types->none);
2070  ao2_cleanup(types->ami);
2071  ao2_cleanup(types->json);
2072  ao2_cleanup(types->event);
2073  ao2_cleanup(types->amievent);
2074  ao2_cleanup(types->type1);
2075  ao2_cleanup(types->type2);
2076  ao2_cleanup(types->type3);
2077  /* N.B. Don't cleanup types->change! */
2078 }
2079 
2080 static struct test_message_types *create_message_types(struct ast_test *test)
2081 {
2082  struct stasis_message_vtable vtable = { 0 };
2083  struct test_message_types *types;
2084  enum ast_test_result_state __attribute__ ((unused)) rc;
2085 
2086  types = ao2_alloc(sizeof(*types), destroy_message_types);
2087  if (!types) {
2088  return NULL;
2089  }
2090 
2091  ast_test_validate_cleanup(test,
2092  stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
2093  rc, cleanup);
2094 
2095  vtable.to_ami = fake_ami;
2096  ast_test_validate_cleanup(test,
2097  stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
2098  rc, cleanup);
2099 
2100  vtable.to_ami = NULL;
2101  vtable.to_json = fake_json;
2102  ast_test_validate_cleanup(test,
2103  stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
2104  rc, cleanup);
2105 
2106  vtable.to_ami = NULL;
2107  vtable.to_json = NULL;
2108  vtable.to_event = fake_event;
2109  ast_test_validate_cleanup(test,
2110  stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
2111  rc, cleanup);
2112 
2113  vtable.to_ami = fake_ami;
2114  ast_test_validate_cleanup(test,
2115  stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
2116  rc, cleanup);
2117 
2118  ast_test_validate_cleanup(test,
2119  stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
2120  rc, cleanup);
2121 
2122  ast_test_validate_cleanup(test,
2123  stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
2124  rc, cleanup);
2125 
2126  ast_test_validate_cleanup(test,
2127  stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
2128  rc, cleanup);
2129 
2131 
2132  return types;
2133 
2134 cleanup:
2135  ao2_cleanup(types);
2136  return NULL;
2137 }
2138 
2139 struct cts {
2143 };
2144 
2145 static void destroy_cts(void *obj)
2146 {
2147  struct cts *c = obj;
2148 
2149  stasis_unsubscribe(c->sub);
2150  ao2_cleanup(c->topic);
2151  ao2_cleanup(c->consumer);
2152 }
2153 
2154 static struct cts *create_cts(struct ast_test *test)
2155 {
2156  struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
2157  enum ast_test_result_state __attribute__ ((unused)) rc;
2158 
2159  ast_test_validate_cleanup(test, cts, rc, cleanup);
2160 
2161  cts->topic = stasis_topic_create("TestTopic");
2162  ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
2163 
2164  cts->consumer = consumer_create(0);
2165  ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
2166 
2167  ao2_ref(cts->consumer, +1);
2168  cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
2169  ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
2170 
2171  return cts;
2172 
2173 cleanup:
2174  ao2_cleanup(cts);
2175  return NULL;
2176 }
2177 
2178 static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
2179 {
2181 
2182  if (stasis_message_type(msg) != mtype) {
2183  return 0;
2184  }
2185 
2186  if (data) {
2187  return (strcmp(data, msg_data->description) == 0);
2188  }
2189 
2190  return 1;
2191 }
2192 
2193 static void dump_consumer(struct ast_test *test, struct cts *cts)
2194 {
2195  int i;
2196  struct stasis_subscription_change *data;
2197 
2198  ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len,
2199  cts->consumer->complete ? "yes" : "no");
2200  for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
2201  data = stasis_message_data(cts->consumer->messages_rxed[i]);
2202  ast_test_status_update(test, "Message type received: %s %s\n",
2204  data && !ast_strlen_zero(data->description) ? data->description : "no data");
2205  }
2206 }
2207 
2208 static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
2209  const char *data)
2210 {
2211  struct stasis_message *msg;
2213  ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
2214 
2215  if (!test_data) {
2216  return 0;
2217  }
2218  strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
2219 
2220  msg = stasis_message_create(msg_type, test_data);
2221  ao2_ref(test_data, -1);
2222  if (!msg) {
2223  ast_test_status_update(test, "Unable to create %s message\n",
2224  stasis_message_type_name(msg_type));
2225  return 0;
2226  }
2227 
2228  stasis_publish(cts->topic, msg);
2229  ao2_ref(msg, -1);
2230 
2231  return 1;
2232 }
2233 
2234 AST_TEST_DEFINE(type_filters)
2235 {
2236  RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2237  RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
2238  int ix = 0;
2239 
2240  switch (cmd) {
2241  case TEST_INIT:
2242  info->name = __func__;
2243  info->category = test_category "filtering/";
2244  info->summary = "Test message filtering by type";
2245  info->description = "Test message filtering by type";
2246  return AST_TEST_NOT_RUN;
2247  case TEST_EXECUTE:
2248  break;
2249  }
2250 
2251  types = create_message_types(test);
2252  ast_test_validate(test, NULL != types);
2253 
2254  cts = create_cts(test);
2255  ast_test_validate(test, NULL != cts);
2256 
2257  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
2258  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
2259  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
2261 
2262  /* We should get these */
2263  ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
2264  ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
2265  /* ... but not this one */
2266  ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2267 
2268  /* Wait for change(subscribe) and "Pass" messages */
2270 
2271  /* Remove type 1 */
2272  ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
2273 
2274  /* We should now NOT get this one */
2275  ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
2276  /* We should get this one (again) */
2277  ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
2278  /* We still should NOT get this one */
2279  ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2280 
2281  /* We should now have a second type2 */
2283 
2285  cts->sub = NULL;
2287 
2289 
2290  ast_test_validate(test, 1 == cts->consumer->complete);
2291  ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
2292  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2293  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
2294  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
2295  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
2296  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2297 
2298  return AST_TEST_PASS;
2299 }
2300 
2301 AST_TEST_DEFINE(formatter_filters)
2302 {
2303  RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2304  RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
2305  int ix = 0;
2306 
2307  switch (cmd) {
2308  case TEST_INIT:
2309  info->name = __func__;
2310  info->category = test_category "filtering/";
2311  info->summary = "Test message filtering by formatter";
2312  info->description = "Test message filtering by formatter";
2313  return AST_TEST_NOT_RUN;
2314  case TEST_EXECUTE:
2315  break;
2316  }
2317 
2318  types = create_message_types(test);
2319  ast_test_validate(test, NULL != types);
2320 
2321  cts = create_cts(test);
2322  ast_test_validate(test, NULL != cts);
2323 
2326 
2327  /* We should get these */
2328  ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
2329  ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
2330  ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
2331 
2332  /* ... but not these */
2333  ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
2334  ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
2335  ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
2336 
2337  /* Wait for change(subscribe) and the "Pass" messages */
2339 
2340  /* Change the subscription to accept only event formatters */
2342 
2343  /* We should NOT get these now */
2344  ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
2345  ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
2346  /* ... but we should still get this one */
2347  ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
2348  /* ... and this one should be new */
2349  ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
2350 
2351  /* We should now have a second amievent */
2353 
2355  cts->sub = NULL;
2357 
2359 
2360  ast_test_validate(test, 1 == cts->consumer->complete);
2361  ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2362  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2363  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
2364  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
2365  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
2366  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
2367  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
2368  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2369 
2370  return AST_TEST_PASS;
2371 }
2372 
2373 AST_TEST_DEFINE(combo_filters)
2374 {
2375  RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
2376  RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
2377  int ix = 0;
2378 
2379  switch (cmd) {
2380  case TEST_INIT:
2381  info->name = __func__;
2382  info->category = test_category "filtering/";
2383  info->summary = "Test message filtering by type and formatter";
2384  info->description = "Test message filtering by type and formatter";
2385  return AST_TEST_NOT_RUN;
2386  case TEST_EXECUTE:
2387  break;
2388  }
2389 
2390  types = create_message_types(test);
2391  ast_test_validate(test, NULL != types);
2392 
2393  cts = create_cts(test);
2394  ast_test_validate(test, NULL != cts);
2395 
2396  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
2397  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
2398  ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
2402 
2403  /* We should get these */
2404  ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
2405  ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
2406  ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
2407  ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
2408  ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
2409 
2410  /* ... but not these */
2411  ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
2412  ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
2413 
2414  /* Wait for change(subscribe) and the "Pass" messages */
2416 
2418  cts->sub = NULL;
2420 
2422 
2423  ast_test_validate(test, 1 == cts->consumer->complete);
2424  ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2425  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
2426  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
2427  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
2428  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
2429  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
2430  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
2431  ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
2432 
2433  return AST_TEST_PASS;
2434 }
2435 
2436 static int unload_module(void)
2437 {
2438  AST_TEST_UNREGISTER(message_type);
2440  AST_TEST_UNREGISTER(subscription_messages);
2441  AST_TEST_UNREGISTER(subscription_pool_messages);
2443  AST_TEST_UNREGISTER(publish_sync);
2444  AST_TEST_UNREGISTER(publish_pool);
2445  AST_TEST_UNREGISTER(unsubscribe_stops_messages);
2446  AST_TEST_UNREGISTER(forward);
2447  AST_TEST_UNREGISTER(cache_filter);
2449  AST_TEST_UNREGISTER(cache_dump);
2450  AST_TEST_UNREGISTER(cache_eid_aggregate);
2452  AST_TEST_UNREGISTER(router_pool);
2453  AST_TEST_UNREGISTER(router_cache_updates);
2454  AST_TEST_UNREGISTER(interleaving);
2455  AST_TEST_UNREGISTER(subscription_interleaving);
2456  AST_TEST_UNREGISTER(no_to_json);
2457  AST_TEST_UNREGISTER(to_json);
2458  AST_TEST_UNREGISTER(no_to_ami);
2460  AST_TEST_UNREGISTER(dtor_order);
2461  AST_TEST_UNREGISTER(caching_dtor_order);
2462  AST_TEST_UNREGISTER(type_filters);
2463  AST_TEST_UNREGISTER(formatter_filters);
2464  AST_TEST_UNREGISTER(combo_filters);
2465  return 0;
2466 }
2467 
2468 static int load_module(void)
2469 {
2470  AST_TEST_REGISTER(message_type);
2472  AST_TEST_REGISTER(subscription_messages);
2473  AST_TEST_REGISTER(subscription_pool_messages);
2475  AST_TEST_REGISTER(publish_sync);
2476  AST_TEST_REGISTER(publish_pool);
2477  AST_TEST_REGISTER(unsubscribe_stops_messages);
2478  AST_TEST_REGISTER(forward);
2479  AST_TEST_REGISTER(cache_filter);
2481  AST_TEST_REGISTER(cache_dump);
2482  AST_TEST_REGISTER(cache_eid_aggregate);
2484  AST_TEST_REGISTER(router_pool);
2485  AST_TEST_REGISTER(router_cache_updates);
2486  AST_TEST_REGISTER(interleaving);
2487  AST_TEST_REGISTER(subscription_interleaving);
2488  AST_TEST_REGISTER(no_to_json);
2489  AST_TEST_REGISTER(to_json);
2490  AST_TEST_REGISTER(no_to_ami);
2492  AST_TEST_REGISTER(dtor_order);
2493  AST_TEST_REGISTER(caching_dtor_order);
2494  AST_TEST_REGISTER(type_filters);
2495  AST_TEST_REGISTER(formatter_filters);
2496  AST_TEST_REGISTER(combo_filters);
2497  return AST_MODULE_LOAD_SUCCESS;
2498 }
2499 
2500 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
2501  .support_level = AST_MODULE_SUPPORT_CORE,
2502  .load = load_module,
2503  .unload = unload_module
2504 );
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
static const char type[]
Definition: chan_ooh323.c:109
Struct containing info for an AMI event to send out.
Definition: manager.h:491
struct stasis_topic * topic
Definition: test_stasis.c:2141
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
An event.
Definition: event.c:81
ast_cond_t out
static void consumer_dtor(void *obj)
Definition: test_stasis.c:170
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
Definition: astmm.h:228
#define stasis_message_router_create_pool(topic)
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
static struct cts * create_cts(struct ast_test *test)
Definition: test_stasis.c:2154
Virtual table providing methods for messages.
Definition: stasis.h:239
AST_TEST_DEFINE(message_type)
Definition: test_stasis.c:77
struct ast_eid eid
static void destroy_message_types(void *obj)
Definition: test_stasis.c:2065
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct stasis_message_type * change
Definition: test_stasis.c:2062
struct stasis_message_type * event
Definition: test_stasis.c:2057
int ignore_subscriptions
Definition: test_stasis.c:166
static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
Definition: test_stasis.c:288
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
static int consumer_wait_for_completion(struct consumer *consumer)
Definition: test_stasis.c:267
struct stasis_message_type * type3
Definition: test_stasis.c:2061
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
static struct ast_event * fake_event(struct stasis_message *message)
Definition: test_stasis.c:43
Test Framework API.
int complete
Definition: test_stasis.c:167
struct stasis_message ** messages_rxed
Definition: test_stasis.c:164
#define AST_TEST_REGISTER(cb)
Definition: test.h:127
Description Used by: AST_EVENT_SUB, AST_EVENT_UNSUB Payload type: STR.
Definition: event_defs.h:265
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1078
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1054
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
struct stasis_message_type * type2
Definition: test_stasis.c:2060
struct stasis_subscription * sub
Definition: test_stasis.c:2142
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define EVENT_FLAG_TEST
Definition: manager.h:88
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:650
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
static struct stasis_message_vtable fake_vtable
Definition: test_stasis.c:72
struct stasis_message_type * type1
Definition: test_stasis.c:2059
static struct test_val c
char * text
Definition: app_queue.c:1508
static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
Definition: test_stasis.c:2178
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
static struct consumer * consumer_create(int ignore_subscriptions)
Definition: test_stasis.c:183
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
#define NULL
Definition: resample.c:96
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
Definition: stasis.h:281
int value
Definition: syslog.c:37
#define ast_cond_signal(cond)
Definition: lock.h:201
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:726
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9727
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2713
unsigned char publish
Definition: res_corosync.c:241
pthread_cond_t ast_cond_t
Definition: lock.h:176
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
#define ao2_bump(obj)
Definition: astobj2.h:491
static struct stasis_message * cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
Definition: test_stasis.c:896
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
struct stasis_message_type * amievent
Definition: test_stasis.c:2058
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
Definition: stasis.h:253
static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type, const char *data)
Definition: test_stasis.c:2208
#define ast_log
Definition: astobj2.c:42
struct consumer * consumer
Definition: test_stasis.c:2140
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:851
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
ast_mutex_t lock
Definition: app_meetme.c:1091
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_message_type * ami
Definition: test_stasis.c:2055
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
struct stasis_message_type * none
Definition: test_stasis.c:2054
static struct test_message_types * create_message_types(struct ast_test *test)
Definition: test_stasis.c:2080
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
static struct stasis_message_router * router
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
Cache update message.
Definition: stasis.h:967
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1094
#define stasis_message_router_create(topic)
static struct stasis_message * cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
Definition: test_stasis.c:875
#define stasis_subscribe(topic, callback, data)
Definition: stasis.h:652
struct stasis_topic * topic
Definition: stasis.h:893
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
Definition: test_stasis.c:246
#define LOG_ERROR
Definition: logger.h:285
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity&#39;s cache entry snapshot by index.
Definition: stasis_cache.c:375
const char * mtype
Definition: http.c:148
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define AST_TEST_UNREGISTER(cb)
Definition: test.h:128
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
def info(msg)
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
#define stasis_subscribe_pool(topic, callback, data)
Definition: stasis.h:682
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2283
Sorcery object created based on backend data.
static const char * cache_simple(struct stasis_message *message)
Definition: test_stasis.c:1699
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:972
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
Definition: stasis.h:266
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1175
#define ast_strlen_zero(a)
Definition: muted.c:73
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1510
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:204
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1547
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
Definition: test_stasis.c:970
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
static void cache_test_data_dtor(void *obj)
Definition: test_stasis.c:867
static struct stasis_message * cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Definition: test_stasis.c:911
static void noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:1962
static struct ast_json * fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Definition: test_stasis.c:49
static void * cleanup(void *unused)
Definition: pbx_realtime.c:124
struct stasis_message_type * json
Definition: test_stasis.c:2056
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS|AST_MODFLAG_LOAD_ORDER, "HTTP Phone Provisioning",.support_level=AST_MODULE_SUPPORT_EXTENDED,.load=load_module,.unload=unload_module,.reload=reload,.load_pri=AST_MODPRI_CHANNEL_DEPEND,.requires="http",)
struct ao2_container * cache
Definition: pbx_realtime.c:77
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
static void dump_consumer(struct ast_test *test, struct cts *cts)
Definition: test_stasis.c:2193
#define test_category
Definition: test_stasis.c:41
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1515
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity&#39;s cache entry snapshot.
Definition: stasis_cache.c:370
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
int ast_json_equal(const struct ast_json *lhs, const struct ast_json *rhs)
Compare two JSON objects.
Definition: json.c:347
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:79
static void destroy_cts(void *obj)
Definition: test_stasis.c:2145
size_t messages_rxed_len
Definition: test_stasis.c:165
static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
Definition: test_stasis.c:965
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1024
struct stasis_forward * sub
Definition: res_corosync.c:240
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Definition: strings.h:94
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
Abstract JSON element (object, array, string, int, ...).
Definition: search.h:40
Forwarding information.
Definition: stasis.c:1530
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2298
static const char * cache_test_data_id(struct stasis_message *message)
Definition: test_stasis.c:901
Generic container type.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1577
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
Asterisk module definitions.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1170
static int load_module(void)
Definition: test_stasis.c:2468
static const char * noop_get_id(struct stasis_message *message)
Definition: test_stasis.c:2001
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
ast_test_result_state
Definition: test.h:200
static struct ast_manager_event_blob * fake_ami(struct stasis_message *message)
Definition: test_stasis.c:56
static int unload_module(void)
Definition: test_stasis.c:2436
static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: test_stasis.c:226
Definition: stasis_cache.c:173