35#include <corosync/cpg.h> 
   36#include <corosync/cfg.h> 
   55#define COROSYNC_POLL_TIMEOUT               (10 * 1000) 
   89#define COROSYNC_IPC_BUFFER_SIZE                (8192 * 128) 
   92#define corosync_pthread_create_background(a, b, c, d)              \ 
   93    ast_pthread_create_stack(a, b, c, d,                    \ 
   94        (AST_BACKGROUND_STACKSIZE + (3 * COROSYNC_IPC_BUFFER_SIZE)),    \ 
   95        __FILE__, __FUNCTION__, __LINE__, #c) 
 
  145        cmp = (left->
id == *
id);
 
  148        cmp = (left->
id == right->
id);
 
 
  187    if (!payload->
event) {
 
 
  213    if (!corosync_ping_message_type()) {
 
  228        ao2_t_ref(payload, -1, 
"Destroy payload on off nominal");
 
  234    ao2_t_ref(payload, -1, 
"Hand ref to stasis");
 
 
  261                         .publish_default = 1,
 
  262                         .subscribe_default = 1,
 
  264                         .message_type_fn = corosync_ping_message_type,
 
  267                                      .publish_default = 1,
 
  268                                      .subscribe_default = 1,
 
  280    .alert_pipe = { -1, -1 },
 
  286#ifdef HAVE_COROSYNC_CFG_STATE_TRACK 
  287static void cfg_state_track_cb(
 
  288        corosync_cfg_state_notification_buffer_t *notification_buffer,
 
  293        corosync_cfg_shutdown_flags_t flags);
 
  296#ifdef HAVE_COROSYNC_CFG_STATE_TRACK 
  297    .corosync_cfg_state_track_callback = cfg_state_track_cb,
 
 
  318        joined ? 
"joined" : 
"left");
 
 
  398    unsigned int new_msgs;
 
  399    unsigned int old_msgs;
 
  414    if (new_msgs > INT_MAX) {
 
  418    if (old_msgs > INT_MAX) {
 
  423                                   (
int)old_msgs, 
NULL, event_eid)) {
 
  427            mailbox, context, 
eid);
 
 
  436    unsigned int cachable;
 
 
  458static void cpg_deliver_cb(cpg_handle_t handle, 
const struct cpg_name *group_name,
 
  459        uint32_t nodeid, uint32_t pid, 
void *msg, 
size_t msg_len);
 
  461static void cpg_confchg_cb(cpg_handle_t handle, 
const struct cpg_name *group_name,
 
  462        const struct cpg_address *member_list, 
size_t member_list_entries,
 
  463        const struct cpg_address *left_list, 
size_t left_list_entries,
 
  464        const struct cpg_address *joined_list, 
size_t joined_list_entries);
 
  471#ifdef HAVE_COROSYNC_CFG_STATE_TRACK 
  472static void cfg_state_track_cb(
 
  473        corosync_cfg_state_notification_buffer_t *notification_buffer,
 
  480        corosync_cfg_shutdown_flags_t flags)
 
 
  484static void cpg_deliver_cb(cpg_handle_t handle, 
const struct cpg_name *group_name,
 
  485        uint32_t nodeid, uint32_t pid, 
void *msg, 
size_t msg_len)
 
  493        ast_debug(1, 
"Ignoring event that's too small. %u < %u\n",
 
  494            (
unsigned int) msg_len,
 
  513    publish_handler = 
event_types[event_type].publish_to_stasis;
 
  528    memcpy(
event, msg, msg_len);
 
  538    ast_debug(5, 
"Publishing event %s (%u) to stasis\n",
 
  540    publish_handler(
event);
 
 
  549    iov.iov_base = (
void *)
event;
 
  552    ast_debug(5, 
"Publishing event %s (%u) to corosync\n",
 
  558        ast_debug(5, 
"publish_event_to_corosync rdlock\n");
 
  559        if ((cs_err = cpg_mcast_joined(
cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
 
  564        ast_debug(5, 
"publish_event_to_corosync unlock\n");
 
 
  647static void cpg_confchg_cb(cpg_handle_t handle, 
const struct cpg_name *group_name,
 
  648        const struct cpg_address *member_list, 
size_t member_list_entries,
 
  649        const struct cpg_address *left_list, 
size_t left_list_entries,
 
  650        const struct cpg_address *joined_list, 
size_t joined_list_entries)
 
  655    for (i = 0; i < left_list_entries; i++) {
 
  656        const struct cpg_address *cpg_node = &left_list[i];
 
  692            ao2_t_ref(messages, -1, 
"Dispose of flushed cache");
 
  701    if (!joined_list_entries) {
 
  732        ao2_t_ref(messages, -1, 
"Dispose of dumped cache");
 
 
  740    unsigned int node_id;
 
  742    corosync_cfg_node_address_t corosync_addr;
 
  750        ast_debug(5, 
"send_cluster_notify rdlock\n");
 
  752        if ((cs_err = corosync_cfg_local_get(
cfg_handle, &node_id)) != CS_OK) {
 
  753            ast_log(
LOG_WARNING, 
"Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
 
  757        if (((cs_err = corosync_cfg_get_node_addrs(
cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
 
  758            ast_log(
LOG_WARNING, 
"Failed to get local Corosync address. Not informing cluster of existance.\n");
 
  763        ast_debug(5, 
"send_cluster_notify unlock\n");
 
  766    sa = (
struct sockaddr *)corosync_addr.address;
 
  767    sa_len = (size_t)corosync_addr.address_length;
 
  768    if ((res = getnameinfo(sa, sa_len, 
buf, 
sizeof(
buf), 
NULL, 0, NI_NUMERICHOST))) {
 
  769        ast_log(
LOG_WARNING, 
"Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
 
  770            gai_strerror(res), res);
 
 
  785    struct pollfd pfd[3] = {
 
  786        { .events = POLLIN, },
 
  787        { .events = POLLIN, },
 
  788        { .events = POLLIN, },
 
  792        ast_debug(5, 
"dispatch_thread_handler rdlock\n");
 
  793        if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
 
  796            ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  800        if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
 
  803            ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  809        ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  811        ast_log(
LOG_ERROR, 
"Failed to get fd: initializing CPG.  This module is now broken.\n");
 
  825        if (res == -1 && 
errno != EINTR && 
errno != EAGAIN) {
 
  827            cs_err = CS_ERR_BAD_HANDLE;
 
  828        } 
else if (res == 0) {
 
  829            unsigned int local_nodeid;
 
  832                ast_debug(5, 
"dispatch_thread_handler rdlock\n");
 
  833                if ((cs_err = cpg_local_get(
cpg_handle, &local_nodeid)) == CS_OK) {
 
  834                    struct cpg_name 
name;
 
  835                    struct cpg_address 
address[CPG_MEMBERS_MAX];
 
  836                    int entries = CPG_MEMBERS_MAX;
 
  844                        ast_debug(1, 
"CPG group has %i node membership\n", entries);
 
  845                        for (i = 0; (i < entries) && !found; i++) {
 
  846                            if (
address[i].nodeid == local_nodeid)
 
  852                            cs_err = CS_ERR_BAD_HANDLE;
 
  857                        cs_err = CS_ERR_BAD_HANDLE;
 
  862                    cs_err = CS_ERR_BAD_HANDLE;
 
  865                ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  869                cs_err = CS_ERR_BAD_HANDLE;
 
  873                ast_debug(5, 
"dispatch_thread_handler rdlock\n");
 
  874                if (pfd[0].revents & POLLIN) {
 
  875                    if ((cs_err = cpg_dispatch(
cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
 
  880                if (pfd[1].revents & POLLIN) {
 
  881                    if ((cs_err = corosync_cfg_dispatch(
cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
 
  886                ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  891        if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
 
  898                struct cpg_name 
name;
 
  899                ast_debug(5, 
"dispatch_thread_handler wrlock\n");
 
  913                    ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  921                    ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  926                if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
 
  929                    ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  934                if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
 
  937                    ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  947                    ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  953                ast_debug(5, 
"dispatch_thread_handler unlock\n");
 
  957                ast_log(
LOG_NOTICE, 
"Failed to recover from corosync failure: initializing CPG.\n");
 
 
  968    cpg_iteration_handle_t cpg_iter;
 
  969    struct cpg_iteration_description_t cpg_desc;
 
  974        e->
command = 
"corosync show members";
 
  976            "Usage: corosync show members\n" 
  977            "       Show corosync cluster members\n";
 
  984    if (
a->argc != e->
args) {
 
  989        ast_debug(5, 
"corosync_show_members rdlock\n");
 
  990        cs_err = cpg_iteration_initialize(
cpg_handle, CPG_ITERATION_ALL, 
NULL, &cpg_iter);
 
  992        if (cs_err != CS_OK) {
 
  993            ast_cli(
a->fd, 
"Failed to initialize CPG iterator: %u.\n", cs_err);
 
  994            cpg_iteration_finalize(cpg_iter);
 
  996            ast_debug(5, 
"corosync_show_members unlock\n");
 
 1001                    "=============================================================\n" 
 1002                    "=== Cluster members =========================================\n" 
 1003                    "=============================================================\n" 
 1006        for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
 
 1008                cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
 
 1009    #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 
 1010            corosync_cfg_node_address_t addrs[8];
 
 1015            ast_cli(
a->fd, 
"=== Node %u\n", i);
 
 1016            ast_cli(
a->fd, 
"=== --> Group: %s\n", cpg_desc.group.value);
 
 1018    #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 
 1024            cs_err = corosync_cfg_get_node_addrs(
cfg_handle, cpg_desc.nodeid,
 
 1026            if (cs_err != CS_OK) {
 
 1031            for (j = 0; j < num_addrs; j++) {
 
 1032                struct sockaddr *sa = (
struct sockaddr *) addrs[j].
address;
 
 1033                size_t sa_len = (size_t) addrs[j].address_length;
 
 1036                getnameinfo(sa, sa_len, 
buf, 
sizeof(
buf), 
NULL, 0, NI_NUMERICHOST);
 
 1038                ast_cli(
a->fd, 
"=== --> Address %u: %s\n", j + 1, 
buf);
 
 1041            ast_cli(
a->fd, 
"=== --> Nodeid: %"PRIu32
"\n", cpg_desc.nodeid);
 
 1046                    "=============================================================\n" 
 1049        cpg_iteration_finalize(cpg_iter);
 
 1051        ast_debug(5, 
"corosync_show_members unlock\n");
 
 1053        ast_cli(
a->fd, 
"Failed to initialize CPG iterator: initializing CPG.\n");
 
 
 1067            "Usage: corosync ping\n" 
 1068            "       Send a test ping to the cluster.\n" 
 1069            "A NOTICE will be in the log for every ping received\n" 
 1070            "on a server.\n  If you send a ping, you should see a NOTICE\n" 
 1071            "in the log for every server in the cluster.\n";
 
 1078    if (
a->argc != e->
args) {
 
 
 1100        e->
command = 
"corosync show config";
 
 1102            "Usage: corosync show config\n" 
 1103            "       Show configuration loaded from res_corosync.conf\n";
 
 1110    if (
a->argc != e->
args) {
 
 1115                "=============================================================\n" 
 1116                "=== res_corosync config =====================================\n" 
 1117                "=============================================================\n" 
 1121    ast_debug(5, 
"corosync_show_config rdlock\n");
 
 1124            ast_cli(
a->fd, 
"=== ==> Publishing Event Type: %s\n",
 
 1128            ast_cli(
a->fd, 
"=== ==> Subscribing to Event Type: %s\n",
 
 1133    ast_debug(5, 
"corosync_show_config unlock\n");
 
 1136                   "=============================================================\n" 
 
 1184    ast_debug(5, 
"load_general_config wrlock\n");
 
 1192        if (!strcasecmp(v->
name, 
"publish_event")) {
 
 1194        } 
else if (!strcasecmp(v->
name, 
"subscribe_event")) {
 
 1217    ast_debug(5, 
"load_general_config unlock\n");
 
 
 1224    static const char filename[] = 
"res_corosync.conf";
 
 1226    const char *cat = 
NULL;
 
 1237        if (!strcasecmp(cat, 
"general")) {
 
 
 1263            ast_debug(5, 
"cleanup_module wrlock\n");
 
 1273            ast_debug(5, 
"cleanup_module unlock\n");
 
 1281                ao2_t_ref(messages, -1, 
"Dispose of flushed cache");
 
 1297        char meepmeep = 
'x';
 
 1318        ast_debug(5, 
"cleanup_module wrlock\n");
 
 1330        ast_debug(5, 
"cleanup_module unlock\n");
 
 
 1339    struct cpg_name 
name;
 
 
void ast_cli_unregister_multiple(void)
static int load_config(void)
Asterisk main include file. File version handling, generic pbx functions.
#define ast_malloc(len)
A wrapper for malloc()
#define ao2_t_ref(o, delta, tag)
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_find(container, arg, flags)
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Standard Command Line Interface.
#define AST_CLI_DEFINE(fn, txt,...)
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
struct stasis_message_type * ast_device_state_message_type(void)
Get the Stasis message type for device state messages.
struct stasis_cache * ast_device_state_cache(void)
Backend cache for ast_device_state_topic_cached()
struct stasis_topic * ast_device_state_topic_all(void)
Get the Stasis topic for device state messages.
ast_device_state
Device States.
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
void ast_event_destroy(struct ast_event *event)
Destroy an event.
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
@ AST_EVENT_IE_STATE
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
@ AST_EVENT_IE_CONTEXT
Context IE Used by AST_EVENT_MWI Payload type: str.
@ AST_EVENT_IE_LOCAL_ADDR
@ AST_EVENT_IE_DEVICE
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
@ AST_EVENT_IE_EID
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
@ AST_EVENT_IE_MAILBOX
Mailbox name.
@ AST_EVENT_IE_CACHABLE
Event non-cacheability flag Used by: All events Payload type: UINT.
@ AST_EVENT_IE_NODE_ID
Cluster node ID Used by: Corosync Payload type: UINT.
@ AST_EVENT_IE_OLDMSGS
Number of Used by: AST_EVENT_MWI Payload type: UINT.
@ AST_EVENT_IE_NEWMSGS
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
@ AST_EVENT_DEVICE_STATE_CHANGE
@ AST_EVENT_CLUSTER_DISCOVERY
@ AST_EVENT_IE_PLTYPE_RAW
@ AST_EVENT_IE_PLTYPE_UINT
@ AST_EVENT_IE_PLTYPE_STR
Configuration File Parser.
#define ast_config_load(filename, flags)
Load a config file.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
#define CONFIG_STATUS_FILEMISSING
#define CONFIG_STATUS_FILEINVALID
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Support for logging to various files, console and syslog Configuration in file logger....
#define ast_debug(level,...)
Log a DEBUG message.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
#define ast_rwlock_wrlock(a)
#define AST_PTHREADT_NULL
#define ast_rwlock_rdlock(a)
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
#define ast_rwlock_trywrlock(a)
#define ast_rwlock_unlock(a)
#define ast_rwlock_tryrdlock(a)
Asterisk module definitions.
#define ASTERISK_GPL_KEY
The text the key() function should return.
#define AST_MODULE_INFO_STANDARD_EXTENDED(keystr, desc)
@ AST_MODULE_LOAD_SUCCESS
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
struct stasis_message_type * ast_mwi_state_type(void)
Get the Stasis Message Bus API message type for MWI messages.
struct stasis_cache * ast_mwi_state_cache(void)
Backend cache for ast_mwi_topic_cached().
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
struct stasis_topic * ast_mwi_topic_all(void)
Get the Stasis Message Bus API topic for MWI messages.
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
#define ast_poll(a, b, c)
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
static void publish_event_to_corosync(struct ast_event *event)
static struct ast_event * corosync_ping_to_event(struct stasis_message *message)
Convert a Corosync PING to a ast_event.
void(* publish_to_stasis)(struct ast_event *)
static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
static int clear_node_cache(void *obj, void *arg, int flags)
static cpg_callbacks_t cpg_callbacks
static void publish_to_corosync(struct stasis_message *message)
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
static void * dispatch_thread_handler(void *data)
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
static struct ao2_container * nodes
All the nodes that we're aware of.
static void publish_corosync_ping_to_stasis(struct ast_event *event)
Publish a Corosync ping to Stasis Message Bus API.
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
struct stasis_message_type *(* message_type_fn)(void)
static struct ast_cli_entry corosync_cli[]
unsigned char publish_default
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
static void cleanup_module(void)
static int corosync_node_joined
Join to corosync.
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
static corosync_cfg_callbacks_t cfg_callbacks
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
static int load_general_config(struct ast_config *cfg)
static void publish_device_state_to_stasis(struct ast_event *event)
Publish a received device state ast_event to Stasis Message Bus API.
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
static int corosync_node_hash_fn(const void *obj, const int flags)
struct stasis_cache *(* cache_fn)(void)
struct stasis_forward * sub
unsigned char subscribe_default
struct stasis_topic *(* topic_fn)(void)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
static struct @468 dispatch_thread
static ast_rwlock_t event_types_lock
static void publish_cluster_discovery_to_stasis(struct ast_event *event)
Publish a received cluster discovery ast_event to Stasis Message Bus API.
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int load_module(void)
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
static struct @467 event_types[]
static corosync_cfg_handle_t cfg_handle
static int unload_module(void)
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static int set_event(const char *event_type, int pubsub)
static void publish_mwi_to_stasis(struct ast_event *event)
Publish a received MWI ast_event to Stasis Message Bus API.
static ast_rwlock_t init_cpg_lock
static cpg_handle_t cpg_handle
static int dump_cache_cb(void *obj, void *arg, int flags)
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name,...)
Boiler-plate messaging macro for defining local message types.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
#define stasis_message_router_create(topic)
Create a new message router object.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
An Entity ID is essentially a MAC address, brief and unique.
Structure used to handle boolean flags.
Abstract JSON element (object, array, string, int, ...).
Socket address structure.
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A payload wrapper around a corosync ping event.
An API for managing task processing threads that can be shared across modules.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
struct ast_eid ast_eid_default
Global EID.