Asterisk - The Open Source Telephony Project  GIT-master-a24979a
Data Structures | Macros | Enumerations | Functions | Variables
res_corosync.c File Reference
#include "asterisk.h"
#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/poll-compat.h"
#include "asterisk/config.h"
#include "asterisk/event.h"
#include "asterisk/cli.h"
#include "asterisk/devicestate.h"
#include "asterisk/mwi.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_system.h"
#include "asterisk/taskprocessor.h"
Include dependency graph for res_corosync.c:

Go to the source code of this file.

Data Structures

struct  corosync_node
 
struct  corosync_ping_payload
 A payload wrapper around a corosync ping event. More...
 

Macros

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)
 Corosync ipc dispatch/request and reply size. More...
 
#define COROSYNC_POLL_TIMEOUT   (10 * 1000)
 Timeout for Corosync's poll process. More...
 
#define corosync_pthread_create_background(a, b, c, d)
 Version of pthread_create to ensure stack is large enough. More...
 

Enumerations

enum  { PUBLISH , SUBSCRIBE }
 

Functions

 AST_MODULE_INFO_STANDARD_EXTENDED (ASTERISK_GPL_KEY, "Corosync")
 
static void cfg_shutdown_cb (corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
 
static void cleanup_module (void)
 
static int clear_node_cache (void *obj, void *arg, int flags)
 
static struct corosync_nodecorosync_node_alloc (struct ast_event *event)
 
static int corosync_node_cmp_fn (void *obj, void *arg, int flags)
 
static int corosync_node_hash_fn (const void *obj, const int flags)
 
static char * corosync_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static void corosync_ping_payload_dtor (void *obj)
 Destructor for the corosync_ping_payload wrapper object. More...
 
static struct ast_eventcorosync_ping_to_event (struct stasis_message *message)
 Convert a Corosync PING to a ast_event. More...
 
static char * corosync_show_config (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * corosync_show_members (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static struct stasis_topiccorosync_topic (void)
 Internal accessor for our topic. More...
 
static void cpg_confchg_cb (cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
 
static void cpg_deliver_cb (cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
 
static void * dispatch_thread_handler (void *data)
 
static int dump_cache_cb (void *obj, void *arg, int flags)
 
static int load_config (unsigned int reload)
 
static int load_general_config (struct ast_config *cfg)
 
static int load_module (void)
 
static void publish_cluster_discovery_to_stasis (struct ast_event *event)
 Publish a received cluster discovery ast_event to Stasis Message Bus API. More...
 
static void publish_cluster_discovery_to_stasis_full (struct corosync_node *node, int joined)
 Publish cluster discovery to Stasis Message Bus API. More...
 
static void publish_corosync_ping_to_stasis (struct ast_event *event)
 Publish a Corosync ping to Stasis Message Bus API. More...
 
static void publish_device_state_to_stasis (struct ast_event *event)
 Publish a received device state ast_event to Stasis Message Bus API. More...
 
static void publish_event_to_corosync (struct ast_event *event)
 
static void publish_mwi_to_stasis (struct ast_event *event)
 Publish a received MWI ast_event to Stasis Message Bus API. More...
 
static void publish_to_corosync (struct stasis_message *message)
 
static void send_cluster_notify (void)
 Informs the cluster of our EID and our IP addresses. More...
 
static int set_event (const char *event_type, int pubsub)
 
static void stasis_message_cb (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
 STASIS_MESSAGE_TYPE_DEFN_LOCAL (corosync_ping_message_type,.to_event=corosync_ping_to_event,)
 
static int unload_module (void)
 

Variables

static corosync_cfg_callbacks_t cfg_callbacks
 
static corosync_cfg_handle_t cfg_handle
 
static struct stasis_topiccorosync_aggregate_topic
 The internal topic used for message forwarding and pings. More...
 
static struct ast_cli_entry corosync_cli []
 
static int corosync_node_joined = 0
 Join to corosync. More...
 
static cpg_callbacks_t cpg_callbacks
 
static cpg_handle_t cpg_handle
 
struct {
   int   alert_pipe [2]
 
   pthread_t   id
 
   unsigned int   stop:1
 
dispatch_thread
 
struct {
   struct stasis_cache *(*   cache_fn )(void)
 
   struct stasis_message_type *(*   message_type_fn )(void)
 
   const char *   name
 
   unsigned char   publish
 
   unsigned char   publish_default
 
   void(*   publish_to_stasis )(struct ast_event *)
 
   struct stasis_forward *   sub
 
   unsigned char   subscribe
 
   unsigned char   subscribe_default
 
   struct stasis_topic *(*   topic_fn )(void)
 
event_types []
 
static ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 
static struct ao2_containernodes
 All the nodes that we're aware of. More...
 
static struct stasis_message_routerstasis_router
 Our Stasis Message Bus API message router. More...
 

Detailed Description

module publishes ast_event representations of information to other Asterisk instances in a cluster.

Events have an associated event type, as well as information elements. The information elements are the meta data that go along with each event. For example, in the case of message waiting indication, the event type is MWI, and each MWI event contains at least three information elements: the mailbox, the number of new messages, and the number of old messages.

Author
Russell Bryant russe.nosp@m.ll@r.nosp@m.ussel.nosp@m.lbry.nosp@m.ant.n.nosp@m.et

This module is based on and replaces the previous res_ais module.

Definition in file res_corosync.c.

Macro Definition Documentation

◆ COROSYNC_IPC_BUFFER_SIZE

#define COROSYNC_IPC_BUFFER_SIZE   (8192 * 128)

Corosync ipc dispatch/request and reply size.

Definition at line 89 of file res_corosync.c.

◆ COROSYNC_POLL_TIMEOUT

#define COROSYNC_POLL_TIMEOUT   (10 * 1000)

Timeout for Corosync's poll process.

Definition at line 55 of file res_corosync.c.

◆ corosync_pthread_create_background

#define corosync_pthread_create_background (   a,
  b,
  c,
  d 
)
Value:
__FILE__, __FUNCTION__, __LINE__, #c)
#define AST_BACKGROUND_STACKSIZE
Definition: ooh323cDriver.c:26
#define COROSYNC_IPC_BUFFER_SIZE
Corosync ipc dispatch/request and reply size.
Definition: res_corosync.c:89
static struct test_val b
static struct test_val a
static struct test_val d
static struct test_val c
int ast_pthread_create_stack(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *data, size_t stacksize, const char *file, const char *caller, int line, const char *start_fn)
Definition: main/utils.c:1592

Version of pthread_create to ensure stack is large enough.

Definition at line 92 of file res_corosync.c.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
PUBLISH 
SUBSCRIBE 

Definition at line 1148 of file res_corosync.c.

1148  {
1149  PUBLISH,
1150  SUBSCRIBE,
1151 };
@ SUBSCRIBE
@ PUBLISH

Function Documentation

◆ AST_MODULE_INFO_STANDARD_EXTENDED()

AST_MODULE_INFO_STANDARD_EXTENDED ( ASTERISK_GPL_KEY  ,
"Corosync"   
)

◆ cfg_shutdown_cb()

static void cfg_shutdown_cb ( corosync_cfg_handle_t  cfg_handle,
corosync_cfg_shutdown_flags_t  flags 
)
static

Definition at line 479 of file res_corosync.c.

481 {
482 }

◆ cleanup_module()

static void cleanup_module ( void  )
static

Definition at line 1249 of file res_corosync.c.

1250 {
1251  cs_error_t cs_err;
1252  unsigned int i;
1253 
1254  if (stasis_router) {
1255 
1256  /* Unsubscribe all topic forwards and cancel all message routes */
1257  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1258  struct ao2_container *messages = NULL;
1259  int messages_count;
1260  unsigned char subscribe = 0;
1261 
1263  ast_debug(5, "cleanup_module wrlock\n");
1264  subscribe = event_types[i].subscribe;
1265 
1266  if (event_types[i].sub) {
1269  }
1270  event_types[i].publish = 0;
1271  event_types[i].subscribe = 0;
1273  ast_debug(5, "cleanup_module unlock\n");
1274 
1277  messages_count = ao2_container_count(messages);
1278  ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1280  ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
1281  ao2_t_ref(messages, -1, "Dispose of flushed cache");
1282  }
1283  }
1284 
1286  stasis_router = NULL;
1287  }
1288 
1290  ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1292  }
1293 
1294  STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1295 
1296  if (dispatch_thread.id != AST_PTHREADT_NULL) {
1297  char meepmeep = 'x';
1298  dispatch_thread.stop = 1;
1299  if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1300  5000) == -1) {
1301  ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1302  strerror(errno), errno);
1303  }
1304  pthread_join(dispatch_thread.id, NULL);
1305  }
1306 
1307  if (dispatch_thread.alert_pipe[0] != -1) {
1308  close(dispatch_thread.alert_pipe[0]);
1309  dispatch_thread.alert_pipe[0] = -1;
1310  }
1311 
1312  if (dispatch_thread.alert_pipe[1] != -1) {
1313  close(dispatch_thread.alert_pipe[1]);
1314  dispatch_thread.alert_pipe[1] = -1;
1315  }
1316 
1318  ast_debug(5, "cleanup_module wrlock\n");
1319  if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1320  ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1321  }
1322  cpg_handle = 0;
1323 
1324  if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1325  ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1326  }
1327  cfg_handle = 0;
1330  ast_debug(5, "cleanup_module unlock\n");
1331  }
1332  ao2_cleanup(nodes);
1333  nodes = NULL;
1334 }
#define ast_log
Definition: astobj2.c:42
#define ao2_t_ref(o, delta, tag)
Definition: astobj2.h:460
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
@ OBJ_NODATA
Definition: astobj2.h:1044
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_NOTICE
#define ast_rwlock_wrlock(a)
Definition: lock.h:234
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ast_rwlock_trywrlock(a)
Definition: lock.h:236
#define ast_rwlock_unlock(a)
Definition: lock.h:232
int errno
static int clear_node_cache(void *obj, void *arg, int flags)
Definition: res_corosync.c:623
static struct ao2_container * nodes
All the nodes that we're aware of.
Definition: res_corosync.c:65
static struct @459 event_types[]
static int corosync_node_joined
Join to corosync.
Definition: res_corosync.c:62
static struct @460 dispatch_thread
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Definition: res_corosync.c:71
struct stasis_message_type *(* message_type_fn)(void)
Definition: res_corosync.c:247
struct stasis_cache *(* cache_fn)(void)
Definition: res_corosync.c:246
unsigned char subscribe
Definition: res_corosync.c:243
struct stasis_forward * sub
Definition: res_corosync.c:240
const char * name
Definition: res_corosync.c:239
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
Definition: res_corosync.c:68
static ast_rwlock_t event_types_lock
Definition: res_corosync.c:51
static corosync_cfg_handle_t cfg_handle
Definition: res_corosync.c:284
struct stasis_topic *(* topic_fn)(void)
Definition: res_corosync.c:245
static ast_rwlock_t init_cpg_lock
Definition: res_corosync.c:52
static cpg_handle_t cpg_handle
Definition: res_corosync.c:283
#define NULL
Definition: resample.c:96
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1550
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed.
Generic container type.
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
Definition: main/utils.c:1738
#define ARRAY_LEN(a)
Definition: utils.h:661

