Asterisk - The Open Source Telephony Project GIT-master-67613d1
Data Structures | Macros | Enumerations | Functions | Variables
res_corosync.c File Reference
#include "asterisk.h"
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/poll-compat.h"
#include "asterisk/config.h"
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
#include "asterisk/mwi.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_system.h"
#include "asterisk/taskprocessor.h"
Include dependency graph for res_corosync.c:

Go to the source code of this file.

Data Structures

struct  corosync_node
 
struct  corosync_ping_payload
 A payload wrapper around a corosync ping event. More...
 

Macros

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)
 Corosync ipc dispatch/request and reply size. More...
 
#define COROSYNC_POLL_TIMEOUT   (10 * 1000)
 Timeout for Corosync's poll process. More...
 
#define corosync_pthread_create_background(a, b, c, d)
 Version of pthread_create to ensure stack is large enough. More...
 

Enumerations

enum  { PUBLISH , SUBSCRIBE }
 

Functions

 AST_MODULE_INFO_STANDARD_EXTENDED (ASTERISK_GPL_KEY, "Corosync")
 
static void cfg_shutdown_cb (corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
 
static void cleanup_module (void)
 
static int clear_node_cache (void *obj, void *arg, int flags)
 
static struct corosync_nodecorosync_node_alloc (struct ast_event *event)
 
static int corosync_node_cmp_fn (void *obj, void *arg, int flags)
 
static int corosync_node_hash_fn (const void *obj, const int flags)
 
static char * corosync_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static void corosync_ping_payload_dtor (void *obj)
 Destructor for the corosync_ping_payload wrapper object. More...
 
static struct ast_eventcorosync_ping_to_event (struct stasis_message *message)
 Convert a Corosync PING to a ast_event. More...
 
static char * corosync_show_config (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * corosync_show_members (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static struct stasis_topiccorosync_topic (void)
 Internal accessor for our topic. More...
 
static void cpg_confchg_cb (cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
 
static void cpg_deliver_cb (cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 
static void * dispatch_thread_handler (void *data)
 
static int dump_cache_cb (void *obj, void *arg, int flags)
 
static int load_config (unsigned int reload)
 
static int load_general_config (struct ast_config *cfg)
 
static int load_module (void)
 
static void publish_cluster_discovery_to_stasis (struct ast_event *event)
 Publish a received cluster discovery ast_event to Stasis Message Bus API. More...
 
static void publish_cluster_discovery_to_stasis_full (struct corosync_node *node, int joined)
 Publish cluster discovery to Stasis Message Bus API. More...
 
static void publish_corosync_ping_to_stasis (struct ast_event *event)
 Publish a Corosync ping to Stasis Message Bus API. More...
 
static void publish_device_state_to_stasis (struct ast_event *event)
 Publish a received device state ast_event to Stasis Message Bus API. More...
 
static void publish_event_to_corosync (struct ast_event *event)
 
static void publish_mwi_to_stasis (struct ast_event *event)
 Publish a received MWI ast_event to Stasis Message Bus API. More...
 
static void publish_to_corosync (struct stasis_message *message)
 
static void send_cluster_notify (void)
 Informs the cluster of our EID and our IP addresses. More...
 
static int set_event (const char *event_type, int pubsub)
 
static void stasis_message_cb (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
 STASIS_MESSAGE_TYPE_DEFN_LOCAL (corosync_ping_message_type,.to_event=corosync_ping_to_event,)
 
static int unload_module (void)
 

Variables

static corosync_cfg_callbacks_t cfg_callbacks
 
static corosync_cfg_handle_t cfg_handle
 
static struct stasis_topiccorosync_aggregate_topic
 The internal topic used for message forwarding and pings. More...
 
static struct ast_cli_entry corosync_cli []
 
static int corosync_node_joined = 0
 Join to corosync. More...
 
static cpg_callbacks_t cpg_callbacks
 
static cpg_handle_t cpg_handle
 
struct {
   int   alert_pipe [2]
 
   pthread_t   id
 
   unsigned int   stop:1
 
dispatch_thread
 
struct {
   struct stasis_cache *(*   cache_fn )(void)
 
   struct stasis_message_type *(*   message_type_fn )(void)
 
   const char *   name
 
   unsigned char   publish
 
   unsigned char   publish_default
 
   void(*   publish_to_stasis )(struct ast_event *)
 
   struct stasis_forward *   sub
 
   unsigned char   subscribe
 
   unsigned char   subscribe_default
 
   struct stasis_topic *(*   topic_fn )(void)
 
event_types []
 
static ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static struct ao2_containernodes
 All the nodes that we're aware of. More...
 
static struct stasis_message_routerstasis_router
 Our Stasis Message Bus API message router. More...
 

Detailed Description

module publishes ast_event representations of information to other Asterisk instances in a cluster.

Events have an associated event type, as well as information elements. The information elements are the meta data that go along with each event. For example, in the case of message waiting indication, the event type is MWI, and each MWI event contains at least three information elements: the mailbox, the number of new messages, and the number of old messages.

Author
Russell Bryant russe.nosp@m.ll@r.nosp@m.ussel.nosp@m.lbry.nosp@m.ant.n.nosp@m.et

This module is based on and replaces the previous res_ais module.

Definition in file res_corosync.c.

Macro Definition Documentation

◆ COROSYNC_IPC_BUFFER_SIZE

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)

Corosync ipc dispatch/request and reply size.

Definition at line 89 of file res_corosync.c.

◆ COROSYNC_POLL_TIMEOUT

#define COROSYNC_POLL_TIMEOUT   (10 * 1000)

Timeout for Corosync's poll process.

Definition at line 55 of file res_corosync.c.

◆ corosync_pthread_create_background

#define corosync_pthread_create_background (   a,
  b,
  c,
  d 
)
Value:
__FILE__, __FUNCTION__, __LINE__, #c)
#define AST_BACKGROUND_STACKSIZE
Definition: ooh323cDriver.c:26
#define COROSYNC_IPC_BUFFER_SIZE
Corosync ipc dispatch/request and reply size.
Definition: res_corosync.c:89
static struct test_val b
static struct test_val a
static struct test_val d
static struct test_val c
int ast_pthread_create_stack(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *data, size_t stacksize, const char *file, const char *caller, int line, const char *start_fn)
Definition: utils.c:1625

Version of pthread_create to ensure stack is large enough.

Definition at line 92 of file res_corosync.c.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
PUBLISH 
SUBSCRIBE 

Definition at line 1148 of file res_corosync.c.

1148 {
1149 PUBLISH,
1150 SUBSCRIBE,
1151};
@ SUBSCRIBE
@ PUBLISH

Function Documentation

◆ AST_MODULE_INFO_STANDARD_EXTENDED()

AST_MODULE_INFO_STANDARD_EXTENDED ( ASTERISK_GPL_KEY  ,
"Corosync"   
)

◆ cfg_shutdown_cb()

static void cfg_shutdown_cb ( corosync_cfg_handle_t  cfg_handle,
corosync_cfg_shutdown_flags_t  flags 
)
static

Definition at line 479 of file res_corosync.c.

481{
482}

◆ cleanup_module()

static void cleanup_module ( void  )
static

Definition at line 1249 of file res_corosync.c.

1250{
1251 cs_error_t cs_err;
1252 unsigned int i;
1253
1254 if (stasis_router) {
1255
1256 /* Unsubscribe all topic forwards and cancel all message routes */
1257 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1258 struct ao2_container *messages = NULL;
1259 int messages_count;
1260 unsigned char subscribe = 0;
1261
1263 ast_debug(5, "cleanup_module wrlock\n");
1264 subscribe = event_types[i].subscribe;
1265
1266 if (event_types[i].sub) {
1269 }
1270 event_types[i].publish = 0;
1271 event_types[i].subscribe = 0;
1273 ast_debug(5, "cleanup_module unlock\n");
1274
1277 messages_count = ao2_container_count(messages);
1278 ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1280 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
1281 ao2_t_ref(messages, -1, "Dispose of flushed cache");
1282 }
1283 }
1284
1287 }
1288
1290 ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1292 }
1293
1294 STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1295
1297 char meepmeep = 'x';
1298 dispatch_thread.stop = 1;
1299 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1300 5000) == -1) {
1301 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1302 strerror(errno), errno);
1303 }
1304 pthread_join(dispatch_thread.id, NULL);
1305 }
1306
1307 if (dispatch_thread.alert_pipe[0] != -1) {
1308 close(dispatch_thread.alert_pipe[0]);
1309 dispatch_thread.alert_pipe[0] = -1;
1310 }
1311
1312 if (dispatch_thread.alert_pipe[1] != -1) {
1313 close(dispatch_thread.alert_pipe[1]);
1314 dispatch_thread.alert_pipe[1] = -1;
1315 }
1316
1318 ast_debug(5, "cleanup_module wrlock\n");
1319 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1320 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1321 }
1322 cpg_handle = 0;
1323
1324 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1325 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1326 }
1327 cfg_handle = 0;
1330 ast_debug(5, "cleanup_module unlock\n");
1331 }
1333 nodes = NULL;
1334}
#define ast_log
Definition: astobj2.c:42
#define ao2_t_ref(o, delta, tag)
Definition: astobj2.h:460
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
@ OBJ_NODATA
Definition: astobj2.h:1044
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
#define ast_rwlock_wrlock(a)
Definition: lock.h:236
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ast_rwlock_trywrlock(a)
Definition: lock.h:238
#define ast_rwlock_unlock(a)
Definition: lock.h:234
int errno
static int clear_node_cache(void *obj, void *arg, int flags)
Definition: res_corosync.c:623
static struct ao2_container * nodes
All the nodes that we're aware of.
Definition: res_corosync.c:65
struct stasis_message_type *(* message_type_fn)(void)
Definition: res_corosync.c:247
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Definition: res_corosync.c:71
static struct @433 dispatch_thread
unsigned char subscribe
Definition: res_corosync.c:243
struct stasis_cache *(* cache_fn)(void)
Definition: res_corosync.c:246
struct stasis_forward * sub
Definition: res_corosync.c:240
const char * name
Definition: res_corosync.c:239
struct stasis_topic *(* topic_fn)(void)
Definition: res_corosync.c:245
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
Definition: res_corosync.c:68
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
static struct @432 event_types[]
#define NULL
Definition: resample.c:96
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
Generic container type.
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
Definition: utils.c:1771
#define ARRAY_LEN(a)
Definition: utils.h:666

