Asterisk - The Open Source Telephony Project GIT-master-a63eec2
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions | Variables
res_corosync.c File Reference
#include "asterisk.h"
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/poll-compat.h"
#include "asterisk/config.h"
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
#include "asterisk/mwi.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_system.h"
#include "asterisk/taskprocessor.h"
Include dependency graph for res_corosync.c:

Go to the source code of this file.

Data Structures

struct  corosync_node
 
struct  corosync_ping_payload
 A payload wrapper around a corosync ping event. More...
 

Macros

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)
 Corosync ipc dispatch/request and reply size.
 
#define COROSYNC_POLL_TIMEOUT   (10 * 1000)
 Timeout for Corosync's poll process.
 
#define corosync_pthread_create_background(a, b, c, d)
 Version of pthread_create to ensure stack is large enough.
 

Enumerations

enum  { PUBLISH , SUBSCRIBE }
 

Functions

 AST_MODULE_INFO_STANDARD_EXTENDED (ASTERISK_GPL_KEY, "Corosync")
 
static void cfg_shutdown_cb (corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
 
static void cleanup_module (void)
 
static int clear_node_cache (void *obj, void *arg, int flags)
 
static struct corosync_nodecorosync_node_alloc (struct ast_event *event)
 
static int corosync_node_cmp_fn (void *obj, void *arg, int flags)
 
static int corosync_node_hash_fn (const void *obj, const int flags)
 
static char * corosync_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static void corosync_ping_payload_dtor (void *obj)
 Destructor for the corosync_ping_payload wrapper object.
 
static struct ast_eventcorosync_ping_to_event (struct stasis_message *message)
 Convert a Corosync PING to a ast_event.
 
static char * corosync_show_config (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * corosync_show_members (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static struct stasis_topiccorosync_topic (void)
 Internal accessor for our topic.
 
static void cpg_confchg_cb (cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
 
static void cpg_deliver_cb (cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 
static void * dispatch_thread_handler (void *data)
 
static int dump_cache_cb (void *obj, void *arg, int flags)
 
static int load_config (unsigned int reload)
 
static int load_general_config (struct ast_config *cfg)
 
static int load_module (void)
 
static void publish_cluster_discovery_to_stasis (struct ast_event *event)
 Publish a received cluster discovery ast_event to Stasis Message Bus API.
 
static void publish_cluster_discovery_to_stasis_full (struct corosync_node *node, int joined)
 Publish cluster discovery to Stasis Message Bus API.
 
static void publish_corosync_ping_to_stasis (struct ast_event *event)
 Publish a Corosync ping to Stasis Message Bus API.
 
static void publish_device_state_to_stasis (struct ast_event *event)
 Publish a received device state ast_event to Stasis Message Bus API.
 
static void publish_event_to_corosync (struct ast_event *event)
 
static void publish_mwi_to_stasis (struct ast_event *event)
 Publish a received MWI ast_event to Stasis Message Bus API.
 
static void publish_to_corosync (struct stasis_message *message)
 
static void send_cluster_notify (void)
 Informs the cluster of our EID and our IP addresses.
 
static int set_event (const char *event_type, int pubsub)
 
static void stasis_message_cb (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
 STASIS_MESSAGE_TYPE_DEFN_LOCAL (corosync_ping_message_type,.to_event=corosync_ping_to_event,)
 
static int unload_module (void)
 

Variables

static corosync_cfg_callbacks_t cfg_callbacks
 
static corosync_cfg_handle_t cfg_handle
 
static struct stasis_topiccorosync_aggregate_topic
 The internal topic used for message forwarding and pings.
 
static struct ast_cli_entry corosync_cli []
 
static int corosync_node_joined = 0
 Join to corosync.
 
static cpg_callbacks_t cpg_callbacks
 
static cpg_handle_t cpg_handle
 
struct { 
 
   int   alert_pipe [2] 
 
   pthread_t   id 
 
   unsigned int   stop:1 
 
dispatch_thread 
 
struct { 
 
   struct stasis_cache *(*   cache_fn )(void) 
 
   struct stasis_message_type *(*   message_type_fn )(void) 
 
   const char *   name 
 
   unsigned char   publish 
 
   unsigned char   publish_default 
 
   void(*   publish_to_stasis )(struct ast_event *) 
 
   struct stasis_forward *   sub 
 
   unsigned char   subscribe 
 
   unsigned char   subscribe_default 
 
   struct stasis_topic *(*   topic_fn )(void) 
 
event_types [] 
 
static ast_rwlock_t event_types_lock = AST_RWLOCK_INIT_VALUE
 
static ast_rwlock_t init_cpg_lock = AST_RWLOCK_INIT_VALUE
 
static struct ao2_containernodes
 All the nodes that we're aware of.
 
static struct stasis_message_routerstasis_router
 Our Stasis Message Bus API message router.
 

Detailed Description

module publishes ast_event representations of information to other Asterisk instances in a cluster.

Events have an associated event type, as well as information elements. The information elements are the meta data that go along with each event. For example, in the case of message waiting indication, the event type is MWI, and each MWI event contains at least three information elements: the mailbox, the number of new messages, and the number of old messages.

Author
Russell Bryant russe.nosp@m.ll@r.nosp@m.ussel.nosp@m.lbry.nosp@m.ant.n.nosp@m.et

This module is based on and replaces the previous res_ais module.

Definition in file res_corosync.c.

Macro Definition Documentation

◆ COROSYNC_IPC_BUFFER_SIZE

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)

Corosync ipc dispatch/request and reply size.

Definition at line 89 of file res_corosync.c.

◆ COROSYNC_POLL_TIMEOUT

#define COROSYNC_POLL_TIMEOUT   (10 * 1000)

Timeout for Corosync's poll process.

Definition at line 55 of file res_corosync.c.

◆ corosync_pthread_create_background

#define corosync_pthread_create_background (   a,
  b,
  c,
  d 
)
Value:
__FILE__, __FUNCTION__, __LINE__, #c)
#define AST_BACKGROUND_STACKSIZE
#define COROSYNC_IPC_BUFFER_SIZE
Corosync ipc dispatch/request and reply size.
static struct test_val b
static struct test_val a
static struct test_val d
static struct test_val c
int ast_pthread_create_stack(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *data, size_t stacksize, const char *file, const char *caller, int line, const char *start_fn)
Definition utils.c:1661

Version of pthread_create to ensure stack is large enough.

Definition at line 92 of file res_corosync.c.

97{
98 struct corosync_node *node;
99
101 if (!node) {
102 return NULL;
103 }
104
105 memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
108
109 return node;
110}
111
112static int corosync_node_hash_fn(const void *obj, const int flags)
113{
114 const struct corosync_node *node;
115 const int *id;
116
117 switch (flags & OBJ_SEARCH_MASK) {
118 case OBJ_SEARCH_KEY:
119 id = obj;
120 break;
122 node = obj;
123 id = &node->id;
124 break;
125 default:
126 ast_assert(0);
127 return 0;
128 }
129 return *id;
130}
131
132static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
133{
134 struct corosync_node *left = obj;
135 struct corosync_node *right = arg;
136 const int *id = arg;
137 int cmp;
138
139 switch (flags & OBJ_SEARCH_MASK) {
141 id = &right->id;
142 /* Fall through */
143 case OBJ_SEARCH_KEY:
144 cmp = (left->id == *id);
145 break;
147 cmp = (left->id == right->id);
148 break;
149 default:
150 /* Sort can only work on something with a full or partial key. */
151 ast_assert(0);
152 cmp = 1;
153 break;
154 }
155 return cmp ? CMP_MATCH : 0;
156}
157
158
159/*! \brief A payload wrapper around a corosync ping event */
161 /*! The corosync ping event being passed over \ref stasis */
162 struct ast_event *event;
163};
164
165/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
166static void corosync_ping_payload_dtor(void *obj)
167{
168 struct corosync_ping_payload *payload = obj;
169
170 ast_free(payload->event);
171}
172
173/*! \brief Convert a Corosync PING to a \ref ast_event */
175{
176 struct corosync_ping_payload *payload;
177 struct ast_event *event;
178 struct ast_eid *event_eid;
179
180 if (!message) {
181 return NULL;
182 }
183
184 payload = stasis_message_data(message);
185
186 if (!payload->event) {
187 return NULL;
188 }
189
190 event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
191
193 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
195
196 return event;
197}
198
199STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
200 .to_event = corosync_ping_to_event, );
201
202/*! \brief Publish a Corosync ping to \ref stasis */
204{
205 struct corosync_ping_payload *payload;
206 struct stasis_message *message;
207 struct ast_eid *event_eid;
208
211
212 if (!corosync_ping_message_type()) {
213 return;
214 }
215
216 payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
217 if (!payload) {
218 return;
219 }
220 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
222 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
224
225 message = stasis_message_create(corosync_ping_message_type(), payload);
226 if (!message) {
227 ao2_t_ref(payload, -1, "Destroy payload on off nominal");
228 return;
229 }
230
232
233 ao2_t_ref(payload, -1, "Hand ref to stasis");
234 ao2_t_ref(message, -1, "Hand ref to stasis");
235}
236
237static struct {
238 const char *name;
239 struct stasis_forward *sub;
240 unsigned char publish;
241 unsigned char publish_default;
242 unsigned char subscribe;
243 unsigned char subscribe_default;
244 struct stasis_topic *(* topic_fn)(void);
245 struct stasis_cache *(* cache_fn)(void);
246 struct stasis_message_type *(* message_type_fn)(void);
247 void (* publish_to_stasis)(struct ast_event *);
248} event_types[] = {
249 [AST_EVENT_MWI] = { .name = "mwi",
250 .topic_fn = ast_mwi_topic_all,
251 .cache_fn = ast_mwi_state_cache,
252 .message_type_fn = ast_mwi_state_type,
253 .publish_to_stasis = publish_mwi_to_stasis, },
254 [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
255 .topic_fn = ast_device_state_topic_all,
256 .cache_fn = ast_device_state_cache,
257 .message_type_fn = ast_device_state_message_type,
258 .publish_to_stasis = publish_device_state_to_stasis, },
259 [AST_EVENT_PING] = { .name = "ping",
260 .publish_default = 1,
261 .subscribe_default = 1,
262 .topic_fn = corosync_topic,
263 .message_type_fn = corosync_ping_message_type,
264 .publish_to_stasis = publish_corosync_ping_to_stasis, },
265 [AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery",
266 .publish_default = 1,
267 .subscribe_default = 1,
268 .topic_fn = ast_system_topic,
269 .message_type_fn = ast_cluster_discovery_type,
270 .publish_to_stasis = publish_cluster_discovery_to_stasis, },
271};
272
273static struct {
274 pthread_t id;
275 int alert_pipe[2];
276 unsigned int stop:1;
277} dispatch_thread = {
279 .alert_pipe = { -1, -1 },
280};
281
282static cpg_handle_t cpg_handle;
283static corosync_cfg_handle_t cfg_handle;
284
285#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
286static void cfg_state_track_cb(
287 corosync_cfg_state_notification_buffer_t *notification_buffer,
288 cs_error_t error);
289#endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
290
291static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
292 corosync_cfg_shutdown_flags_t flags);
293
294static corosync_cfg_callbacks_t cfg_callbacks = {
295#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
296 .corosync_cfg_state_track_callback = cfg_state_track_cb,
297#endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
298 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
299};
300
301/*! \brief Publish cluster discovery to \ref stasis */
302static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
303{
304 struct ast_json *json;
305 struct ast_json_payload *payload;
306 struct stasis_message *message;
307 char eid[18];
308 const char *addr;
309
310 ast_eid_to_str(eid, sizeof(eid), &node->eid);
311 addr = ast_sockaddr_stringify_addr(&node->addr);
312
313 ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
314 node->id,
315 eid,
316 addr,
317 joined ? "joined" : "left");
318
319 json = ast_json_pack("{s: s, s: i, s: s, s: i}",
320 "address", addr,
321 "node_id", node->id,
322 "eid", eid,
323 "joined", joined);
324 if (!json) {
325 return;
326 }
327
328 payload = ast_json_payload_create(json);
329 if (!payload) {
330 ast_json_unref(json);
331 return;
332 }
333
335 if (!message) {
336 ast_json_unref(json);
337 ao2_ref(payload, -1);
338 return;
339 }
340
342 ast_json_unref(json);
343 ao2_ref(payload, -1);
344 ao2_ref(message, -1);
345}
346
347static void send_cluster_notify(void);
348
349/*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */
351{
352 struct corosync_node *node;
354 struct ast_eid *event_eid;
355
357
358 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
359 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
360 /* Don't feed events back in that originated locally. */
361 return;
362 }
363
366 if (node) {
367 /* We already know about this node */
369 ao2_ref(node, -1);
370 return;
371 }
372
374 if (!node) {
376 return;
377 }
380
382
383 ao2_ref(node, -1);
384
385 /*
386 * When we get news that someone else has joined, we need to let them
387 * know we exist as well.
388 */
390}
391
392/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
393static void publish_mwi_to_stasis(struct ast_event *event)
394{
395 const char *mailbox;
396 const char *context;
397 unsigned int new_msgs;
398 unsigned int old_msgs;
399 struct ast_eid *event_eid;
400
402
407 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
408
409 if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
410 return;
411 }
412
413 if (new_msgs > INT_MAX) {
414 new_msgs = INT_MAX;
415 }
416
417 if (old_msgs > INT_MAX) {
418 old_msgs = INT_MAX;
419 }
420
421 if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
422 (int)old_msgs, NULL, event_eid)) {
423 char eid[18];
424 ast_eid_to_str(eid, sizeof(eid), event_eid);
425 ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
426 mailbox, context, eid);
427 }
428}
429
430/*! \brief Publish a received device state \ref ast_event to \ref stasis */
432{
433 const char *device;
435 unsigned int cachable;
436 struct ast_eid *event_eid;
437
439
443 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
444
445 if (ast_strlen_zero(device)) {
446 return;
447 }
448
449 if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
450 char eid[18];
451 ast_eid_to_str(eid, sizeof(eid), event_eid);
452 ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
453 device, eid);
454 }
455}
456
457static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
458 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
459
460static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
461 const struct cpg_address *member_list, size_t member_list_entries,
462 const struct cpg_address *left_list, size_t left_list_entries,
463 const struct cpg_address *joined_list, size_t joined_list_entries);
464
465static cpg_callbacks_t cpg_callbacks = {
466 .cpg_deliver_fn = cpg_deliver_cb,
467 .cpg_confchg_fn = cpg_confchg_cb,
468};
469
470#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
471static void cfg_state_track_cb(
472 corosync_cfg_state_notification_buffer_t *notification_buffer,
473 cs_error_t error)
474{
475}
476#endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
477
478static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
479 corosync_cfg_shutdown_flags_t flags)
480{
481}
482
483static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
484 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
485{
486 struct ast_event *event;
487 void (*publish_handler)(struct ast_event *) = NULL;
488 enum ast_event_type event_type;
489 struct ast_eid *event_eid;
490
491 if (msg_len < ast_event_minimum_length()) {
492 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
493 (unsigned int) msg_len,
494 (unsigned int) ast_event_minimum_length());
495 return;
496 }
497
498 event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
499 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
500 /* Don't feed events back in that originated locally. */
501 return;
502 }
503
504 event_type = ast_event_get_type(msg);
505 if (event_type > AST_EVENT_TOTAL) {
506 /* Egads, we don't support this */
507 return;
508 }
509
511 ast_debug(5, "cpg_deliver_cb rdlock\n");
512 publish_handler = event_types[event_type].publish_to_stasis;
513 if (!event_types[event_type].subscribe || !publish_handler) {
514 /* We are not configured to subscribe to these events or
515 we have no way to publish it internally. */
517 ast_debug(5, "cpg_deliver_cb unlock\n");
518 return;
519 }
521 ast_debug(5, "cpg_deliver_cb unlock\n");
522
523 if (!(event = ast_malloc(msg_len))) {
524 return;
525 }
526
527 memcpy(event, msg, msg_len);
528
529 if (event_type == AST_EVENT_PING) {
530 const struct ast_eid *eid;
531 char buf[128] = "";
532
534 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
535 ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
536 }
537 ast_debug(5, "Publishing event %s (%u) to stasis\n",
538 ast_event_get_type_name(event), event_type);
539 publish_handler(event);
541}
542
543static void publish_event_to_corosync(struct ast_event *event)
544{
545 cs_error_t cs_err;
546 struct iovec iov;
547
548 iov.iov_base = (void *)event;
549 iov.iov_len = ast_event_get_size(event);
550
551 ast_debug(5, "Publishing event %s (%u) to corosync\n",
553
554 /* The stasis subscription will only exist if we are configured to publish
555 * these events, so just send away. */
557 ast_debug(5, "publish_event_to_corosync rdlock\n");
558 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
559 ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
561 }
563 ast_debug(5, "publish_event_to_corosync unlock\n");
564 } else {
565 ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
567 }
568}
569
570static void publish_to_corosync(struct stasis_message *message)
571{
572 struct ast_event *event;
573 struct ast_eid *event_eid;
574
576 if (!event) {
577 return;
578 }
579
580 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
581 if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
582 /* If the event didn't originate from this server, don't send it back out. */
584 return;
585 }
586
588 const struct ast_eid *eid;
589 char buf[128] = "";
590
592 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
593 ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
594 }
595
598}
599
600static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
601{
602 if (!message) {
603 return;
604 }
605
607}
608
609static int dump_cache_cb(void *obj, void *arg, int flags)
610{
611 struct stasis_message *message = obj;
612
613 if (!message) {
614 return 0;
615 }
616
618
619 return 0;
620}
621
622static int clear_node_cache(void *obj, void *arg, int flags)
623{
624 struct stasis_message *cached_msg = obj;
625 struct stasis_topic *topic = arg;
626 struct stasis_message *msg;
627 struct ast_eid *msg_eid;
628
629 if (!cached_msg) {
630 return 0;
631 }
632
633 msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
634 if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
635 {
636 msg = stasis_cache_clear_create(cached_msg);
637 if (msg) {
638 stasis_publish(topic, msg);
639 ao2_cleanup(msg);
640 }
641 }
642
643 return 0;
644}
645
646static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
647 const struct cpg_address *member_list, size_t member_list_entries,
648 const struct cpg_address *left_list, size_t left_list_entries,
649 const struct cpg_address *joined_list, size_t joined_list_entries)
650{
651 unsigned int i;
652
653
654 for (i = 0; i < left_list_entries; i++) {
655 const struct cpg_address *cpg_node = &left_list[i];
656 struct corosync_node* node;
657 unsigned int j;
658
659 node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
660 if (!node) {
661 continue;
662 }
663
664 for (j = 0; j < ARRAY_LEN(event_types); j++) {
665 struct ao2_container *messages;
666 int messages_count;
667
669 ast_debug(5, "cpg_confchg_cb rdlock\n");
670 if (!event_types[j].subscribe) {
672 ast_debug(5, "cpg_confchg_cb unlock\n");
673 continue;
674 }
675
678 ast_debug(5, "cpg_confchg_cb unlock\n");
679 continue;
680 }
682 ast_debug(5, "cpg_confchg_cb unlock\n");
683
685
686 messages_count = ao2_container_count(messages);
687 ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
689 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
690
691 ao2_t_ref(messages, -1, "Dispose of flushed cache");
692 }
693
695 ao2_ref(node, -1);
696 }
697
698 /* If any new nodes have joined, dump our cache of events we are publishing
699 * that originated from this server. */
700 if (!joined_list_entries) {
701 return;
702 }
703
704 for (i = 0; i < ARRAY_LEN(event_types); i++) {
705 struct ao2_container *messages;
706 int messages_count;
707
709 ast_debug(5, "cpg_confchg_cb rdlock\n");
710 if (!event_types[i].publish) {
712 ast_debug(5, "cpg_confchg_cb unlock\n");
713 continue;
714 }
715
718 ast_debug(5, "cpg_confchg_cb unlock\n");
719 continue;
720 }
722 ast_debug(5, "cpg_confchg_cb unlock\n");
723
725
726 messages_count = ao2_container_count(messages);
727 ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
729 ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
730
731 ao2_t_ref(messages, -1, "Dispose of dumped cache");
732 }
733}
734
735/*! \brief Informs the cluster of our EID and our IP addresses */
736static void send_cluster_notify(void)
737{
738 struct ast_event *event;
739 unsigned int node_id;
740 cs_error_t cs_err;
741 corosync_cfg_node_address_t corosync_addr;
742 int num_addrs = 0;
743 struct sockaddr *sa;
744 size_t sa_len;
745 char buf[128];
746 int res;
747
749 ast_debug(5, "send_cluster_notify rdlock\n");
750
751 if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
752 ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
753 return;
754 }
755
756 if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
757 ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
758 return;
759 }
760
762 ast_debug(5, "send_cluster_notify unlock\n");
763 }
764
765 sa = (struct sockaddr *)corosync_addr.address;
766 sa_len = (size_t)corosync_addr.address_length;
767 if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
768 ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
769 gai_strerror(res), res);
770 return;
771 }
772
779}
780
781static void *dispatch_thread_handler(void *data)
782{
783 cs_error_t cs_err;
784 struct pollfd pfd[3] = {
785 { .events = POLLIN, },
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 };
789
791 ast_debug(5, "dispatch_thread_handler rdlock\n");
792 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
793 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
795 ast_debug(5, "dispatch_thread_handler unlock\n");
796 return NULL;
797 }
798
799 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
800 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
802 ast_debug(5, "dispatch_thread_handler unlock\n");
803 return NULL;
804 }
805
806 pfd[2].fd = dispatch_thread.alert_pipe[0];
808 ast_debug(5, "dispatch_thread_handler unlock\n");
809 } else {
810 ast_log(LOG_ERROR, "Failed to get fd: initializing CPG. This module is now broken.\n");
811 return NULL;
812 }
814 while (!dispatch_thread.stop) {
815 int res;
816
817 cs_err = CS_OK;
818
819 pfd[0].revents = 0;
820 pfd[1].revents = 0;
821 pfd[2].revents = 0;
822
823 res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
824 if (res == -1 && errno != EINTR && errno != EAGAIN) {
825 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
826 cs_err = CS_ERR_BAD_HANDLE;
827 } else if (res == 0) {
828 unsigned int local_nodeid;
829
831 ast_debug(5, "dispatch_thread_handler rdlock\n");
832 if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
833 struct cpg_name name;
834 struct cpg_address address[CPG_MEMBERS_MAX];
835 int entries = CPG_MEMBERS_MAX;
836
837 ast_copy_string(name.value, "asterisk", sizeof(name.value));
838 name.length = strlen(name.value);
839 if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
840 int i;
841 int found = 0;
842
843 ast_debug(1, "CPG group has %i node membership\n", entries);
844 for (i = 0; (i < entries) && !found; i++) {
845 if (address[i].nodeid == local_nodeid)
846 found = 1;
847 }
848 if (!found) {
849 ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
851 cs_err = CS_ERR_BAD_HANDLE;
852 }
853 } else {
854 ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
856 cs_err = CS_ERR_BAD_HANDLE;
857 }
858 } else {
859 ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
861 cs_err = CS_ERR_BAD_HANDLE;
862 }
864 ast_debug(5, "dispatch_thread_handler unlock\n");
865 } else {
866 ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
868 cs_err = CS_ERR_BAD_HANDLE;
869 }
870 } else {
872 ast_debug(5, "dispatch_thread_handler rdlock\n");
873 if (pfd[0].revents & POLLIN) {
874 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
875 ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
876 }
877 }
878
879 if (pfd[1].revents & POLLIN) {
880 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
881 ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
882 }
883 }
885 ast_debug(5, "dispatch_thread_handler unlock\n");
886 } else {
887 ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
888 }
889 }
890 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
891
892 /* If corosync gets restarted out from under Asterisk, try to recover. */
893
894 ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
895
897 struct cpg_name name;
898 ast_debug(5, "dispatch_thread_handler wrlock\n");
899
901 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
902 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
903 }
904
905 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
906 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
907 }
908
909 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
910 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
912 ast_debug(5, "dispatch_thread_handler unlock\n");
913 sleep(5);
914 continue;
915 }
916
917 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
918 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
920 ast_debug(5, "dispatch_thread_handler unlock\n");
921 sleep(5);
922 continue;
923 }
924
925 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
926 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
928 ast_debug(5, "dispatch_thread_handler unlock\n");
929 sleep(5);
930 continue;
931 }
932
933 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
934 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
936 ast_debug(5, "dispatch_thread_handler unlock\n");
937 sleep(5);
938 continue;
939 }
940
941 ast_copy_string(name.value, "asterisk", sizeof(name.value));
942 name.length = strlen(name.value);
943 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
944 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
946 ast_debug(5, "dispatch_thread_handler unlock\n");
947 sleep(5);
948 continue;
949 }
952 ast_debug(5, "dispatch_thread_handler unlock\n");
953 ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
955 } else {
956 ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
957 }
958 }
959 }
960
961 return NULL;
962}
963
964static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
965{
966 cs_error_t cs_err;
967 cpg_iteration_handle_t cpg_iter;
968 struct cpg_iteration_description_t cpg_desc;
969 unsigned int i;
970
971 switch (cmd) {
972 case CLI_INIT:
973 e->command = "corosync show members";
974 e->usage =
975 "Usage: corosync show members\n"
976 " Show corosync cluster members\n";
977 return NULL;
978
979 case CLI_GENERATE:
980 return NULL; /* no completion */
981 }
982
983 if (a->argc != e->args) {
984 return CLI_SHOWUSAGE;
985 }
986
988 ast_debug(5, "corosync_show_members rdlock\n");
989 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
990
991 if (cs_err != CS_OK) {
992 ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
993 cpg_iteration_finalize(cpg_iter);
995 ast_debug(5, "corosync_show_members unlock\n");
996 return CLI_FAILURE;
997 }
998
999 ast_cli(a->fd, "\n"
1000 "=============================================================\n"
1001 "=== Cluster members =========================================\n"
1002 "=============================================================\n"
1003 "===\n");
1004
1005 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1006 cs_err == CS_OK;
1007 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1008 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1009 corosync_cfg_node_address_t addrs[8];
1010 int num_addrs = 0;
1011 unsigned int j;
1012 #endif
1013
1014 ast_cli(a->fd, "=== Node %u\n", i);
1015 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
1016
1017 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1018 /*
1019 * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
1020 * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
1021 * resulting in crash.
1022 */
1023 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1024 ARRAY_LEN(addrs), &num_addrs, addrs);
1025 if (cs_err != CS_OK) {
1026 ast_log(LOG_WARNING, "Failed to get node addresses\n");
1027 continue;
1028 }
1029
1030 for (j = 0; j < num_addrs; j++) {
1031 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
1032 size_t sa_len = (size_t) addrs[j].address_length;
1033 char buf[128];
1034
1035 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
1036
1037 ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
1038 }
1039 #else
1040 ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
1041 #endif
1042 }
1043
1044 ast_cli(a->fd, "===\n"
1045 "=============================================================\n"
1046 "\n");
1047
1048 cpg_iteration_finalize(cpg_iter);
1050 ast_debug(5, "corosync_show_members unlock\n");
1051 } else {
1052 ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
1053 }
1054
1055 return CLI_SUCCESS;
1056}
1057
1058static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1059{
1060 struct ast_event *event;
1061
1062 switch (cmd) {
1063 case CLI_INIT:
1064 e->command = "corosync ping";
1065 e->usage =
1066 "Usage: corosync ping\n"
1067 " Send a test ping to the cluster.\n"
1068 "A NOTICE will be in the log for every ping received\n"
1069 "on a server.\n If you send a ping, you should see a NOTICE\n"
1070 "in the log for every server in the cluster.\n";
1071 return NULL;
1072
1073 case CLI_GENERATE:
1074 return NULL; /* no completion */
1075 }
1076
1077 if (a->argc != e->args) {
1078 return CLI_SHOWUSAGE;
1079 }
1080
1082
1083 if (!event) {
1084 return CLI_FAILURE;
1085 }
1086
1087 event_types[AST_EVENT_PING].publish_to_stasis(event);
1088
1090 return CLI_SUCCESS;
1091}
1092
1093static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1094{
1095 unsigned int i;
1096
1097 switch (cmd) {
1098 case CLI_INIT:
1099 e->command = "corosync show config";
1100 e->usage =
1101 "Usage: corosync show config\n"
1102 " Show configuration loaded from res_corosync.conf\n";
1103 return NULL;
1104
1105 case CLI_GENERATE:
1106 return NULL; /* no completion */
1107 }
1108
1109 if (a->argc != e->args) {
1110 return CLI_SHOWUSAGE;
1111 }
1112
1113 ast_cli(a->fd, "\n"
1114 "=============================================================\n"
1115 "=== res_corosync config =====================================\n"
1116 "=============================================================\n"
1117 "===\n");
1118
1120 ast_debug(5, "corosync_show_config rdlock\n");
1121 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1122 if (event_types[i].publish) {
1123 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
1124 event_types[i].name);
1125 }
1126 if (event_types[i].subscribe) {
1127 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
1128 event_types[i].name);
1129 }
1130 }
1132 ast_debug(5, "corosync_show_config unlock\n");
1133
1134 ast_cli(a->fd, "===\n"
1135 "=============================================================\n"
1136 "\n");
1137
1138 return CLI_SUCCESS;
1139}
1140
1141static struct ast_cli_entry corosync_cli[] = {
1142 AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
1143 AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
1144 AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
1145};
1146
1147enum {
1148 PUBLISH,
1149 SUBSCRIBE,
1150};
1151
1152static int set_event(const char *event_type, int pubsub)
1153{
1154 unsigned int i;
1155
1156 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1157 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1158 continue;
1159 }
1160
1161 switch (pubsub) {
1162 case PUBLISH:
1163 event_types[i].publish = 1;
1164 break;
1165 case SUBSCRIBE:
1166 event_types[i].subscribe = 1;
1167 break;
1168 }
1169
1170 break;
1171 }
1172
1173 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1174}
1175
1176static int load_general_config(struct ast_config *cfg)
1177{
1178 struct ast_variable *v;
1179 int res = 0;
1180 unsigned int i;
1181
1183 ast_debug(5, "load_general_config wrlock\n");
1184
1185 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1186 event_types[i].publish = event_types[i].publish_default;
1187 event_types[i].subscribe = event_types[i].subscribe_default;
1188 }
1189
1190 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
1191 if (!strcasecmp(v->name, "publish_event")) {
1192 res = set_event(v->value, PUBLISH);
1193 } else if (!strcasecmp(v->name, "subscribe_event")) {
1194 res = set_event(v->value, SUBSCRIBE);
1195 } else {
1196 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1197 }
1198 }
1199
1200 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1201 if (event_types[i].publish && !event_types[i].sub) {
1203 corosync_topic());
1207 NULL);
1208 } else if (!event_types[i].publish && event_types[i].sub) {
1212 }
1213 }
1214
1216 ast_debug(5, "load_general_config unlock\n");
1217
1218 return res;
1219}
1220
1221static int load_config(unsigned int reload)
1222{
1223 static const char filename[] = "res_corosync.conf";
1224 struct ast_config *cfg;
1225 const char *cat = NULL;
1226 struct ast_flags config_flags = { 0 };
1227 int res = 0;
1228
1229 cfg = ast_config_load(filename, config_flags);
1230
1232 return -1;
1233 }
1234
1235 while ((cat = ast_category_browse(cfg, cat))) {
1236 if (!strcasecmp(cat, "general")) {
1237 res = load_general_config(cfg);
1238 } else {
1239 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1240 }
1241 }
1242
1243 ast_config_destroy(cfg);
1244
1245 return res;
1246}
1247
1248static void cleanup_module(void)
1249{
1250 cs_error_t cs_err;
1251 unsigned int i;
1252
1253 if (stasis_router) {
1254
1255 /* Unsubscribe all topic forwards and cancel all message routes */
1256 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1257 struct ao2_container *messages = NULL;
1258 int messages_count;
1259 unsigned char subscribe = 0;
1260
1262 ast_debug(5, "cleanup_module wrlock\n");
1263 subscribe = event_types[i].subscribe;
1264
1265 if (event_types[i].sub) {
1268 }
1269 event_types[i].publish = 0;
1270 event_types[i].subscribe = 0;
1272 ast_debug(5, "cleanup_module unlock\n");
1273
1276 messages_count = ao2_container_count(messages);
1277 ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1279 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
1280 ao2_t_ref(messages, -1, "Dispose of flushed cache");
1281 }
1282 }
1283
1286 }
1287
1289 ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1291 }
1292
1293 STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1294
1296 char meepmeep = 'x';
1297 dispatch_thread.stop = 1;
1298 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1299 5000) == -1) {
1300 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1301 strerror(errno), errno);
1302 }
1303 pthread_join(dispatch_thread.id, NULL);
1304 }
1305
1306 if (dispatch_thread.alert_pipe[0] != -1) {
1307 close(dispatch_thread.alert_pipe[0]);
1308 dispatch_thread.alert_pipe[0] = -1;
1309 }
1310
1311 if (dispatch_thread.alert_pipe[1] != -1) {
1312 close(dispatch_thread.alert_pipe[1]);
1313 dispatch_thread.alert_pipe[1] = -1;
1314 }
1315
1317 ast_debug(5, "cleanup_module wrlock\n");
1318 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1319 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1320 }
1321 cpg_handle = 0;
1322
1323 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1324 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1325 }
1326 cfg_handle = 0;
1329 ast_debug(5, "cleanup_module unlock\n");
1330 }
1332 nodes = NULL;
1333}
1334
1335static int load_module(void)
1336{
1337 cs_error_t cs_err;
1338 struct cpg_name name;
1339
1341 ast_log(LOG_ERROR, "Entity ID is not set.\n");
1343 }
1344
1347 if (!nodes) {
1348 goto failed;
1349 }
1350
1351 corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
1353 ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1354 goto failed;
1355 }
1356
1358 if (!stasis_router) {
1359 ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1360 goto failed;
1361 }
1364
1365 if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1366 ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1367 goto failed;
1368 }
1369
1370 if (load_config(0)) {
1371 /* simply not configured is not a fatal error */
1372 goto failed;
1373 }
1374
1377 ast_debug(5, "load_module wrlock\n");
1378 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1379 ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1381 ast_debug(5, "load_module unlock\n");
1382 goto failed;
1383 }
1384
1385 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1386 ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1388 ast_debug(5, "load_module unlock\n");
1389 goto failed;
1390 }
1391
1392 ast_copy_string(name.value, "asterisk", sizeof(name.value));
1393 name.length = strlen(name.value);
1394
1395 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1396 ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1398 ast_debug(5, "load_module unlock\n");
1399 goto failed;
1400 }
1401
1402 if (pipe(dispatch_thread.alert_pipe) == -1) {
1403 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1404 strerror(errno), errno);
1406 ast_debug(5, "load_module unlock\n");
1407 goto failed;
1408 }
1410
1412 ast_debug(5, "load_module unlock\n");
1415 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1416 goto failed;
1417 }
1418
1420 } else {
1421 goto failed;
1422 }
1423
1425
1426failed:
1428
1430}
1431
1432static int unload_module(void)
1433{
1435
1437
1438 return 0;
1439}
1440
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
static int load_config(void)
enum queue_result id
Definition app_queue.c:1767
unsigned int stop
Definition app_sla.c:342
#define ast_free(a)
Definition astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define ast_log
Definition astobj2.c:42
#define ao2_t_ref(o, delta, tag)
Definition astobj2.h:460
@ CMP_MATCH
Definition astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition astobj2.h:407
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition astobj2.h:1072
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
#define CLI_SHOWUSAGE
Definition cli.h:45
#define CLI_SUCCESS
Definition cli.h:44
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define CLI_FAILURE
Definition cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
struct stasis_message_type * ast_device_state_message_type(void)
Get the Stasis message type for device state messages.
struct stasis_cache * ast_device_state_cache(void)
Backend cache for ast_device_state_topic_cached()
struct stasis_topic * ast_device_state_topic_all(void)
Get the Stasis topic for device state messages.
ast_device_state
Device States.
Definition devicestate.h:52
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
char buf[BUFSIZE]
Definition eagi_proxy.c:66
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
Definition event.c:530
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition event.c:403
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
Definition event.c:229
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition event.c:294
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition event.c:525
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition event.c:303
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition event.c:312
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
Definition event.c:195
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition event.c:289
@ AST_EVENT_IE_END
Definition event_defs.h:70
@ AST_EVENT_IE_STATE
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
Definition event_defs.h:121
@ AST_EVENT_IE_CONTEXT
Context IE Used by AST_EVENT_MWI Payload type: str.
Definition event_defs.h:127
@ AST_EVENT_IE_LOCAL_ADDR
Definition event_defs.h:281
@ AST_EVENT_IE_DEVICE
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
Definition event_defs.h:113
@ AST_EVENT_IE_EID
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition event_defs.h:272
@ AST_EVENT_IE_MAILBOX
Mailbox name.
Definition event_defs.h:89
@ AST_EVENT_IE_CACHABLE
Event non-cacheability flag Used by: All events Payload type: UINT.
Definition event_defs.h:306
@ AST_EVENT_IE_NODE_ID
Cluster node ID Used by: Corosync Payload type: UINT.
Definition event_defs.h:313
@ AST_EVENT_IE_OLDMSGS
Number of Used by: AST_EVENT_MWI Payload type: UINT.
Definition event_defs.h:83
@ AST_EVENT_IE_NEWMSGS
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Definition event_defs.h:77
ast_event_type
Definition event_defs.h:28
@ AST_EVENT_TOTAL
Definition event_defs.h:64
@ AST_EVENT_PING
Definition event_defs.h:60
@ AST_EVENT_DEVICE_STATE_CHANGE
Definition event_defs.h:48
@ AST_EVENT_MWI
Definition event_defs.h:38
@ AST_EVENT_CLUSTER_DISCOVERY
Definition event_defs.h:62
@ AST_EVENT_IE_PLTYPE_RAW
Definition event_defs.h:337
@ AST_EVENT_IE_PLTYPE_UINT
Definition event_defs.h:333
@ AST_EVENT_IE_PLTYPE_STR
Definition event_defs.h:335
char * address
Definition f2c.h:59
static const char name[]
Definition format_mp3.c:68
#define ast_config_load(filename, flags)
Load a config file.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
Definition extconf.c:3324
#define CONFIG_STATUS_FILEMISSING
#define CONFIG_STATUS_FILEINVALID
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
Definition extconf.c:1287
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Definition extconf.c:1213
#define AST_LOG_ERROR
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define AST_LOG_NOTICE
#define LOG_NOTICE
#define LOG_WARNING
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
Definition json.c:756
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
#define ast_rwlock_wrlock(a)
Definition lock.h:243
#define AST_PTHREADT_NULL
Definition lock.h:73
#define ast_rwlock_rdlock(a)
Definition lock.h:242
#define ast_rwlock_trywrlock(a)
Definition lock.h:245
#define ast_rwlock_unlock(a)
Definition lock.h:241
#define ast_rwlock_tryrdlock(a)
Definition lock.h:244
int errno
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
#define AST_MODULE_INFO_STANDARD_EXTENDED(keystr, desc)
Definition module.h:589
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
struct stasis_message_type * ast_mwi_state_type(void)
Get the Stasis Message Bus API message type for MWI messages.
struct stasis_cache * ast_mwi_state_cache(void)
Backend cache for ast_mwi_topic_cached().
Definition mwi.c:94
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
Definition mwi.c:393
struct stasis_topic * ast_mwi_topic_all(void)
Get the Stasis Message Bus API topic for MWI messages.
Definition mwi.c:89
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
Definition netsock2.c:230
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
Definition netsock2.h:286
#define ast_poll(a, b, c)
Definition poll-compat.h:88
static struct stasis_subscription * sub
Statsd channel stats. Exmaple of how to subscribe to Stasis events.
static int reload(void)
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
static void publish_event_to_corosync(struct ast_event *event)
static struct ast_event * corosync_ping_to_event(struct stasis_message *message)
Convert a Corosync PING to a ast_event.
void(* publish_to_stasis)(struct ast_event *)
static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
pthread_t id
static int clear_node_cache(void *obj, void *arg, int flags)
static cpg_callbacks_t cpg_callbacks
static void publish_to_corosync(struct stasis_message *message)
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
static void * dispatch_thread_handler(void *data)
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
static struct ao2_container * nodes
All the nodes that we're aware of.
static void publish_corosync_ping_to_stasis(struct ast_event *event)
Publish a Corosync ping to Stasis Message Bus API.
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
struct stasis_message_type *(* message_type_fn)(void)
static struct ast_cli_entry corosync_cli[]
unsigned char publish_default
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
int alert_pipe[2]
static void cleanup_module(void)
static int corosync_node_joined
Join to corosync.
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
static corosync_cfg_callbacks_t cfg_callbacks
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
static int load_general_config(struct ast_config *cfg)
static void publish_device_state_to_stasis(struct ast_event *event)
Publish a received device state ast_event to Stasis Message Bus API.
unsigned char subscribe
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
static int corosync_node_hash_fn(const void *obj, const int flags)
struct stasis_cache *(* cache_fn)(void)
struct stasis_forward * sub
const char * name
unsigned char subscribe_default
struct stasis_topic *(* topic_fn)(void)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
static struct @468 dispatch_thread
unsigned char publish
static ast_rwlock_t event_types_lock
static void publish_cluster_discovery_to_stasis(struct ast_event *event)
Publish a received cluster discovery ast_event to Stasis Message Bus API.
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int load_module(void)
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
static struct @467 event_types[]
static corosync_cfg_handle_t cfg_handle
static int unload_module(void)
@ SUBSCRIBE
@ PUBLISH
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static int set_event(const char *event_type, int pubsub)
static void publish_mwi_to_stasis(struct ast_event *event)
Publish a received MWI ast_event to Stasis Message Bus API.
static ast_rwlock_t init_cpg_lock
static cpg_handle_t cpg_handle
static int dump_cache_cb(void *obj, void *arg, int flags)
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
#define NULL
Definition resample.c:96
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition stasis.h:1515
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition stasis.c:1615
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition stasis.c:684
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name,...)
Boiler-plate messaging macro for defining local message types.
Definition stasis.h:1467
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition stasis.c:1645
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition stasis.c:1578
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
#define stasis_message_router_create(topic)
Create a new message router object.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
Generic container type.
descriptor for a cli entry.
Definition cli.h:171
int args
This gets set in ast_cli_register()
Definition cli.h:185
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
An Entity ID is essentially a MAC address, brief and unique.
Definition utils.h:850
unsigned char eid[6]
Definition utils.h:851
An event.
Definition event.c:81
Structure used to handle boolean flags.
Definition utils.h:217
Abstract JSON element (object, array, string, int, ...).
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A payload wrapper around a corosync ping event.
struct ast_event * event
Forwarding information.
Definition stasis.c:1598
struct ast_eid eid
char * name
Definition stasis.c:453
const char * name
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
#define ast_assert(a)
Definition utils.h:776
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
Definition utils.c:1807
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition utils.c:3130
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition utils.c:2875
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
Definition utils.c:3135
#define ARRAY_LEN(a)
Definition utils.h:703
struct ast_eid ast_eid_default
Global EID.
Definition options.c:94

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
PUBLISH 
SUBSCRIBE 

