Asterisk - The Open Source Telephony Project  GIT-master-e8cda4b
Data Structures | Macros | Enumerations | Functions | Variables
res_corosync.c File Reference
#include "asterisk.h"
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/poll-compat.h"
#include "asterisk/config.h"
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
#include "asterisk/mwi.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_system.h"
#include "asterisk/taskprocessor.h"

Go to the source code of this file.

Data Structures

struct  corosync_node
 
struct  corosync_ping_payload
 A payload wrapper around a corosync ping event. More...
 

Macros

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)
 Corosync ipc dispatch/request and reply size. More...
 
#define COROSYNC_POLL_TIMEOUT   (10 * 1000)
 Timeout for Corosync's poll process. More...
 
#define corosync_pthread_create_background(a, b, c, d)
 Version of pthread_create to ensure stack is large enough. More...
 

Enumerations

enum  { PUBLISH, SUBSCRIBE }
 

Functions

 AST_MODULE_INFO_STANDARD_EXTENDED (ASTERISK_GPL_KEY, "Corosync")
 
static void cfg_shutdown_cb (corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
 
static void cleanup_module (void)
 
static int clear_node_cache (void *obj, void *arg, int flags)
 
static struct corosync_nodecorosync_node_alloc (struct ast_event *event)
 
static int corosync_node_cmp_fn (void *obj, void *arg, int flags)
 
static int corosync_node_hash_fn (const void *obj, const int flags)
 
static char * corosync_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static void corosync_ping_payload_dtor (void *obj)
 Destructor for the corosync_ping_payload wrapper object. More...
 
static struct ast_eventcorosync_ping_to_event (struct stasis_message *message)
 Convert a Corosync PING to a ast_event. More...
 
static char * corosync_show_config (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * corosync_show_members (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static struct stasis_topiccorosync_topic (void)
 Internal accessor for our topic. More...
 
static void cpg_confchg_cb (cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
 
static void cpg_deliver_cb (cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 
static void * dispatch_thread_handler (void *data)
 
static int dump_cache_cb (void *obj, void *arg, int flags)
 
static int load_config (unsigned int reload)
 
static int load_general_config (struct ast_config *cfg)
 
static int load_module (void)
 
static void publish_cluster_discovery_to_stasis (struct ast_event *event)
 Publish a received cluster discovery ast_event to Stasis Message Bus API. More...
 
static void publish_cluster_discovery_to_stasis_full (struct corosync_node *node, int joined)
 Publish cluster discovery to Stasis Message Bus API. More...
 
static void publish_corosync_ping_to_stasis (struct ast_event *event)
 Publish a Corosync ping to Stasis Message Bus API. More...
 
static void publish_device_state_to_stasis (struct ast_event *event)
 Publish a received device state ast_event to Stasis Message Bus API. More...
 
static void publish_event_to_corosync (struct ast_event *event)
 
static void publish_mwi_to_stasis (struct ast_event *event)
 Publish a received MWI ast_event to Stasis Message Bus API. More...
 
static void publish_to_corosync (struct stasis_message *message)
 
static void send_cluster_notify (void)
 Informs the cluster of our EID and our IP addresses. More...
 
static int set_event (const char *event_type, int pubsub)
 
static void stasis_message_cb (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
 STASIS_MESSAGE_TYPE_DEFN_LOCAL (corosync_ping_message_type,.to_event=corosync_ping_to_event,)
 
static int unload_module (void)
 

Variables

static corosync_cfg_callbacks_t cfg_callbacks
 
static corosync_cfg_handle_t cfg_handle
 
static struct stasis_topiccorosync_aggregate_topic
 The internal topic used for message forwarding and pings. More...
 
static struct ast_cli_entry corosync_cli []
 
static int corosync_node_joined = 0
 Join to corosync. More...
 
static cpg_callbacks_t cpg_callbacks
 
static cpg_handle_t cpg_handle
 
struct {
   int   alert_pipe [2]
 
   pthread_t   id
 
   unsigned int   stop:1
 
dispatch_thread
 
struct {
   struct stasis_cache *(*   cache_fn )(void)
 
   struct stasis_message_type *(*   message_type_fn )(void)
 
   const char *   name
 
   unsigned char   publish
 
   unsigned char   publish_default
 
   void(*   publish_to_stasis )(struct ast_event *)
 
   struct stasis_forward *   sub
 
   unsigned char   subscribe
 
   unsigned char   subscribe_default
 
   struct stasis_topic *(*   topic_fn )(void)
 
event_types []
 
static ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static struct ao2_containernodes
 All the nodes that we're aware of. More...
 
static struct stasis_message_routerstasis_router
 Our Stasis Message Bus API message router. More...
 

Detailed Description

module publishes ast_event representations of information to other Asterisk instances in a cluster.

Events have an associated event type, as well as information elements. The information elements are the meta data that go along with each event. For example, in the case of message waiting indication, the event type is MWI, and each MWI event contains at least three information elements: the mailbox, the number of new messages, and the number of old messages.

Author
Russell Bryant russe.nosp@m.ll@r.nosp@m.ussel.nosp@m.lbry.nosp@m.ant.n.nosp@m.et

This module is based on and replaces the previous res_ais module.

Definition in file res_corosync.c.

Macro Definition Documentation

◆ COROSYNC_IPC_BUFFER_SIZE

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)

Corosync ipc dispatch/request and reply size.

Definition at line 89 of file res_corosync.c.

◆ COROSYNC_POLL_TIMEOUT

#define COROSYNC_POLL_TIMEOUT   (10 * 1000)

Timeout for Corosync's poll process.

Definition at line 55 of file res_corosync.c.

Referenced by dispatch_thread_handler().

◆ corosync_pthread_create_background

#define corosync_pthread_create_background (   a,
  b,
  c,
  d 
)
Value:
__FILE__, __FUNCTION__, __LINE__, #c)
static struct test_val d
#define COROSYNC_IPC_BUFFER_SIZE
Corosync ipc dispatch/request and reply size.
Definition: res_corosync.c:89
static struct test_val c
#define AST_BACKGROUND_STACKSIZE
Definition: ooh323cDriver.c:26
static struct test_val b
int ast_pthread_create_stack(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *data, size_t stacksize, const char *file, const char *caller, int line, const char *start_fn)
Definition: main/utils.c:1446
static struct test_val a

Version of pthread_create to ensure stack is large enough.

Definition at line 92 of file res_corosync.c.

Referenced by load_module().

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
PUBLISH 
SUBSCRIBE 

Definition at line 1148 of file res_corosync.c.

1148  {
1149  PUBLISH,
1150  SUBSCRIBE,
1151 };

Function Documentation

◆ AST_MODULE_INFO_STANDARD_EXTENDED()

AST_MODULE_INFO_STANDARD_EXTENDED ( ASTERISK_GPL_KEY  ,
"Corosync"   
)

Referenced by unload_module().

◆ cfg_shutdown_cb()

static void cfg_shutdown_cb ( corosync_cfg_handle_t  cfg_handle,
corosync_cfg_shutdown_flags_t  flags 
)
static

Definition at line 479 of file res_corosync.c.

481 {
482 }

◆ cleanup_module()

static void cleanup_module ( void  )
static

Definition at line 1249 of file res_corosync.c.

References ao2_callback, ao2_cleanup, ao2_container_count(), ao2_t_ref, ARRAY_LEN, ast_carefulwrite(), ast_debug, ast_log, AST_PTHREADT_NULL, ast_rwlock_trywrlock, ast_rwlock_unlock, ast_rwlock_wrlock, cache_fn, cfg_handle, clear_node_cache(), corosync_node_joined, cpg_handle, dispatch_thread, errno, event_types, event_types_lock, init_cpg_lock, LOG_ERROR, LOG_NOTICE, message_type_fn, name, NULL, OBJ_NODATA, stasis_cache_dump_all(), stasis_forward_cancel(), stasis_message_router_remove(), stasis_message_router_unsubscribe_and_join(), STASIS_MESSAGE_TYPE_CLEANUP, sub, subscribe, and topic_fn.

Referenced by load_module(), and unload_module().

1250 {
1251  cs_error_t cs_err;
1252  unsigned int i;
1253 
1254  if (stasis_router) {
1255 
1256  /* Unsubscribe all topic forwards and cancel all message routes */
1257  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1258  struct ao2_container *messages = NULL;
1259  int messages_count;
1260  unsigned char subscribe = 0;
1261 
1263  ast_debug(5, "cleanup_module wrlock\n");
1264  subscribe = event_types[i].subscribe;
1265 
1266  if (event_types[i].sub) {
1269  }
1270  event_types[i].publish = 0;
1271  event_types[i].subscribe = 0;
1273  ast_debug(5, "cleanup_module unlock\n");
1274 
1275  if (subscribe && event_types[i].cache_fn && event_types[i].message_type_fn) {
1277  messages_count = ao2_container_count(messages);
1278  ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1280  ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
1281  ao2_t_ref(messages, -1, "Dispose of flushed cache");
1282  }
1283  }
1284 
1286  stasis_router = NULL;
1287  }
1288 
1290  ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1292  }
1293 
1294  STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1295 
1296  if (dispatch_thread.id != AST_PTHREADT_NULL) {
1297  char meepmeep = 'x';
1298  dispatch_thread.stop = 1;
1299  if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1300  5000) == -1) {
1301  ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1302  strerror(errno), errno);
1303  }
1304  pthread_join(dispatch_thread.id, NULL);
1305  }
1306 
1307  if (dispatch_thread.alert_pipe[0] != -1) {
1308  close(dispatch_thread.alert_pipe[0]);
1309  dispatch_thread.alert_pipe[0] = -1;
1310  }
1311 
1312  if (dispatch_thread.alert_pipe[1] != -1) {
1313  close(dispatch_thread.alert_pipe[1]);
1314  dispatch_thread.alert_pipe[1] = -1;
1315  }
1316 
1318  ast_debug(5, "cleanup_module wrlock\n");
1319  if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1320  ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1321  }
1322  cpg_handle = 0;
1323 
1324  if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1325  ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1326  }
1327  cfg_handle = 0;
1330  ast_debug(5, "cleanup_module unlock\n");
1331  }
1332  ao2_cleanup(nodes);
1333  nodes = NULL;
1334 }
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
Definition: main/utils.c:1592
static struct @449 dispatch_thread
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
static int clear_node_cache(void *obj, void *arg, int flags)
Definition: res_corosync.c:623
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
const char * name
Definition: res_corosync.c:239
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define AST_PTHREADT_NULL
Definition: lock.h:66
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
#define LOG_ERROR
Definition: logger.h:285
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
int errno
#define LOG_NOTICE
Definition: logger.h:263
struct stasis_topic *(* topic_fn)(void)
Definition: res_corosync.c:245
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
#define ast_rwlock_wrlock(a)
Definition: lock.h:234
unsigned char subscribe
Definition: res_corosync.c:243
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
#define ast_rwlock_trywrlock(a)
Definition: lock.h:236
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Definition: res_corosync.c:71
struct stasis_forward * sub
Definition: res_corosync.c:240
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
Definition: res_corosync.c:68
Generic container type.
static struct @448 event_types[]
struct stasis_message_type *(* message_type_fn)(void)
Definition: res_corosync.c:247
struct stasis_cache *(* cache_fn)(void)
Definition: res_corosync.c:246
static struct ao2_container * nodes
All the nodes that we&#39;re aware of.
Definition: res_corosync.c:65