References ao2_callback, ao2_cleanup, ao2_container_count(), ao2_t_ref, ARRAY_LEN, ast_carefulwrite(), ast_debug, ast_log, AST_PTHREADT_NULL, ast_rwlock_trywrlock, ast_rwlock_unlock, ast_rwlock_wrlock, cache_fn, cfg_handle, clear_node_cache(), corosync_aggregate_topic, corosync_node_joined, cpg_handle, dispatch_thread, errno, event_types, event_types_lock, init_cpg_lock, LOG_ERROR, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, stasis_cache_dump_all(), stasis_forward_cancel(), stasis_message_router_remove(), stasis_message_router_unsubscribe_and_join(), STASIS_MESSAGE_TYPE_CLEANUP, stasis_router, sub, subscribe, and topic_fn.

Referenced by load_module(), and unload_module().

◆ clear_node_cache()

static int clear_node_cache ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 623 of file res_corosync.c.

624{
625 struct stasis_message *cached_msg = obj;
626 struct stasis_topic *topic = arg;
627 struct stasis_message *msg;
628 struct ast_eid *msg_eid;
629
630 if (!cached_msg) {
631 return 0;
632 }
633
634 msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
635 if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
636 {
637 msg = stasis_cache_clear_create(cached_msg);
638 if (msg) {
639 stasis_publish(topic, msg);
640 ao2_cleanup(msg);
641 }
642 }
643
644 return 0;
645}
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:813
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: utils.c:3094
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

References ao2_cleanup, ast_eid_cmp(), ast_eid_default, stasis_cache_clear_create(), stasis_message_eid(), and stasis_publish().

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ corosync_node_alloc()

static struct corosync_node * corosync_node_alloc ( struct ast_event event)
static

Definition at line 97 of file res_corosync.c.

98{
99 struct corosync_node *node;
100
102 if (!node) {
103 return NULL;
104 }
105
106 memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
109
110 return node;
111}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition: event.c:302
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
@ AST_EVENT_IE_LOCAL_ADDR
Definition: event_defs.h:281
@ AST_EVENT_IE_EID
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
@ AST_EVENT_IE_NODE_ID
Cluster node ID Used by: Corosync Payload type: UINT.
Definition: event_defs.h:313
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
Definition: netsock2.c:230
Definition: astman.c:222
Definition: test_heap.c:38

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), AST_EVENT_IE_EID, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, ast_sockaddr_parse(), NULL, and PARSE_PORT_IGNORE.

Referenced by publish_cluster_discovery_to_stasis().

◆ corosync_node_cmp_fn()

static int corosync_node_cmp_fn ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 133 of file res_corosync.c.

134{
135 struct corosync_node *left = obj;
136 struct corosync_node *right = arg;
137 const int *id = arg;
138 int cmp;
139
140 switch (flags & OBJ_SEARCH_MASK) {
142 id = &right->id;
143 /* Fall through */
144 case OBJ_SEARCH_KEY:
145 cmp = (left->id == *id);
146 break;
148 cmp = (left->id == right->id);
149 break;
150 default:
151 /* Sort can only work on something with a full or partial key. */
152 ast_assert(0);
153 cmp = 1;
154 break;
155 }
156 return cmp ? CMP_MATCH : 0;
157}
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
pthread_t id
Definition: res_corosync.c:275
#define ast_assert(a)
Definition: utils.h:739

References ast_assert, CMP_MATCH, corosync_node::id, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by load_module().

◆ corosync_node_hash_fn()

static int corosync_node_hash_fn ( const void *  obj,
const int  flags 
)
static

Definition at line 113 of file res_corosync.c.

114{
115 const struct corosync_node *node;
116 const int *id;
117
118 switch (flags & OBJ_SEARCH_MASK) {
119 case OBJ_SEARCH_KEY:
120 id = obj;
121 break;
123 node = obj;
124 id = &node->id;
125 break;
126 default:
127 ast_assert(0);
128 return 0;
129 }
130 return *id;
131}

References ast_assert, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by load_module().

◆ corosync_ping()

static char * corosync_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1059 of file res_corosync.c.