Definition at line 1148 of file res_corosync.c.

1148 {
1149 PUBLISH,
1150 SUBSCRIBE,
1151};

Function Documentation

◆ AST_MODULE_INFO_STANDARD_EXTENDED()

AST_MODULE_INFO_STANDARD_EXTENDED ( ASTERISK_GPL_KEY  ,
"Corosync"   
)

◆ cfg_shutdown_cb()

static void cfg_shutdown_cb ( corosync_cfg_handle_t  cfg_handle,
corosync_cfg_shutdown_flags_t  flags 
)
static

Definition at line 479 of file res_corosync.c.

481{
482}

◆ cleanup_module()

static void cleanup_module ( void  )
static

Definition at line 1249 of file res_corosync.c.

1250{
1251 cs_error_t cs_err;
1252 unsigned int i;
1253
1254 if (stasis_router) {
1255
1256 /* Unsubscribe all topic forwards and cancel all message routes */
1257 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1258 struct ao2_container *messages = NULL;
1259 int messages_count;
1260 unsigned char subscribe = 0;
1261
1263 ast_debug(5, "cleanup_module wrlock\n");
1264 subscribe = event_types[i].subscribe;
1265
1266 if (event_types[i].sub) {
1269 }
1270 event_types[i].publish = 0;
1271 event_types[i].subscribe = 0;
1273 ast_debug(5, "cleanup_module unlock\n");
1274
1277 messages_count = ao2_container_count(messages);
1278 ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1280 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
1281 ao2_t_ref(messages, -1, "Dispose of flushed cache");
1282 }
1283 }
1284
1287 }
1288
1290 ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1292 }
1293
1294 STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1295
1297 char meepmeep = 'x';
1298 dispatch_thread.stop = 1;
1299 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1300 5000) == -1) {
1301 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1302 strerror(errno), errno);
1303 }
1304 pthread_join(dispatch_thread.id, NULL);
1305 }
1306
1307 if (dispatch_thread.alert_pipe[0] != -1) {
1308 close(dispatch_thread.alert_pipe[0]);
1309 dispatch_thread.alert_pipe[0] = -1;
1310 }
1311
1312 if (dispatch_thread.alert_pipe[1] != -1) {
1313 close(dispatch_thread.alert_pipe[1]);
1314 dispatch_thread.alert_pipe[1] = -1;
1315 }
1316
1318 ast_debug(5, "cleanup_module wrlock\n");
1319 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1320 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1321 }
1322 cpg_handle = 0;
1323
1324 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1325 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1326 }
1327 cfg_handle = 0;
1330 ast_debug(5, "cleanup_module unlock\n");
1331 }
1333 nodes = NULL;
1334}