References ao2_callback, ao2_cleanup, ao2_container_count(), ao2_t_ref, ARRAY_LEN, ast_carefulwrite(), ast_debug, ast_log, AST_PTHREADT_NULL, ast_rwlock_trywrlock, ast_rwlock_unlock, ast_rwlock_wrlock, cache_fn, cfg_handle, clear_node_cache(), corosync_aggregate_topic, corosync_node_joined, cpg_handle, dispatch_thread, errno, event_types, event_types_lock, init_cpg_lock, LOG_ERROR, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, stasis_cache_dump_all(), stasis_forward_cancel(), stasis_message_router_remove(), stasis_message_router_unsubscribe_and_join(), STASIS_MESSAGE_TYPE_CLEANUP, stasis_router, sub, subscribe, and topic_fn.

Referenced by load_module(), and unload_module().

◆ clear_node_cache()

static int clear_node_cache ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 623 of file res_corosync.c.

624 {
625  struct stasis_message *cached_msg = obj;
626  struct stasis_topic *topic = arg;
627  struct stasis_message *msg;
628  struct ast_eid *msg_eid;
629 
630  if (!cached_msg) {
631  return 0;
632  }
633 
634  msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
635  if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
636  {
637  msg = stasis_cache_clear_create(cached_msg);
638  if (msg) {
639  stasis_publish(topic, msg);
640  ao2_cleanup(msg);
641  }
642  }
643 
644  return 0;
645 }
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1513
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:808
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2990
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

References ao2_cleanup, ast_eid_cmp(), ast_eid_default, stasis_cache_clear_create(), stasis_message_eid(), and stasis_publish().

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ corosync_node_alloc()

static struct corosync_node* corosync_node_alloc ( struct ast_event event)
static

Definition at line 97 of file res_corosync.c.

98 {
99  struct corosync_node *node;
100 
102  if (!node) {
103  return NULL;
104  }
105 
106  memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
109 
110  return node;
111 }
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
Definition: event.c:302
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
Definition: event.c:293
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
Definition: event.c:311
@ AST_EVENT_IE_LOCAL_ADDR
Definition: event_defs.h:281
@ AST_EVENT_IE_EID
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
Definition: event_defs.h:272
@ AST_EVENT_IE_NODE_ID
Cluster node ID Used by: Corosync Payload type: UINT.
Definition: event_defs.h:313
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
Definition: netsock2.c:230
Definition: astman.c:222
Definition: test_heap.c:38

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), AST_EVENT_IE_EID, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, ast_sockaddr_parse(), NULL, and PARSE_PORT_IGNORE.

Referenced by publish_cluster_discovery_to_stasis().

◆ corosync_node_cmp_fn()

static int corosync_node_cmp_fn ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 133 of file res_corosync.c.

134 {
135  struct corosync_node *left = obj;
136  struct corosync_node *right = arg;
137  const int *id = arg;
138  int cmp;
139 
140  switch (flags & OBJ_SEARCH_MASK) {
141  case OBJ_SEARCH_OBJECT:
142  id = &right->id;
143  /* Fall through */
144  case OBJ_SEARCH_KEY:
145  cmp = (left->id == *id);
146  break;
148  cmp = (left->id == right->id);
149  break;
150  default:
151  /* Sort can only work on something with a full or partial key. */
152  ast_assert(0);
153  cmp = 1;
154  break;
155  }
156  return cmp ? CMP_MATCH : 0;
157 }
@ CMP_MATCH
Definition: astobj2.h:1027
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
pthread_t id
Definition: res_corosync.c:275
#define ast_assert(a)
Definition: utils.h:734

References ast_assert, CMP_MATCH, corosync_node::id, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by load_module().

◆ corosync_node_hash_fn()

static int corosync_node_hash_fn ( const void *  obj,
const int  flags 
)
static

Definition at line 113 of file res_corosync.c.

114 {
115  const struct corosync_node *node;
116  const int *id;
117 
118  switch (flags & OBJ_SEARCH_MASK) {
119  case OBJ_SEARCH_KEY:
120  id = obj;
121  break;
122  case OBJ_SEARCH_OBJECT:
123  node = obj;
124  id = &node->id;
125  break;
126  default:
127  ast_assert(0);
128  return 0;
129  }
130  return *id;
131 }

References ast_assert, id, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by load_module().

◆ corosync_ping()

static char* corosync_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1059 of file res_corosync.c.

1060 {
1061  struct ast_event *event;
1062 
1063  switch (cmd) {
1064  case CLI_INIT:
1065  e->command = "corosync ping";
1066  e->usage =
1067  "Usage: corosync ping\n"
1068  " Send a test ping to the cluster.\n"
1069  "A NOTICE will be in the log for every ping received\n"
1070  "on a server.\n If you send a ping, you should see a NOTICE\n"
1071  "in the log for every server in the cluster.\n";
1072  return NULL;
1073 
1074  case CLI_GENERATE:
1075  return NULL; /* no completion */
1076  }
1077 
1078  if (a->argc != e->args) {
1079  return CLI_SHOWUSAGE;
1080  }
1081 
1083 
1084  if (!event) {
1085  return CLI_FAILURE;
1086  }
1087 
1088  event_types[AST_EVENT_PING].publish_to_stasis(event);
1089 
1091  return CLI_SUCCESS;
1092 }
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Definition: event.c:524
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
Definition: event.c:402
@ AST_EVENT_IE_END
Definition: event_defs.h:70
@ AST_EVENT_PING
Definition: event_defs.h:60
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
An event.
Definition: event.c:81