1060{
1061 struct ast_event *event;
1062
1063 switch (cmd) {
1064 case CLI_INIT:
1065 e->command = "corosync ping";
1066 e->usage =
1067 "Usage: corosync ping\n"
1068 " Send a test ping to the cluster.\n"
1069 "A NOTICE will be in the log for every ping received\n"
1070 "on a server.\n If you send a ping, you should see a NOTICE\n"
1071 "in the log for every server in the cluster.\n";
1072 return NULL;
1073
1074 case CLI_GENERATE:
1075 return NULL; /* no completion */
1076 }
1077
1078 if (a->argc != e->args) {
1079 return CLI_SHOWUSAGE;
1080 }
1081
1083
1084 if (!event) {
1085 return CLI_FAILURE;
1086 }
1087
1088 event_types[AST_EVENT_PING].publish_to_stasis(event);
1089
1091 return CLI_SUCCESS;
1092}
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition: event.c:524
@ AST_EVENT_IE_END
Definition: event_defs.h:70
@ AST_EVENT_PING
Definition: event_defs.h:60
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
An event.
Definition: event.c:81

References a, ast_cli_entry::args, ast_event_destroy(), AST_EVENT_IE_END, ast_event_new(), AST_EVENT_PING, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, NULL, and ast_cli_entry::usage.

◆ corosync_ping_payload_dtor()

static void corosync_ping_payload_dtor ( void *  obj)
static

Destructor for the corosync_ping_payload wrapper object.

Definition at line 167 of file res_corosync.c.

168{
169 struct corosync_ping_payload *payload = obj;
170
171 ast_free(payload->event);
172}
#define ast_free(a)
Definition: astmm.h:180
A payload wrapper around a corosync ping event.
Definition: res_corosync.c:161
struct ast_event * event
Definition: res_corosync.c:163

References ast_free, and corosync_ping_payload::event.

Referenced by publish_corosync_ping_to_stasis().

◆ corosync_ping_to_event()

static struct ast_event * corosync_ping_to_event ( struct stasis_message message)
static

Convert a Corosync PING to a ast_event.

Definition at line 175 of file res_corosync.c.

176{
177 struct corosync_ping_payload *payload;
178 struct ast_event *event;
179 struct ast_eid *event_eid;
180
181 if (!message) {
182 return NULL;
183 }
184
185 payload = stasis_message_data(message);
186
187 if (!payload->event) {
188 return NULL;
189 }
190
191 event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
192
194 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
196
197 return event;
198}
@ AST_EVENT_IE_PLTYPE_RAW
Definition: event_defs.h:330
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.

References ast_event_get_ie_raw(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload::event, NULL, and stasis_message_data().

◆ corosync_show_config()

static char * corosync_show_config ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1094 of file res_corosync.c.

1095{
1096 unsigned int i;
1097
1098 switch (cmd) {
1099 case CLI_INIT:
1100 e->command = "corosync show config";
1101 e->usage =
1102 "Usage: corosync show config\n"
1103 " Show configuration loaded from res_corosync.conf\n";
1104 return NULL;
1105
1106 case CLI_GENERATE:
1107 return NULL; /* no completion */
1108 }
1109
1110 if (a->argc != e->args) {
1111 return CLI_SHOWUSAGE;
1112 }
1113
1114 ast_cli(a->fd, "\n"
1115 "=============================================================\n"
1116 "=== res_corosync config =====================================\n"
1117 "=============================================================\n"
1118 "===\n");
1119
1121 ast_debug(5, "corosync_show_config rdlock\n");
1122 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1123 if (event_types[i].publish) {
1124 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
1125 event_types[i].name);
1126 }
1127 if (event_types[i].subscribe) {
1128 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
1129 event_types[i].name);
1130 }
1131 }
1133 ast_debug(5, "corosync_show_config unlock\n");
1134
1135 ast_cli(a->fd, "===\n"
1136 "=============================================================\n"
1137 "\n");
1138
1139 return CLI_SUCCESS;
1140}
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define ast_rwlock_rdlock(a)
Definition: lock.h:235
unsigned char publish
Definition: res_corosync.c:241

References a, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_rwlock_rdlock, ast_rwlock_unlock, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, event_types_lock, NULL, publish, subscribe, and ast_cli_entry::usage.

◆ corosync_show_members()

static char * corosync_show_members ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 965 of file res_corosync.c.

966{
967 cs_error_t cs_err;
968 cpg_iteration_handle_t cpg_iter;
969 struct cpg_iteration_description_t cpg_desc;
970 unsigned int i;
971
972 switch (cmd) {
973 case CLI_INIT:
974 e->command = "corosync show members";
975 e->usage =
976 "Usage: corosync show members\n"
977 " Show corosync cluster members\n";
978 return NULL;
979
980 case CLI_GENERATE:
981 return NULL; /* no completion */
982 }
983
984 if (a->argc != e->args) {
985 return CLI_SHOWUSAGE;
986 }
987
989 ast_debug(5, "corosync_show_members rdlock\n");
990 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
991
992 if (cs_err != CS_OK) {
993 ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
994 cpg_iteration_finalize(cpg_iter);
996 ast_debug(5, "corosync_show_members unlock\n");
997 return CLI_FAILURE;
998 }
999
1000 ast_cli(a->fd, "\n"
1001 "=============================================================\n"
1002 "=== Cluster members =========================================\n"
1003 "=============================================================\n"
1004 "===\n");
1005
1006 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1007 cs_err == CS_OK;
1008 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010 corosync_cfg_node_address_t addrs[8];
1011 int num_addrs = 0;
1012 unsigned int j;
1013 #endif
1014
1015 ast_cli(a->fd, "=== Node %u\n", i);
1016 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
1017
1018 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1019 /*
1020 * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
1021 * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
1022 * resulting in crash.
1023 */
1024 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1025 ARRAY_LEN(addrs), &num_addrs, addrs);
1026 if (cs_err != CS_OK) {
1027 ast_log(LOG_WARNING, "Failed to get node addresses\n");
1028 continue;
1029 }
1030
1031 for (j = 0; j < num_addrs; j++) {
1032 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
1033 size_t sa_len = (size_t) addrs[j].address_length;
1034 char buf[128];
1035
1036 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
1037
1038 ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
1039 }
1040 #else
1041 ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
1042 #endif
1043 }
1044
1045 ast_cli(a->fd, "===\n"
1046 "=============================================================\n"
1047 "\n");
1048
1049 cpg_iteration_finalize(cpg_iter);
1051 ast_debug(5, "corosync_show_members unlock\n");
1052 } else {
1053 ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
1054 }
1055
1056 return CLI_SUCCESS;
1057}
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
char * address
Definition: f2c.h:59
#define LOG_WARNING
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:237

References a, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, cpg_handle, init_cpg_lock, LOG_WARNING, NULL, and ast_cli_entry::usage.

◆ corosync_topic()

static struct stasis_topic * corosync_topic ( void  )
static

Internal accessor for our topic.

Definition at line 74 of file res_corosync.c.

75{
77}

References corosync_aggregate_topic.

Referenced by load_general_config(), and publish_corosync_ping_to_stasis().

◆ cpg_confchg_cb()

static void cpg_confchg_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
const struct cpg_address *  member_list,
size_t  member_list_entries,
const struct cpg_address *  left_list,
size_t  left_list_entries,
const struct cpg_address *  joined_list,
size_t  joined_list_entries 
)
static

Definition at line 647 of file res_corosync.c.