References ao2_callback, ao2_cleanup, ao2_container_count(), ao2_t_ref, ARRAY_LEN, ast_carefulwrite(), ast_debug, ast_log, AST_PTHREADT_NULL, ast_rwlock_trywrlock, ast_rwlock_unlock, ast_rwlock_wrlock, cache_fn, cfg_handle, clear_node_cache(), corosync_aggregate_topic, corosync_node_joined, cpg_handle, dispatch_thread, errno, event_types, event_types_lock, init_cpg_lock, LOG_ERROR, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, stasis_cache_dump_all(), stasis_forward_cancel(), stasis_message_router_remove(), stasis_message_router_unsubscribe_and_join(), STASIS_MESSAGE_TYPE_CLEANUP, stasis_router, sub, subscribe, and topic_fn.

Referenced by load_module(), and unload_module().

◆ clear_node_cache()

static int clear_node_cache ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 623 of file res_corosync.c.

624{
625 struct stasis_message *cached_msg = obj;
626 struct stasis_topic *topic = arg;
627 struct stasis_message *msg;
628 struct ast_eid *msg_eid;
629
630 if (!cached_msg) {
631 return 0;
632 }
633
634 msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
635 if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
636 {
637 msg = stasis_cache_clear_create(cached_msg);
638 if (msg) {
639 stasis_publish(topic, msg);
640 ao2_cleanup(msg);
641 }
642 }
643
644 return 0;
645}