◆ clear_node_cache()

static int clear_node_cache ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 623 of file res_corosync.c.

References ao2_cleanup, ast_eid_cmp(), ast_eid_default, stasis_cache_clear_create(), stasis_message_eid(), and stasis_publish().

Referenced by cleanup_module(), and cpg_confchg_cb().

624 {
625  struct stasis_message *cached_msg = obj;
626  struct stasis_topic *topic = arg;
627  struct stasis_message *msg;
628  struct ast_eid *msg_eid;
629 
630  if (!cached_msg) {
631  return 0;
632  }
633 
634  msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
635  if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
636  {
637  msg = stasis_cache_clear_create(cached_msg);
638  if (msg) {
639  stasis_publish(topic, msg);
640  ao2_cleanup(msg);
641  }
642  }
643 
644  return 0;
645 }
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ corosync_node_alloc()

static struct corosync_node* corosync_node_alloc ( struct ast_event event)
static

Definition at line 97 of file res_corosync.c.

References corosync_node::addr, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), AST_EVENT_IE_EID, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, ast_sockaddr_parse(), corosync_node::eid, corosync_node::id, NULL, and PARSE_PORT_IGNORE.

Referenced by publish_cluster_discovery_to_stasis().

98 {
99  struct corosync_node *node;
100 
101  node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK);
102  if (!node) {
103  return NULL;
104  }
105 
106  memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
109 
110  return node;
111 }
Definition: test_heap.c:38
struct ast_eid eid
Definition: res_corosync.c:83
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
Definition: netsock2.c:230
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
struct ast_sockaddr addr
Definition: res_corosync.c:85
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition: event.c:302
Cluster node ID Used by: Corosync Payload type: UINT.
Definition: event_defs.h:313

◆ corosync_node_cmp_fn()

static int corosync_node_cmp_fn ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 133 of file res_corosync.c.

References ast_assert, CMP_MATCH, corosync_node::id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by load_module().