References a, ast_cli_entry::args, ast_event_destroy(), AST_EVENT_IE_END, ast_event_new(), AST_EVENT_PING, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, event_types, NULL, and ast_cli_entry::usage.

◆ corosync_ping_payload_dtor()

static void corosync_ping_payload_dtor ( void *  obj)
static

Destructor for the corosync_ping_payload wrapper object.

Definition at line 167 of file res_corosync.c.

168 {
169  struct corosync_ping_payload *payload = obj;
170 
171  ast_free(payload->event);
172 }
#define ast_free(a)
Definition: astmm.h:180
A payload wrapper around a corosync ping event.
Definition: res_corosync.c:161
struct ast_event * event
Definition: res_corosync.c:163

References ast_free, and corosync_ping_payload::event.

Referenced by publish_corosync_ping_to_stasis().

◆ corosync_ping_to_event()

static struct ast_event* corosync_ping_to_event ( struct stasis_message message)
static

Convert a Corosync PING to a ast_event.

Definition at line 175 of file res_corosync.c.

176 {
177  struct corosync_ping_payload *payload;
178  struct ast_event *event;
179  struct ast_eid *event_eid;
180 
181  if (!message) {
182  return NULL;
183  }
184 
185  payload = stasis_message_data(message);
186 
187  if (!payload->event) {
188  return NULL;
189  }
190 
191  event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
192 
194  AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
196 
197  return event;
198 }
@ AST_EVENT_IE_PLTYPE_RAW
Definition: event_defs.h:330
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.

References ast_event_get_ie_raw(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload::event, NULL, and stasis_message_data().

◆ corosync_show_config()

static char* corosync_show_config ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1094 of file res_corosync.c.

1095 {
1096  unsigned int i;
1097 
1098  switch (cmd) {
1099  case CLI_INIT:
1100  e->command = "corosync show config";
1101  e->usage =
1102  "Usage: corosync show config\n"
1103  " Show configuration loaded from res_corosync.conf\n";
1104  return NULL;
1105 
1106  case CLI_GENERATE:
1107  return NULL; /* no completion */
1108  }
1109 
1110  if (a->argc != e->args) {
1111  return CLI_SHOWUSAGE;
1112  }
1113 
1114  ast_cli(a->fd, "\n"
1115  "=============================================================\n"
1116  "=== res_corosync config =====================================\n"
1117  "=============================================================\n"
1118  "===\n");
1119 
1121  ast_debug(5, "corosync_show_config rdlock\n");
1122  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1123  if (event_types[i].publish) {
1124  ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
1125  event_types[i].name);
1126  }
1127  if (event_types[i].subscribe) {
1128  ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
1129  event_types[i].name);
1130  }
1131  }
1133  ast_debug(5, "corosync_show_config unlock\n");
1134 
1135  ast_cli(a->fd, "===\n"
1136  "=============================================================\n"
1137  "\n");
1138 
1139  return CLI_SUCCESS;
1140 }
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
unsigned char publish
Definition: res_corosync.c:241

◆ corosync_show_members()

static char* corosync_show_members ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 965 of file res_corosync.c.

966 {
967  cs_error_t cs_err;
968  cpg_iteration_handle_t cpg_iter;
969  struct cpg_iteration_description_t cpg_desc;
970  unsigned int i;
971 
972  switch (cmd) {
973  case CLI_INIT:
974  e->command = "corosync show members";
975  e->usage =
976  "Usage: corosync show members\n"
977  " Show corosync cluster members\n";
978  return NULL;
979 
980  case CLI_GENERATE:
981  return NULL; /* no completion */
982  }
983 
984  if (a->argc != e->args) {
985  return CLI_SHOWUSAGE;
986  }
987 
989  ast_debug(5, "corosync_show_members rdlock\n");
990  cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
991 
992  if (cs_err != CS_OK) {
993  ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
994  cpg_iteration_finalize(cpg_iter);
996  ast_debug(5, "corosync_show_members unlock\n");
997  return CLI_FAILURE;
998  }
999 
1000  ast_cli(a->fd, "\n"
1001  "=============================================================\n"
1002  "=== Cluster members =========================================\n"
1003  "=============================================================\n"
1004  "===\n");
1005 
1006  for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1007  cs_err == CS_OK;
1008  cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009  #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010  corosync_cfg_node_address_t addrs[8];
1011  int num_addrs = 0;
1012  unsigned int j;
1013  #endif
1014 
1015  ast_cli(a->fd, "=== Node %u\n", i);
1016  ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
1017 
1018  #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1019  /*
1020  * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
1021  * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
1022  * resulting in crash.
1023  */
1024  cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1025  ARRAY_LEN(addrs), &num_addrs, addrs);
1026  if (cs_err != CS_OK) {
1027  ast_log(LOG_WARNING, "Failed to get node addresses\n");
1028  continue;
1029  }
1030 
1031  for (j = 0; j < num_addrs; j++) {
1032  struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
1033  size_t sa_len = (size_t) addrs[j].address_length;
1034  char buf[128];
1035 
1036  getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
1037 
1038  ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
1039  }
1040  #else
1041  ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
1042  #endif
1043  }
1044 
1045  ast_cli(a->fd, "===\n"
1046  "=============================================================\n"
1047  "\n");
1048 
1049  cpg_iteration_finalize(cpg_iter);
1051  ast_debug(5, "corosync_show_members unlock\n");
1052  } else {
1053  ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
1054  }
1055 
1056  return CLI_SUCCESS;
1057 }
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
char * address
Definition: f2c.h:59
#define LOG_WARNING
#define ast_rwlock_tryrdlock(a)
Definition: lock.h:235

References a, ast_cli_entry::args, ARRAY_LEN, ast_cli(), ast_debug, ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, cpg_handle, init_cpg_lock, LOG_WARNING, NULL, and ast_cli_entry::usage.

◆ corosync_topic()

static struct stasis_topic* corosync_topic ( void  )
static

Internal accessor for our topic.

Definition at line 74 of file res_corosync.c.

75 {
77 }

References corosync_aggregate_topic.

Referenced by load_general_config(), and publish_corosync_ping_to_stasis().

◆ cpg_confchg_cb()

static void cpg_confchg_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
const struct cpg_address *  member_list,
size_t  member_list_entries,
const struct cpg_address *  left_list,
size_t  left_list_entries,
const struct cpg_address *  joined_list,
size_t  joined_list_entries 
)
static

Definition at line 647 of file res_corosync.c.

651 {
652  unsigned int i;
653 
654 
655  for (i = 0; i < left_list_entries; i++) {
656  const struct cpg_address *cpg_node = &left_list[i];
657  struct corosync_node* node;
658  unsigned int j;
659 
660  node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
661  if (!node) {
662  continue;
663  }
664 
665  for (j = 0; j < ARRAY_LEN(event_types); j++) {
666  struct ao2_container *messages;
667  int messages_count;
668 
670  ast_debug(5, "cpg_confchg_cb rdlock\n");
671  if (!event_types[j].subscribe) {
673  ast_debug(5, "cpg_confchg_cb unlock\n");
674  continue;
675  }
676 
679  ast_debug(5, "cpg_confchg_cb unlock\n");
680  continue;
681  }
683  ast_debug(5, "cpg_confchg_cb unlock\n");
684 
686 
687  messages_count = ao2_container_count(messages);
688  ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
690  ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
691 
692  ao2_t_ref(messages, -1, "Dispose of flushed cache");
693  }
694 
696  ao2_ref(node, -1);
697  }
698 
699  /* If any new nodes have joined, dump our cache of events we are publishing
700  * that originated from this server. */
701  if (!joined_list_entries) {
702  return;
703  }
704 
705  for (i = 0; i < ARRAY_LEN(event_types); i++) {
706  struct ao2_container *messages;
707  int messages_count;
708 
710  ast_debug(5, "cpg_confchg_cb rdlock\n");
711  if (!event_types[i].publish) {
713  ast_debug(5, "cpg_confchg_cb unlock\n");
714  continue;
715  }
716 
719  ast_debug(5, "cpg_confchg_cb unlock\n");
720  continue;
721  }
723  ast_debug(5, "cpg_confchg_cb unlock\n");
724 
726 
727  messages_count = ao2_container_count(messages);
728  ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
730  ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
731 
732  ao2_t_ref(messages, -1, "Dispose of dumped cache");
733  }
734 }
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_UNLINK
Definition: astobj2.h:1039
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Definition: res_corosync.c:303
static int dump_cache_cb(void *obj, void *arg, int flags)
Definition: res_corosync.c:610
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718

