41#define test_category "/stasis/core/"
62 "Message: %s\r\n",
text);
83 info->name = __func__;
85 info->summary =
"Test basic message_type functions";
86 info->description =
"Test basic message_type functions";
105 char *expected =
"SomeData";
106 struct timeval expected_timestamp;
107 struct timeval time_diff;
112 info->name = __func__;
114 info->summary =
"Test basic message functions";
115 info->description =
"Test basic message functions";
122 memset(&foreign_eid, 0xFF,
sizeof(foreign_eid));
130 strcpy(data, expected);
135 ast_test_validate(
test,
NULL != uut1);
136 ast_test_validate(
test,
NULL != uut2);
149 ast_test_validate(
test, time_diff.tv_sec == 0);
150 ast_test_validate(
test, time_diff.tv_usec < 10000);
249 struct timespec
end = {
250 .tv_sec = start.tv_sec + 30,
251 .tv_nsec = start.tv_usec * 1000
259 if (r == ETIMEDOUT) {
270 struct timespec
end = {
271 .tv_sec = start.tv_sec + 3,
272 .tv_nsec = start.tv_usec * 1000
280 if (r == ETIMEDOUT) {
291 struct timeval diff = {
295 struct timeval end_tv =
ast_tvadd(start, diff);
296 struct timespec
end = {
297 .tv_sec = end_tv.tv_sec,
298 .tv_nsec = end_tv.tv_usec * 1000
306 if (r == ETIMEDOUT) {
328 info->name = __func__;
330 info->summary =
"Test subscribe/unsubscribe messages";
331 info->description =
"Test subscribe/unsubscribe messages";
344 ast_test_validate(
test,
NULL != uut);
350 ast_test_validate(
test, 1 == complete);
358 ast_test_validate(
test, 0 == strcmp(
"Subscribe", change->
description));
359 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
363 ast_test_validate(
test, 0 == strcmp(
"Unsubscribe", change->
description));
364 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
383 info->name = __func__;
385 info->summary =
"Test subscribe/unsubscribe messages using a threadpool subscription";
386 info->description =
"Test subscribe/unsubscribe messages using a threadpool subscription";
399 ast_test_validate(
test,
NULL != uut);
405 ast_test_validate(
test, 1 == complete);
413 ast_test_validate(
test, 0 == strcmp(
"Subscribe", change->
description));
414 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
418 ast_test_validate(
test, 0 == strcmp(
"Unsubscribe", change->
description));
419 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
437 info->name = __func__;
439 info->summary =
"Test publishing";
440 info->description =
"Test publishing";
453 ast_test_validate(
test,
NULL != uut);
464 ast_test_validate(
test, 1 == actual_len);
484 info->name = __func__;
486 info->summary =
"Test synchronous publishing";
487 info->description =
"Test synchronous publishing";
500 ast_test_validate(
test,
NULL != uut);
511 ast_test_validate(
test, 1 == actual_len);
531 info->name = __func__;
533 info->summary =
"Test publishing with a threadpool";
534 info->description =
"Test publishing to a subscriber whose\n"
535 "subscription dictates messages are received through a\n"
549 ast_test_validate(
test,
NULL != uut);
560 ast_test_validate(
test, 1 == actual_len);
579 info->name = __func__;
581 info->summary =
"Test simple subscriptions";
582 info->description =
"Test simple subscriptions";
595 ast_test_validate(
test,
NULL != uut);
608 ast_test_validate(
test, 0 == actual_len);
632 info->name = __func__;
634 info->summary =
"Test sending events to a parent topic";
635 info->description =
"Test sending events to a parent topic.\n"
636 "This test creates three topics (one parent, two children)\n"
637 "and publishes a message to one child, and verifies it's\n"
638 "only seen by that child and the parent";
645 ast_test_validate(
test,
NULL != parent_topic);
650 ast_test_validate(
test,
NULL != forward_sub);
653 ast_test_validate(
test,
NULL != parent_consumer);
658 ast_test_validate(
test,
NULL != parent_sub);
672 ast_test_validate(
test, 1 == actual_len);
674 ast_test_validate(
test, 1 == actual_len);
703 info->name = __func__;
705 info->summary =
"Test sending interleaved events to a parent topic";
706 info->description =
"Test sending events to a parent topic.\n"
707 "This test creates three topics (one parent, two children)\n"
708 "and publishes messages alternately between the children.\n"
709 "It verifies that the messages are received in the expected\n"
717 ast_test_validate(
test,
NULL != test_message_type);
723 ast_test_validate(
test,
NULL != test_message1);
725 ast_test_validate(
test,
NULL != test_message2);
727 ast_test_validate(
test,
NULL != test_message3);
730 ast_test_validate(
test,
NULL != parent_topic);
732 ast_test_validate(
test,
NULL != topic1);
734 ast_test_validate(
test,
NULL != topic2);
737 ast_test_validate(
test,
NULL != forward_sub1);
739 ast_test_validate(
test,
NULL != forward_sub2);
753 ast_test_validate(
test, 3 == actual_len);
788 info->name = __func__;
790 info->summary =
"Test sending interleaved events to a parent topic with different subscribers";
791 info->description =
"Test sending events to a parent topic.\n"
792 "This test creates three topics (one parent, two children)\n"
793 "and publishes messages alternately between the children.\n"
794 "It verifies that the messages are received in the expected\n"
795 "order, for different subscription types: one with a dedicated\n"
796 "thread, the other on the Stasis threadpool.";
803 ast_test_validate(
test,
NULL != test_message_type);
809 ast_test_validate(
test,
NULL != test_message1);
811 ast_test_validate(
test,
NULL != test_message2);
813 ast_test_validate(
test,
NULL != test_message3);
816 ast_test_validate(
test,
NULL != parent_topic);
818 ast_test_validate(
test,
NULL != topic1);
820 ast_test_validate(
test,
NULL != topic2);
823 ast_test_validate(
test,
NULL != forward_sub1);
825 ast_test_validate(
test,
NULL != forward_sub2);
828 ast_test_validate(
test,
NULL != consumer1);
831 ast_test_validate(
test,
NULL != consumer2);
834 ast_test_validate(
test,
NULL != sub1);
838 ast_test_validate(
test,
NULL != sub2);
846 ast_test_validate(
test, 3 == actual_len);
849 ast_test_validate(
test, 3 == actual_len);
851 ast_test_validate(
test, test_message1 == consumer1->messages_rxed[0]);
852 ast_test_validate(
test, test_message2 == consumer1->messages_rxed[1]);
853 ast_test_validate(
test, test_message3 == consumer1->messages_rxed[2]);
855 ast_test_validate(
test, test_message1 == consumer2->messages_rxed[0]);
856 ast_test_validate(
test, test_message2 == consumer2->messages_rxed[1]);
857 ast_test_validate(
test, test_message3 == consumer2->messages_rxed[2]);
919 char aggregate_str[30];
928 for (idx = 0; ; ++idx) {
948 if (accumulated == atoi(
test_data->value)) {
954 snprintf(aggregate_str,
sizeof(aggregate_str),
"%d", accumulated);
956 if (!aggregate_snapshot) {
962 return aggregate_snapshot;
999 info->name = __func__;
1001 info->summary =
"Test caching topics only forward cache_update messages.";
1002 info->description =
"Test caching topics only forward cache_update messages.";
1009 ast_test_validate(
test,
NULL != non_cache_type);
1011 ast_test_validate(
test,
NULL != topic);
1015 ast_test_validate(
test,
NULL != caching_topic);
1023 ast_test_validate(
test,
NULL != test_message);
1028 ast_test_validate(
test, 0 == actual_len);
1050 info->name = __func__;
1052 info->summary =
"Test passing messages through cache topic unscathed.";
1053 info->description =
"Test passing messages through cache topic unscathed.";
1060 ast_test_validate(
test,
NULL != cache_type);
1062 ast_test_validate(
test,
NULL != topic);
1066 ast_test_validate(
test,
NULL != caching_topic);
1074 ast_test_validate(
test,
NULL != test_message1_1);
1076 ast_test_validate(
test,
NULL != test_message2_1);
1082 ast_test_validate(
test, 2 == actual_len);
1103 ast_test_validate(
test,
NULL != test_message2_2);
1107 ast_test_validate(
test, 3 == actual_len);
1118 ast_test_validate(
test,
NULL != test_message1_clear);
1122 ast_test_validate(
test, 4 == actual_len);
1151 info->name = __func__;
1153 info->summary =
"Test cache dump routines.";
1154 info->description =
"Test cache dump routines.";
1161 ast_test_validate(
test,
NULL != cache_type);
1163 ast_test_validate(
test,
NULL != topic);
1167 ast_test_validate(
test,
NULL != caching_topic);
1175 ast_test_validate(
test,
NULL != test_message1_1);
1177 ast_test_validate(
test,
NULL != test_message2_1);
1183 ast_test_validate(
test, 2 == actual_len);
1188 ast_test_validate(
test,
NULL != cache_dump);
1193 ast_test_validate(
test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1199 ast_test_validate(
test,
NULL != test_message2_2);
1203 ast_test_validate(
test, 3 == actual_len);
1208 ast_test_validate(
test,
NULL != cache_dump);
1213 ast_test_validate(
test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1219 ast_test_validate(
test,
NULL != test_message1_clear);
1223 ast_test_validate(
test, 4 == actual_len);
1228 ast_test_validate(
test,
NULL != cache_dump);
1233 ast_test_validate(
test, actual_cache_entry == test_message2_2);
1271 info->name = __func__;
1273 info->summary =
"Test cache eid and aggregate support.";
1274 info->description =
"Test cache eid and aggregate support.";
1280 memset(&foreign_eid1, 0xAA,
sizeof(foreign_eid1));
1281 memset(&foreign_eid2, 0xBB,
sizeof(foreign_eid2));
1284 ast_test_validate(
test,
NULL != cache_type);
1287 ast_test_validate(
test,
NULL != topic);
1291 ast_test_validate(
test,
NULL != topic_consumer);
1294 ast_test_validate(
test,
NULL != topic_sub);
1302 ast_test_validate(
test,
NULL != caching_topic);
1306 ast_test_validate(
test,
NULL != cache_consumer);
1309 ast_test_validate(
test,
NULL != cache_sub);
1314 ast_test_validate(
test,
NULL != test_message1_1);
1316 ast_test_validate(
test,
NULL != test_message2_1);
1318 ast_test_validate(
test,
NULL != test_message2_2);
1320 ast_test_validate(
test,
NULL != test_message2_3);
1322 ast_test_validate(
test,
NULL != test_message2_4);
1333 ast_test_validate(
test, 6 == actual_len);
1335 ast_test_validate(
test, 6 == actual_len);
1340 ast_test_validate(
test,
NULL != cache_dump);
1346 ast_test_validate(
test,
1347 actual_cache_entry == test_message1_1
1348 || actual_cache_entry == test_message2_1
1349 || actual_cache_entry == test_message2_2);
1356 ast_test_validate(
test,
NULL != cache_dump);
1362 ast_test_validate(
test,
1363 actual_cache_entry == test_message1_1
1364 || actual_cache_entry == test_message2_1);
1373 ast_test_validate(
test, 8 == actual_len);
1375 ast_test_validate(
test, 8 == actual_len);
1380 ast_test_validate(
test,
NULL != cache_dump);
1386 ast_test_validate(
test,
1387 actual_cache_entry == test_message1_1
1388 || actual_cache_entry == test_message2_1
1389 || actual_cache_entry == test_message2_2
1390 || actual_cache_entry == test_message2_3);
1397 ast_test_validate(
test,
NULL != cache_dump);
1403 ast_test_validate(
test, actual_cache_entry == test_message2_2);
1412 ast_test_validate(
test, 10 == actual_len);
1414 ast_test_validate(
test, 10 == actual_len);
1419 ast_test_validate(
test,
NULL != cache_dump);
1425 ast_test_validate(
test,
1426 actual_cache_entry == test_message1_1
1427 || actual_cache_entry == test_message2_1
1428 || actual_cache_entry == test_message2_2
1429 || actual_cache_entry == test_message2_4);
1436 ast_test_validate(
test,
NULL != cache_dump);
1442 ast_test_validate(
test,
1443 actual_cache_entry == test_message2_1
1444 || actual_cache_entry == test_message2_2
1445 || actual_cache_entry == test_message2_4);
1451 ast_test_validate(
test,
NULL != test_message1_clear);
1456 ast_test_validate(
test, 12 == actual_len);
1458 ast_test_validate(
test, 11 == actual_len);
1463 ast_test_validate(
test,
NULL != cache_dump);
1469 ast_test_validate(
test,
1470 actual_cache_entry == test_message2_1
1471 || actual_cache_entry == test_message2_2
1472 || actual_cache_entry == test_message2_4);
1478 ast_test_validate(
test,
NULL != test_message2_clear);
1483 ast_test_validate(
test, 14 == actual_len);
1485 ast_test_validate(
test, 13 == actual_len);
1490 ast_test_validate(
test,
NULL != cache_dump);
1496 ast_test_validate(
test,
1497 actual_cache_entry == test_message2_1
1498 || actual_cache_entry == test_message2_4);
1519 int actual_len, ret;
1524 info->name = __func__;
1526 info->summary =
"Test simple message routing";
1527 info->description =
"Test simple message routing";
1534 ast_test_validate(
test,
NULL != topic);
1537 ast_test_validate(
test,
NULL != consumer1);
1539 ast_test_validate(
test,
NULL != consumer2);
1541 ast_test_validate(
test,
NULL != consumer3);
1544 ast_test_validate(
test,
NULL != test_message_type1);
1546 ast_test_validate(
test,
NULL != test_message_type2);
1548 ast_test_validate(
test,
NULL != test_message_type3);
1551 ast_test_validate(
test,
NULL != uut);
1555 ast_test_validate(
test, 0 == ret);
1559 ast_test_validate(
test, 0 == ret);
1562 ast_test_validate(
test, 0 == ret);
1568 ast_test_validate(
test,
NULL != test_message1);
1570 ast_test_validate(
test,
NULL != test_message2);
1572 ast_test_validate(
test,
NULL != test_message3);
1579 ast_test_validate(
test, 1 == actual_len);
1581 ast_test_validate(
test, 1 == actual_len);
1583 ast_test_validate(
test, 1 == actual_len);
1585 actual = consumer1->messages_rxed[0];
1586 ast_test_validate(
test, test_message1 == actual);
1588 actual = consumer2->messages_rxed[0];
1589 ast_test_validate(
test, test_message2 == actual);
1591 actual = consumer3->messages_rxed[0];
1592 ast_test_validate(
test, test_message3 == actual);
1615 int actual_len, ret;
1620 info->name = __func__;
1622 info->summary =
"Test message routing via threadpool";
1623 info->description =
"Test simple message routing when\n"
1624 "the subscriptions dictate usage of the Stasis\n"
1632 ast_test_validate(
test,
NULL != topic);
1635 ast_test_validate(
test,
NULL != consumer1);
1637 ast_test_validate(
test,
NULL != consumer2);
1639 ast_test_validate(
test,
NULL != consumer3);
1642 ast_test_validate(
test,
NULL != test_message_type1);
1644 ast_test_validate(
test,
NULL != test_message_type2);
1646 ast_test_validate(
test,
NULL != test_message_type3);
1649 ast_test_validate(
test,
NULL != uut);
1653 ast_test_validate(
test, 0 == ret);
1657 ast_test_validate(
test, 0 == ret);
1660 ast_test_validate(
test, 0 == ret);
1666 ast_test_validate(
test,
NULL != test_message1);
1668 ast_test_validate(
test,
NULL != test_message2);
1670 ast_test_validate(
test,
NULL != test_message3);
1677 ast_test_validate(
test, 1 == actual_len);
1679 ast_test_validate(
test, 1 == actual_len);
1681 ast_test_validate(
test, 1 == actual_len);
1683 actual = consumer1->messages_rxed[0];
1684 ast_test_validate(
test, test_message1 == actual);
1686 actual = consumer2->messages_rxed[0];
1687 ast_test_validate(
test, test_message2 == actual);
1689 actual = consumer3->messages_rxed[0];
1690 ast_test_validate(
test, test_message3 == actual);
1701 const char *type_name =
1729 int actual_len, ret;
1734 info->name = __func__;
1736 info->summary =
"Test special handling cache_update messages";
1737 info->description =
"Test special handling cache_update messages";
1744 ast_test_validate(
test,
NULL != topic);
1749 ast_test_validate(
test,
NULL != caching_topic);
1752 ast_test_validate(
test,
NULL != consumer1);
1754 ast_test_validate(
test,
NULL != consumer2);
1756 ast_test_validate(
test,
NULL != consumer3);
1759 ast_test_validate(
test,
NULL != test_message_type1);
1761 ast_test_validate(
test,
NULL != test_message_type2);
1763 ast_test_validate(
test,
NULL != test_message_type3);
1767 ast_test_validate(
test,
NULL != uut);
1771 ast_test_validate(
test, 0 == ret);
1775 ast_test_validate(
test, 0 == ret);
1778 ast_test_validate(
test, 0 == ret);
1784 ast_test_validate(
test,
NULL != test_message1);
1786 ast_test_validate(
test,
NULL != test_message2);
1788 ast_test_validate(
test,
NULL != test_message3);
1795 ast_test_validate(
test, 1 == actual_len);
1797 ast_test_validate(
test, 1 == actual_len);
1800 ast_test_validate(
test, 0 == actual_len);
1802 actual = consumer1->messages_rxed[0];
1805 ast_test_validate(
test, test_message_type1 ==
update->type);
1806 ast_test_validate(
test, test_message1 ==
update->new_snapshot);
1808 actual = consumer2->messages_rxed[0];
1811 ast_test_validate(
test, test_message_type2 ==
update->type);
1812 ast_test_validate(
test, test_message2 ==
update->new_snapshot);
1827 char *expected =
"SomeData";
1831 info->name = __func__;
1833 info->summary =
"Test message to_json function";
1834 info->description =
"Test message to_json function";
1842 ast_test_validate(
test,
NULL == actual);
1848 strcpy(
data, expected);
1850 ast_test_validate(
test,
NULL != uut);
1853 ast_test_validate(
test,
NULL == actual);
1864 const char *expected_text =
"SomeData";
1869 info->name = __func__;
1871 info->summary =
"Test message to_json function when NULL";
1872 info->description =
"Test message to_json function when NULL";
1881 strcpy(
data, expected_text);
1883 ast_test_validate(
test,
NULL != uut);
1898 char *expected =
"SomeData";
1902 info->name = __func__;
1904 info->summary =
"Test message to_ami function when NULL";
1905 info->description =
"Test message to_ami function when NULL";
1913 ast_test_validate(
test,
NULL == actual);
1919 strcpy(
data, expected);
1921 ast_test_validate(
test,
NULL != uut);
1924 ast_test_validate(
test,
NULL == actual);
1935 const char *expected_text =
"SomeData";
1936 const char *expected =
"Message: SomeData\r\n";
1940 info->name = __func__;
1942 info->summary =
"Test message to_ami function";
1943 info->description =
"Test message to_ami function";
1952 strcpy(
data, expected_text);
1954 ast_test_validate(
test,
NULL != uut);
1957 ast_test_validate(
test, strcmp(expected, actual->extra_fields) == 0);
1975 info->name = __func__;
1977 info->summary =
"Test that destruction order doesn't bomb stuff";
1978 info->description =
"Test that destruction order doesn't bomb stuff";
1985 ast_test_validate(
test,
NULL != topic);
2016 info->name = __func__;
2018 info->summary =
"Test that destruction order doesn't bomb stuff";
2019 info->description =
"Test that destruction order doesn't bomb stuff";
2029 ast_test_validate(
test,
NULL != topic);
2032 ast_test_validate(
test,
NULL != caching_topic);
2047 caching_topic =
NULL;
2091 ast_test_validate_cleanup(
test,
2096 ast_test_validate_cleanup(
test,
2102 ast_test_validate_cleanup(
test,
2109 ast_test_validate_cleanup(
test,
2114 ast_test_validate_cleanup(
test,
2118 ast_test_validate_cleanup(
test,
2122 ast_test_validate_cleanup(
test,
2126 ast_test_validate_cleanup(
test,
2147 struct cts *
c = obj;
2187 return (strcmp(data,
msg_data->description) == 0);
2242 info->name = __func__;
2244 info->summary =
"Test message filtering by type";
2245 info->description =
"Test message filtering by type";
2252 ast_test_validate(
test,
NULL != types);
2309 info->name = __func__;
2311 info->summary =
"Test message filtering by formatter";
2312 info->description =
"Test message filtering by formatter";
2319 ast_test_validate(
test,
NULL != types);
2381 info->name = __func__;
2383 info->summary =
"Test message filtering by type and formatter";
2384 info->description =
"Test message filtering by type and formatter";
2391 ast_test_validate(
test,
NULL != types);
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
#define ast_strdup(str)
A wrapper for strdup()
#define ast_malloc(len)
A wrapper for malloc()
#define ao2_iterator_next(iter)
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_alloc(data_size, destructor_fn)
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
@ AST_EVENT_IE_DESCRIPTION
Description Used by: AST_EVENT_SUB, AST_EVENT_UNSUB Payload type: STR.
@ AST_EVENT_IE_PLTYPE_STR
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
int ast_json_equal(const struct ast_json *lhs, const struct ast_json *rhs)
Compare two JSON objects.
#define ast_cond_destroy(cond)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
pthread_cond_t ast_cond_t
#define ast_cond_signal(cond)
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Asterisk module definitions.
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
@ AST_MODULE_SUPPORT_CORE
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
static void * cleanup(void *unused)
struct ao2_container * cache
static struct stasis_message_router * router
struct stasis_forward * sub
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define stasis_subscribe_pool(topic, callback, data)
@ STASIS_SUBSCRIPTION_FORMATTER_EVENT
@ STASIS_SUBSCRIPTION_FORMATTER_AMI
@ STASIS_SUBSCRIPTION_FORMATTER_JSON
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
#define stasis_subscribe(topic, callback, data)
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
@ STASIS_MESSAGE_TYPE_ERROR
@ STASIS_MESSAGE_TYPE_SUCCESS
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
#define stasis_message_router_create(topic)
Create a new message router object.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
#define stasis_message_router_create_pool(topic)
Create a new message router object.
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
An Entity ID is essentially a MAC address, brief and unique.
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
struct stasis_message ** messages_rxed
struct stasis_subscription * sub
struct stasis_topic * topic
struct consumer * consumer
struct stasis_message * old_snapshot
Old value from the cache.
struct stasis_message * new_snapshot
New value.
Structure containing callbacks for Stasis message sanitization.
Virtual table providing methods for messages.
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
Holds details about changes to subscriptions for the specified topic.
struct stasis_topic * topic
Sorcery object created based on backend data.
struct stasis_message_type * change
struct stasis_message_type * event
struct stasis_message_type * type3
struct stasis_message_type * ami
struct stasis_message_type * json
struct stasis_message_type * amievent
struct stasis_message_type * type2
struct stasis_message_type * none
struct stasis_message_type * type1
#define AST_TEST_REGISTER(cb)
#define ast_test_status_update(a, b, c...)
#define AST_TEST_UNREGISTER(cb)
static void cache_test_data_dtor(void *obj)
AST_TEST_DEFINE(message_type)
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct test_message_types * create_message_types(struct ast_test *test)
static struct stasis_message * cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
static int consumer_wait_for_completion(struct consumer *consumer)
static struct stasis_message * cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type, const char *data)
static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
static void consumer_dtor(void *obj)
static struct ast_event * fake_event(struct stasis_message *message)
static const char * cache_test_data_id(struct stasis_message *message)
static struct ast_manager_event_blob * fake_ami(struct stasis_message *message)
static void destroy_message_types(void *obj)
static struct ast_json * fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
static void destroy_cts(void *obj)
static struct stasis_message * cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
static struct consumer * consumer_create(int ignore_subscriptions)
static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
static const char * cache_simple(struct stasis_message *message)
static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
static struct cts * create_cts(struct ast_test *test)
static int load_module(void)
static int unload_module(void)
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
static const char * noop_get_id(struct stasis_message *message)
static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
static void dump_consumer(struct ast_test *test, struct cts *cts)
static void noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static struct stasis_message_vtable fake_vtable
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
struct ast_eid ast_eid_default
Global EID.