134 {
135  struct corosync_node *left = obj;
136  struct corosync_node *right = arg;
137  const int *id = arg;
138  int cmp;
139 
140  switch (flags & OBJ_SEARCH_MASK) {
141  case OBJ_SEARCH_OBJECT:
142  id = &right->id;
143  /* Fall through */
144  case OBJ_SEARCH_KEY:
145  cmp = (left->id == *id);
146  break;
148  cmp = (left->id == right->id);
149  break;
150  default:
151  /* Sort can only work on something with a full or partial key. */
152  ast_assert(0);
153  cmp = 1;
154  break;
155  }
156  return cmp ? CMP_MATCH : 0;
157 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:710
pthread_t id
Definition: res_corosync.c:275
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076

◆ corosync_node_hash_fn()

static int corosync_node_hash_fn ( const void *  obj,
const int  flags 
)
static

Definition at line 113 of file res_corosync.c.

References ast_assert, corosync_node::id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by load_module().

114 {
115  const struct corosync_node *node;
116  const int *id;
117 
118  switch (flags & OBJ_SEARCH_MASK) {
119  case OBJ_SEARCH_KEY:
120  id = obj;
121  break;
122  case OBJ_SEARCH_OBJECT:
123  node = obj;
124  id = &node->id;
125  break;
126  default:
127  ast_assert(0);
128  return 0;
129  }
130  return *id;
131 }
Definition: test_heap.c:38
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:710
pthread_t id
Definition: res_corosync.c:275
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076

◆ corosync_ping()

static char* corosync_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1059 of file res_corosync.c.

References ast_cli_args::argc, ast_cli_entry::args, ast_event_destroy(), AST_EVENT_IE_END, ast_event_new(), AST_EVENT_PING, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, NULL, and ast_cli_entry::usage.

1060 {
1061  struct ast_event *event;
1062 
1063  switch (cmd) {
1064  case CLI_INIT:
1065  e->command = "corosync ping";
1066  e->usage =
1067  "Usage: corosync ping\n"
1068  " Send a test ping to the cluster.\n"
1069  "A NOTICE will be in the log for every ping received\n"
1070  "on a server.\n If you send a ping, you should see a NOTICE\n"
1071  "in the log for every server in the cluster.\n";
1072  return NULL;
1073 
1074  case CLI_GENERATE:
1075  return NULL; /* no completion */
1076  }
1077 
1078  if (a->argc != e->args) {
1079  return CLI_SHOWUSAGE;
1080  }
1081 
1083 
1084  if (!event) {
1085  return CLI_FAILURE;
1086  }
1087 
1088  event_types[AST_EVENT_PING].publish_to_stasis(event);
1089 
1090  ast_event_destroy(event);
1091  return CLI_SUCCESS;
1092 }
An event.
Definition: event.c:81
const int argc
Definition: cli.h:160
Definition: cli.h:152
Definition: astman.c:222
#define NULL
Definition: resample.c:96
int args
This gets set in ast_cli_register()
Definition: cli.h:185
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition: event.c:524
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
static struct @448 event_types[]

◆ corosync_ping_payload_dtor()

static void corosync_ping_payload_dtor ( void *  obj)
static

Destructor for the corosync_ping_payload wrapper object.

Definition at line 167 of file res_corosync.c.

References ast_free, and corosync_ping_payload::event.

Referenced by publish_corosync_ping_to_stasis().

168 {
169  struct corosync_ping_payload *payload = obj;
170 
171  ast_free(payload->event);
172 }
struct ast_event * event
Definition: res_corosync.c:163
A payload wrapper around a corosync ping event.
Definition: res_corosync.c:161
#define ast_free(a)
Definition: astmm.h:182

◆ corosync_ping_to_event()

static struct ast_event* corosync_ping_to_event ( struct stasis_message message)
static

Convert a Corosync PING to a ast_event.

Definition at line 175 of file res_corosync.c.

References ast_event_get_ie_raw(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload::event, NULL, stasis_message_data(), and STASIS_MESSAGE_TYPE_DEFN_LOCAL().

176 {
177  struct corosync_ping_payload *payload;
178  struct ast_event *event;
179  struct ast_eid *event_eid;
180 
181  if (!message) {
182  return NULL;
183  }
184 
185  payload = stasis_message_data(message);
186 
187  if (!payload->event) {
188  return NULL;
189  }
190 
191  event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
192 
194  AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
196 
197  return event;
198 }
An event.
Definition: event.c:81
Definition: astman.c:222
#define NULL
Definition: resample.c:96
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
struct ast_event * event
Definition: res_corosync.c:163
A payload wrapper around a corosync ping event.
Definition: res_corosync.c:161
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402

◆ corosync_show_config()

static char* corosync_show_config ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1094 of file res_corosync.c.

References ast_cli_args::argc, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_rwlock_rdlock, ast_rwlock_unlock, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, event_types_lock, ast_cli_args::fd, NULL, publish, subscribe, and ast_cli_entry::usage.

1095 {
1096  unsigned int i;
1097 
1098  switch (cmd) {
1099  case CLI_INIT:
1100  e->command = "corosync show config";
1101  e->usage =
1102  "Usage: corosync show config\n"
1103  " Show configuration loaded from res_corosync.conf\n";
1104  return NULL;
1105 
1106  case CLI_GENERATE:
1107  return NULL; /* no completion */
1108  }
1109 
1110  if (a->argc != e->args) {
1111  return CLI_SHOWUSAGE;
1112  }
1113 
1114  ast_cli(a->fd, "\n"
1115  "=============================================================\n"
1116  "=== res_corosync config =====================================\n"
1117  "=============================================================\n"
1118  "===\n");
1119 
1121  ast_debug(5, "corosync_show_config rdlock\n");
1122  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1123  if (event_types[i].publish) {
1124  ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
1125  event_types[i].name);
1126  }
1127  if (event_types[i].subscribe) {
1128  ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
1129  event_types[i].name);
1130  }
1131  }
1133  ast_debug(5, "corosync_show_config unlock\n");
1134 
1135  ast_cli(a->fd, "===\n"
1136  "=============================================================\n"
1137  "\n");
1138 
1139  return CLI_SUCCESS;
1140 }
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
const int argc
Definition: cli.h:160
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
Definition: cli.h:152
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define ast_rwlock_unlock(a)
Definition: lock.h:232
unsigned char publish
Definition: res_corosync.c:241
int args
This gets set in ast_cli_register()
Definition: cli.h:185
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
const int fd
Definition: cli.h:159
#define CLI_SHOWUSAGE
Definition: cli.h:45
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
unsigned char subscribe
Definition: res_corosync.c:243
static struct @448 event_types[]

◆ corosync_show_members()

static char* corosync_show_members ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 965 of file res_corosync.c.

References ast_cli_args::argc, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, cpg_handle, ast_cli_args::fd, init_cpg_lock, LOG_WARNING, NULL, and ast_cli_entry::usage.

966 {
967  cs_error_t cs_err;
968  cpg_iteration_handle_t cpg_iter;
969  struct cpg_iteration_description_t cpg_desc;
970  unsigned int i;
971 
972  switch (cmd) {
973  case CLI_INIT:
974  e->command = "corosync show members";
975  e->usage =
976  "Usage: corosync show members\n"
977  " Show corosync cluster members\n";
978  return NULL;
979 
980  case CLI_GENERATE:
981  return NULL; /* no completion */
982  }
983 
984  if (a->argc != e->args) {
985  return CLI_SHOWUSAGE;
986  }
987 
989  ast_debug(5, "corosync_show_members rdlock\n");
990  cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
991 
992  if (cs_err != CS_OK) {
993  ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
994  cpg_iteration_finalize(cpg_iter);
996  ast_debug(5, "corosync_show_members unlock\n");
997  return CLI_FAILURE;
998  }
999 
1000  ast_cli(a->fd, "\n"
1001  "=============================================================\n"
1002  "=== Cluster members =========================================\n"
1003  "=============================================================\n"
1004  "===\n");
1005 
1006  for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1007  cs_err == CS_OK;
1008  cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009  #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010  corosync_cfg_node_address_t addrs[8];
1011  int num_addrs = 0;
1012  unsigned int j;
1013  #endif
1014 
1015  ast_cli(a->fd, "=== Node %u\n", i);
1016  ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
1017 
1018  #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1019  /*
1020  * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
1021  * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
1022  * resulting in crash.
1023  */
1024  cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1025  ARRAY_LEN(addrs), &num_addrs, addrs);
1026  if (cs_err != CS_OK) {
1027  ast_log(LOG_WARNING, "Failed to get node addresses\n");
1028  continue;
1029  }
1030 
1031  for (j = 0; j < num_addrs; j++) {
1032  struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
1033  size_t sa_len = (size_t) addrs[j].address_length;
1034  char buf[128];
1035 
1036  getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
1037 
1038  ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
1039  }
1040  #else
1041  ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
1042  #endif
1043  }
1044 
1045  ast_cli(a->fd, "===\n"
1046  "=============================================================\n"
1047  "\n");
1048 
1049  cpg_iteration_finalize(cpg_iter);
1051  ast_debug(5, "corosync_show_members unlock\n");
1052  } else {
1053  ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
1054  }
1055 
1056  return CLI_SUCCESS;
1057 }
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
char * address
Definition: f2c.h:59
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
const int argc
Definition: cli.h:160
#define LOG_WARNING
Definition: logger.h:274
Definition: cli.h:152
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define ast_rwlock_unlock(a)
Definition: lock.h:232
int args
This gets set in ast_cli_register()
Definition: cli.h:185
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:235
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
const int fd
Definition: cli.h:159
#define CLI_SHOWUSAGE
Definition: cli.h:45
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
#define CLI_SUCCESS
Definition: cli.h:44

◆ corosync_topic()

static struct stasis_topic* corosync_topic ( void  )
static

Internal accessor for our topic.

Definition at line 74 of file res_corosync.c.

References corosync_aggregate_topic.

Referenced by load_general_config(), and publish_corosync_ping_to_stasis().

75 {
77 }
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
Definition: res_corosync.c:68

◆ cpg_confchg_cb()

static void cpg_confchg_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
const struct cpg_address *  member_list,
size_t  member_list_entries,
const struct cpg_address *  left_list,
size_t  left_list_entries,
const struct cpg_address *  joined_list,
size_t  joined_list_entries 
)
static

Definition at line 647 of file res_corosync.c.

References ao2_callback, ao2_container_count(), ao2_find, ao2_ref, ao2_t_ref, ARRAY_LEN, ast_debug, ast_eid_default, ast_log, ast_rwlock_rdlock, ast_rwlock_unlock, cache_fn, clear_node_cache(), dump_cache_cb(), corosync_node::eid, event_types, event_types_lock, corosync_node::id, LOG_NOTICE, message_type_fn, name, NULL, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, publish, publish_cluster_discovery_to_stasis_full(), stasis_cache_dump_by_eid(), subscribe, and topic_fn.

Referenced by publish_device_state_to_stasis().