References ao2_callback, ao2_container_count(), ao2_find, ao2_ref, ao2_t_ref, ARRAY_LEN, ast_debug, ast_eid_default, ast_log, ast_rwlock_rdlock, ast_rwlock_unlock, cache_fn, clear_node_cache(), dump_cache_cb(), event_types, event_types_lock, LOG_NOTICE, message_type_fn, name, nodes, NULL, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, publish, publish_cluster_discovery_to_stasis_full(), stasis_cache_dump_by_eid(), subscribe, and topic_fn.

◆ cpg_deliver_cb()

static void cpg_deliver_cb ( cpg_handle_t  handle,
const struct cpg_name *  group_name,
uint32_t  nodeid,
uint32_t  pid,
void *  msg,
size_t  msg_len 
)
static

Definition at line 484 of file res_corosync.c.

486 {
487  struct ast_event *event;
488  void (*publish_handler)(struct ast_event *) = NULL;
489  enum ast_event_type event_type;
490  struct ast_eid *event_eid;
491 
492  if (msg_len < ast_event_minimum_length()) {
493  ast_debug(1, "Ignoring event that's too small. %u < %u\n",
494  (unsigned int) msg_len,
495  (unsigned int) ast_event_minimum_length());
496  return;
497  }
498 
499  event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
500  if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
501  /* Don't feed events back in that originated locally. */
502  return;
503  }
504 
505  event_type = ast_event_get_type(msg);
506  if (event_type > AST_EVENT_TOTAL) {
507  /* Egads, we don't support this */
508  return;
509  }
510 
512  ast_debug(5, "cpg_deliver_cb rdlock\n");
513  publish_handler = event_types[event_type].publish_to_stasis;
514  if (!event_types[event_type].subscribe || !publish_handler) {
515  /* We are not configured to subscribe to these events or
516  we have no way to publish it internally. */
518  ast_debug(5, "cpg_deliver_cb unlock\n");
519  return;
520  }
522  ast_debug(5, "cpg_deliver_cb unlock\n");
523 
524  if (!(event = ast_malloc(msg_len))) {
525  return;
526  }
527 
528  memcpy(event, msg, msg_len);
529 
530  if (event_type == AST_EVENT_PING) {
531  const struct ast_eid *eid;
532  char buf[128] = "";
533 
535  ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
536  ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
537  }
538  ast_debug(5, "Publishing event %s (%u) to stasis\n",
539  ast_event_get_type_name(event), event_type);
540  publish_handler(event);
541  ast_free(event);
542 }
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
Definition: event.c:529
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
Definition: event.c:288
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
Definition: event.c:194
ast_event_type
Definition: event_defs.h:28
@ AST_EVENT_TOTAL
Definition: event_defs.h:64
unsigned char eid[6]
Definition: utils.h:809
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2735

References ast_debug, ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_type(), ast_event_get_type_name(), AST_EVENT_IE_EID, ast_event_minimum_length(), AST_EVENT_PING, AST_EVENT_TOTAL, ast_free, ast_log, ast_malloc, ast_rwlock_rdlock, ast_rwlock_unlock, buf, ast_eid::eid, event_types, event_types_lock, LOG_NOTICE, NULL, and subscribe.

◆ dispatch_thread_handler()

static void* dispatch_thread_handler ( void *  data)
static

Definition at line 782 of file res_corosync.c.

783 {
784  cs_error_t cs_err;
785  struct pollfd pfd[3] = {
786  { .events = POLLIN, },
787  { .events = POLLIN, },
788  { .events = POLLIN, },
789  };
790 
792  ast_debug(5, "dispatch_thread_handler rdlock\n");
793  if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
794  ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
796  ast_debug(5, "dispatch_thread_handler unlock\n");
797  return NULL;
798  }
799 
800  if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
801  ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
803  ast_debug(5, "dispatch_thread_handler unlock\n");
804  return NULL;
805  }
806 
807  pfd[2].fd = dispatch_thread.alert_pipe[0];
809  ast_debug(5, "dispatch_thread_handler unlock\n");
810  } else {
811  ast_log(LOG_ERROR, "Failed to get fd: initializing CPG. This module is now broken.\n");
812  return NULL;
813  }
815  while (!dispatch_thread.stop) {
816  int res;
817 
818  cs_err = CS_OK;
819 
820  pfd[0].revents = 0;
821  pfd[1].revents = 0;
822  pfd[2].revents = 0;
823 
824  res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
825  if (res == -1 && errno != EINTR && errno != EAGAIN) {
826  ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
827  cs_err = CS_ERR_BAD_HANDLE;
828  } else if (res == 0) {
829  unsigned int local_nodeid;
830 
832  ast_debug(5, "dispatch_thread_handler rdlock\n");
833  if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
834  struct cpg_name name;
835  struct cpg_address address[CPG_MEMBERS_MAX];
836  int entries = CPG_MEMBERS_MAX;
837 
838  ast_copy_string(name.value, "asterisk", sizeof(name.value));
839  name.length = strlen(name.value);
840  if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
841  int i;
842  int found = 0;
843 
844  ast_debug(1, "CPG group has %i node membership\n", entries);
845  for (i = 0; (i < entries) && !found; i++) {
846  if (address[i].nodeid == local_nodeid)
847  found = 1;
848  }
849  if (!found) {
850  ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
852  cs_err = CS_ERR_BAD_HANDLE;
853  }
854  } else {
855  ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
857  cs_err = CS_ERR_BAD_HANDLE;
858  }
859  } else {
860  ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
862  cs_err = CS_ERR_BAD_HANDLE;
863  }
865  ast_debug(5, "dispatch_thread_handler unlock\n");
866  } else {
867  ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
869  cs_err = CS_ERR_BAD_HANDLE;
870  }
871  } else {
873  ast_debug(5, "dispatch_thread_handler rdlock\n");
874  if (pfd[0].revents & POLLIN) {
875  if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
876  ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
877  }
878  }
879 
880  if (pfd[1].revents & POLLIN) {
881  if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
882  ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
883  }
884  }
886  ast_debug(5, "dispatch_thread_handler unlock\n");
887  } else {
888  ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
889  }
890  }
891  if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
892 
893  /* If corosync gets restarted out from under Asterisk, try to recover. */
894 
895  ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
896 
898  struct cpg_name name;
899  ast_debug(5, "dispatch_thread_handler wrlock\n");
900 
902  if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
903  ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
904  }
905 
906  if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
907  ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
908  }
909 
910  if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
911  ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
913  ast_debug(5, "dispatch_thread_handler unlock\n");
914  sleep(5);
915  continue;
916  }
917 
918  if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
919  ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
921  ast_debug(5, "dispatch_thread_handler unlock\n");
922  sleep(5);
923  continue;
924  }
925 
926  if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
927  ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
929  ast_debug(5, "dispatch_thread_handler unlock\n");
930  sleep(5);
931  continue;
932  }
933 
934  if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
935  ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
937  ast_debug(5, "dispatch_thread_handler unlock\n");
938  sleep(5);
939  continue;
940  }
941 
942  ast_copy_string(name.value, "asterisk", sizeof(name.value));
943  name.length = strlen(name.value);
944  if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
945  ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
947  ast_debug(5, "dispatch_thread_handler unlock\n");
948  sleep(5);
949  continue;
950  }
953  ast_debug(5, "dispatch_thread_handler unlock\n");
954  ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
956  } else {
957  ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
958  }
959  }
960  }
961 
962  return NULL;
963 }
#define ast_poll(a, b, c)
Definition: poll-compat.h:88
static cpg_callbacks_t cpg_callbacks
Definition: res_corosync.c:466
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
Definition: res_corosync.c:55
static corosync_cfg_callbacks_t cfg_callbacks
Definition: res_corosync.c:295
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Definition: res_corosync.c:737
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:406