References ao2_cleanup, ast_eid_cmp(), ast_eid_default, stasis_cache_clear_create(), stasis_message_eid(), and stasis_publish().

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ corosync_node_alloc()

static struct corosync_node * corosync_node_alloc ( struct ast_event event)
static

◆ corosync_node_cmp_fn()

static int corosync_node_cmp_fn ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 133 of file res_corosync.c.

134{
135 struct corosync_node *left = obj;
136 struct corosync_node *right = arg;
137 const int *id = arg;
138 int cmp;
139
140 switch (flags & OBJ_SEARCH_MASK) {
142 id = &right->id;
143 /* Fall through */
144 case OBJ_SEARCH_KEY:
145 cmp = (left->id == *id);
146 break;
148 cmp = (left->id == right->id);
149 break;
150 default:
151 /* Sort can only work on something with a full or partial key. */
152 ast_assert(0);
153 cmp = 1;
154 break;
155 }
156 return cmp ? CMP_MATCH : 0;
157}

References ast_assert, CMP_MATCH, corosync_node::id, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by load_module().

◆ corosync_node_hash_fn()

static int corosync_node_hash_fn ( const void *  obj,
const int  flags 
)
static

Definition at line 113 of file res_corosync.c.

114{
115 const struct corosync_node *node;
116 const int *id;
117
118 switch (flags & OBJ_SEARCH_MASK) {
119 case OBJ_SEARCH_KEY:
120 id = obj;
121 break;
123 node = obj;
124 id = &node->id;
125 break;
126 default:
127 ast_assert(0);
128 return 0;
129 }
130 return *id;
131}