651 {
652  unsigned int i;
653 
654 
655  for (i = 0; i < left_list_entries; i++) {
656  const struct cpg_address *cpg_node = &left_list[i];
657  struct corosync_node* node;
658  unsigned int j;
659 
660  node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
661  if (!node) {
662  continue;
663  }
664 
665  for (j = 0; j < ARRAY_LEN(event_types); j++) {
666  struct ao2_container *messages;
667  int messages_count;
668 
670  ast_debug(5, "cpg_confchg_cb rdlock\n");
671  if (!event_types[j].subscribe) {
673  ast_debug(5, "cpg_confchg_cb unlock\n");
674  continue;
675  }
676 
679  ast_debug(5, "cpg_confchg_cb unlock\n");
680  continue;
681  }
683  ast_debug(5, "cpg_confchg_cb unlock\n");
684 
686 
687  messages_count = ao2_container_count(messages);
688  ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
690  ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
691 
692  ao2_t_ref(messages, -1, "Dispose of flushed cache");
693  }
694 
696  ao2_ref(node, -1);
697  }
698 
699  /* If any new nodes have joined, dump our cache of events we are publishing
700  * that originated from this server. */
701  if (!joined_list_entries) {
702  return;
703  }
704 
705  for (i = 0; i < ARRAY_LEN(event_types); i++) {
706  struct ao2_container *messages;
707  int messages_count;
708 
710  ast_debug(5, "cpg_confchg_cb rdlock\n");
711  if (!event_types[i].publish) {
713  ast_debug(5, "cpg_confchg_cb unlock\n");
714  continue;
715  }
716 
717  if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
719  ast_debug(5, "cpg_confchg_cb unlock\n");
720  continue;
721  }
723  ast_debug(5, "cpg_confchg_cb unlock\n");
724 
726 
727  messages_count = ao2_container_count(messages);
728  ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
730  ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
731 
732  ao2_t_ref(messages, -1, "Dispose of dumped cache");
733  }
734 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
Definition: test_heap.c:38
struct ast_eid eid
Definition: res_corosync.c:83
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
static int dump_cache_cb(void *obj, void *arg, int flags)
Definition: res_corosync.c:610
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
static int clear_node_cache(void *obj, void *arg, int flags)
Definition: res_corosync.c:623
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Definition: res_corosync.c:303
unsigned char publish
Definition: res_corosync.c:241
const char * name
Definition: res_corosync.c:239
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
#define LOG_NOTICE
Definition: logger.h:263
struct stasis_topic *(* topic_fn)(void)
Definition: res_corosync.c:245
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
unsigned char subscribe
Definition: res_corosync.c:243
Generic container type.
static struct @448 event_types[]
struct stasis_message_type *(* message_type_fn)(void)
Definition: res_corosync.c:247
struct stasis_cache *(* cache_fn)(void)
Definition: res_corosync.c:246
static struct ao2_container * nodes
All the nodes that we&#39;re aware of.
Definition: res_corosync.c:65

◆ cpg_deliver_cb()

static void cpg_deliver_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
uint32_t  nodeid,
uint32_t  pid,
void *  msg,
size_t  msg_len 
)
static

Definition at line 484 of file res_corosync.c.

References ast_debug, ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_type(), ast_event_get_type_name(), AST_EVENT_IE_EID, ast_event_minimum_length(), AST_EVENT_PING, AST_EVENT_TOTAL, ast_free, ast_log, ast_malloc, ast_rwlock_rdlock, ast_rwlock_unlock, buf, corosync_node::eid, event_types, event_types_lock, LOG_NOTICE, NULL, and subscribe.

Referenced by publish_device_state_to_stasis().

486 {
487  struct ast_event *event;
488  void (*publish_handler)(struct ast_event *) = NULL;
489  enum ast_event_type event_type;
490  struct ast_eid *event_eid;
491 
492  if (msg_len < ast_event_minimum_length()) {
493  ast_debug(1, "Ignoring event that's too small. %u < %u\n",
494  (unsigned int) msg_len,
495  (unsigned int) ast_event_minimum_length());
496  return;
497  }
498 
499  event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
500  if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
501  /* Don't feed events back in that originated locally. */
502  return;
503  }
504 
505  event_type = ast_event_get_type(msg);
506  if (event_type > AST_EVENT_TOTAL) {
507  /* Egads, we don't support this */
508  return;
509  }
510 
512  ast_debug(5, "cpg_deliver_cb rdlock\n");
513  publish_handler = event_types[event_type].publish_to_stasis;
514  if (!event_types[event_type].subscribe || !publish_handler) {
515  /* We are not configured to subscribe to these events or
516  we have no way to publish it internally. */
518  ast_debug(5, "cpg_deliver_cb unlock\n");
519  return;
520  }
522  ast_debug(5, "cpg_deliver_cb unlock\n");
523 
524  if (!(event = ast_malloc(msg_len))) {
525  return;
526  }
527 
528  memcpy(event, msg, msg_len);
529 
530  if (event_type == AST_EVENT_PING) {
531  const struct ast_eid *eid;
532  char buf[128] = "";
533 
534  eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
535  ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
536  ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
537  }
538  ast_debug(5, "Publishing event %s (%u) to stasis\n",
539  ast_event_get_type_name(event), event_type);
540  publish_handler(event);
541  ast_free(event);
542 }
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
An event.
Definition: event.c:81
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
Definition: astman.c:222
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
ast_event_type
Definition: event_defs.h:28
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
unsigned char eid[6]
Definition: utils.h:787
#define LOG_NOTICE
Definition: logger.h:263
#define ast_free(a)
Definition: astmm.h:182
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
Definition: event.c:529
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
unsigned char subscribe
Definition: res_corosync.c:243
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
Definition: event.c:194
static struct @448 event_types[]

◆ dispatch_thread_handler()

static void* dispatch_thread_handler ( void *  data)
static

Definition at line 782 of file res_corosync.c.

References ARRAY_LEN, ast_copy_string(), ast_debug, ast_log, ast_poll, ast_rwlock_tryrdlock, ast_rwlock_trywrlock, ast_rwlock_unlock, cfg_callbacks, cfg_handle, corosync_node_joined, COROSYNC_POLL_TIMEOUT, cpg_callbacks, cpg_handle, dispatch_thread, errno, init_cpg_lock, LOG_ERROR, LOG_NOTICE, LOG_WARNING, NULL, and send_cluster_notify().

Referenced by load_module().