651{
652 unsigned int i;
653
654
655 for (i = 0; i < left_list_entries; i++) {
656 const struct cpg_address *cpg_node = &left_list[i];
657 struct corosync_node* node;
658 unsigned int j;
659
660 node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
661 if (!node) {
662 continue;
663 }
664
665 for (j = 0; j < ARRAY_LEN(event_types); j++) {
666 struct ao2_container *messages;
667 int messages_count;
668
670 ast_debug(5, "cpg_confchg_cb rdlock\n");
671 if (!event_types[j].subscribe) {
673 ast_debug(5, "cpg_confchg_cb unlock\n");
674 continue;
675 }
676
679 ast_debug(5, "cpg_confchg_cb unlock\n");
680 continue;
681 }
683 ast_debug(5, "cpg_confchg_cb unlock\n");
684
686
687 messages_count = ao2_container_count(messages);
688 ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
690 ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
691
692 ao2_t_ref(messages, -1, "Dispose of flushed cache");
693 }
694
696 ao2_ref(node, -1);
697 }
698
699 /* If any new nodes have joined, dump our cache of events we are publishing
700 * that originated from this server. */
701 if (!joined_list_entries) {
702 return;
703 }
704
705 for (i = 0; i < ARRAY_LEN(event_types); i++) {
706 struct ao2_container *messages;
707 int messages_count;
708
710 ast_debug(5, "cpg_confchg_cb rdlock\n");
711 if (!event_types[i].publish) {
713 ast_debug(5, "cpg_confchg_cb unlock\n");
714 continue;
715 }
716
719 ast_debug(5, "cpg_confchg_cb unlock\n");
720 continue;
721 }
723 ast_debug(5, "cpg_confchg_cb unlock\n");
724
726
727 messages_count = ao2_container_count(messages);
728 ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
730 ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
731
732 ao2_t_ref(messages, -1, "Dispose of dumped cache");
733 }
734}
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_UNLINK
Definition: astobj2.h:1039
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Definition: res_corosync.c:303
static int dump_cache_cb(void *obj, void *arg, int flags)
Definition: res_corosync.c:610
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718

References ao2_callback, ao2_container_count(), ao2_find, ao2_ref, ao2_t_ref, ARRAY_LEN, ast_debug, ast_eid_default, ast_log, ast_rwlock_rdlock, ast_rwlock_unlock, cache_fn, clear_node_cache(), dump_cache_cb(), event_types, event_types_lock, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, publish, publish_cluster_discovery_to_stasis_full(), stasis_cache_dump_by_eid(), subscribe, and topic_fn.

◆ cpg_deliver_cb()

static void cpg_deliver_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
uint32_t  nodeid,
uint32_t  pid,
void *  msg,
size_t  msg_len 
)
static

Definition at line 484 of file res_corosync.c.

486{
487 struct ast_event *event;
488 void (*publish_handler)(struct ast_event *) = NULL;
489 enum ast_event_type event_type;
490 struct ast_eid *event_eid;
491
492 if (msg_len < ast_event_minimum_length()) {
493 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
494 (unsigned int) msg_len,
495 (unsigned int) ast_event_minimum_length());
496 return;
497 }
498
499 event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
500 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
501 /* Don't feed events back in that originated locally. */
502 return;
503 }
504
505 event_type = ast_event_get_type(msg);
506 if (event_type > AST_EVENT_TOTAL) {
507 /* Egads, we don't support this */
508 return;
509 }
510
512 ast_debug(5, "cpg_deliver_cb rdlock\n");
513 publish_handler = event_types[event_type].publish_to_stasis;
514 if (!event_types[event_type].subscribe || !publish_handler) {
515 /* We are not configured to subscribe to these events or
516 we have no way to publish it internally. */
518 ast_debug(5, "cpg_deliver_cb unlock\n");
519 return;
520 }
522 ast_debug(5, "cpg_deliver_cb unlock\n");
523
524 if (!(event = ast_malloc(msg_len))) {
525 return;
526 }
527
528 memcpy(event, msg, msg_len);
529
530 if (event_type == AST_EVENT_PING) {
531 const struct ast_eid *eid;
532 char buf[128] = "";
533
535 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
536 ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
537 }
538 ast_debug(5, "Publishing event %s (%u) to stasis\n",
539 ast_event_get_type_name(event), event_type);
540 publish_handler(event);
542}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
Definition: event.c:529
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
Definition: event.c:194
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
ast_event_type
Definition: event_defs.h:28
@ AST_EVENT_TOTAL
Definition: event_defs.h:64
unsigned char eid[6]
Definition: utils.h:814
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: utils.c:2839

References ast_debug, ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_type(), ast_event_get_type_name(), AST_EVENT_IE_EID, ast_event_minimum_length(), AST_EVENT_PING, AST_EVENT_TOTAL, ast_free, ast_log, ast_malloc, ast_rwlock_rdlock, ast_rwlock_unlock, buf, ast_eid::eid, event_types, event_types_lock, LOG_NOTICE, NULL, and subscribe.

◆ dispatch_thread_handler()

static void * dispatch_thread_handler ( void *  data)
static

Definition at line 782 of file res_corosync.c.

783{
784 cs_error_t cs_err;
785 struct pollfd pfd[3] = {
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 { .events = POLLIN, },
789 };
790
792 ast_debug(5, "dispatch_thread_handler rdlock\n");
793 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
794 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
796 ast_debug(5, "dispatch_thread_handler unlock\n");
797 return NULL;
798 }
799
800 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
801 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
803 ast_debug(5, "dispatch_thread_handler unlock\n");
804 return NULL;
805 }
806
807 pfd[2].fd = dispatch_thread.alert_pipe[0];
809 ast_debug(5, "dispatch_thread_handler unlock\n");
810 } else {
811 ast_log(LOG_ERROR, "Failed to get fd: initializing CPG. This module is now broken.\n");
812 return NULL;
813 }
815 while (!dispatch_thread.stop) {
816 int res;
817
818 cs_err = CS_OK;
819
820 pfd[0].revents = 0;
821 pfd[1].revents = 0;
822 pfd[2].revents = 0;
823
824 res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
825 if (res == -1 && errno != EINTR && errno != EAGAIN) {
826 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
827 cs_err = CS_ERR_BAD_HANDLE;
828 } else if (res == 0) {
829 unsigned int local_nodeid;
830
832 ast_debug(5, "dispatch_thread_handler rdlock\n");
833 if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
834 struct cpg_name name;
835 struct cpg_address address[CPG_MEMBERS_MAX];
836 int entries = CPG_MEMBERS_MAX;
837
838 ast_copy_string(name.value, "asterisk", sizeof(name.value));
839 name.length = strlen(name.value);
840 if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
841 int i;
842 int found = 0;
843
844 ast_debug(1, "CPG group has %i node membership\n", entries);
845 for (i = 0; (i < entries) && !found; i++) {
846 if (address[i].nodeid == local_nodeid)
847 found = 1;
848 }
849 if (!found) {
850 ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
852 cs_err = CS_ERR_BAD_HANDLE;
853 }
854 } else {
855 ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
857 cs_err = CS_ERR_BAD_HANDLE;
858 }
859 } else {
860 ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
862 cs_err = CS_ERR_BAD_HANDLE;
863 }
865 ast_debug(5, "dispatch_thread_handler unlock\n");
866 } else {
867 ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
869 cs_err = CS_ERR_BAD_HANDLE;
870 }
871 } else {
873 ast_debug(5, "dispatch_thread_handler rdlock\n");
874 if (pfd[0].revents & POLLIN) {
875 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
876 ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
877 }
878 }
879
880 if (pfd[1].revents & POLLIN) {
881 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
882 ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
883 }
884 }
886 ast_debug(5, "dispatch_thread_handler unlock\n");
887 } else {
888 ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
889 }
890 }
891 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
892
893 /* If corosync gets restarted out from under Asterisk, try to recover. */
894
895 ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
896
898 struct cpg_name name;
899 ast_debug(5, "dispatch_thread_handler wrlock\n");
900
902 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
903 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
904 }
905
906 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
907 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
908 }
909
910 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
911 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
913 ast_debug(5, "dispatch_thread_handler unlock\n");
914 sleep(5);
915 continue;
916 }
917
918 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
919 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
921 ast_debug(5, "dispatch_thread_handler unlock\n");
922 sleep(5);
923 continue;
924 }
925
926 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
927 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
929 ast_debug(5, "dispatch_thread_handler unlock\n");
930 sleep(5);
931 continue;
932 }
933
934 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
935 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
937 ast_debug(5, "dispatch_thread_handler unlock\n");
938 sleep(5);
939 continue;
940 }
941
942 ast_copy_string(name.value, "asterisk", sizeof(name.value));
943 name.length = strlen(name.value);
944 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
945 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
947 ast_debug(5, "dispatch_thread_handler unlock\n");
948 sleep(5);
949 continue;
950 }
953 ast_debug(5, "dispatch_thread_handler unlock\n");
954 ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
956 } else {
957 ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
958 }
959 }
960 }
961
962 return NULL;
963}
#define ast_poll(a, b, c)
Definition: poll-compat.h:88
static cpg_callbacks_t cpg_callbacks
Definition: res_corosync.c:466
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
Definition: res_corosync.c:55
static corosync_cfg_callbacks_t cfg_callbacks
Definition: res_corosync.c:295
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Definition: res_corosync.c:737
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425