References ARRAY_LEN, ast_copy_string(), ast_debug, ast_log, ast_poll, ast_rwlock_tryrdlock, ast_rwlock_trywrlock, ast_rwlock_unlock, cfg_callbacks, cfg_handle, corosync_node_joined, COROSYNC_POLL_TIMEOUT, cpg_callbacks, cpg_handle, dispatch_thread, errno, init_cpg_lock, LOG_ERROR, LOG_NOTICE, LOG_WARNING, name, NULL, and send_cluster_notify().

Referenced by load_module().

◆ dump_cache_cb()

static int dump_cache_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 610 of file res_corosync.c.

611 {
612  struct stasis_message *message = obj;
613 
614  if (!message) {
615  return 0;
616  }
617 
619 
620  return 0;
621 }
static void publish_to_corosync(struct stasis_message *message)
Definition: res_corosync.c:571

References publish_to_corosync().

Referenced by cpg_confchg_cb().

◆ load_config()

static int load_config ( unsigned int  reload)
static

Definition at line 1222 of file res_corosync.c.

1223 {
1224  static const char filename[] = "res_corosync.conf";
1225  struct ast_config *cfg;
1226  const char *cat = NULL;
1227  struct ast_flags config_flags = { 0 };
1228  int res = 0;
1229 
1230  cfg = ast_config_load(filename, config_flags);
1231 
1233  return -1;
1234  }
1235 
1236  while ((cat = ast_category_browse(cfg, cat))) {
1237  if (!strcasecmp(cat, "general")) {
1238  res = load_general_config(cfg);
1239  } else {
1240  ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1241  }
1242  }
1243 
1244  ast_config_destroy(cfg);
1245 
1246  return res;
1247 }
#define ast_config_load(filename, flags)
Load a config file.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
Definition: extconf.c:3327
#define CONFIG_STATUS_FILEMISSING
#define CONFIG_STATUS_FILEINVALID
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
Definition: extconf.c:1289
static int load_general_config(struct ast_config *cfg)
Structure used to handle boolean flags.
Definition: utils.h:199

References ast_category_browse(), ast_config_destroy(), ast_config_load, ast_log, CONFIG_STATUS_FILEINVALID, CONFIG_STATUS_FILEMISSING, load_general_config(), LOG_WARNING, and NULL.

Referenced by load_module().

◆ load_general_config()

static int load_general_config ( struct ast_config cfg)
static

Definition at line 1177 of file res_corosync.c.

1178 {
1179  struct ast_variable *v;
1180  int res = 0;
1181  unsigned int i;
1182 
1184  ast_debug(5, "load_general_config wrlock\n");
1185 
1186  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1187  event_types[i].publish = event_types[i].publish_default;
1188  event_types[i].subscribe = event_types[i].subscribe_default;
1189  }
1190 
1191  for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
1192  if (!strcasecmp(v->name, "publish_event")) {
1193  res = set_event(v->value, PUBLISH);
1194  } else if (!strcasecmp(v->name, "subscribe_event")) {
1195  res = set_event(v->value, SUBSCRIBE);
1196  } else {
1197  ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1198  }
1199  }
1200 
1201  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1202  if (event_types[i].publish && !event_types[i].sub) {
1204  corosync_topic());
1208  NULL);
1209  } else if (!event_types[i].publish && event_types[i].sub) {
1213  }
1214  }
1215 
1217  ast_debug(5, "load_general_config unlock\n");
1218 
1219  return res;
1220 }
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
Definition: extconf.c:1215
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: res_corosync.c:601
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
Definition: res_corosync.c:74
static int set_event(const char *event_type, int pubsub)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1580
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next

References ARRAY_LEN, ast_debug, ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, ast_variable_browse(), corosync_topic(), event_types, event_types_lock, LOG_WARNING, message_type_fn, ast_variable::name, ast_variable::next, NULL, publish, PUBLISH, set_event(), stasis_forward_all(), stasis_forward_cancel(), stasis_message_cb(), stasis_message_router_add(), stasis_message_router_remove(), stasis_router, sub, SUBSCRIBE, topic_fn, and ast_variable::value.

Referenced by load_config().

◆ load_module()

static int load_module ( void  )
static

Definition at line 1336 of file res_corosync.c.

1337 {
1338  cs_error_t cs_err;
1339  struct cpg_name name;
1340 
1342  ast_log(LOG_ERROR, "Entity ID is not set.\n");
1343  return AST_MODULE_LOAD_DECLINE;
1344  }
1345 
1348  if (!nodes) {
1349  goto failed;
1350  }
1351 
1352  corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
1353  if (!corosync_aggregate_topic) {
1354  ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1355  goto failed;
1356  }
1357 
1359  if (!stasis_router) {
1360  ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1361  goto failed;
1362  }
1365 
1366  if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1367  ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1368  goto failed;
1369  }
1370 
1371  if (load_config(0)) {
1372  /* simply not configured is not a fatal error */
1373  goto failed;
1374  }
1375 
1378  ast_debug(5, "load_module wrlock\n");
1379  if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1380  ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1382  ast_debug(5, "load_module unlock\n");
1383  goto failed;
1384  }
1385 
1386  if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1387  ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1389  ast_debug(5, "load_module unlock\n");
1390  goto failed;
1391  }
1392 
1393  ast_copy_string(name.value, "asterisk", sizeof(name.value));
1394  name.length = strlen(name.value);
1395 
1396  if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1397  ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1399  ast_debug(5, "load_module unlock\n");
1400  goto failed;
1401  }
1402 
1403  if (pipe(dispatch_thread.alert_pipe) == -1) {
1404  ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1405  strerror(errno), errno);
1407  ast_debug(5, "load_module unlock\n");
1408  goto failed;
1409  }
1411 
1413  ast_debug(5, "load_module unlock\n");
1416  ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1417  goto failed;
1418  }
1419 
1421  } else {
1422  goto failed;
1423  }
1424 
1425  return AST_MODULE_LOAD_SUCCESS;
1426 
1427 failed:
1428  cleanup_module();
1429 
1430  return AST_MODULE_LOAD_DECLINE;
1431 }
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define AST_LOG_ERROR
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
static struct ast_cli_entry corosync_cli[]
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
Definition: res_corosync.c:92
static void cleanup_module(void)
static int corosync_node_hash_fn(const void *obj, const int flags)
Definition: res_corosync.c:113
static void * dispatch_thread_handler(void *data)
Definition: res_corosync.c:782
static int load_config(unsigned int reload)
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
Definition: res_corosync.c:133
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:619
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
#define stasis_message_router_create(topic)
Create a new message router object.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
Definition: main/utils.c:2995

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ARRAY_LEN, ast_cli_register_multiple, ast_copy_string(), ast_debug, ast_eid_default, ast_eid_is_empty(), ast_log, AST_LOG_ERROR, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_rwlock_trywrlock, ast_rwlock_unlock, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, cfg_callbacks, cfg_handle, cleanup_module(), corosync_aggregate_topic, corosync_cli, corosync_node_cmp_fn(), corosync_node_hash_fn(), corosync_node_joined, corosync_pthread_create_background, cpg_callbacks, cpg_handle, dispatch_thread, dispatch_thread_handler(), errno, init_cpg_lock, load_config(), LOG_ERROR, name, nodes, NULL, stasis_message_router_create, stasis_message_router_set_congestion_limits(), STASIS_MESSAGE_TYPE_INIT, stasis_router, and stasis_topic_create().

◆ publish_cluster_discovery_to_stasis()

static void publish_cluster_discovery_to_stasis ( struct ast_event event)
static

Publish a received cluster discovery ast_event to Stasis Message Bus API.

Definition at line 351 of file res_corosync.c.