References ast_assert, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by load_module().

◆ corosync_ping()

static char * corosync_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1059 of file res_corosync.c.

1060{
1061 struct ast_event *event;
1062
1063 switch (cmd) {
1064 case CLI_INIT:
1065 e->command = "corosync ping";
1066 e->usage =
1067 "Usage: corosync ping\n"
1068 " Send a test ping to the cluster.\n"
1069 "A NOTICE will be in the log for every ping received\n"
1070 "on a server.\n If you send a ping, you should see a NOTICE\n"
1071 "in the log for every server in the cluster.\n";
1072 return NULL;
1073
1074 case CLI_GENERATE:
1075 return NULL; /* no completion */
1076 }
1077
1078 if (a->argc != e->args) {
1079 return CLI_SHOWUSAGE;
1080 }
1081
1083
1084 if (!event) {
1085 return CLI_FAILURE;
1086 }
1087
1088 event_types[AST_EVENT_PING].publish_to_stasis(event);
1089
1091 return CLI_SUCCESS;
1092}

References a, ast_cli_entry::args, ast_event_destroy(), AST_EVENT_IE_END, ast_event_new(), AST_EVENT_PING, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, NULL, and ast_cli_entry::usage.

◆ corosync_ping_payload_dtor()

static void corosync_ping_payload_dtor ( void *  obj)
static

Destructor for the corosync_ping_payload wrapper object.

Definition at line 167 of file res_corosync.c.

168{
169 struct corosync_ping_payload *payload = obj;
170
171 ast_free(payload->event);
172}

References ast_free, and corosync_ping_payload::event.

Referenced by publish_corosync_ping_to_stasis().

◆ corosync_ping_to_event()

static struct ast_event * corosync_ping_to_event ( struct stasis_message message)
static

Convert a Corosync PING to a ast_event.

Definition at line 175 of file res_corosync.c.

176{
177 struct corosync_ping_payload *payload;
178 struct ast_event *event;
179 struct ast_eid *event_eid;
180
181 if (!message) {
182 return NULL;
183 }
184
185 payload = stasis_message_data(message);
186
187 if (!payload->event) {
188 return NULL;
189 }
190
191 event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
192
194 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
196
197 return event;
198}

References ast_event_get_ie_raw(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload::event, NULL, and stasis_message_data().

◆ corosync_show_config()

static char * corosync_show_config ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1094 of file res_corosync.c.

1095{
1096 unsigned int i;
1097
1098 switch (cmd) {
1099 case CLI_INIT:
1100 e->command = "corosync show config";
1101 e->usage =
1102 "Usage: corosync show config\n"
1103 " Show configuration loaded from res_corosync.conf\n";
1104 return NULL;
1105
1106 case CLI_GENERATE:
1107 return NULL; /* no completion */
1108 }
1109
1110 if (a->argc != e->args) {
1111 return CLI_SHOWUSAGE;
1112 }
1113
1114 ast_cli(a->fd, "\n"
1115 "=============================================================\n"
1116 "=== res_corosync config =====================================\n"
1117 "=============================================================\n"
1118 "===\n");
1119
1121 ast_debug(5, "corosync_show_config rdlock\n");
1122 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1123 if (event_types[i].publish) {
1124 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
1125 event_types[i].name);
1126 }
1127 if (event_types[i].subscribe) {
1128 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
1129 event_types[i].name);
1130 }
1131 }
1133 ast_debug(5, "corosync_show_config unlock\n");
1134
1135 ast_cli(a->fd, "===\n"
1136 "=============================================================\n"
1137 "\n");
1138
1139 return CLI_SUCCESS;
1140}