References ARRAY_LEN, ast_copy_string(), ast_debug, ast_log, ast_poll, ast_rwlock_tryrdlock, ast_rwlock_trywrlock, ast_rwlock_unlock, cfg_callbacks, cfg_handle, corosync_node_joined, COROSYNC_POLL_TIMEOUT, cpg_callbacks, cpg_handle, dispatch_thread, errno, init_cpg_lock, LOG_ERROR, LOG_NOTICE, LOG_WARNING, name, NULL, and send_cluster_notify().

Referenced by load_module().

◆ dump_cache_cb()

static int dump_cache_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 610 of file res_corosync.c.

611{
612 struct stasis_message *message = obj;
613
614 if (!message) {
615 return 0;
616 }
617
619
620 return 0;
621}
static void publish_to_corosync(struct stasis_message *message)
Definition: res_corosync.c:571

References publish_to_corosync().

Referenced by cpg_confchg_cb().

◆ load_config()

static int load_config ( unsigned int  reload)
static

Definition at line 1222 of file res_corosync.c.

1223{
1224 static const char filename[] = "res_corosync.conf";
1225 struct ast_config *cfg;
1226 const char *cat = NULL;
1227 struct ast_flags config_flags = { 0 };
1228 int res = 0;
1229
1230 cfg = ast_config_load(filename, config_flags);
1231
1233 return -1;
1234 }
1235
1236 while ((cat = ast_category_browse(cfg, cat))) {
1237 if (!strcasecmp(cat, "general")) {
1238 res = load_general_config(cfg);
1239 } else {
1240 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1241 }
1242 }
1243
1244 ast_config_destroy(cfg);
1245
1246 return res;
1247}
#define ast_config_load(filename, flags)
Load a config file.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
Definition: extconf.c:3326
#define CONFIG_STATUS_FILEMISSING
#define CONFIG_STATUS_FILEINVALID
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
Definition: extconf.c:1289
static int load_general_config(struct ast_config *cfg)
Structure used to handle boolean flags.
Definition: utils.h:199

References ast_category_browse(), ast_config_destroy(), ast_config_load, ast_log, CONFIG_STATUS_FILEINVALID, CONFIG_STATUS_FILEMISSING, load_general_config(), LOG_WARNING, and NULL.

Referenced by load_module().

◆ load_general_config()

static int load_general_config ( struct ast_config cfg)
static

Definition at line 1177 of file res_corosync.c.

1178{
1179 struct ast_variable *v;
1180 int res = 0;
1181 unsigned int i;
1182
1184 ast_debug(5, "load_general_config wrlock\n");
1185
1186 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1187 event_types[i].publish = event_types[i].publish_default;
1188 event_types[i].subscribe = event_types[i].subscribe_default;
1189 }
1190
1191 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
1192 if (!strcasecmp(v->name, "publish_event")) {
1193 res = set_event(v->value, PUBLISH);
1194 } else if (!strcasecmp(v->name, "subscribe_event")) {
1195 res = set_event(v->value, SUBSCRIBE);
1196 } else {
1197 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1198 }
1199 }
1200
1201 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1202 if (event_types[i].publish && !event_types[i].sub) {
1204 corosync_topic());
1208 NULL);
1209 } else if (!event_types[i].publish && event_types[i].sub) {
1213 }
1214 }
1215
1217 ast_debug(5, "load_general_config unlock\n");
1218
1219 return res;
1220}
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Definition: extconf.c:1215
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
Definition: res_corosync.c:74
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: res_corosync.c:601
static int set_event(const char *event_type, int pubsub)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next

References ARRAY_LEN, ast_debug, ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, ast_variable_browse(), corosync_topic(), event_types, event_types_lock, LOG_WARNING, message_type_fn, ast_variable::name, ast_variable::next, NULL, publish, PUBLISH, set_event(), stasis_forward_all(), stasis_forward_cancel(), stasis_message_cb(), stasis_message_router_add(), stasis_message_router_remove(), stasis_router, sub, SUBSCRIBE, topic_fn, and ast_variable::value.

Referenced by load_config().

◆ load_module()

static int load_module ( void  )
static

Definition at line 1336 of file res_corosync.c.

1337{
1338 cs_error_t cs_err;
1339 struct cpg_name name;
1340
1342 ast_log(LOG_ERROR, "Entity ID is not set.\n");
1344 }
1345
1348 if (!nodes) {
1349 goto failed;
1350 }
1351
1352 corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
1354 ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1355 goto failed;
1356 }
1357
1359 if (!stasis_router) {
1360 ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1361 goto failed;
1362 }
1365
1366 if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1367 ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1368 goto failed;
1369 }
1370
1371 if (load_config(0)) {
1372 /* simply not configured is not a fatal error */
1373 goto failed;
1374 }
1375
1378 ast_debug(5, "load_module wrlock\n");
1379 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1380 ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1382 ast_debug(5, "load_module unlock\n");
1383 goto failed;
1384 }
1385
1386 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1387 ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1389 ast_debug(5, "load_module unlock\n");
1390 goto failed;
1391 }
1392
1393 ast_copy_string(name.value, "asterisk", sizeof(name.value));
1394 name.length = strlen(name.value);
1395
1396 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1397 ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1399 ast_debug(5, "load_module unlock\n");
1400 goto failed;
1401 }
1402
1403 if (pipe(dispatch_thread.alert_pipe) == -1) {
1404 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1405 strerror(errno), errno);
1407 ast_debug(5, "load_module unlock\n");
1408 goto failed;
1409 }
1411
1413 ast_debug(5, "load_module unlock\n");
1416 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1417 goto failed;
1418 }
1419
1421 } else {
1422 goto failed;
1423 }
1424
1426
1427failed:
1429
1431}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define AST_LOG_ERROR
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
static void * dispatch_thread_handler(void *data)
Definition: res_corosync.c:782
static struct ast_cli_entry corosync_cli[]
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
Definition: res_corosync.c:92
static void cleanup_module(void)
static int corosync_node_hash_fn(const void *obj, const int flags)
Definition: res_corosync.c:113
static int load_config(unsigned int reload)
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
Definition: res_corosync.c:133
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
#define stasis_message_router_create(topic)
Create a new message router object.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
Definition: utils.c:3099

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ARRAY_LEN, ast_cli_register_multiple, ast_copy_string(), ast_debug, ast_eid_default, ast_eid_is_empty(), ast_log, AST_LOG_ERROR, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_rwlock_trywrlock, ast_rwlock_unlock, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, cfg_callbacks, cfg_handle, cleanup_module(), corosync_aggregate_topic, corosync_cli, corosync_node_cmp_fn(), corosync_node_hash_fn(), corosync_node_joined, corosync_pthread_create_background, cpg_callbacks, cpg_handle, dispatch_thread, dispatch_thread_handler(), errno, init_cpg_lock, load_config(), LOG_ERROR, name, nodes, NULL, stasis_message_router_create, stasis_message_router_set_congestion_limits(), STASIS_MESSAGE_TYPE_INIT, stasis_router, and stasis_topic_create().

