Asterisk - The Open Source Telephony Project GIT-master-66c01d8
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Macros Modules Pages
stasis_endpoints.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis endpoint API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
33#include "asterisk/stasis.h"
35
36/*** DOCUMENTATION
37 <managerEvent language="en_US" name="PeerStatus">
38 <managerEventInstance class="EVENT_FLAG_SYSTEM">
39 <since>
40 <version>12.0.0</version>
41 </since>
42 <synopsis>Raised when the state of a peer changes.</synopsis>
43 <syntax>
44 <parameter name="ChannelType">
45 <para>The channel technology of the peer.</para>
46 </parameter>
47 <parameter name="Peer">
48 <para>The name of the peer (including channel technology).</para>
49 </parameter>
50 <parameter name="PeerStatus">
51 <para>New status of the peer.</para>
52 <enumlist>
53 <enum name="Unknown"/>
54 <enum name="Registered"/>
55 <enum name="Unregistered"/>
56 <enum name="Rejected"/>
57 <enum name="Lagged"/>
58 </enumlist>
59 </parameter>
60 <parameter name="Cause">
61 <para>The reason the status has changed.</para>
62 </parameter>
63 <parameter name="Address">
64 <para>New address of the peer.</para>
65 </parameter>
66 <parameter name="Port">
67 <para>New port for the peer.</para>
68 </parameter>
69 <parameter name="Time">
70 <para>Time it takes to reach the peer and receive a response.</para>
71 </parameter>
72 </syntax>
73 </managerEventInstance>
74 </managerEvent>
75 <managerEvent language="en_US" name="ContactStatus">
76 <managerEventInstance class="EVENT_FLAG_SYSTEM">
77 <since>
78 <version>13.5.0</version>
79 </since>
80 <synopsis>Raised when the state of a contact changes.</synopsis>
81 <syntax>
82 <parameter name="URI">
83 <para>This contact's URI.</para>
84 </parameter>
85 <parameter name="ContactStatus">
86 <para>New status of the contact.</para>
87 <enumlist>
88 <enum name="Unknown"/>
89 <enum name="Unreachable"/>
90 <enum name="Reachable"/>
91 <enum name="Unqualified"/>
92 <enum name="Removed"/>
93 <enum name="Updated"/>
94 </enumlist>
95 </parameter>
96 <parameter name="AOR">
97 <para>The name of the associated aor.</para>
98 </parameter>
99 <parameter name="EndpointName">
100 <para>The name of the associated endpoint.</para>
101 </parameter>
102 <parameter name="RoundtripUsec">
103 <para>The RTT measured during the last qualify.</para>
104 </parameter>
105 </syntax>
106 </managerEventInstance>
107 </managerEvent>
108***/
109
111
113{
114 return endpoint_cache_all;
115}
116
118{
120}
121
123{
125}
126
128{
130}
131
133
135{
136 struct ast_endpoint_blob *obj = stasis_message_data(msg);
137 RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
138 const char *value;
139
140 /* peer_status is the only *required* thing */
141 if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
142 return NULL;
143 }
144 ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
145
146 if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
147 ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
148 }
149 if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
150 ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
151 }
152 if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
153 ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
154 }
155 if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
156 ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
157 }
158
160 "ChannelType: %s\r\n"
161 "Peer: %s/%s\r\n"
162 "%s",
163 obj->snapshot->tech,
164 obj->snapshot->tech,
165 obj->snapshot->resource,
166 ast_str_buffer(peerstatus_event_string));
167}
168
169static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
170{
171 struct ast_endpoint_blob *obj = stasis_message_data(msg);
172 struct ast_json *json_endpoint;
173 struct ast_json *json_peer;
174 struct ast_json *json_final;
175 const struct timeval *tv = stasis_message_timestamp(msg);
176
177 json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
178 if (!json_endpoint) {
179 return NULL;
180 }
181
182 json_peer = ast_json_object_create();
183 if (!json_peer) {
184 ast_json_unref(json_endpoint);
185 return NULL;
186 }
187
188 /* Copy all fields from the blob */
189 ast_json_object_update(json_peer, obj->blob);
190
191 json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
192 "type", "PeerStatusChange",
193 "timestamp", ast_json_timeval(*tv, NULL),
194 "endpoint", json_endpoint,
195 "peer", json_peer);
196 if (!json_final) {
197 ast_json_unref(json_endpoint);
198 ast_json_unref(json_peer);
199 }
200
201 return json_final;
202}
203
206 .to_json = peerstatus_to_json,
207);
208
210{
211 struct ast_endpoint_blob *obj = stasis_message_data(msg);
212 RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
213 const char *value;
214
215 if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
216 return NULL;
217 }
218 ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
219
220 if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
221 return NULL;
222 }
223 ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
224
225 if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
226 return NULL;
227 }
228 ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
229
230 if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
231 return NULL;
232 }
233 ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
234
235 if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
236 ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
237 }
238
239 return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
240 "%s", ast_str_buffer(contactstatus_event_string));
241}
242
243static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
244{
245 struct ast_endpoint_blob *obj = stasis_message_data(msg);
246 struct ast_json *json_endpoint;
247 struct ast_json *json_final;
248 const char *rtt;
249 const struct timeval *tv = stasis_message_timestamp(msg);
250
251 json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
252 if (!json_endpoint) {
253 return NULL;
254 }
255
256 /* The roundtrip time is optional. */
257 rtt = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec"));
258 if (!ast_strlen_zero(rtt)) {
259 json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
260 "type", "ContactStatusChange",
261 "timestamp", ast_json_timeval(*tv, NULL),
262 "endpoint", json_endpoint,
263 "contact_info",
264 "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
265 "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
266 "contact_status")),
267 "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
268 "roundtrip_usec", rtt);
269 } else {
270 json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s } } ",
271 "type", "ContactStatusChange",
272 "timestamp", ast_json_timeval(*tv, NULL),
273 "endpoint", json_endpoint,
274 "contact_info",
275 "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
276 "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
277 "contact_status")),
278 "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")));
279 }
280 if (!json_final) {
281 ast_json_unref(json_endpoint);
282 }
283
284 return json_final;
285}
286
289 .to_json = contactstatus_to_json
290);
291
292static void endpoint_blob_dtor(void *obj)
293{
294 struct ast_endpoint_blob *event = obj;
295 ao2_cleanup(event->snapshot);
296 ast_json_unref(event->blob);
297}
298
300 struct stasis_message_type *type, struct ast_json *blob)
301{
302 struct ast_endpoint_blob *obj;
303 struct stasis_message *msg;
304
305 if (!type) {
306 return NULL;
307 }
308 if (!blob) {
309 blob = ast_json_null();
310 }
311
312 if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
313 return NULL;
314 }
315
316 if (endpoint) {
317 if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
318 ao2_ref(obj, -1);
319
320 return NULL;
321 }
322 }
323
324 obj->blob = ast_json_ref(blob);
325 msg = stasis_message_create(type, obj);
326 ao2_ref(obj, -1);
327
328 return msg;
329}
330
332 struct ast_json *blob)
333{
334 struct stasis_message *message;
335
336 if (!blob) {
337 return;
338 }
339
340 message = ast_endpoint_blob_create(endpoint, type, blob);
341 if (message) {
343 ao2_ref(message, -1);
344 }
345}
346
348 const char *name)
349{
350 char *id = NULL;
351 struct stasis_message *msg;
352 struct ast_endpoint_snapshot *snapshot;
353
354 if (ast_strlen_zero(name)) {
355 ast_asprintf(&id, "%s", tech);
356 } else {
357 ast_asprintf(&id, "%s/%s", tech, name);
358 }
359 if (!id) {
360 return NULL;
361 }
363
365 ast_free(id);
366 if (!msg) {
367 return NULL;
368 }
369
370 snapshot = stasis_message_data(msg);
371 ast_assert(snapshot != NULL);
372
373 ao2_ref(snapshot, +1);
374 ao2_ref(msg, -1);
375
376 return snapshot;
377}
378
379/*!
380 * \brief Callback extract a unique identity from a snapshot message.
381 *
382 * This identity is unique to the underlying object of the snapshot, such as the
383 * UniqueId field of a channel.
384 *
385 * \param message Message to extract id from.
386 * \return String representing the snapshot's id.
387 * \retval NULL if the message_type of the message isn't a handled snapshot.
388 * \since 12
389 */
391{
392 struct ast_endpoint_snapshot *snapshot;
393
395 return NULL;
396 }
397
398 snapshot = stasis_message_data(message);
399
400 return snapshot->id;
401}
402
403
405 const struct ast_endpoint_snapshot *snapshot,
406 const struct stasis_message_sanitizer *sanitize)
407{
408 struct ast_json *json;
409 struct ast_json *channel_array;
410 int i;
411
412 json = ast_json_pack("{s: s, s: s, s: s, s: []}",
413 "technology", snapshot->tech,
414 "resource", snapshot->resource,
415 "state", ast_endpoint_state_to_string(snapshot->state),
416 "channel_ids");
417
418 if (json == NULL) {
419 return NULL;
420 }
421
422 if (snapshot->max_channels != -1) {
423 int res = ast_json_object_set(json, "max_channels",
425 if (res != 0) {
426 ast_json_unref(json);
427
428 return NULL;
429 }
430 }
431
432 channel_array = ast_json_object_get(json, "channel_ids");
433 ast_assert(channel_array != NULL);
434 for (i = 0; i < snapshot->num_channels; ++i) {
435 int res;
436
437 if (sanitize && sanitize->channel_id
438 && sanitize->channel_id(snapshot->channel_ids[i])) {
439 continue;
440 }
441
442 res = ast_json_array_append(channel_array,
443 ast_json_string_create(snapshot->channel_ids[i]));
444 if (res != 0) {
445 ast_json_unref(json);
446
447 return NULL;
448 }
449 }
450
451 return json;
452}
453
455{
459
462}
463
465{
466 int res = 0;
468
471 if (!endpoint_cache_all) {
472 return -1;
473 }
474
478
479 return res;
480}
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char type[]
Definition: chan_ooh323.c:109
const char * ast_endpoint_state_to_string(enum ast_endpoint_state state)
Returns a string representation of the given endpoint state.
static const char name[]
Definition: format_mp3.c:68
void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type, struct ast_json *blob)
Creates and publishes a ast_endpoint_blob message.
struct stasis_topic * ast_endpoint_topic_all(void)
Topic for all endpoint related messages.
struct stasis_message_type * ast_endpoint_contact_state_type(void)
Message type for endpoint contact state changes.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct stasis_cp_all * ast_endpoint_cache_all(void)
struct ast_endpoint_snapshot * ast_endpoint_snapshot_create(struct ast_endpoint *endpoint)
Create a snapshot of an endpoint.
struct ast_endpoint_snapshot * ast_endpoint_latest_snapshot(const char *tech, const char *name)
Retrieve the most recent snapshot for the endpoint with the given name.
struct stasis_message_type * ast_endpoint_state_type(void)
Message type for endpoint state changes.
struct stasis_message * ast_endpoint_blob_create(struct ast_endpoint *endpoint, struct stasis_message_type *type, struct ast_json *blob)
Creates a ast_endpoint_blob message.
struct stasis_cache * ast_endpoint_cache(void)
Backend cache for ast_endpoint_topic_all_cached().
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
struct ast_json * ast_json_null(void)
Get the JSON null value.
Definition: json.c:248
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition: json.c:378
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct ast_json * ast_json_integer_create(intmax_t value)
Create a JSON integer.
Definition: json.c:327
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
int ast_json_object_update(struct ast_json *object, struct ast_json *other)
Update object with all of the fields of other.
Definition: json.c:426
#define EVENT_FLAG_SYSTEM
Definition: manager.h:75
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10230
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
#define NULL
Definition: resample.c:96
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1538
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
static const char * endpoint_snapshot_get_id(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type)
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
static void endpoint_blob_dtor(void *obj)
static struct ast_json * contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
static void endpoints_stasis_cleanup(void)
static struct ast_manager_event_blob * contactstatus_to_ami(struct stasis_message *msg)
static struct ast_json * peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
static struct stasis_cp_all * endpoint_cache_all
static struct ast_manager_event_blob * peerstatus_to_ami(struct stasis_message *msg)
int ast_endpoint_stasis_init(void)
Initialization function for endpoint stasis support.
Endpoint abstractions.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
char * ast_tech_to_upper(char *dev_str)
Convert the tech portion of a device string to upper case.
Definition: strings.h:1236
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
Blob of data associated with an endpoint.
struct ast_endpoint_snapshot * snapshot
struct ast_json * blob
A snapshot of an endpoint's state.
const ast_string_field tech
enum ast_endpoint_state state
const ast_string_field id
const ast_string_field resource
Abstract JSON element (object, array, string, int, ...).
Struct containing info for an AMI event to send out.
Definition: manager.h:503
Support for dynamic strings.
Definition: strings.h:623
Definition: astman.c:222
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
int(* channel_id)(const char *channel_id)
Callback which determines whether a channel should be sanitized from a message based on the channel's...
Definition: stasis.h:210
int value
Definition: syslog.c:37
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739