783 {
784  cs_error_t cs_err;
785  struct pollfd pfd[3] = {
786  { .events = POLLIN, },
787  { .events = POLLIN, },
788  { .events = POLLIN, },
789  };
790 
792  ast_debug(5, "dispatch_thread_handler rdlock\n");
793  if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
794  ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
796  ast_debug(5, "dispatch_thread_handler unlock\n");
797  return NULL;
798  }
799 
800  if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
801  ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
803  ast_debug(5, "dispatch_thread_handler unlock\n");
804  return NULL;
805  }
806 
807  pfd[2].fd = dispatch_thread.alert_pipe[0];
809  ast_debug(5, "dispatch_thread_handler unlock\n");
810  } else {
811  ast_log(LOG_ERROR, "Failed to get fd: initiliazing CPG. This module is now broken.\n");
812  return NULL;
813  }
815  while (!dispatch_thread.stop) {
816  int res;
817 
818  cs_err = CS_OK;
819 
820  pfd[0].revents = 0;
821  pfd[1].revents = 0;
822  pfd[2].revents = 0;
823 
824  res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
825  if (res == -1 && errno != EINTR && errno != EAGAIN) {
826  ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
827  cs_err = CS_ERR_BAD_HANDLE;
828  } else if (res == 0) {
829  unsigned int local_nodeid;
830 
832  ast_debug(5, "dispatch_thread_handler rdlock\n");
833  if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
834  struct cpg_name name;
835  struct cpg_address address[CPG_MEMBERS_MAX];
836  int entries = CPG_MEMBERS_MAX;
837 
838  ast_copy_string(name.value, "asterisk", sizeof(name.value));
839  name.length = strlen(name.value);
840  if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
841  int i;
842  int found = 0;
843 
844  ast_debug(1, "CPG group has %i node membership\n", entries);
845  for (i = 0; (i < entries) && !found; i++) {
846  if (address[i].nodeid == local_nodeid)
847  found = 1;
848  }
849  if (!found) {
850  ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
852  cs_err = CS_ERR_BAD_HANDLE;
853  }
854  } else {
855  ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
857  cs_err = CS_ERR_BAD_HANDLE;
858  }
859  } else {
860  ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
862  cs_err = CS_ERR_BAD_HANDLE;
863  }
865  ast_debug(5, "dispatch_thread_handler unlock\n");
866  } else {
867  ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
869  cs_err = CS_ERR_BAD_HANDLE;
870  }
871  } else {
873  ast_debug(5, "dispatch_thread_handler rdlock\n");
874  if (pfd[0].revents & POLLIN) {
875  if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
876  ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
877  }
878  }
879 
880  if (pfd[1].revents & POLLIN) {
881  if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
882  ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
883  }
884  }
886  ast_debug(5, "dispatch_thread_handler unlock\n");
887  } else {
888  ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
889  }
890  }
891  if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
892 
893  /* If corosync gets restarted out from under Asterisk, try to recover. */
894 
895  ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
896 
898  struct cpg_name name;
899  ast_debug(5, "dispatch_thread_handler wrlock\n");
900 
902  if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
903  ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
904  }
905 
906  if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
907  ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
908  }
909 
910  if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
911  ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
913  ast_debug(5, "dispatch_thread_handler unlock\n");
914  sleep(5);
915  continue;
916  }
917 
918  if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
919  ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
921  ast_debug(5, "dispatch_thread_handler unlock\n");
922  sleep(5);
923  continue;
924  }
925 
926  if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
927  ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
929  ast_debug(5, "dispatch_thread_handler unlock\n");
930  sleep(5);
931  continue;
932  }
933 
934  if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
935  ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
937  ast_debug(5, "dispatch_thread_handler unlock\n");
938  sleep(5);
939  continue;
940  }
941 
942  ast_copy_string(name.value, "asterisk", sizeof(name.value));
943  name.length = strlen(name.value);
944  if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
945  ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
947  ast_debug(5, "dispatch_thread_handler unlock\n");
948  sleep(5);
949  continue;
950  }
953  ast_debug(5, "dispatch_thread_handler unlock\n");
954  ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
956  } else {
957  ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
958  }
959  }
960  }
961 
962  return NULL;
963 }
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync&#39;s poll process.
Definition: res_corosync.c:55
char * address
Definition: f2c.h:59
static struct @449 dispatch_thread
#define LOG_WARNING
Definition: logger.h:274
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Definition: res_corosync.c:737
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
const char * name
Definition: res_corosync.c:239
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:235
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define ast_poll(a, b, c)
Definition: poll-compat.h:88
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
static cpg_callbacks_t cpg_callbacks
Definition: res_corosync.c:466
#define LOG_ERROR
Definition: logger.h:285
int errno
#define LOG_NOTICE
Definition: logger.h:263
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
static corosync_cfg_callbacks_t cfg_callbacks
Definition: res_corosync.c:295
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define ast_rwlock_trywrlock(a)
Definition: lock.h:236

◆ dump_cache_cb()

static int dump_cache_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 610 of file res_corosync.c.

References publish_to_corosync().

Referenced by cpg_confchg_cb().

611 {
612  struct stasis_message *message = obj;
613 
614  if (!message) {
615  return 0;
616  }
617 
618  publish_to_corosync(message);
619 
620  return 0;
621 }
static void publish_to_corosync(struct stasis_message *message)
Definition: res_corosync.c:571

◆ load_config()

static int load_config ( unsigned int  reload)
static

Definition at line 1222 of file res_corosync.c.

References ast_category_browse(), ast_config_destroy(), ast_config_load, ast_log, CONFIG_STATUS_FILEINVALID, CONFIG_STATUS_FILEMISSING, load_general_config(), LOG_WARNING, and NULL.

Referenced by load_module().

1223 {
1224  static const char filename[] = "res_corosync.conf";
1225  struct ast_config *cfg;
1226  const char *cat = NULL;
1227  struct ast_flags config_flags = { 0 };
1228  int res = 0;
1229 
1230  cfg = ast_config_load(filename, config_flags);
1231 
1233  return -1;
1234  }
1235 
1236  while ((cat = ast_category_browse(cfg, cat))) {
1237  if (!strcasecmp(cat, "general")) {
1238  res = load_general_config(cfg);
1239  } else {
1240  ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1241  }
1242  }
1243 
1244  ast_config_destroy(cfg);
1245 
1246  return res;
1247 }
#define LOG_WARNING
Definition: logger.h:274
#define CONFIG_STATUS_FILEINVALID
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
Definition: extconf.c:3328
#define NULL
Definition: resample.c:96
static int load_general_config(struct ast_config *cfg)
#define ast_log
Definition: astobj2.c:42
#define ast_config_load(filename, flags)
Load a config file.
void ast_config_destroy(struct ast_config *config)
Destroys a config.
Definition: extconf.c:1290
Structure used to handle boolean flags.
Definition: utils.h:199
#define CONFIG_STATUS_FILEMISSING

◆ load_general_config()

static int load_general_config ( struct ast_config cfg)
static

Definition at line 1177 of file res_corosync.c.

References ARRAY_LEN, ast_debug, ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, ast_variable_browse(), corosync_topic(), event_types, event_types_lock, LOG_WARNING, message_type_fn, ast_variable::name, ast_variable::next, NULL, publish, PUBLISH, set_event(), stasis_forward_all(), stasis_forward_cancel(), stasis_message_cb(), stasis_message_router_add(), stasis_message_router_remove(), sub, SUBSCRIBE, topic_fn, and ast_variable::value.

Referenced by load_config().

1178 {
1179  struct ast_variable *v;
1180  int res = 0;
1181  unsigned int i;
1182 
1184  ast_debug(5, "load_general_config wrlock\n");
1185 
1186  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1187  event_types[i].publish = event_types[i].publish_default;
1188  event_types[i].subscribe = event_types[i].subscribe_default;
1189  }
1190 
1191  for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
1192  if (!strcasecmp(v->name, "publish_event")) {
1193  res = set_event(v->value, PUBLISH);
1194  } else if (!strcasecmp(v->name, "subscribe_event")) {
1195  res = set_event(v->value, SUBSCRIBE);
1196  } else {
1197  ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1198  }
1199  }
1200 
1201  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1202  if (event_types[i].publish && !event_types[i].sub) {
1204  corosync_topic());
1208  NULL);
1209  } else if (!event_types[i].publish && event_types[i].sub) {
1213  }
1214  }
1215 
1217  ast_debug(5, "load_general_config unlock\n");
1218 
1219  return res;
1220 }
struct ast_variable * next
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: res_corosync.c:601
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Definition: extconf.c:1216
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
#define LOG_WARNING
Definition: logger.h:274
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
Structure for variables, used for configurations and for channel variables.
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
unsigned char publish
Definition: res_corosync.c:241
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
struct stasis_topic *(* topic_fn)(void)
Definition: res_corosync.c:245
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
#define ast_rwlock_wrlock(a)
Definition: lock.h:234
static int set_event(const char *event_type, int pubsub)
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Definition: res_corosync.c:71
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
static struct @448 event_types[]
struct stasis_message_type *(* message_type_fn)(void)
Definition: res_corosync.c:247
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
Definition: res_corosync.c:74

◆ load_module()

static int load_module ( void  )
static

Definition at line 1336 of file res_corosync.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ARRAY_LEN, ast_cli_register_multiple, ast_copy_string(), ast_debug, ast_eid_default, ast_eid_is_empty(), ast_log, AST_LOG_ERROR, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_rwlock_trywrlock, ast_rwlock_unlock, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, cfg_callbacks, cfg_handle, cleanup_module(), corosync_node_cmp_fn(), corosync_node_hash_fn(), corosync_node_joined, corosync_pthread_create_background, cpg_callbacks, cpg_handle, dispatch_thread, dispatch_thread_handler(), errno, init_cpg_lock, load_config(), LOG_ERROR, NULL, stasis_message_router_create, stasis_message_router_set_congestion_limits(), STASIS_MESSAGE_TYPE_INIT, and stasis_topic_create().