352 {
353  struct corosync_node *node;
355  struct ast_eid *event_eid;
356 
358 
359  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
360  if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
361  /* Don't feed events back in that originated locally. */
362  return;
363  }
364 
365  ao2_lock(nodes);
367  if (node) {
368  /* We already know about this node */
369  ao2_unlock(nodes);
370  ao2_ref(node, -1);
371  return;
372  }
373 
375  if (!node) {
376  ao2_unlock(nodes);
377  return;
378  }
380  ao2_unlock(nodes);
381 
383 
384  ao2_ref(node, -1);
385 
386  /*
387  * When we get news that someone else has joined, we need to let them
388  * know we exist as well.
389  */
391 }
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ AST_EVENT_CLUSTER_DISCOVERY
Definition: event_defs.h:62
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
Definition: res_corosync.c:97

References ao2_find, ao2_link_flags, ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_eid_cmp(), ast_eid_default, AST_EVENT_CLUSTER_DISCOVERY, ast_event_get_ie_raw(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_NODE_ID, corosync_node_alloc(), nodes, OBJ_NOLOCK, OBJ_SEARCH_KEY, publish_cluster_discovery_to_stasis_full(), and send_cluster_notify().

◆ publish_cluster_discovery_to_stasis_full()

static void publish_cluster_discovery_to_stasis_full ( struct corosync_node node,
int  joined 
)
static

Publish cluster discovery to Stasis Message Bus API.

Definition at line 303 of file res_corosync.c.

304 {
305  struct ast_json *json;
306  struct ast_json_payload *payload;
307  struct stasis_message *message;
308  char eid[18];
309  const char *addr;
310 
311  ast_eid_to_str(eid, sizeof(eid), &node->eid);
312  addr = ast_sockaddr_stringify_addr(&node->addr);
313 
314  ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
315  node->id,
316  eid,
317  addr,
318  joined ? "joined" : "left");
319 
320  json = ast_json_pack("{s: s, s: i, s: s, s: i}",
321  "address", addr,
322  "node_id", node->id,
323  "eid", eid,
324  "joined", joined);
325  if (!json) {
326  return;
327  }
328 
329  payload = ast_json_payload_create(json);
330  if (!payload) {
331  ast_json_unref(json);
332  return;
333  }
334 
336  if (!message) {
337  ast_json_unref(json);
338  ao2_ref(payload, -1);
339  return;
340  }
341 
343  ast_json_unref(json);
344  ao2_ref(payload, -1);
345  ao2_ref(message, -1);
346 }
#define AST_LOG_NOTICE
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
Definition: json.c:735
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:591
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
Definition: netsock2.h:286
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes.
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
Abstract JSON element (object, array, string, int, ...).
struct ast_eid eid

References ao2_ref, ast_cluster_discovery_type(), ast_eid_to_str(), ast_json_pack(), ast_json_payload_create(), ast_json_unref(), ast_log, AST_LOG_NOTICE, ast_sockaddr_stringify_addr(), ast_system_topic(), stasis_message::eid, stasis_message_create(), and stasis_publish().

Referenced by cpg_confchg_cb(), and publish_cluster_discovery_to_stasis().

◆ publish_corosync_ping_to_stasis()

static void publish_corosync_ping_to_stasis ( struct ast_event event)
static

Publish a Corosync ping to Stasis Message Bus API.

Definition at line 204 of file res_corosync.c.

205 {
206  struct corosync_ping_payload *payload;
207  struct stasis_message *message;
208  struct ast_eid *event_eid;
209 
211  ast_assert(event != NULL);
212 
213  if (!corosync_ping_message_type()) {
214  return;
215  }
216 
217  payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
218  if (!payload) {
219  return;
220  }
221  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
223  AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
225 
226  message = stasis_message_create(corosync_ping_message_type(), payload);
227  if (!message) {
228  ao2_t_ref(payload, -1, "Destroy payload on off nominal");
229  return;
230  }
231 
233 
234  ao2_t_ref(payload, -1, "Hand ref to stasis");
235  ao2_t_ref(message, -1, "Hand ref to stasis");
236 }
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:407
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
Definition: res_corosync.c:167

References ao2_t_alloc, ao2_t_ref, ast_assert, ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_IE_END, AST_EVENT_IE_PLTYPE_RAW, ast_event_new(), AST_EVENT_PING, corosync_ping_payload_dtor(), corosync_topic(), corosync_ping_payload::event, NULL, stasis_message_create(), and stasis_publish().

◆ publish_device_state_to_stasis()

static void publish_device_state_to_stasis ( struct ast_event event)
static

Publish a received device state ast_event to Stasis Message Bus API.

Definition at line 432 of file res_corosync.c.

433 {
434  const char *device;
435  enum ast_device_state state;
436  unsigned int cachable;
437  struct ast_eid *event_eid;
438 
440 
444  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
445 
446  if (ast_strlen_zero(device)) {
447  return;
448  }
449 
450  if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
451  char eid[18];
452  ast_eid_to_str(eid, sizeof(eid), event_eid);
453  ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
454  device, eid);
455  }
456 }
enum sip_cc_notify_state state
Definition: chan_sip.c:966
ast_device_state
Device States.
Definition: devicestate.h:52
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
Definition: devicestate.c:709
@ AST_EVENT_IE_STATE
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
Definition: event_defs.h:121
@ AST_EVENT_IE_DEVICE
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
Definition: event_defs.h:113
@ AST_EVENT_IE_CACHABLE
Event non-cacheability flag Used by: All events Payload type: UINT.
Definition: event_defs.h:306
@ AST_EVENT_DEVICE_STATE_CHANGE
Definition: event_defs.h:48
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65

References ast_assert, ast_eid_to_str(), AST_EVENT_DEVICE_STATE_CHANGE, ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CACHABLE, AST_EVENT_IE_DEVICE, AST_EVENT_IE_EID, AST_EVENT_IE_STATE, ast_log, ast_publish_device_state_full(), ast_strlen_zero(), ast_eid::eid, LOG_WARNING, and state.

◆ publish_event_to_corosync()

static void publish_event_to_corosync ( struct ast_event event)
static

Definition at line 544 of file res_corosync.c.

545 {
546  cs_error_t cs_err;
547  struct iovec iov;
548 
549  iov.iov_base = (void *)event;
550  iov.iov_len = ast_event_get_size(event);
551 
552  ast_debug(5, "Publishing event %s (%u) to corosync\n",
554 
555  /* The stasis subscription will only exist if we are configured to publish
556  * these events, so just send away. */
558  ast_debug(5, "publish_event_to_corosync rdlock\n");
559  if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
560  ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
562  }
564  ast_debug(5, "publish_event_to_corosync unlock\n");
565  } else {
566  ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
568  }
569 }
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
Definition: event.c:228

References ast_debug, ast_event_get_size(), ast_event_get_type(), ast_event_get_type_name(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, corosync_node_joined, cpg_handle, init_cpg_lock, and LOG_WARNING.

Referenced by publish_to_corosync(), and send_cluster_notify().

◆ publish_mwi_to_stasis()

static void publish_mwi_to_stasis ( struct ast_event event)
static

Publish a received MWI ast_event to Stasis Message Bus API.

Definition at line 394 of file res_corosync.c.

395 {
396  const char *mailbox;
397  const char *context;
398  unsigned int new_msgs;
399  unsigned int old_msgs;
400  struct ast_eid *event_eid;
401 
403 
408  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
409 
411  return;
412  }
413 
414  if (new_msgs > INT_MAX) {
415  new_msgs = INT_MAX;
416  }
417 
418  if (old_msgs > INT_MAX) {
419  old_msgs = INT_MAX;
420  }
421 
422  if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
423  (int)old_msgs, NULL, event_eid)) {
424  char eid[18];
425  ast_eid_to_str(eid, sizeof(eid), event_eid);
426  ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
427  mailbox, context, eid);
428  }
429 }
static char context[AST_MAX_CONTEXT]
Definition: chan_alsa.c:120
static char mailbox[AST_MAX_MAILBOX_UNIQUEID]
Definition: chan_mgcp.c:207
@ AST_EVENT_IE_CONTEXT
Context IE Used by AST_EVENT_MWI Payload type: str.
Definition: event_defs.h:127
@ AST_EVENT_IE_MAILBOX
Mailbox name.
Definition: event_defs.h:89
@ AST_EVENT_IE_OLDMSGS
Number of Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:83
@ AST_EVENT_IE_NEWMSGS
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Definition: event_defs.h:77
@ AST_EVENT_MWI
Definition: event_defs.h:38
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
Definition: mwi.c:393