◆ publish_cluster_discovery_to_stasis()

static void publish_cluster_discovery_to_stasis ( struct ast_event event)
static

Publish a received cluster discovery ast_event to Stasis Message Bus API.

Definition at line 351 of file res_corosync.c.

352{
353 struct corosync_node *node;
355 struct ast_eid *event_eid;
356
358
359 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
360 if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
361 /* Don't feed events back in that originated locally. */
362 return;
363 }
364
367 if (node) {
368 /* We already know about this node */
370 ao2_ref(node, -1);
371 return;
372 }
373
375 if (!node) {
377 return;
378 }
381
383
384 ao2_ref(node, -1);
385
386 /*
387 * When we get news that someone else has joined, we need to let them
388 * know we exist as well.
389 */
391}
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ AST_EVENT_CLUSTER_DISCOVERY
Definition: event_defs.h:62
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
Definition: res_corosync.c:97

References ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_eid_cmp(), ast_eid_default, AST_EVENT_CLUSTER_DISCOVERY, ast_event_get_ie_raw(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_NODE_ID, corosync_node_alloc(), nodes, OBJ_NOLOCK, OBJ_SEARCH_KEY, publish_cluster_discovery_to_stasis_full(), and send_cluster_notify().

◆ publish_cluster_discovery_to_stasis_full()

static void publish_cluster_discovery_to_stasis_full ( struct corosync_node node,
int  joined 
)
static

Publish cluster discovery to Stasis Message Bus API.

Definition at line 303 of file res_corosync.c.

304{
305 struct ast_json *json;
306 struct ast_json_payload *payload;
307 struct stasis_message *message;
308 char eid[18];
309 const char *addr;
310
311 ast_eid_to_str(eid, sizeof(eid), &node->eid);
312 addr = ast_sockaddr_stringify_addr(&node->addr);
313
314 ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
315 node->id,
316 eid,
317 addr,
318 joined ? "joined" : "left");
319
320 json = ast_json_pack("{s: s, s: i, s: s, s: i}",
321 "address", addr,
322 "node_id", node->id,
323 "eid", eid,
324 "joined", joined);
325 if (!json) {
326 return;
327 }
328
329 payload = ast_json_payload_create(json);
330 if (!payload) {
331 ast_json_unref(json);
332 return;
333 }
334
336 if (!message) {
337 ast_json_unref(json);
338 ao2_ref(payload, -1);
339 return;
340 }
341
343 ast_json_unref(json);
344 ao2_ref(payload, -1);
345 ao2_ref(message, -1);
346}
#define AST_LOG_NOTICE
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
Definition: json.c:756
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
Definition: netsock2.h:286
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes.
Abstract JSON element (object, array, string, int, ...).
struct ast_eid eid

References ao2_ref, ast_cluster_discovery_type(), ast_eid_to_str(), ast_json_pack(), ast_json_payload_create(), ast_json_unref(), ast_log, AST_LOG_NOTICE, ast_sockaddr_stringify_addr(), ast_system_topic(), stasis_message::eid, stasis_message_create(), and stasis_publish().

Referenced by cpg_confchg_cb(), and publish_cluster_discovery_to_stasis().

◆ publish_corosync_ping_to_stasis()

static void publish_corosync_ping_to_stasis ( struct ast_event event)
static

Publish a Corosync ping to Stasis Message Bus API.

Definition at line 204 of file res_corosync.c.

205{
206 struct corosync_ping_payload *payload;
207 struct stasis_message *message;
208 struct ast_eid *event_eid;
209
212
213 if (!corosync_ping_message_type()) {
214 return;
215 }
216
217 payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
218 if (!payload) {
219 return;
220 }
221 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
223 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
225
226 message = stasis_message_create(corosync_ping_message_type(), payload);
227 if (!message) {
228 ao2_t_ref(payload, -1, "Destroy payload on off nominal");
229 return;
230 }
231
233
234 ao2_t_ref(payload, -1, "Hand ref to stasis");
235 ao2_t_ref(message, -1, "Hand ref to stasis");
236}
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
Definition: res_corosync.c:167

References ao2_t_alloc, ao2_t_ref, ast_assert, ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload_dtor(), corosync_topic(), corosync_ping_payload::event, NULL, stasis_message_create(), and stasis_publish().

◆ publish_device_state_to_stasis()

static void publish_device_state_to_stasis ( struct ast_event event)
static

Publish a received device state ast_event to Stasis Message Bus API.

Definition at line 432 of file res_corosync.c.

433{
434 const char *device;
436 unsigned int cachable;
437 struct ast_eid *event_eid;
438
440
444 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
445
446 if (ast_strlen_zero(device)) {
447 return;
448 }
449
450 if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
451 char eid[18];
452 ast_eid_to_str(eid, sizeof(eid), event_eid);
453 ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
454 device, eid);
455 }
456}
enum cc_state state
Definition: ccss.c:393
ast_device_state
Device States.
Definition: devicestate.h:52
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
Definition: devicestate.c:709
@ AST_EVENT_IE_STATE
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
Definition: event_defs.h:121
@ AST_EVENT_IE_DEVICE
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
Definition: event_defs.h:113
@ AST_EVENT_IE_CACHABLE
Event non-cacheability flag Used by: All events Payload type: UINT.
Definition: event_defs.h:306
@ AST_EVENT_DEVICE_STATE_CHANGE
Definition: event_defs.h:48
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65

References ast_assert, ast_eid_to_str(), AST_EVENT_DEVICE_STATE_CHANGE, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CACHABLE, AST_EVENT_IE_DEVICE, AST_EVENT_IE_EID, AST_EVENT_IE_STATE, ast_log, ast_publish_device_state_full(), ast_strlen_zero(), ast_eid::eid, LOG_WARNING, and state.

◆ publish_event_to_corosync()

static void publish_event_to_corosync ( struct ast_event event)
static

Definition at line 544 of file res_corosync.c.