1337 {
1338  cs_error_t cs_err;
1339  struct cpg_name name;
1340 
1342  ast_log(LOG_ERROR, "Entity ID is not set.\n");
1343  return AST_MODULE_LOAD_DECLINE;
1344  }
1345 
1348  if (!nodes) {
1349  goto failed;
1350  }
1351 
1352  corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
1353  if (!corosync_aggregate_topic) {
1354  ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1355  goto failed;
1356  }
1357 
1359  if (!stasis_router) {
1360  ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1361  goto failed;
1362  }
1365 
1366  if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1367  ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1368  goto failed;
1369  }
1370 
1371  if (load_config(0)) {
1372  /* simply not configured is not a fatal error */
1373  goto failed;
1374  }
1375 
1378  ast_debug(5, "load_module wrlock\n");
1379  if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1380  ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1382  ast_debug(5, "load_module unlock\n");
1383  goto failed;
1384  }
1385 
1386  if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1387  ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1389  ast_debug(5, "load_module unlock\n");
1390  goto failed;
1391  }
1392 
1393  ast_copy_string(name.value, "asterisk", sizeof(name.value));
1394  name.length = strlen(name.value);
1395 
1396  if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1397  ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1399  ast_debug(5, "load_module unlock\n");
1400  goto failed;
1401  }
1402 
1403  if (pipe(dispatch_thread.alert_pipe) == -1) {
1404  ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1405  strerror(errno), errno);
1407  ast_debug(5, "load_module unlock\n");
1408  goto failed;
1409  }
1411 
1413  ast_debug(5, "load_module unlock\n");
1416  ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1417  goto failed;
1418  }
1419 
1421  } else {
1422  goto failed;
1423  }
1424 
1425  return AST_MODULE_LOAD_SUCCESS;
1426 
1427 failed:
1428  cleanup_module();
1429 
1430  return AST_MODULE_LOAD_DECLINE;
1431 }
static void * dispatch_thread_handler(void *data)
Definition: res_corosync.c:782
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
static struct @449 dispatch_thread
static struct ast_cli_entry corosync_cli[]
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:63
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
const char * name
Definition: res_corosync.c:239
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define AST_LOG_ERROR
Definition: logger.h:290
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
#define stasis_message_router_create(topic)
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
Definition: res_corosync.c:133
static cpg_callbacks_t cpg_callbacks
Definition: res_corosync.c:466
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
int errno
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
Definition: res_corosync.c:92
static void cleanup_module(void)
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
static corosync_cfg_callbacks_t cfg_callbacks
Definition: res_corosync.c:295
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define ast_rwlock_trywrlock(a)
Definition: lock.h:236
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Definition: res_corosync.c:71
static int load_config(unsigned int reload)
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
Definition: res_corosync.c:68
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
Definition: main/utils.c:2847
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
static int corosync_node_hash_fn(const void *obj, const int flags)
Definition: res_corosync.c:113
static struct ao2_container * nodes
All the nodes that we&#39;re aware of.
Definition: res_corosync.c:65

◆ publish_cluster_discovery_to_stasis()

static void publish_cluster_discovery_to_stasis ( struct ast_event event)
static

Publish a received cluster discovery ast_event to Stasis Message Bus API.

Definition at line 351 of file res_corosync.c.

References ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_eid_cmp(), ast_eid_default, AST_EVENT_CLUSTER_DISCOVERY, ast_event_get_ie_raw(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_NODE_ID, corosync_node_alloc(), OBJ_NOLOCK, OBJ_SEARCH_KEY, publish_cluster_discovery_to_stasis_full(), and send_cluster_notify().

352 {
353  struct corosync_node *node;
355  struct ast_eid *event_eid;
356 
358 
359  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
360  if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
361  /* Don't feed events back in that originated locally. */
362  return;
363  }
364 
365  ao2_lock(nodes);
366  node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
367  if (node) {
368  /* We already know about this node */
369  ao2_unlock(nodes);
370  ao2_ref(node, -1);
371  return;
372  }
373 
374  node = corosync_node_alloc(event);
375  if (!node) {
376  ao2_unlock(nodes);
377  return;
378  }
380  ao2_unlock(nodes);
381 
383 
384  ao2_ref(node, -1);
385 
386  /*
387  * When we get news that someone else has joined, we need to let them
388  * know we exist as well.
389  */
391 }
Definition: test_heap.c:38
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
Definition: res_corosync.c:97
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Definition: res_corosync.c:737
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ast_assert(a)
Definition: utils.h:710
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ao2_unlock(a)
Definition: astobj2.h:730
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Definition: res_corosync.c:303
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
Cluster node ID Used by: Corosync Payload type: UINT.
Definition: event_defs.h:313
static struct ao2_container * nodes
All the nodes that we&#39;re aware of.
Definition: res_corosync.c:65

◆ publish_cluster_discovery_to_stasis_full()

static void publish_cluster_discovery_to_stasis_full ( struct corosync_node node,
int  joined 
)
static

Publish cluster discovery to Stasis Message Bus API.

Definition at line 303 of file res_corosync.c.

References corosync_node::addr, ao2_ref, ast_cluster_discovery_type(), ast_eid_to_str(), ast_json_pack(), ast_json_payload_create(), ast_json_unref(), ast_log, AST_LOG_NOTICE, ast_sockaddr_stringify_addr(), ast_system_topic(), corosync_node::eid, corosync_node::id, send_cluster_notify(), stasis_message_create(), and stasis_publish().

Referenced by cpg_confchg_cb(), and publish_cluster_discovery_to_stasis().

304 {
305  struct ast_json *json;
306  struct ast_json_payload *payload;
307  struct stasis_message *message;
308  char eid[18];
309  const char *addr;
310 
311  ast_eid_to_str(eid, sizeof(eid), &node->eid);
312  addr = ast_sockaddr_stringify_addr(&node->addr);
313 
314  ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
315  node->id,
316  eid,
317  addr,
318  joined ? "joined" : "left");
319 
320  json = ast_json_pack("{s: s, s: i, s: s, s: i}",
321  "address", addr,
322  "node_id", node->id,
323  "eid", eid,
324  "joined", joined);
325  if (!json) {
326  return;
327  }
328 
329  payload = ast_json_payload_create(json);
330  if (!payload) {
331  ast_json_unref(json);
332  return;
333  }
334 
336  if (!message) {
337  ast_json_unref(json);
338  ao2_ref(payload, -1);
339  return;
340  }
341 
342  stasis_publish(ast_system_topic(), message);
343  ast_json_unref(json);
344  ao2_ref(payload, -1);
345  ao2_ref(message, -1);
346 }
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
Definition: netsock2.h:290
struct ast_eid eid
Definition: res_corosync.c:83
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:591
struct ast_eid eid
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
Definition: json.c:735
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
#define AST_LOG_NOTICE
Definition: logger.h:268
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ast_sockaddr addr
Definition: res_corosync.c:85
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes. ...
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
Abstract JSON element (object, array, string, int, ...).

◆ publish_corosync_ping_to_stasis()

static void publish_corosync_ping_to_stasis ( struct ast_event event)
static

Publish a Corosync ping to Stasis Message Bus API.

Definition at line 204 of file res_corosync.c.

References ao2_t_alloc, ao2_t_ref, ast_assert, ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload_dtor(), corosync_topic(), corosync_ping_payload::event, NULL, stasis_message_create(), and stasis_publish().

205 {
206  struct corosync_ping_payload *payload;
207  struct stasis_message *message;
208  struct ast_eid *event_eid;
209 
211  ast_assert(event != NULL);
212 
213  if (!corosync_ping_message_type()) {
214  return;
215  }
216 
217  payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
218  if (!payload) {
219  return;
220  }
221  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
223  AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
225 
226  message = stasis_message_create(corosync_ping_message_type(), payload);
227  if (!message) {
228  ao2_t_ref(payload, -1, "Destroy payload on off nominal");
229  return;
230  }
231 
232  stasis_publish(corosync_topic(), message);
233 
234  ao2_t_ref(payload, -1, "Hand ref to stasis");
235  ao2_t_ref(message, -1, "Hand ref to stasis");
236 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
Definition: res_corosync.c:167
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
struct ast_event * event
Definition: res_corosync.c:163
A payload wrapper around a corosync ping event.
Definition: res_corosync.c:161
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
Definition: res_corosync.c:74

◆ publish_device_state_to_stasis()

static void publish_device_state_to_stasis ( struct ast_event event)
static

Publish a received device state ast_event to Stasis Message Bus API.

Definition at line 432 of file res_corosync.c.

References ast_assert, ast_eid_to_str(), AST_EVENT_DEVICE_STATE_CHANGE, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CACHABLE, AST_EVENT_IE_DEVICE, AST_EVENT_IE_EID, AST_EVENT_IE_STATE, ast_log, ast_publish_device_state_full(), ast_strlen_zero, cpg_confchg_cb(), cpg_deliver_cb(), corosync_node::eid, LOG_WARNING, and state.

433 {
434  const char *device;
435  enum ast_device_state state;
436  unsigned int cachable;
437  struct ast_eid *event_eid;
438 
440 
441  device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
443  cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
444  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
445 
446  if (ast_strlen_zero(device)) {
447  return;
448  }
449 
450  if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
451  char eid[18];
452  ast_eid_to_str(eid, sizeof(eid), event_eid);
453  ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
454  device, eid);
455  }
456 }
enum sip_cc_notify_state state
Definition: chan_sip.c:960
ast_device_state
Device States.
Definition: devicestate.h:52
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
#define LOG_WARNING
Definition: logger.h:274
#define ast_assert(a)
Definition: utils.h:710
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
#define ast_log
Definition: astobj2.c:42
Event non-cachability flag Used by: All events Payload type: UINT.
Definition: event_defs.h:306
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
unsigned char eid[6]
Definition: utils.h:787
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
Definition: devicestate.c:709
#define ast_strlen_zero(a)
Definition: muted.c:73
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
Definition: event_defs.h:121
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition: event.c:302
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
Definition: event_defs.h:113