References ast_assert, ast_eid_to_str(), ast_event_get_ie_raw(), ast_event_get_ie_str(), ast_event_get_ie_uint(), ast_event_get_type(), AST_EVENT_IE_CONTEXT, AST_EVENT_IE_EID, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_OLDMSGS, AST_EVENT_MWI, ast_log, ast_publish_mwi_state_full(), ast_strlen_zero(), context, ast_eid::eid, LOG_WARNING, mailbox, and NULL.

◆ publish_to_corosync()

static void publish_to_corosync ( struct stasis_message message)
static

Definition at line 571 of file res_corosync.c.

572 {
573  struct ast_event *event;
574  struct ast_eid *event_eid;
575 
577  if (!event) {
578  return;
579  }
580 
581  event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
582  if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
583  /* If the event didn't originate from this server, don't send it back out. */
585  return;
586  }
587 
589  const struct ast_eid *eid;
590  char buf[128] = "";
591 
593  ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
594  ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
595  }
596 
599 }
static void publish_event_to_corosync(struct ast_event *event)
Definition: res_corosync.c:544
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.

References ast_eid_cmp(), ast_eid_default, ast_eid_to_str(), ast_event_destroy(), ast_event_get_ie_raw(), ast_event_get_type(), AST_EVENT_IE_EID, AST_EVENT_PING, ast_log, buf, ast_eid::eid, LOG_NOTICE, publish_event_to_corosync(), and stasis_message_to_event().

Referenced by dump_cache_cb(), and stasis_message_cb().

◆ send_cluster_notify()

static void send_cluster_notify ( void  )
static

Informs the cluster of our EID and our IP addresses.

Definition at line 737 of file res_corosync.c.

738 {
739  struct ast_event *event;
740  unsigned int node_id;
741  cs_error_t cs_err;
742  corosync_cfg_node_address_t corosync_addr;
743  int num_addrs = 0;
744  struct sockaddr *sa;
745  size_t sa_len;
746  char buf[128];
747  int res;
748 
750  ast_debug(5, "send_cluster_notify rdlock\n");
751 
752  if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
753  ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
754  return;
755  }
756 
757  if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758  ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
759  return;
760  }
761 
763  ast_debug(5, "send_cluster_notify unlock\n");
764  }
765 
766  sa = (struct sockaddr *)corosync_addr.address;
767  sa_len = (size_t)corosync_addr.address_length;
768  if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
769  ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770  gai_strerror(res), res);
771  return;
772  }
773 
780 }
@ AST_EVENT_IE_PLTYPE_UINT
Definition: event_defs.h:326
@ AST_EVENT_IE_PLTYPE_STR
Definition: event_defs.h:328

References ast_debug, AST_EVENT_CLUSTER_DISCOVERY, ast_event_destroy(), AST_EVENT_IE_END, AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_STR, AST_EVENT_IE_PLTYPE_UINT, ast_event_new(), ast_log, ast_rwlock_tryrdlock, ast_rwlock_unlock, buf, cfg_handle, init_cpg_lock, LOG_WARNING, NULL, and publish_event_to_corosync().

Referenced by dispatch_thread_handler(), and publish_cluster_discovery_to_stasis().

◆ set_event()

static int set_event ( const char *  event_type,
int  pubsub 
)
static

Definition at line 1153 of file res_corosync.c.

1154 {
1155  unsigned int i;
1156 
1157  for (i = 0; i < ARRAY_LEN(event_types); i++) {
1158  if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1159  continue;
1160  }
1161 
1162  switch (pubsub) {
1163  case PUBLISH:
1164  event_types[i].publish = 1;
1165  break;
1166  case SUBSCRIBE:
1167  event_types[i].subscribe = 1;
1168  break;
1169  }
1170 
1171  break;
1172  }
1173 
1174  return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1175 }

References ARRAY_LEN, event_types, name, PUBLISH, and SUBSCRIBE.

Referenced by load_general_config().

◆ stasis_message_cb()

static void stasis_message_cb ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 601 of file res_corosync.c.

602 {
603  if (!message) {
604  return;
605  }
606 
608 }

References publish_to_corosync().

Referenced by load_general_config().

◆ STASIS_MESSAGE_TYPE_DEFN_LOCAL()

STASIS_MESSAGE_TYPE_DEFN_LOCAL ( corosync_ping_message_type  ,
to_event = corosync_ping_to_event 
)

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 1433 of file res_corosync.c.

1434 {
1436 
1437  cleanup_module();
1438 
1439  return 0;
1440 }
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30

References ARRAY_LEN, ast_cli_unregister_multiple(), cleanup_module(), and corosync_cli.

Variable Documentation

◆ alert_pipe

int alert_pipe[2]

◆ cache_fn

struct stasis_cache*(* cache_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), and cpg_confchg_cb().

◆ cfg_callbacks

corosync_cfg_callbacks_t cfg_callbacks
static

Definition at line 295 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cfg_handle

corosync_cfg_handle_t cfg_handle
static

◆ corosync_aggregate_topic

struct stasis_topic* corosync_aggregate_topic
static

The internal topic used for message forwarding and pings.

Definition at line 68 of file res_corosync.c.

Referenced by cleanup_module(), corosync_topic(), and load_module().

◆ corosync_cli

struct ast_cli_entry corosync_cli[]
static
Initial value:
= {
{ .handler = corosync_show_config , .summary = "Show configuration" ,},
{ .handler = corosync_show_members , .summary = "Show cluster members" ,},
{ .handler = corosync_ping , .summary = "Send a test ping to the cluster" ,},
}
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
Definition: res_corosync.c:965

Definition at line 1094 of file res_corosync.c.

Referenced by load_module(), and unload_module().

◆ corosync_node_joined

int corosync_node_joined = 0
static

Join to corosync.

Definition at line 62 of file res_corosync.c.

Referenced by cleanup_module(), dispatch_thread_handler(), load_module(), and publish_event_to_corosync().

◆ cpg_callbacks

cpg_callbacks_t cpg_callbacks
static
Initial value:
= {
.cpg_deliver_fn = cpg_deliver_cb,
.cpg_confchg_fn = cpg_confchg_cb,
}
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
Definition: res_corosync.c:484
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: res_corosync.c:647

Definition at line 466 of file res_corosync.c.

Referenced by dispatch_thread_handler(), and load_module().

◆ cpg_handle

cpg_handle_t cpg_handle
static

◆ 

struct { ... } dispatch_thread
Initial value:
= {
.alert_pipe = { -1, -1 },
}

Referenced by cleanup_module(), dispatch_thread_handler(), and load_module().

◆ 

struct { ... } event_types[]

◆ event_types_lock

ast_rwlock_t event_types_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

Definition at line 51 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), cpg_deliver_cb(), and load_general_config().

◆ id

pthread_t id

Definition at line 275 of file res_corosync.c.

Referenced by corosync_node_cmp_fn(), and corosync_node_hash_fn().

◆ init_cpg_lock

ast_rwlock_t init_cpg_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

◆ message_type_fn