References a, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_rwlock_rdlock, ast_rwlock_unlock, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, event_types_lock, test_val::name, NULL, publish, subscribe, and ast_cli_entry::usage.

◆ corosync_show_members()

static char * corosync_show_members ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 965 of file res_corosync.c.

966{
967 cs_error_t cs_err;
968 cpg_iteration_handle_t cpg_iter;
969 struct cpg_iteration_description_t cpg_desc;
970 unsigned int i;
971
972 switch (cmd) {
973 case CLI_INIT:
974 e->command = "corosync show members";
975 e->usage =
976 "Usage: corosync show members\n"
977 " Show corosync cluster members\n";
978 return NULL;
979
980 case CLI_GENERATE:
981 return NULL; /* no completion */
982 }
983
984 if (a->argc != e->args) {
985 return CLI_SHOWUSAGE;
986 }
987
989 ast_debug(5, "corosync_show_members rdlock\n");
990 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
991
992 if (cs_err != CS_OK) {
993 ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
994 cpg_iteration_finalize(cpg_iter);
996 ast_debug(5, "corosync_show_members unlock\n");
997 return CLI_FAILURE;
998 }
999
1000 ast_cli(a->fd, "\n"
1001 "=============================================================\n"
1002 "=== Cluster members =========================================\n"
1003 "=============================================================\n"
1004 "===\n");
1005
1006 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1007 cs_err == CS_OK;
1008 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010 corosync_cfg_node_address_t addrs[8];
1011 int num_addrs = 0;
1012 unsigned int j;
1013 #endif
1014
1015 ast_cli(a->fd, "=== Node %u\n", i);
1016 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
1017
1018 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1019 /*
1020 * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
1021 * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
1022 * resulting in crash.
1023 */
1024 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1025 ARRAY_LEN(addrs), &num_addrs, addrs);
1026 if (cs_err != CS_OK) {
1027 ast_log(LOG_WARNING, "Failed to get node addresses\n");
1028 continue;
1029 }
1030
1031 for (j = 0; j < num_addrs; j++) {
1032 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
1033 size_t sa_len = (size_t) addrs[j].address_length;
1034 char buf[128];
1035
1036 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
1037
1038 ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
1039 }
1040 #else
1041 ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
1042 #endif
1043 }
1044
1045 ast_cli(a->fd, "===\n"
1046 "=============================================================\n"
1047 "\n");
1048
1049 cpg_iteration_finalize(cpg_iter);
1051 ast_debug(5, "corosync_show_members unlock\n");
1052 } else {
1053 ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
1054 }
1055
1056 return CLI_SUCCESS;
1057}

References a, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, cpg_handle, init_cpg_lock, LOG_WARNING, NULL, and ast_cli_entry::usage.

◆ corosync_topic()

static struct stasis_topic * corosync_topic ( void  )
static

Internal accessor for our topic.

Definition at line 74 of file res_corosync.c.

75{
77}

References corosync_aggregate_topic.

Referenced by load_general_config(), and publish_corosync_ping_to_stasis().

◆ cpg_confchg_cb()

static void cpg_confchg_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
const struct cpg_address *  member_list,
size_t  member_list_entries,
const struct cpg_address *  left_list,
size_t  left_list_entries,
const struct cpg_address *  joined_list,
size_t  joined_list_entries 
)
static

Definition at line 647 of file res_corosync.c.

651{
652 unsigned int i;
653
654
655 for (i = 0; i < left_list_entries; i++) {
656 const struct cpg_address *cpg_node = &left_list[i];
657 struct corosync_node* node;
658 unsigned int j;
659
660 node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
661 if (!node) {
662 continue;
663 }
664
665 for (j = 0; j < ARRAY_LEN(event_types); j++) {
666 struct ao2_container *messages;
667 int messages_count;
668
670 ast_debug(5, "cpg_confchg_cb rdlock\n");
671 if (!event_types[j].subscribe) {
673 ast_debug(5, "cpg_confchg_cb unlock\n");
674 continue;
675 }
676
679 ast_debug(5, "cpg_confchg_cb unlock\n");
680 continue;
681 }
683 ast_debug(5, "cpg_confchg_cb unlock\n");
684
686
687 messages_count = ao2_container_count(messages);
688 ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
690 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
691
692 ao2_t_ref(messages, -1, "Dispose of flushed cache");
693 }
694
696 ao2_ref(node, -1);
697 }
698
699 /* If any new nodes have joined, dump our cache of events we are publishing
700 * that originated from this server. */
701 if (!joined_list_entries) {
702 return;
703 }
704
705 for (i = 0; i < ARRAY_LEN(event_types); i++) {
706 struct ao2_container *messages;
707 int messages_count;
708
710 ast_debug(5, "cpg_confchg_cb rdlock\n");
711 if (!event_types[i].publish) {
713 ast_debug(5, "cpg_confchg_cb unlock\n");
714 continue;
715 }
716
719 ast_debug(5, "cpg_confchg_cb unlock\n");
720 continue;
721 }
723 ast_debug(5, "cpg_confchg_cb unlock\n");
724
726
727 messages_count = ao2_container_count(messages);
728 ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
730 ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
731
732 ao2_t_ref(messages, -1, "Dispose of dumped cache");
733 }
734}

References ao2_callback, ao2_container_count(), ao2_find, ao2_ref, ao2_t_ref, ARRAY_LEN, ast_debug, ast_eid_default, ast_log, ast_rwlock_rdlock, ast_rwlock_unlock, cache_fn, clear_node_cache(), dump_cache_cb(), event_types, event_types_lock, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, publish, publish_cluster_discovery_to_stasis_full(), stasis_cache_dump_by_eid(), subscribe, and topic_fn.

◆ cpg_deliver_cb()

static void cpg_deliver_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
uint32_t  nodeid,
uint32_t  pid,
void *  msg,
size_t  msg_len 
)
static

Definition at line 484 of file res_corosync.c.

486{
487 struct ast_event *event;
488 void (*publish_handler)(struct ast_event *) = NULL;
489 enum ast_event_type event_type;
490 struct ast_eid *event_eid;
491
492 if (msg_len < ast_event_minimum_length()) {
493 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
494 (unsigned int) msg_len,
495 (unsigned int) ast_event_minimum_length());
496 return;
497 }
498
499 event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
500 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
501 /* Don't feed events back in that originated locally. */
502 return;
503 }
504
505 event_type = ast_event_get_type(msg);
506 if (event_type > AST_EVENT_TOTAL) {
507 /* Egads, we don't support this */
508 return;
509 }
510
512 ast_debug(5, "cpg_deliver_cb rdlock\n");
513 publish_handler = event_types[event_type].publish_to_stasis;
514 if (!event_types[event_type].subscribe || !publish_handler) {
515 /* We are not configured to subscribe to these events or
516 we have no way to publish it internally. */
518 ast_debug(5, "cpg_deliver_cb unlock\n");
519 return;
520 }
522 ast_debug(5, "cpg_deliver_cb unlock\n");
523
524 if (!(event = ast_malloc(msg_len))) {
525 return;
526 }
527
528 memcpy(event, msg, msg_len);
529
530 if (event_type == AST_EVENT_PING) {
531 const struct ast_eid *eid;
532 char buf[128] = "";
533
535 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
536 ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
537 }
538 ast_debug(5, "Publishing event %s (%u) to stasis\n",
539 ast_event_get_type_name(event), event_type);
540 publish_handler(event);
542}

References ast_debug, ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_type(), ast_event_get_type_name(), AST_EVENT_IE_EID, ast_event_minimum_length(), AST_EVENT_PING, AST_EVENT_TOTAL, ast_free, ast_log, ast_malloc, ast_rwlock_rdlock, ast_rwlock_unlock, buf, ast_eid::eid, event_types, event_types_lock, LOG_NOTICE, NULL, and subscribe.

◆ dispatch_thread_handler()

static void * dispatch_thread_handler ( void *  data)
static

Definition at line 782 of file res_corosync.c.

783{
784 cs_error_t cs_err;
785 struct pollfd pfd[3] = {
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 { .events = POLLIN, },
789 };
790
792 ast_debug(5, "dispatch_thread_handler rdlock\n");
793 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
794 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
796 ast_debug(5, "dispatch_thread_handler unlock\n");
797 return NULL;
798 }
799
800 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
801 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
803 ast_debug(5, "dispatch_thread_handler unlock\n");
804 return NULL;
805 }
806
807 pfd[2].fd = dispatch_thread.alert_pipe[0];
809 ast_debug(5, "dispatch_thread_handler unlock\n");
810 } else {
811 ast_log(LOG_ERROR, "Failed to get fd: initializing CPG. This module is now broken.\n");
812 return NULL;
813 }
815 while (!dispatch_thread.stop) {
816 int res;
817
818 cs_err = CS_OK;
819
820 pfd[0].revents = 0;
821 pfd[1].revents = 0;
822 pfd[2].revents = 0;
823
824 res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
825 if (res == -1 && errno != EINTR && errno != EAGAIN) {
826 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
827 cs_err = CS_ERR_BAD_HANDLE;
828 } else if (res == 0) {
829 unsigned int local_nodeid;
830
832 ast_debug(5, "dispatch_thread_handler rdlock\n");
833 if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
834 struct cpg_name name;
835 struct cpg_address address[CPG_MEMBERS_MAX];
836 int entries = CPG_MEMBERS_MAX;
837
838 ast_copy_string(name.value, "asterisk", sizeof(name.value));
839 name.length = strlen(name.value);
840 if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
841 int i;
842 int found = 0;
843
844 ast_debug(1, "CPG group has %i node membership\n", entries);
845 for (i = 0; (i < entries) && !found; i++) {
846 if (address[i].nodeid == local_nodeid)
847 found = 1;
848 }
849 if (!found) {
850 ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
852 cs_err = CS_ERR_BAD_HANDLE;
853 }
854 } else {
855 ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
857 cs_err = CS_ERR_BAD_HANDLE;
858 }
859 } else {
860 ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
862 cs_err = CS_ERR_BAD_HANDLE;
863 }
865 ast_debug(5, "dispatch_thread_handler unlock\n");
866 } else {
867 ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
869 cs_err = CS_ERR_BAD_HANDLE;
870 }
871 } else {
873 ast_debug(5, "dispatch_thread_handler rdlock\n");
874 if (pfd[0].revents & POLLIN) {
875 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
876 ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
877 }
878 }
879
880 if (pfd[1].revents & POLLIN) {
881 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
882 ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
883 }
884 }
886 ast_debug(5, "dispatch_thread_handler unlock\n");
887 } else {
888 ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
889 }
890 }
891 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
892
893 /* If corosync gets restarted out from under Asterisk, try to recover. */
894
895 ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
896
898 struct cpg_name name;
899 ast_debug(5, "dispatch_thread_handler wrlock\n");
900
902 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
903 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
904 }
905
906 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
907 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
908 }
909
910 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
911 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
913 ast_debug(5, "dispatch_thread_handler unlock\n");
914 sleep(5);
915 continue;
916 }
917
918 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
919 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
921 ast_debug(5, "dispatch_thread_handler unlock\n");
922 sleep(5);
923 continue;
924 }
925
926 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
927 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
929 ast_debug(5, "dispatch_thread_handler unlock\n");
930 sleep(5);
931 continue;
932 }
933
934 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
935 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
937 ast_debug(5, "dispatch_thread_handler unlock\n");
938 sleep(5);
939 continue;
940 }
941
942 ast_copy_string(name.value, "asterisk", sizeof(name.value));
943 name.length = strlen(name.value);
944 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
945 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
947 ast_debug(5, "dispatch_thread_handler unlock\n");
948 sleep(5);
949 continue;
950 }
953 ast_debug(5, "dispatch_thread_handler unlock\n");
954 ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
956 } else {
957 ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
958 }
959 }
960 }
961
962 return NULL;
963}