◆ publish_event_to_corosync()

static void publish_event_to_corosync ( struct ast_event event)
static

Definition at line 544 of file res_corosync.c.

References ast_debug, ast_event_get_size(), ast_event_get_type(), ast_event_get_type_name(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, corosync_node_joined, cpg_handle, init_cpg_lock, and LOG_WARNING.

Referenced by publish_to_corosync(), and send_cluster_notify().

545 {
546  cs_error_t cs_err;
547  struct iovec iov;
548 
549  iov.iov_base = (void *)event;
550  iov.iov_len = ast_event_get_size(event);
551 
552  ast_debug(5, "Publishing event %s (%u) to corosync\n",
554 
555  /* The stasis subscription will only exist if we are configured to publish
556  * these events, so just send away. */
558  ast_debug(5, "publish_event_to_corosync rdlock\n");
559  if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
560  ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
561  cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
562  }
564  ast_debug(5, "publish_event_to_corosync unlock\n");
565  } else {
566  ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
568  }
569 }
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
#define LOG_WARNING
Definition: logger.h:274
#define ast_rwlock_unlock(a)
Definition: lock.h:232
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:235
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
Definition: event.c:228
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
Definition: event.c:194

◆ publish_mwi_to_stasis()

static void publish_mwi_to_stasis ( struct ast_event event)
static

Publish a received MWI ast_event to Stasis Message Bus API.

Definition at line 394 of file res_corosync.c.

References ast_assert, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CONTEXT, AST_EVENT_IE_EID, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_OLDMSGS, AST_EVENT_MWI, ast_log, ast_publish_mwi_state_full(), ast_strlen_zero, context, corosync_node::eid, LOG_WARNING, mailbox, and NULL.

395 {
396  const char *mailbox;
397  const char *context;
398  unsigned int new_msgs;
399  unsigned int old_msgs;
400  struct ast_eid *event_eid;
401 
403 
404  mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
405  context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
406  new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
407  old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
408  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
409 
410  if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
411  return;
412  }
413 
414  if (new_msgs > INT_MAX) {
415  new_msgs = INT_MAX;
416  }
417 
418  if (old_msgs > INT_MAX) {
419  old_msgs = INT_MAX;
420  }
421 
422  if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
423  (int)old_msgs, NULL, event_eid)) {
424  char eid[18];
425  ast_eid_to_str(eid, sizeof(eid), event_eid);
426  ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
427  mailbox, context, eid);
428  }
429 }
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
#define LOG_WARNING
Definition: logger.h:274
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:77
Number of Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:83
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
static char mailbox[AST_MAX_MAILBOX_UNIQUEID]
Definition: chan_mgcp.c:204
#define ast_log
Definition: astobj2.c:42
Context IE Used by AST_EVENT_MWI Payload type: str.
Definition: event_defs.h:127
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
Definition: mwi.c:388
unsigned char eid[6]
Definition: utils.h:787
#define ast_strlen_zero(a)
Definition: muted.c:73
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
static char context[AST_MAX_CONTEXT]
Definition: chan_alsa.c:116
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition: event.c:302
Mailbox name.
Definition: event_defs.h:89

◆ publish_to_corosync()

static void publish_to_corosync ( struct stasis_message message)
static

Definition at line 571 of file res_corosync.c.

References ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_destroy(), ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_PING, ast_log, buf, corosync_node::eid, LOG_NOTICE, publish_event_to_corosync(), and stasis_message_to_event().

Referenced by dump_cache_cb(), and stasis_message_cb().

572 {
573  struct ast_event *event;
574  struct ast_eid *event_eid;
575 
576  event = stasis_message_to_event(message);
577  if (!event) {
578  return;
579  }
580 
581  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
582  if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
583  /* If the event didn't originate from this server, don't send it back out. */
584  ast_event_destroy(event);
585  return;
586  }
587 
588  if (ast_event_get_type(event) == AST_EVENT_PING) {
589  const struct ast_eid *eid;
590  char buf[128] = "";
591 
592  eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
593  ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
594  ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
595  }
596 
598  ast_event_destroy(event);
599 }
An event.
Definition: event.c:81
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
Definition: astman.c:222
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
#define ast_log
Definition: astobj2.c:42
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
unsigned char eid[6]
Definition: utils.h:787
#define LOG_NOTICE
Definition: logger.h:263
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition: event.c:524
static void publish_event_to_corosync(struct ast_event *event)
Definition: res_corosync.c:544

◆ send_cluster_notify()

static void send_cluster_notify ( void  )
static

Informs the cluster of our EID and our IP addresses.

Definition at line 737 of file res_corosync.c.

References ast_debug, AST_EVENT_CLUSTER_DISCOVERY, ast_event_destroy(), AST_EVENT_IE_END, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_STR, AST_EVENT_IE_PLTYPE_UINT, ast_event_new(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, init_cpg_lock, LOG_WARNING, NULL, and publish_event_to_corosync().

Referenced by dispatch_thread_handler(), publish_cluster_discovery_to_stasis(), and publish_cluster_discovery_to_stasis_full().

738 {
739  struct ast_event *event;
740  unsigned int node_id;
741  cs_error_t cs_err;
742  corosync_cfg_node_address_t corosync_addr;
743  int num_addrs = 0;
744  struct sockaddr *sa;
745  size_t sa_len;
746  char buf[128];
747  int res;
748 
750  ast_debug(5, "send_cluster_notify rdlock\n");
751 
752  if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
753  ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
754  return;
755  }
756 
757  if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758  ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
759  return;
760  }
761 
763  ast_debug(5, "send_cluster_notify unlock\n");
764  }
765 
766  sa = (struct sockaddr *)corosync_addr.address;
767  sa_len = (size_t)corosync_addr.address_length;
768  if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
769  ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770  gai_strerror(res), res);
771  return;
772  }
773 
779  ast_event_destroy(event);
780 }
An event.
Definition: event.c:81
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define LOG_WARNING
Definition: logger.h:274
Definition: astman.c:222
#define NULL
Definition: resample.c:96
#define ast_rwlock_unlock(a)
Definition: lock.h:232
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:235
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition: event.c:524
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
static void publish_event_to_corosync(struct ast_event *event)
Definition: res_corosync.c:544
Cluster node ID Used by: Corosync Payload type: UINT.
Definition: event_defs.h:313

◆ set_event()

static int set_event ( const char *  event_type,
int  pubsub 
)
static

Definition at line 1153 of file res_corosync.c.

References ARRAY_LEN, event_types, name, PUBLISH, and SUBSCRIBE.

Referenced by load_general_config().

1154 {
1155  unsigned int i;
1156 
1157  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1158  if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1159  continue;
1160  }
1161 
1162  switch (pubsub) {
1163  case PUBLISH:
1164  event_types[i].publish = 1;
1165  break;
1166  case SUBSCRIBE:
1167  event_types[i].subscribe = 1;
1168  break;
1169  }
1170 
1171  break;
1172  }
1173 
1174  return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1175 }
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
const char * name
Definition: res_corosync.c:239
static struct @448 event_types[]