struct stasis_message_type*(* message_type_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().

◆ name

const char* name

◆ nodes

struct ao2_container* nodes
static

All the nodes that we're aware of.

Definition at line 65 of file res_corosync.c.

Referenced by AST_TEST_DEFINE(), cleanup_module(), cpg_confchg_cb(), load_module(), and publish_cluster_discovery_to_stasis().

◆ publish

unsigned char publish

◆ publish_default

unsigned char publish_default

Definition at line 242 of file res_corosync.c.

◆ publish_to_stasis

void(* publish_to_stasis) (struct ast_event *) ( struct ast_event )

Definition at line 248 of file res_corosync.c.

◆ stasis_router

struct stasis_message_router* stasis_router
static

Our Stasis Message Bus API message router.

Definition at line 71 of file res_corosync.c.

Referenced by cleanup_module(), load_general_config(), and load_module().

◆ stop

unsigned int stop

Definition at line 277 of file res_corosync.c.

◆ sub

struct stasis_forward* sub

Definition at line 240 of file res_corosync.c.

Referenced by _skinny_show_lines(), acf_channel_read(), activatesub(), add_header_offhook(), add_mwi_datastore(), add_sdp(), allocate_body_part(), allocate_subscription(), analog_dial_digits(), analog_get_sub_fd(), analog_new_ast_channel(), analog_play_tone(), analog_set_inthreeway(), analog_set_linear_mode(), ao2_weakproxy_subscribe(), ao2_weakproxy_unsubscribe(), ast_channel_connected_line_sub(), ast_channel_redirecting_sub(), ast_mwi_subscribe_pool(), ast_mwi_subscriber_data(), ast_mwi_subscriber_subscription(), ast_mwi_subscriber_topic(), ast_mwi_unsubscribe(), ast_mwi_unsubscribe_and_join(), ast_sip_create_subscription(), ast_sip_subscription_destroy(), ast_sip_subscription_get_body_subtype(), ast_sip_subscription_get_body_type(), ast_sip_subscription_get_dialog(), ast_sip_subscription_get_endpoint(), ast_sip_subscription_get_header(), ast_sip_subscription_get_local_uri(), ast_sip_subscription_get_remote_uri(), ast_sip_subscription_get_resource_name(), ast_sip_subscription_get_serializer(), ast_sip_subscription_get_sip_uri(), ast_sip_subscription_is_terminated(), ast_sip_subscription_notify(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), attempt_transfer(), bridge_subscription_change_handler(), build_body_part(), build_gateway(), build_rlmi_body(), caching_topic_exec(), channel_to_session(), cleanup_module(), close_call(), close_client(), config_device(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), create_unsolicited_mwi_subscriptions(), create_virtual_subscriptions(), dahdievent_to_analogevent(), delete_device(), destroy_endpoint(), destroy_rtp(), destroy_subscription(), device_state_cb(), device_state_subscription_create(), device_state_subscription_destroy(), device_to_json_cb(), dialandactivatesub(), discard_call(), dispatch_exec_async(), dispatch_exec_sync(), dispatch_message(), dump_cmd_queues(), dumpsub(), endpoint_subscription_change(), exten_state_subscription_destructor(), find_and_retrans(), find_command(), find_rtp_port(), find_subchannel_and_lock(), find_subchannel_by_instance_reference(), find_subchannel_by_name(), find_subchannel_by_reference(), generate_content_id_hdr(), generate_initial_notify(), generate_list_body(), generic_agent_devstate_cb(), get_devicestate(), get_exten_state_sub(), get_notify_data(), get_or_create_subscription(), get_sub(), get_sub_holding(), get_subscription(), handle_call_incoming(), handle_call_outgoing(), handle_callforward_button(), handle_enbloc_call_message(), handle_hd_hf(), handle_hold_button(), handle_key_fav(), handle_keypad_button_message(), handle_offhook_message(), handle_onhook_message(), handle_open_receive_channel_ack_message(), handle_request(), handle_response(), handle_soft_key_event_message(), handle_stimulus_message(), handle_transfer_button(), handle_validate(), has_destination_cb(), internal_stasis_subscribe(), is_app_subscribed(), is_subscribed_device_state(), key_call(), key_dial_page(), load_general_config(), load_module(), message_sink_cb(), message_subscription_alloc(), message_subscription_dtor(), message_subscription_hash_cb(), messaging_app_subscribe_endpoint(), messaging_app_unsubscribe_endpoint(), messaging_subscription_cmp(), mgcp_alloc_pktcgate(), mgcp_answer(), mgcp_call(), mgcp_fixup(), mgcp_get_codec(), mgcp_hangup(), mgcp_indicate(), mgcp_new(), mgcp_pktcgate_open(), mgcp_pktcgate_remove(), mgcp_postrequest(), mgcp_prune_realtime_gateway(), mgcp_queue_control(), mgcp_queue_frame(), mgcp_queue_hangup(), mgcp_read(), mgcp_request(), mgcp_rtp_read(), mgcp_senddigit_begin(), mgcp_senddigit_end(), mgcp_set_owner(), mgcp_set_rtp_peer(), mgcp_ss(), mgcp_write(), mgcpsock_read(), mwi_create_subscription(), mwi_ds_destroy(), mwi_event_cb(), mwi_get_notify_data(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), mwi_on_aor(), mwi_startup_event_cb(), mwi_stasis_cb(), mwi_subscribe_all(), mwi_subscribe_single(), mwi_subscription_alloc(), mwi_subscription_destructor(), mwi_subscription_established(), mwi_subscription_shutdown(), mwi_to_ami(), my_conf_add(), my_conf_del(), my_dial_digits(), my_get_sub_fd(), my_is_dialing(), my_new_analog_ast_channel(), my_set_inthreeway(), my_set_linear_mode(), my_wink(), park_announce_update_cb(), parker_update_cb(), process_opcode(), process_sdp(), publish_msg(), pubsub_on_rx_notify(), push_callinfo(), queue_bridge_cb(), queue_channel_cb(), rcv_mac_addr(), refer_progress_bridge(), refer_progress_notify(), refer_progress_on_evsub_state(), remove_device_state_subscription(), resend_response(), router_dispatch(), send_callerid_screen(), send_callinfo(), send_device_state(), send_mwi_notify(), send_request(), send_response(), send_start_rtp(), send_subscription_subscribe(), send_subscription_unsubscribe(), send_unsolicited_mwi_notify(), send_unsolicited_mwi_notify_to_contact(), set_state_terminated(), setsubstate(), shutdown_subscriptions(), skinny_answer(), skinny_autoanswer_cb(), skinny_call(), skinny_cfwd_cb(), skinny_dialer(), skinny_dialer_cb(), skinny_fixup(), skinny_hangup(), skinny_indicate(), skinny_locksub(), skinny_new(), skinny_newcall(), skinny_read(), skinny_request(), skinny_rtp_read(), skinny_sched_add(), skinny_sched_del(), skinny_senddigit_end(), skinny_set_owner(), skinny_transfer_attended(), skinny_transfer_blind(), skinny_unlocksub(), skinny_write(), start_rtp(), startup_event_cb(), stasis_publish_sync(), stasis_state_add_subscriber(), stasis_state_subscribe_pool(), stasis_state_subscriber_data(), stasis_state_subscriber_id(), stasis_state_subscriber_subscription(), stasis_state_subscriber_topic(), stasis_state_unsubscribe(), stasis_state_unsubscribe_and_join(), stasis_subscription_final_message(), stasis_subscription_is_subscribed(), stasis_subscription_uniqueid(), stasis_unsubscribe(), sub_cleanup(), sub_hold(), sub_start_silence(), sub_stop_silence(), sub_subscription_change_handler(), sub_unhold(), subscribe_device_state(), subscriber_dtor(), subscript(), subscription_dtor(), subscription_invoke(), subscription_persistence_event_cb(), subscription_shutdown(), subscriptions_create(), to_ami(), topic_add_subscription(), topic_remove_subscription(), transfer_call_step1(), transfer_cancel_step2(), transfer_refer(), transmit_closereceivechannel(), transmit_connect(), transmit_connect_with_sdp(), transmit_connection_del(), transmit_modify_request(), transmit_modify_with_sdp(), transmit_notify_request(), transmit_notify_request_with_callerid(), transmit_response(), transmit_startmediatransmission(), transmit_stopmediatransmission(), unalloc_sub(), unistim_alloc_sub(), unistim_answer(), unistim_call(), unistim_do_senddigit(), unistim_free_sub(), unistim_hangup(), unistim_hangup_clean(), unistim_indicate(), unistim_new(), unistim_read(), unistim_request(), unistim_rtp_read(), unistim_set_owner(), unistim_show_info(), unistim_sp(), unistim_ss(), unistim_unalloc_sub(), unistim_write(), unload_module(), unsubscribe_device_state(), update_connectedline(), xfer_client_on_evsub_state(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

◆ subscribe

unsigned char subscribe

◆ subscribe_default

unsigned char subscribe_default

Definition at line 244 of file res_corosync.c.

◆ topic_fn

struct stasis_topic*(* topic_fn) (void) ( void  )

Definition at line 244 of file res_corosync.c.

Referenced by cleanup_module(), cpg_confchg_cb(), and load_general_config().