References ARRAY_LEN, ast_copy_string(), ast_debug, ast_log, ast_poll, ast_rwlock_tryrdlock, ast_rwlock_trywrlock, ast_rwlock_unlock, cfg_callbacks, cfg_handle, corosync_node_joined, COROSYNC_POLL_TIMEOUT, cpg_callbacks, cpg_handle, dispatch_thread, errno, init_cpg_lock, LOG_ERROR, LOG_NOTICE, LOG_WARNING, name, NULL, and send_cluster_notify().

Referenced by load_module().

◆ dump_cache_cb()

static int dump_cache_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 610 of file res_corosync.c.

611{
612 struct stasis_message *message = obj;
613
614 if (!message) {
615 return 0;
616 }
617
619
620 return 0;
621}

References publish_to_corosync().

Referenced by cpg_confchg_cb().

◆ load_config()

static int load_config ( unsigned int  reload)
static

Definition at line 1222 of file res_corosync.c.

1223{
1224 static const char filename[] = "res_corosync.conf";
1225 struct ast_config *cfg;
1226 const char *cat = NULL;
1227 struct ast_flags config_flags = { 0 };
1228 int res = 0;
1229
1230 cfg = ast_config_load(filename, config_flags);
1231
1233 return -1;
1234 }
1235
1236 while ((cat = ast_category_browse(cfg, cat))) {
1237 if (!strcasecmp(cat, "general")) {
1238 res = load_general_config(cfg);
1239 } else {
1240 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1241 }
1242 }
1243
1244 ast_config_destroy(cfg);
1245
1246 return res;
1247}

References ast_category_browse(), ast_config_destroy(), ast_config_load, ast_log, CONFIG_STATUS_FILEINVALID, CONFIG_STATUS_FILEMISSING, load_general_config(), LOG_WARNING, and NULL.

◆ load_general_config()

static int load_general_config ( struct ast_config cfg)
static

Definition at line 1177 of file res_corosync.c.

1178{
1179 struct ast_variable *v;
1180 int res = 0;
1181 unsigned int i;
1182
1184 ast_debug(5, "load_general_config wrlock\n");
1185
1186 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1187 event_types[i].publish = event_types[i].publish_default;
1188 event_types[i].subscribe = event_types[i].subscribe_default;
1189 }
1190
1191 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
1192 if (!strcasecmp(v->name, "publish_event")) {
1193 res = set_event(v->value, PUBLISH);
1194 } else if (!strcasecmp(v->name, "subscribe_event")) {
1195 res = set_event(v->value, SUBSCRIBE);
1196 } else {
1197 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1198 }
1199 }
1200
1201 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1202 if (event_types[i].publish && !event_types[i].sub) {
1204 corosync_topic());
1208 NULL);
1209 } else if (!event_types[i].publish && event_types[i].sub) {
1213 }
1214 }
1215
1217 ast_debug(5, "load_general_config unlock\n");
1218
1219 return res;
1220}

References ARRAY_LEN, ast_debug, ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, ast_variable_browse(), corosync_topic(), event_types, event_types_lock, LOG_WARNING, message_type_fn, ast_variable::name, ast_variable::next, NULL, publish, PUBLISH, set_event(), stasis_forward_all(), stasis_forward_cancel(), stasis_message_cb(), stasis_message_router_add(), stasis_message_router_remove(), stasis_router, sub, SUBSCRIBE, topic_fn, and ast_variable::value.

Referenced by load_config().

◆ load_module()

static int load_module ( void  )
static

Definition at line 1336 of file res_corosync.c.

1337{
1338 cs_error_t cs_err;
1339 struct cpg_name name;
1340
1342 ast_log(LOG_ERROR, "Entity ID is not set.\n");
1344 }
1345
1348 if (!nodes) {
1349 goto failed;
1350 }
1351
1352 corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
1354 ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1355 goto failed;
1356 }
1357
1359 if (!stasis_router) {
1360 ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1361 goto failed;
1362 }
1365
1366 if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1367 ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1368 goto failed;
1369 }
1370
1371 if (load_config(0)) {
1372 /* simply not configured is not a fatal error */
1373 goto failed;
1374 }
1375
1378 ast_debug(5, "load_module wrlock\n");
1379 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1380 ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1382 ast_debug(5, "load_module unlock\n");
1383 goto failed;
1384 }
1385
1386 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1387 ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1389 ast_debug(5, "load_module unlock\n");
1390 goto failed;
1391 }
1392
1393 ast_copy_string(name.value, "asterisk", sizeof(name.value));
1394 name.length = strlen(name.value);
1395
1396 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1397 ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1399 ast_debug(5, "load_module unlock\n");
1400 goto failed;
1401 }
1402
1403 if (pipe(dispatch_thread.alert_pipe) == -1) {
1404 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1405 strerror(errno), errno);
1407 ast_debug(5, "load_module unlock\n");
1408 goto failed;
1409 }
1411
1413 ast_debug(5, "load_module unlock\n");
1416 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1417 goto failed;
1418 }
1419
1421 } else {
1422 goto failed;
1423 }
1424
1426
1427failed:
1429
1431}

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ARRAY_LEN, ast_cli_register_multiple, ast_copy_string(), ast_debug, ast_eid_default, ast_eid_is_empty(), ast_log, AST_LOG_ERROR, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_rwlock_trywrlock, ast_rwlock_unlock, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, cfg_callbacks, cfg_handle, cleanup_module(), corosync_aggregate_topic, corosync_cli, corosync_node_cmp_fn(), corosync_node_hash_fn(), corosync_node_joined, corosync_pthread_create_background, cpg_callbacks, cpg_handle, dispatch_thread, dispatch_thread_handler(), errno, init_cpg_lock, load_config(), LOG_ERROR, name, nodes, NULL, stasis_message_router_create, stasis_message_router_set_congestion_limits(), STASIS_MESSAGE_TYPE_INIT, stasis_router, and stasis_topic_create().

◆ publish_cluster_discovery_to_stasis()

static void publish_cluster_discovery_to_stasis ( struct ast_event event)
static

Publish a received cluster discovery ast_event to Stasis Message Bus API.

Definition at line 351 of file res_corosync.c.

352{
353 struct corosync_node *node;
355 struct ast_eid *event_eid;
356
358
359 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
360 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
361 /* Don't feed events back in that originated locally. */
362 return;
363 }
364
367 if (node) {
368 /* We already know about this node */
370 ao2_ref(node, -1);
371 return;
372 }
373
375 if (!node) {
377 return;
378 }
381
383
384 ao2_ref(node, -1);
385
386 /*
387 * When we get news that someone else has joined, we need to let them
388 * know we exist as well.
389 */
391}

References ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_eid_cmp(), ast_eid_default, AST_EVENT_CLUSTER_DISCOVERY, ast_event_get_ie_raw(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_NODE_ID, corosync_node_alloc(), nodes, OBJ_NOLOCK, OBJ_SEARCH_KEY, publish_cluster_discovery_to_stasis_full(), and send_cluster_notify().

◆ publish_cluster_discovery_to_stasis_full()

static void publish_cluster_discovery_to_stasis_full ( struct corosync_node node,
int  joined 
)
static

Publish cluster discovery to Stasis Message Bus API.

Definition at line 303 of file res_corosync.c.

304{
305 struct ast_json *json;
306 struct ast_json_payload *payload;
307 struct stasis_message *message;
308 char eid[18];
309 const char *addr;
310
311 ast_eid_to_str(eid, sizeof(eid), &node->eid);
312 addr = ast_sockaddr_stringify_addr(&node->addr);
313
314 ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
315 node->id,
316 eid,
317 addr,
318 joined ? "joined" : "left");
319
320 json = ast_json_pack("{s: s, s: i, s: s, s: i}",
321 "address", addr,
322 "node_id", node->id,
323 "eid", eid,
324 "joined", joined);
325 if (!json) {
326 return;
327 }
328
329 payload = ast_json_payload_create(json);
330 if (!payload) {
331 ast_json_unref(json);
332 return;
333 }
334
336 if (!message) {
337 ast_json_unref(json);
338 ao2_ref(payload, -1);
339 return;
340 }
341
343 ast_json_unref(json);
344 ao2_ref(payload, -1);
345 ao2_ref(message, -1);
346}

References ao2_ref, ast_cluster_discovery_type(), ast_eid_to_str(), ast_json_pack(), ast_json_payload_create(), ast_json_unref(), ast_log, AST_LOG_NOTICE, ast_sockaddr_stringify_addr(), ast_system_topic(), stasis_message::eid, stasis_message_create(), and stasis_publish().

Referenced by cpg_confchg_cb(), and publish_cluster_discovery_to_stasis().

◆ publish_corosync_ping_to_stasis()

static void publish_corosync_ping_to_stasis ( struct ast_event event)
static

Publish a Corosync ping to Stasis Message Bus API.

Definition at line 204 of file res_corosync.c.