◆ stasis_message_cb()

static void stasis_message_cb ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 601 of file res_corosync.c.

References publish_to_corosync().

Referenced by load_general_config().

602 {
603  if (!message) {
604  return;
605  }
606 
607  publish_to_corosync(message);
608 }
static void publish_to_corosync(struct stasis_message *message)
Definition: res_corosync.c:571

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL()

STASIS_MESSAGE_TYPE_DEFN_LOCAL ( corosync_ping_message_type  ,
to_event = corosync_ping_to_event 
)

Referenced by corosync_ping_to_event().

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 1433 of file res_corosync.c.

References ARRAY_LEN, ast_cli_unregister_multiple(), AST_MODULE_INFO_STANDARD_EXTENDED(), ASTERISK_GPL_KEY, and cleanup_module().

1434 {
1436 
1437  cleanup_module();
1438 
1439  return 0;
1440 }
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
static struct ast_cli_entry corosync_cli[]
static void cleanup_module(void)

Variable Documentation

◆ alert_pipe

int alert_pipe[2]

Definition at line 276 of file res_corosync.c.

◆ cache_fn

struct stasis_cache*(* cache_fn) (void)

Definition at line 246 of file res_corosync.c.

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ cfg_callbacks

corosync_cfg_callbacks_t cfg_callbacks
static

Definition at line 295 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cfg_handle

corosync_cfg_handle_t cfg_handle
static

◆ corosync_aggregate_topic

struct stasis_topic* corosync_aggregate_topic
static

The internal topic used for message forwarding and pings.

Definition at line 68 of file res_corosync.c.

Referenced by corosync_topic().

◆ corosync_cli

struct ast_cli_entry corosync_cli[]
static
Initial value:
= {
{ .handler = corosync_show_config , .summary = "Show configuration" ,},
{ .handler = corosync_show_members , .summary = "Show cluster members" ,},
{ .handler = corosync_ping , .summary = "Send a test ping to the cluster" ,},
}
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: res_corosync.c:965

Definition at line 1142 of file res_corosync.c.

◆ corosync_node_joined

int corosync_node_joined = 0
static

Join to corosync.

Definition at line 62 of file res_corosync.c.

Referenced by cleanup_module(), dispatch_thread_handler(), load_module(), and publish_event_to_corosync().

◆ cpg_callbacks

cpg_callbacks_t cpg_callbacks
static
Initial value:
= {
.cpg_deliver_fn = cpg_deliver_cb,
.cpg_confchg_fn = cpg_confchg_cb,
}
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: res_corosync.c:647
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
Definition: res_corosync.c:484

Definition at line 466 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cpg_handle

cpg_handle_t cpg_handle
static

◆ dispatch_thread

struct { ... } dispatch_thread
Initial value:
= {
.alert_pipe = { -1, -1 },
}
#define AST_PTHREADT_NULL
Definition: lock.h:66

Referenced by cleanup_module(), dispatch_thread_handler(), and load_module().

◆ event_types

struct { ... } event_types[]

◆ event_types_lock

ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

◆ id

pthread_t id

Definition at line 275 of file res_corosync.c.

◆ init_cpg_lock

ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

◆ message_type_fn

struct stasis_message_type*(* message_type_fn) (void)

Definition at line 247 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().

◆ name

const char* name

Definition at line 239 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and set_event().

◆ nodes

struct ao2_container* nodes
static

All the nodes that we're aware of.

Definition at line 65 of file res_corosync.c.

Referenced by AST_TEST_DEFINE().

◆ publish

unsigned char publish

◆ publish_default

unsigned char publish_default

Definition at line 242 of file res_corosync.c.

◆ publish_to_stasis

void(* publish_to_stasis) (struct ast_event *)

Definition at line 248 of file res_corosync.c.

◆ stasis_router

struct stasis_message_router* stasis_router
static

Our Stasis Message Bus API message router.

Definition at line 71 of file res_corosync.c.

◆ stop

unsigned int stop

Definition at line 277 of file res_corosync.c.

◆ sub

struct stasis_forward* sub

Definition at line 240 of file res_corosync.c.

Referenced by _skinny_show_lines(), acf_channel_read(), activatesub(), add_mwi_datastore(), allocate_subscription(), analog_new_ast_channel(), analog_set_inthreeway(), ao2_weakproxy_subscribe(), ao2_weakproxy_unsubscribe(), append_sub(), ast_channel_connected_line_sub(), ast_channel_redirecting_sub(), ast_mwi_subscribe_pool(), ast_sip_create_subscription(), AST_TEST_DEFINE(), attempt_transfer(), build_gateway(), channel_to_session(), cleanup_module(), close_call(), close_client(), config_device(), create_unsolicited_mwi_subscriptions(), create_virtual_subscriptions(), delete_device(), delete_sub(), destroy_endpoint(), discard_call(), exten_state_subscription_destructor(), find_rtp_port(), find_subchannel_and_lock(), find_subchannel_by_instance_reference(), find_subchannel_by_name(), find_subchannel_by_reference(), generic_monitor_instance_list_destructor(), get_devicestate(), get_or_create_subscription(), get_sub(), get_sub_holding(), get_subscription(), handle_call_incoming(), handle_call_outgoing(), handle_callforward_button(), handle_enbloc_call_message(), handle_key_fav(), handle_keypad_button_message(), handle_msg_cb(), handle_offhook_message(), handle_onhook_message(), handle_open_receive_channel_ack_message(), handle_request(), handle_soft_key_event_message(), handle_stimulus_message(), handle_transfer_button(), has_destination_cb(), internal_stasis_subscribe(), jb_debug_output(), key_call(), key_dial_page(), load_general_config(), load_module(), message_subscription_alloc(), message_subscription_dtor(), message_subscription_hash_cb(), messaging_app_subscribe_endpoint(), messaging_app_unsubscribe_endpoint(), mgcp_alloc_pktcgate(), mgcp_answer(), mgcp_call(), mgcp_fixup(), mgcp_get_codec(), mgcp_get_rtp_peer(), mgcp_hangup(), mgcp_indicate(), mgcp_pktcgate_open(), mgcp_pktcgate_remove(), mgcp_postrequest(), mgcp_prune_realtime_gateway(), mgcp_read(), mgcp_request(), mgcp_senddigit_begin(), mgcp_senddigit_end(), mgcp_set_rtp_peer(), mgcp_ss(), mgcp_write(), mgcpsock_read(), mwi_create_subscription(), mwi_ds_destroy(), mwi_on_aor(), mwi_subscribe_all(), mwi_subscribe_single(), mwi_subscription_alloc(), mwi_subscription_destructor(), mwi_subscription_established(), onevent(), proxy_dtor(), pubsub_on_rx_notify(), rcv_mac_addr(), rtp_learning_start(), send_unsolicited_mwi_notify(), send_unsolicited_mwi_notify_to_contact(), setsubstate(), skinny_answer(), skinny_autoanswer_cb(), skinny_call(), skinny_cfwd_cb(), skinny_dialer_cb(), skinny_fixup(), skinny_get_rtp_peer(), skinny_get_vrtp_peer(), skinny_hangup(), skinny_indicate(), skinny_new(), skinny_newcall(), skinny_read(), skinny_request(), skinny_senddigit_end(), skinny_set_rtp_peer(), skinny_transfer_attended(), skinny_write(), stasis_state_add_subscriber(), stasis_state_subscribe_pool(), stasis_subscription_cb_noop(), stasis_subscription_is_subscribed(), stasis_topic_subscribers(), subscriber_dtor(), subscript(), subscriptions_create(), transfer_call_step1(), transfer_cancel_step2(), transfer_refer(), unistim_alloc_sub(), unistim_answer(), unistim_call(), unistim_do_senddigit(), unistim_get_rtp_peer(), unistim_hangup(), unistim_indicate(), unistim_read(), unistim_request(), unistim_set_rtp_peer(), unistim_show_info(), unistim_sp(), unistim_ss(), unistim_write(), and unload_module().

◆ subscribe

unsigned char subscribe

◆ subscribe_default

unsigned char subscribe_default

Definition at line 244 of file res_corosync.c.

◆ topic_fn

struct stasis_topic*(* topic_fn) (void)

Definition at line 245 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().