545{
546 cs_error_t cs_err;
547 struct iovec iov;
548
549 iov.iov_base = (void *)event;
550 iov.iov_len = ast_event_get_size(event);
551
552 ast_debug(5, "Publishing event %s (%u) to corosync\n",
554
555 /* The stasis subscription will only exist if we are configured to publish
556 * these events, so just send away. */
558 ast_debug(5, "publish_event_to_corosync rdlock\n");
559 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
560 ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
562 }
564 ast_debug(5, "publish_event_to_corosync unlock\n");
565 } else {
566 ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
568 }
569}
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
Definition: event.c:228

References ast_debug, ast_event_get_size(), ast_event_get_type(), ast_event_get_type_name(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, corosync_node_joined, cpg_handle, init_cpg_lock, and LOG_WARNING.

Referenced by publish_to_corosync(), and send_cluster_notify().

◆ publish_mwi_to_stasis()

static void publish_mwi_to_stasis ( struct ast_event event)
static

Publish a received MWI ast_event to Stasis Message Bus API.

Definition at line 394 of file res_corosync.c.

395{
396 const char *mailbox;
397 const char *context;
398 unsigned int new_msgs;
399 unsigned int old_msgs;
400 struct ast_eid *event_eid;
401
403
408 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
409
411 return;
412 }
413
414 if (new_msgs > INT_MAX) {
415 new_msgs = INT_MAX;
416 }
417
418 if (old_msgs > INT_MAX) {
419 old_msgs = INT_MAX;
420 }
421
422 if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
423 (int)old_msgs, NULL, event_eid)) {
424 char eid[18];
425 ast_eid_to_str(eid, sizeof(eid), event_eid);
426 ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
428 }
429}
@ AST_EVENT_IE_CONTEXT
Context IE Used by AST_EVENT_MWI Payload type: str.
Definition: event_defs.h:127
@ AST_EVENT_IE_MAILBOX
Mailbox name.
Definition: event_defs.h:89
@ AST_EVENT_IE_OLDMSGS
Number of Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:83
@ AST_EVENT_IE_NEWMSGS
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:77
@ AST_EVENT_MWI
Definition: event_defs.h:38
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
Definition: mwi.c:393

References ast_assert, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CONTEXT, AST_EVENT_IE_EID, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_OLDMSGS, AST_EVENT_MWI, ast_log, ast_publish_mwi_state_full(), ast_strlen_zero(), voicemailpwcheck::context, ast_eid::eid, LOG_WARNING, voicemailpwcheck::mailbox, and NULL.

◆ publish_to_corosync()

static void publish_to_corosync ( struct stasis_message message)
static

Definition at line 571 of file res_corosync.c.

572{
573 struct ast_event *event;
574 struct ast_eid *event_eid;
575
577 if (!event) {
578 return;
579 }
580
581 event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
582 if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
583 /* If the event didn't originate from this server, don't send it back out. */
585 return;
586 }
587
589 const struct ast_eid *eid;
590 char buf[128] = "";
591
593 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
594 ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
595 }
596
599}
static void publish_event_to_corosync(struct ast_event *event)
Definition: res_corosync.c:544
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.

References ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_destroy(), ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_PING, ast_log, buf, ast_eid::eid, LOG_NOTICE, publish_event_to_corosync(), and stasis_message_to_event().

Referenced by dump_cache_cb(), and stasis_message_cb().

◆ send_cluster_notify()

static void send_cluster_notify ( void  )
static

Informs the cluster of our EID and our IP addresses.

Definition at line 737 of file res_corosync.c.

738{
739 struct ast_event *event;
740 unsigned int node_id;
741 cs_error_t cs_err;
742 corosync_cfg_node_address_t corosync_addr;
743 int num_addrs = 0;
744 struct sockaddr *sa;
745 size_t sa_len;
746 char buf[128];
747 int res;
748
750 ast_debug(5, "send_cluster_notify rdlock\n");
751
752 if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
753 ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
754 return;
755 }
756
757 if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758 ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
759 return;
760 }
761
763 ast_debug(5, "send_cluster_notify unlock\n");
764 }
765
766 sa = (struct sockaddr *)corosync_addr.address;
767 sa_len = (size_t)corosync_addr.address_length;
768 if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
769 ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770 gai_strerror(res), res);
771 return;
772 }
773
780}
@ AST_EVENT_IE_PLTYPE_UINT
Definition: event_defs.h:326
@ AST_EVENT_IE_PLTYPE_STR
Definition: event_defs.h:328

References ast_debug, AST_EVENT_CLUSTER_DISCOVERY, ast_event_destroy(), AST_EVENT_IE_END, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_STR, AST_EVENT_IE_PLTYPE_UINT, ast_event_new(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, init_cpg_lock, LOG_WARNING, NULL, and publish_event_to_corosync().

Referenced by dispatch_thread_handler(), and publish_cluster_discovery_to_stasis().

◆ set_event()

static int set_event ( const char *  event_type,
int  pubsub 
)
static

Definition at line 1153 of file res_corosync.c.

1154{
1155 unsigned int i;
1156
1157 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1158 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1159 continue;
1160 }
1161
1162 switch (pubsub) {
1163 case PUBLISH:
1164 event_types[i].publish = 1;
1165 break;
1166 case SUBSCRIBE:
1167 event_types[i].subscribe = 1;
1168 break;
1169 }
1170
1171 break;
1172 }
1173
1174 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1175}

References ARRAY_LEN, event_types, name, PUBLISH, and SUBSCRIBE.

Referenced by load_general_config().

◆ stasis_message_cb()

static void stasis_message_cb ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 601 of file res_corosync.c.

602{
603 if (!message) {
604 return;
605 }
606
608}

References publish_to_corosync().

Referenced by load_general_config().

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL()

STASIS_MESSAGE_TYPE_DEFN_LOCAL ( corosync_ping_message_type  ,
to_event = corosync_ping_to_event 
)

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 1433 of file res_corosync.c.

1434{
1436
1438
1439 return 0;
1440}
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30

References ARRAY_LEN, ast_cli_unregister_multiple(), cleanup_module(), and corosync_cli.

Variable Documentation

◆ alert_pipe

int alert_pipe[2]

◆ cache_fn