205{
206 struct corosync_ping_payload *payload;
207 struct stasis_message *message;
208 struct ast_eid *event_eid;
209
212
213 if (!corosync_ping_message_type()) {
214 return;
215 }
216
217 payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
218 if (!payload) {
219 return;
220 }
221 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
223 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
225
226 message = stasis_message_create(corosync_ping_message_type(), payload);
227 if (!message) {
228 ao2_t_ref(payload, -1, "Destroy payload on off nominal");
229 return;
230 }
231
233
234 ao2_t_ref(payload, -1, "Hand ref to stasis");
235 ao2_t_ref(message, -1, "Hand ref to stasis");
236}

References ao2_t_alloc, ao2_t_ref, ast_assert, ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload_dtor(), corosync_topic(), corosync_ping_payload::event, NULL, stasis_message_create(), and stasis_publish().

◆ publish_device_state_to_stasis()

static void publish_device_state_to_stasis ( struct ast_event event)
static

Publish a received device state ast_event to Stasis Message Bus API.

Definition at line 432 of file res_corosync.c.

433{
434 const char *device;
436 unsigned int cachable;
437 struct ast_eid *event_eid;
438
440
444 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
445
446 if (ast_strlen_zero(device)) {
447 return;
448 }
449
450 if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
451 char eid[18];
452 ast_eid_to_str(eid, sizeof(eid), event_eid);
453 ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
454 device, eid);
455 }
456}

References ast_assert, ast_eid_to_str(), AST_EVENT_DEVICE_STATE_CHANGE, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CACHABLE, AST_EVENT_IE_DEVICE, AST_EVENT_IE_EID, AST_EVENT_IE_STATE, ast_log, ast_publish_device_state_full(), ast_strlen_zero(), ast_eid::eid, and LOG_WARNING.

◆ publish_event_to_corosync()

static void publish_event_to_corosync ( struct ast_event event)
static

Definition at line 544 of file res_corosync.c.

545{
546 cs_error_t cs_err;
547 struct iovec iov;
548
549 iov.iov_base = (void *)event;
550 iov.iov_len = ast_event_get_size(event);
551
552 ast_debug(5, "Publishing event %s (%u) to corosync\n",
554
555 /* The stasis subscription will only exist if we are configured to publish
556 * these events, so just send away. */
558 ast_debug(5, "publish_event_to_corosync rdlock\n");
559 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
560 ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
562 }
564 ast_debug(5, "publish_event_to_corosync unlock\n");
565 } else {
566 ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
568 }
569}

References ast_debug, ast_event_get_size(), ast_event_get_type(), ast_event_get_type_name(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, corosync_node_joined, cpg_handle, init_cpg_lock, and LOG_WARNING.

Referenced by publish_to_corosync(), and send_cluster_notify().

◆ publish_mwi_to_stasis()

static void publish_mwi_to_stasis ( struct ast_event event)
static

Publish a received MWI ast_event to Stasis Message Bus API.

Definition at line 394 of file res_corosync.c.

395{
396 const char *mailbox;
397 const char *context;
398 unsigned int new_msgs;
399 unsigned int old_msgs;
400 struct ast_eid *event_eid;
401
403
408 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
409
410 if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
411 return;
412 }
413
414 if (new_msgs > INT_MAX) {
415 new_msgs = INT_MAX;
416 }
417
418 if (old_msgs > INT_MAX) {
419 old_msgs = INT_MAX;
420 }
421
422 if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
423 (int)old_msgs, NULL, event_eid)) {
424 char eid[18];
425 ast_eid_to_str(eid, sizeof(eid), event_eid);
426 ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
427 mailbox, context, eid);
428 }
429}

References ast_assert, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CONTEXT, AST_EVENT_IE_EID, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_OLDMSGS, AST_EVENT_MWI, ast_log, ast_publish_mwi_state_full(), ast_strlen_zero(), ast_eid::eid, LOG_WARNING, and NULL.

◆ publish_to_corosync()

static void publish_to_corosync ( struct stasis_message message)
static

Definition at line 571 of file res_corosync.c.

572{
573 struct ast_event *event;
574 struct ast_eid *event_eid;
575
577 if (!event) {
578 return;
579 }
580
581 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
582 if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
583 /* If the event didn't originate from this server, don't send it back out. */
585 return;
586 }
587
589 const struct ast_eid *eid;
590 char buf[128] = "";
591
593 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
594 ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
595 }
596
599}

References ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_destroy(), ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_PING, ast_log, buf, ast_eid::eid, LOG_NOTICE, publish_event_to_corosync(), and stasis_message_to_event().

Referenced by dump_cache_cb(), and stasis_message_cb().

◆ send_cluster_notify()

static void send_cluster_notify ( void  )
static

Informs the cluster of our EID and our IP addresses.

Definition at line 737 of file res_corosync.c.

738{
739 struct ast_event *event;
740 unsigned int node_id;
741 cs_error_t cs_err;
742 corosync_cfg_node_address_t corosync_addr;
743 int num_addrs = 0;
744 struct sockaddr *sa;
745 size_t sa_len;
746 char buf[128];
747 int res;
748
750 ast_debug(5, "send_cluster_notify rdlock\n");
751
752 if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
753 ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
754 return;
755 }
756
757 if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758 ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
759 return;
760 }
761
763 ast_debug(5, "send_cluster_notify unlock\n");
764 }
765
766 sa = (struct sockaddr *)corosync_addr.address;
767 sa_len = (size_t)corosync_addr.address_length;
768 if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
769 ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770 gai_strerror(res), res);
771 return;
772 }
773
780}

References ast_debug, AST_EVENT_CLUSTER_DISCOVERY, ast_event_destroy(), AST_EVENT_IE_END, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_STR, AST_EVENT_IE_PLTYPE_UINT, ast_event_new(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, init_cpg_lock, LOG_WARNING, NULL, and publish_event_to_corosync().

Referenced by dispatch_thread_handler(), and publish_cluster_discovery_to_stasis().

◆ set_event()

static int set_event ( const char *  event_type,
int  pubsub 
)
static

Definition at line 1153 of file res_corosync.c.

1154{
1155 unsigned int i;
1156
1157 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1158 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1159 continue;
1160 }
1161
1162 switch (pubsub) {
1163 case PUBLISH:
1164 event_types[i].publish = 1;
1165 break;
1166 case SUBSCRIBE:
1167 event_types[i].subscribe = 1;
1168 break;
1169 }
1170
1171 break;
1172 }
1173
1174 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1175}

References ARRAY_LEN, event_types, name, PUBLISH, and SUBSCRIBE.

Referenced by load_general_config().

◆ stasis_message_cb()

static void stasis_message_cb ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 601 of file res_corosync.c.

602{
603 if (!message) {
604 return;
605 }
606
608}

References publish_to_corosync().

Referenced by load_general_config().

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL()

STASIS_MESSAGE_TYPE_DEFN_LOCAL ( corosync_ping_message_type  ,
to_event = corosync_ping_to_event 
)

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 1433 of file res_corosync.c.

1434{
1436
1438
1439 return 0;
1440}

References ARRAY_LEN, ast_cli_unregister_multiple(), cleanup_module(), and corosync_cli.

Variable Documentation

◆ alert_pipe

int alert_pipe[2]

◆ cache_fn

struct stasis_cache *(* cache_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ cfg_callbacks

corosync_cfg_callbacks_t cfg_callbacks
static

Definition at line 295 of file res_corosync.c.

295 {
296#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
297 .corosync_cfg_state_track_callback = cfg_state_track_cb,
298#endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
299 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
300};

Referenced by dispatch_thread_handler(), and load_module().

◆ cfg_handle

corosync_cfg_handle_t cfg_handle
static

◆ corosync_aggregate_topic

struct stasis_topic* corosync_aggregate_topic
static

The internal topic used for message forwarding and pings.

Definition at line 68 of file res_corosync.c.

Referenced by cleanup_module(), corosync_topic(), and load_module().

◆ corosync_cli

struct ast_cli_entry corosync_cli[]
static
Initial value:
= {
{ .handler = corosync_show_config , .summary = "Show configuration" ,},
{ .handler = corosync_show_members , .summary = "Show cluster members" ,},
{ .handler = corosync_ping , .summary = "Send a test ping to the cluster" ,},
}

Definition at line 1142 of file res_corosync.c.

1142 {
1143 AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
1144 AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
1145 AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
1146};

Referenced by load_module(), and unload_module().

◆ corosync_node_joined

int corosync_node_joined = 0
static

Join to corosync.

Definition at line 62 of file res_corosync.c.

Referenced by cleanup_module(), dispatch_thread_handler(), load_module(), and publish_event_to_corosync().

◆ cpg_callbacks

cpg_callbacks_t cpg_callbacks
static
Initial value:
= {
.cpg_deliver_fn = cpg_deliver_cb,
.cpg_confchg_fn = cpg_confchg_cb,
}

Definition at line 466 of file res_corosync.c.

466 {
467 .cpg_deliver_fn = cpg_deliver_cb,
468 .cpg_confchg_fn = cpg_confchg_cb,
469};

Referenced by dispatch_thread_handler(), and load_module().

◆ cpg_handle

cpg_handle_t cpg_handle
static

◆ [struct]

struct { ... } dispatch_thread
Initial value:
= {
.alert_pipe = { -1, -1 },
}

Referenced by cleanup_module(), dispatch_thread_handler(), and load_module().

◆ [struct]

struct { ... } event_types[]

◆ event_types_lock

ast_rwlock_t event_types_lock = AST_RWLOCK_INIT_VALUE
static

◆ id

pthread_t id

Definition at line 275 of file res_corosync.c.

Referenced by corosync_node_cmp_fn(), and corosync_node_hash_fn().

◆ init_cpg_lock

ast_rwlock_t init_cpg_lock = AST_RWLOCK_INIT_VALUE
static

◆ message_type_fn

struct stasis_message_type *(* message_type_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().

◆ name

const char* name

◆ nodes

struct ao2_container* nodes
static

◆ publish

unsigned char publish

◆ publish_default

unsigned char publish_default

Definition at line 242 of file res_corosync.c.

◆ publish_to_stasis

void(* publish_to_stasis) (struct ast_event *) ( struct ast_event )

Definition at line 248 of file res_corosync.c.

◆ stasis_router

struct stasis_message_router* stasis_router
static

Our Stasis Message Bus API message router.

Definition at line 71 of file res_corosync.c.

Referenced by cleanup_module(), load_general_config(), and load_module().

◆ stop

unsigned int stop

Definition at line 277 of file res_corosync.c.

◆ sub

struct stasis_forward* sub

Definition at line 240 of file res_corosync.c.

Referenced by cleanup_module(), and load_general_config().

◆ subscribe

unsigned char subscribe

◆ subscribe_default

unsigned char subscribe_default

Definition at line 244 of file res_corosync.c.

◆ topic_fn

struct stasis_topic *(* topic_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().