35#include <corosync/cpg.h>
36#include <corosync/cfg.h>
55#define COROSYNC_POLL_TIMEOUT (10 * 1000)
89#define COROSYNC_IPC_BUFFER_SIZE (8192 * 128)
92#define corosync_pthread_create_background(a, b, c, d) \
93 ast_pthread_create_stack(a, b, c, d, \
94 (AST_BACKGROUND_STACKSIZE + (3 * COROSYNC_IPC_BUFFER_SIZE)), \
95 __FILE__, __FUNCTION__, __LINE__, #c)
145 cmp = (left->
id == *
id);
148 cmp = (left->
id == right->
id);
187 if (!payload->
event) {
213 if (!corosync_ping_message_type()) {
228 ao2_t_ref(payload, -1,
"Destroy payload on off nominal");
234 ao2_t_ref(payload, -1,
"Hand ref to stasis");
261 .publish_default = 1,
262 .subscribe_default = 1,
264 .message_type_fn = corosync_ping_message_type,
267 .publish_default = 1,
268 .subscribe_default = 1,
280 .alert_pipe = { -1, -1 },
286#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
287static void cfg_state_track_cb(
288 corosync_cfg_state_notification_buffer_t *notification_buffer,
293 corosync_cfg_shutdown_flags_t flags);
296#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
297 .corosync_cfg_state_track_callback = cfg_state_track_cb,
318 joined ?
"joined" :
"left");
398 unsigned int new_msgs;
399 unsigned int old_msgs;
414 if (new_msgs > INT_MAX) {
418 if (old_msgs > INT_MAX) {
423 (
int)old_msgs,
NULL, event_eid)) {
436 unsigned int cachable;
458static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
459 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len);
461static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
462 const struct cpg_address *member_list,
size_t member_list_entries,
463 const struct cpg_address *left_list,
size_t left_list_entries,
464 const struct cpg_address *joined_list,
size_t joined_list_entries);
471#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
472static void cfg_state_track_cb(
473 corosync_cfg_state_notification_buffer_t *notification_buffer,
480 corosync_cfg_shutdown_flags_t flags)
484static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
485 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len)
493 ast_debug(1,
"Ignoring event that's too small. %u < %u\n",
494 (
unsigned int) msg_len,
513 publish_handler =
event_types[event_type].publish_to_stasis;
528 memcpy(
event, msg, msg_len);
538 ast_debug(5,
"Publishing event %s (%u) to stasis\n",
540 publish_handler(
event);
549 iov.iov_base = (
void *)
event;
552 ast_debug(5,
"Publishing event %s (%u) to corosync\n",
558 ast_debug(5,
"publish_event_to_corosync rdlock\n");
559 if ((cs_err = cpg_mcast_joined(
cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
564 ast_debug(5,
"publish_event_to_corosync unlock\n");
647static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
648 const struct cpg_address *member_list,
size_t member_list_entries,
649 const struct cpg_address *left_list,
size_t left_list_entries,
650 const struct cpg_address *joined_list,
size_t joined_list_entries)
655 for (i = 0; i < left_list_entries; i++) {
656 const struct cpg_address *cpg_node = &left_list[i];
692 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
701 if (!joined_list_entries) {
732 ao2_t_ref(messages, -1,
"Dispose of dumped cache");
740 unsigned int node_id;
742 corosync_cfg_node_address_t corosync_addr;
750 ast_debug(5,
"send_cluster_notify rdlock\n");
752 if ((cs_err = corosync_cfg_local_get(
cfg_handle, &node_id)) != CS_OK) {
753 ast_log(
LOG_WARNING,
"Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
757 if (((cs_err = corosync_cfg_get_node_addrs(
cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758 ast_log(
LOG_WARNING,
"Failed to get local Corosync address. Not informing cluster of existance.\n");
763 ast_debug(5,
"send_cluster_notify unlock\n");
766 sa = (
struct sockaddr *)corosync_addr.address;
767 sa_len = (
size_t)corosync_addr.address_length;
768 if ((res = getnameinfo(sa, sa_len,
buf,
sizeof(
buf),
NULL, 0, NI_NUMERICHOST))) {
769 ast_log(
LOG_WARNING,
"Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770 gai_strerror(res), res);
785 struct pollfd pfd[3] = {
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 { .events = POLLIN, },
792 ast_debug(5,
"dispatch_thread_handler rdlock\n");
793 if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
796 ast_debug(5,
"dispatch_thread_handler unlock\n");
800 if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
803 ast_debug(5,
"dispatch_thread_handler unlock\n");
809 ast_debug(5,
"dispatch_thread_handler unlock\n");
811 ast_log(
LOG_ERROR,
"Failed to get fd: initializing CPG. This module is now broken.\n");
825 if (res == -1 &&
errno != EINTR &&
errno != EAGAIN) {
827 cs_err = CS_ERR_BAD_HANDLE;
828 }
else if (res == 0) {
829 unsigned int local_nodeid;
832 ast_debug(5,
"dispatch_thread_handler rdlock\n");
833 if ((cs_err = cpg_local_get(
cpg_handle, &local_nodeid)) == CS_OK) {
834 struct cpg_name
name;
835 struct cpg_address
address[CPG_MEMBERS_MAX];
836 int entries = CPG_MEMBERS_MAX;
844 ast_debug(1,
"CPG group has %i node membership\n", entries);
845 for (i = 0; (i < entries) && !found; i++) {
846 if (
address[i].nodeid == local_nodeid)
852 cs_err = CS_ERR_BAD_HANDLE;
857 cs_err = CS_ERR_BAD_HANDLE;
862 cs_err = CS_ERR_BAD_HANDLE;
865 ast_debug(5,
"dispatch_thread_handler unlock\n");
869 cs_err = CS_ERR_BAD_HANDLE;
873 ast_debug(5,
"dispatch_thread_handler rdlock\n");
874 if (pfd[0].revents & POLLIN) {
875 if ((cs_err = cpg_dispatch(
cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
880 if (pfd[1].revents & POLLIN) {
881 if ((cs_err = corosync_cfg_dispatch(
cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
886 ast_debug(5,
"dispatch_thread_handler unlock\n");
891 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
898 struct cpg_name
name;
899 ast_debug(5,
"dispatch_thread_handler wrlock\n");
913 ast_debug(5,
"dispatch_thread_handler unlock\n");
921 ast_debug(5,
"dispatch_thread_handler unlock\n");
926 if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
929 ast_debug(5,
"dispatch_thread_handler unlock\n");
934 if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
937 ast_debug(5,
"dispatch_thread_handler unlock\n");
947 ast_debug(5,
"dispatch_thread_handler unlock\n");
953 ast_debug(5,
"dispatch_thread_handler unlock\n");
957 ast_log(
LOG_NOTICE,
"Failed to recover from corosync failure: initializing CPG.\n");
968 cpg_iteration_handle_t cpg_iter;
969 struct cpg_iteration_description_t cpg_desc;
974 e->
command =
"corosync show members";
976 "Usage: corosync show members\n"
977 " Show corosync cluster members\n";
984 if (
a->argc != e->
args) {
989 ast_debug(5,
"corosync_show_members rdlock\n");
990 cs_err = cpg_iteration_initialize(
cpg_handle, CPG_ITERATION_ALL,
NULL, &cpg_iter);
992 if (cs_err != CS_OK) {
993 ast_cli(
a->fd,
"Failed to initialize CPG iterator: %u.\n", cs_err);
994 cpg_iteration_finalize(cpg_iter);
996 ast_debug(5,
"corosync_show_members unlock\n");
1001 "=============================================================\n"
1002 "=== Cluster members =========================================\n"
1003 "=============================================================\n"
1006 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1008 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010 corosync_cfg_node_address_t addrs[8];
1015 ast_cli(
a->fd,
"=== Node %u\n", i);
1016 ast_cli(
a->fd,
"=== --> Group: %s\n", cpg_desc.group.value);
1018 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1024 cs_err = corosync_cfg_get_node_addrs(
cfg_handle, cpg_desc.nodeid,
1026 if (cs_err != CS_OK) {
1031 for (j = 0; j < num_addrs; j++) {
1032 struct sockaddr *sa = (
struct sockaddr *) addrs[j].
address;
1033 size_t sa_len = (size_t) addrs[j].address_length;
1036 getnameinfo(sa, sa_len,
buf,
sizeof(
buf),
NULL, 0, NI_NUMERICHOST);
1038 ast_cli(
a->fd,
"=== --> Address %u: %s\n", j + 1,
buf);
1041 ast_cli(
a->fd,
"=== --> Nodeid: %"PRIu32
"\n", cpg_desc.nodeid);
1046 "=============================================================\n"
1049 cpg_iteration_finalize(cpg_iter);
1051 ast_debug(5,
"corosync_show_members unlock\n");
1053 ast_cli(
a->fd,
"Failed to initialize CPG iterator: initializing CPG.\n");
1067 "Usage: corosync ping\n"
1068 " Send a test ping to the cluster.\n"
1069 "A NOTICE will be in the log for every ping received\n"
1070 "on a server.\n If you send a ping, you should see a NOTICE\n"
1071 "in the log for every server in the cluster.\n";
1078 if (
a->argc != e->
args) {
1100 e->
command =
"corosync show config";
1102 "Usage: corosync show config\n"
1103 " Show configuration loaded from res_corosync.conf\n";
1110 if (
a->argc != e->
args) {
1115 "=============================================================\n"
1116 "=== res_corosync config =====================================\n"
1117 "=============================================================\n"
1121 ast_debug(5,
"corosync_show_config rdlock\n");
1124 ast_cli(
a->fd,
"=== ==> Publishing Event Type: %s\n",
1128 ast_cli(
a->fd,
"=== ==> Subscribing to Event Type: %s\n",
1133 ast_debug(5,
"corosync_show_config unlock\n");
1136 "=============================================================\n"
1184 ast_debug(5,
"load_general_config wrlock\n");
1192 if (!strcasecmp(v->
name,
"publish_event")) {
1194 }
else if (!strcasecmp(v->
name,
"subscribe_event")) {
1217 ast_debug(5,
"load_general_config unlock\n");
1224 static const char filename[] =
"res_corosync.conf";
1226 const char *cat =
NULL;
1237 if (!strcasecmp(cat,
"general")) {
1263 ast_debug(5,
"cleanup_module wrlock\n");
1273 ast_debug(5,
"cleanup_module unlock\n");
1281 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
1297 char meepmeep =
'x';
1318 ast_debug(5,
"cleanup_module wrlock\n");
1330 ast_debug(5,
"cleanup_module unlock\n");
1339 struct cpg_name
name;
Asterisk main include file. File version handling, generic pbx functions.
#define ast_malloc(len)
A wrapper for malloc()
#define ao2_t_ref(o, delta, tag)
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_find(container, arg, flags)
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Standard Command Line Interface.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
struct stasis_message_type * ast_device_state_message_type(void)
Get the Stasis message type for device state messages.
struct stasis_cache * ast_device_state_cache(void)
Backend cache for ast_device_state_topic_cached()
struct stasis_topic * ast_device_state_topic_all(void)
Get the Stasis topic for device state messages.
ast_device_state
Device States.
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
void ast_event_destroy(struct ast_event *event)
Destroy an event.
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
@ AST_EVENT_IE_STATE
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
@ AST_EVENT_IE_CONTEXT
Context IE Used by AST_EVENT_MWI Payload type: str.
@ AST_EVENT_IE_LOCAL_ADDR
@ AST_EVENT_IE_DEVICE
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
@ AST_EVENT_IE_EID
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
@ AST_EVENT_IE_MAILBOX
Mailbox name.
@ AST_EVENT_IE_CACHABLE
Event non-cacheability flag Used by: All events Payload type: UINT.
@ AST_EVENT_IE_NODE_ID
Cluster node ID Used by: Corosync Payload type: UINT.
@ AST_EVENT_IE_OLDMSGS
Number of Used by: AST_EVENT_MWI Payload type: UINT.
@ AST_EVENT_IE_NEWMSGS
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
@ AST_EVENT_DEVICE_STATE_CHANGE
@ AST_EVENT_CLUSTER_DISCOVERY
@ AST_EVENT_IE_PLTYPE_RAW
@ AST_EVENT_IE_PLTYPE_UINT
@ AST_EVENT_IE_PLTYPE_STR
Configuration File Parser.
#define ast_config_load(filename, flags)
Load a config file.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
#define CONFIG_STATUS_FILEMISSING
#define CONFIG_STATUS_FILEINVALID
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Support for logging to various files, console and syslog Configuration in file logger....
#define ast_debug(level,...)
Log a DEBUG message.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
#define ast_rwlock_wrlock(a)
#define AST_PTHREADT_NULL
#define ast_rwlock_rdlock(a)
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
#define ast_rwlock_trywrlock(a)
#define ast_rwlock_unlock(a)
#define ast_rwlock_tryrdlock(a)
Asterisk module definitions.
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
struct stasis_message_type * ast_mwi_state_type(void)
Get the Stasis Message Bus API message type for MWI messages.
struct stasis_cache * ast_mwi_state_cache(void)
Backend cache for ast_mwi_topic_cached().
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
struct stasis_topic * ast_mwi_topic_all(void)
Get the Stasis Message Bus API topic for MWI messages.
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
#define ast_poll(a, b, c)
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
static void publish_event_to_corosync(struct ast_event *event)
static struct ast_event * corosync_ping_to_event(struct stasis_message *message)
Convert a Corosync PING to a ast_event.
void(* publish_to_stasis)(struct ast_event *)
static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
static int clear_node_cache(void *obj, void *arg, int flags)
static cpg_callbacks_t cpg_callbacks
static void publish_to_corosync(struct stasis_message *message)
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
static void * dispatch_thread_handler(void *data)
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
static struct ao2_container * nodes
All the nodes that we're aware of.
static void publish_corosync_ping_to_stasis(struct ast_event *event)
Publish a Corosync ping to Stasis Message Bus API.
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
struct stasis_message_type *(* message_type_fn)(void)
static struct ast_cli_entry corosync_cli[]
unsigned char publish_default
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
static void cleanup_module(void)
static int corosync_node_joined
Join to corosync.
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
static corosync_cfg_callbacks_t cfg_callbacks
static struct @433 dispatch_thread
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
static int load_general_config(struct ast_config *cfg)
static void publish_device_state_to_stasis(struct ast_event *event)
Publish a received device state ast_event to Stasis Message Bus API.
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
static int corosync_node_hash_fn(const void *obj, const int flags)
struct stasis_cache *(* cache_fn)(void)
struct stasis_forward * sub
unsigned char subscribe_default
struct stasis_topic *(* topic_fn)(void)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
static ast_rwlock_t event_types_lock
static void publish_cluster_discovery_to_stasis(struct ast_event *event)
Publish a received cluster discovery ast_event to Stasis Message Bus API.
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int load_module(void)
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,.to_event=corosync_ping_to_event,)
static corosync_cfg_handle_t cfg_handle
static int unload_module(void)
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static int load_config(unsigned int reload)
static int set_event(const char *event_type, int pubsub)
static void publish_mwi_to_stasis(struct ast_event *event)
Publish a received MWI ast_event to Stasis Message Bus API.
static ast_rwlock_t init_cpg_lock
AST_MODULE_INFO_STANDARD_EXTENDED(ASTERISK_GPL_KEY, "Corosync")
static cpg_handle_t cpg_handle
static int dump_cache_cb(void *obj, void *arg, int flags)
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
static struct @432 event_types[]
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
#define stasis_message_router_create(topic)
Create a new message router object.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
An Entity ID is essentially a MAC address, brief and unique.
Structure used to handle boolean flags.
Abstract JSON element (object, array, string, int, ...).
Socket address structure.
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A payload wrapper around a corosync ping event.
An API for managing task processing threads that can be shared across modules.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
int error(const char *format,...)
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
struct ast_eid ast_eid_default
Global EID.