struct stasis_cache *(* cache_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ cfg_callbacks

corosync_cfg_callbacks_t cfg_callbacks
static

Definition at line 295 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cfg_handle

corosync_cfg_handle_t cfg_handle
static

◆ corosync_aggregate_topic

struct stasis_topic* corosync_aggregate_topic
static

The internal topic used for message forwarding and pings.

Definition at line 68 of file res_corosync.c.

Referenced by cleanup_module(), corosync_topic(), and load_module().

◆ corosync_cli

struct ast_cli_entry corosync_cli[]
static
Initial value:
= {
{ .handler = corosync_show_config , .summary = "Show configuration" ,},
{ .handler = corosync_show_members , .summary = "Show cluster members" ,},
{ .handler = corosync_ping , .summary = "Send a test ping to the cluster" ,},
}
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: res_corosync.c:965

Definition at line 1142 of file res_corosync.c.

Referenced by load_module(), and unload_module().

◆ corosync_node_joined

int corosync_node_joined = 0
static

Join to corosync.

Definition at line 62 of file res_corosync.c.

Referenced by cleanup_module(), dispatch_thread_handler(), load_module(), and publish_event_to_corosync().

◆ cpg_callbacks

cpg_callbacks_t cpg_callbacks
static
Initial value:
= {
.cpg_deliver_fn = cpg_deliver_cb,
.cpg_confchg_fn = cpg_confchg_cb,
}
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
Definition: res_corosync.c:484
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: res_corosync.c:647

Definition at line 466 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cpg_handle

cpg_handle_t cpg_handle
static

◆ 

struct { ... } dispatch_thread
Initial value:
= {
.alert_pipe = { -1, -1 },
}

Referenced by cleanup_module(), dispatch_thread_handler(), and load_module().

◆ 

struct { ... } event_types[]

◆ event_types_lock

ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

◆ id

pthread_t id

Definition at line 275 of file res_corosync.c.

Referenced by corosync_node_cmp_fn(), and corosync_node_hash_fn().

◆ init_cpg_lock

ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

◆ message_type_fn

struct stasis_message_type *(* message_type_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().

◆ name

const char* name

◆ nodes

struct ao2_container* nodes
static

All the nodes that we're aware of.

Definition at line 65 of file res_corosync.c.

Referenced by AST_TEST_DEFINE(), cleanup_module(), cpg_confchg_cb(), load_module(), and publish_cluster_discovery_to_stasis().

◆ publish

unsigned char publish

◆ publish_default

unsigned char publish_default

Definition at line 242 of file res_corosync.c.

◆ publish_to_stasis

void(* publish_to_stasis) (struct ast_event *) ( struct ast_event )

Definition at line 248 of file res_corosync.c.

◆ stasis_router

struct stasis_message_router* stasis_router
static

Our Stasis Message Bus API message router.

Definition at line 71 of file res_corosync.c.

Referenced by cleanup_module(), load_general_config(), and load_module().

◆ stop

unsigned int stop

Definition at line 277 of file res_corosync.c.

◆ sub

struct stasis_forward* sub

Definition at line 240 of file res_corosync.c.

Referenced by add_mwi_datastore(), allocate_body_part(), allocate_subscription(), analog_dial_digits(), analog_get_sub_fd(), analog_new_ast_channel(), analog_play_tone(), analog_set_inthreeway(), analog_set_linear_mode(), ao2_weakproxy_subscribe(), ao2_weakproxy_unsubscribe(), ast_channel_connected_line_sub(), ast_channel_redirecting_sub(), ast_mwi_subscribe_pool(), ast_mwi_subscriber_data(), ast_mwi_subscriber_subscription(), ast_mwi_subscriber_topic(), ast_mwi_unsubscribe(), ast_mwi_unsubscribe_and_join(), ast_sip_create_subscription(), ast_sip_subscription_destroy(), ast_sip_subscription_get_body_subtype(), ast_sip_subscription_get_body_type(), ast_sip_subscription_get_dialog(), ast_sip_subscription_get_endpoint(), ast_sip_subscription_get_header(), ast_sip_subscription_get_local_uri(), ast_sip_subscription_get_remote_uri(), ast_sip_subscription_get_resource_name(), ast_sip_subscription_get_serializer(), ast_sip_subscription_get_sip_uri(), ast_sip_subscription_is_terminated(), ast_sip_subscription_notify(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), bridge_subscription_change_handler(), build_body_part(), build_rlmi_body(), caching_topic_exec(), cel_local_optimization_begin_cb(), cel_local_optimization_end_cb(), channel_to_session(), cleanup_module(), close_call(), close_client(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), create_unsolicited_mwi_subscriptions(), create_virtual_subscriptions(), delete_device(), destroy_subscription(), destroy_subscriptions_task(), device_state_cb(), device_state_subscription_create(), device_state_subscription_destroy(), device_to_json_cb(), discard_call(), dispatch_exec_async(), dispatch_exec_sync(), dispatch_message(), endpoint_subscription_change(), exten_state_subscription_destructor(), find_rtp_port(), find_subchannel_by_name(), generate_content_id_hdr(), generate_initial_notify(), generate_list_body(), generic_agent_devstate_cb(), get_exten_state_sub(), get_notify_data(), get_or_create_subscription(), get_sub(), get_sub_holding(), get_subscription(), handle_call_incoming(), handle_call_outgoing(), handle_key_fav(), handle_msg_cb(), handle_validate(), has_destination_cb(), internal_stasis_subscribe(), is_app_subscribed(), is_subscribed_device_state(), key_call(), key_dial_page(), load_general_config(), load_module(), message_sink_cb(), message_subscription_alloc(), message_subscription_dtor(), message_subscription_hash_cb(), messaging_app_subscribe_endpoint(), messaging_app_unsubscribe_endpoint(), messaging_subscription_cmp(), mwi_create_subscription(), mwi_ds_destroy(), mwi_get_notify_data(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), mwi_on_aor(), mwi_startup_event_cb(), mwi_stasis_cb(), mwi_subscribe_all(), mwi_subscribe_single(), mwi_subscription_alloc(), mwi_subscription_destructor(), mwi_subscription_established(), mwi_subscription_shutdown(), mwi_to_ami(), my_conf_add(), my_conf_del(), my_dial_digits(), my_get_sub_fd(), my_is_dialing(), my_new_analog_ast_channel(), my_play_tone(), my_set_inthreeway(), my_set_linear_mode(), my_wink(), onevent(), park_announce_update_cb(), parker_update_cb(), process_opcode(), publish_msg(), pubsub_on_rx_notify(), queue_bridge_cb(), queue_channel_cb(), rcv_mac_addr(), refer_client_on_evsub_state(), refer_progress_bridge(), refer_progress_notify(), refer_progress_on_evsub_state(), refer_send(), remove_device_state_subscription(), router_dispatch(), send_callerid_screen(), send_device_state(), send_mwi_notify(), send_start_rtp(), send_subscription_subscribe(), send_subscription_unsubscribe(), send_unsolicited_mwi_notify(), send_unsolicited_mwi_notify_to_contact(), set_state_terminated(), shutdown_subscriptions(), start_rtp(), stasis_publish_sync(), stasis_state_add_subscriber(), stasis_state_subscribe_pool(), stasis_state_subscriber_data(), stasis_state_subscriber_id(), stasis_state_subscriber_subscription(), stasis_state_subscriber_topic(), stasis_state_unsubscribe(), stasis_state_unsubscribe_and_join(), stasis_subscription_final_message(), stasis_subscription_is_subscribed(), stasis_subscription_uniqueid(), stasis_unsubscribe(), sub_cleanup(), sub_hold(), sub_start_silence(), sub_stop_silence(), sub_subscription_change_handler(), sub_unhold(), subscribe_device_state(), subscriber_dtor(), subscript(), subscription_dtor(), subscription_invoke(), subscription_persistence_event_cb(), subscription_shutdown(), subscriptions_create(), to_ami(), topic_add_subscription(), topic_remove_subscription(), transfer_call_step1(), transfer_cancel_step2(), transfer_refer(), unistim_alloc_sub(), unistim_answer(), unistim_call(), unistim_do_senddigit(), unistim_free_sub(), unistim_get_rtp_peer(), unistim_hangup(), unistim_hangup_clean(), unistim_indicate(), unistim_new(), unistim_read(), unistim_request(), unistim_rtp_read(), unistim_set_owner(), unistim_set_rtp_peer(), unistim_show_info(), unistim_sp(), unistim_ss(), unistim_unalloc_sub(), unistim_write(), unsubscribe_device_state(), xfer_client_on_evsub_state(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ subscribe

unsigned char subscribe

◆ subscribe_default

unsigned char subscribe_default

Definition at line 244 of file res_corosync.c.

◆ topic_fn

struct stasis_topic *(* topic_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().