Asterisk - The Open Source Telephony Project GIT-master-1f1c5bb
main/endpoints.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2013, Digium, Inc.
5 *
6 * David M. Lee, II <dlee@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Asterisk endpoint API.
22 *
23 * \author David M. Lee, II <dlee@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/astobj2.h"
33#include "asterisk/endpoints.h"
34#include "asterisk/stasis.h"
39#include "asterisk/_private.h"
40
41/*! Buckets for endpoint->channel mappings. Keep it prime! */
42#define ENDPOINT_CHANNEL_BUCKETS 127
43
44/*! Buckets for endpoint hash. Keep it prime! */
45#define ENDPOINT_BUCKETS 127
46
47/*! Buckets for technology endpoints. */
48#define TECH_ENDPOINT_BUCKETS 11
49
50static struct ao2_container *endpoints;
51
53
56 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
57 AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
58 AST_STRING_FIELD(id); /*!< tech/resource id */
59 );
60 /*! Endpoint's current state */
62 /*!
63 * \brief Max channels for this endpoint. -1 means unlimited or unknown.
64 *
65 * Note that this simply documents the limits of an endpoint, and does
66 * nothing to try to enforce the limit.
67 */
69 /*! Topic for this endpoint's messages */
71 /*! Router for handling this endpoint's messages */
73 /*! ast_str_container of channels associated with this endpoint */
75 /*! Forwarding subscription from an endpoint to its tech endpoint */
77};
78
81
83{
84 struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
85
86 if (!endpoint) {
87 endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
88 }
89
90 return endpoint;
91}
92
94{
95 if (!endpoint) {
97 }
98 return stasis_cp_single_topic(endpoint->topics);
99}
100
102{
103 if (!endpoint) {
105 }
106 return stasis_cp_single_topic_cached(endpoint->topics);
107}
108
110{
111 switch (state) {
113 return "unknown";
115 return "offline";
117 return "online";
118 }
119 return "?";
120}
121
122static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
123{
124 RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
126
127 ast_assert(endpoint != NULL);
128 ast_assert(endpoint->topics != NULL);
129
131 return;
132 }
133
134 snapshot = ast_endpoint_snapshot_create(endpoint);
135 if (!snapshot) {
136 return;
137 }
139 if (!message) {
140 return;
141 }
143}
144
145static void endpoint_dtor(void *obj)
146{
147 struct ast_endpoint *endpoint = obj;
148
149 /* The router should be shut down already */
151 ao2_cleanup(endpoint->router);
152 endpoint->router = NULL;
153
155 endpoint->topics = NULL;
156
157 ao2_cleanup(endpoint->channel_ids);
158 endpoint->channel_ids = NULL;
159
161}
162
163
165 struct ast_channel *chan)
166{
167 ast_assert(chan != NULL);
168 ast_assert(endpoint != NULL);
170
171 ast_channel_forward_endpoint(chan, endpoint);
172
173 ao2_lock(endpoint);
175 ao2_unlock(endpoint);
176
178
179 return 0;
180}
181
182/*! \brief Handler for channel snapshot update */
183static void endpoint_cache_clear(void *data,
184 struct stasis_subscription *sub,
185 struct stasis_message *message)
186{
187 struct ast_endpoint *endpoint = data;
189
190 /* Only when the channel is dead do we remove it */
191 if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
192 return;
193 }
194
195 ast_assert(endpoint != NULL);
196
197 ao2_lock(endpoint);
198 ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid);
199 ao2_unlock(endpoint);
201}
202
203static void endpoint_subscription_change(void *data,
204 struct stasis_subscription *sub,
205 struct stasis_message *message)
206{
207 struct stasis_endpoint *endpoint = data;
208
210 ao2_cleanup(endpoint);
211 }
212}
213
214static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
215{
216 RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
217 RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
218 int r = 0;
219
220 /* Get/create the technology endpoint */
222 tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
223 if (!tech_endpoint) {
224 tech_endpoint = endpoint_internal_create(tech, NULL);
225 if (!tech_endpoint) {
226 return NULL;
227 }
228 }
229 }
230
231 endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
232 if (!endpoint) {
233 return NULL;
234 }
235
236 endpoint->max_channels = -1;
237 endpoint->state = AST_ENDPOINT_UNKNOWN;
238
239 if (ast_string_field_init(endpoint, 80) != 0) {
240 return NULL;
241 }
242 ast_string_field_set(endpoint, tech, tech);
244 ast_string_field_build(endpoint, id, "%s%s%s",
245 tech,
246 !ast_strlen_zero(resource) ? "/" : "",
247 S_OR(resource, ""));
248
249 /* All access to channel_ids should be covered by the endpoint's
250 * lock; no extra lock needed. */
251 endpoint->channel_ids = ast_str_container_alloc_options(
253 if (!endpoint->channel_ids) {
254 return NULL;
255 }
256
258 char *topic_name;
259 int ret;
260
261 ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
262 if (ret < 0) {
263 return NULL;
264 }
265
267 topic_name);
268 ast_free(topic_name);
269 if (!endpoint->topics) {
270 return NULL;
271 }
274
275 endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
276 if (!endpoint->router) {
277 return NULL;
278 }
279 r |= stasis_message_router_add(endpoint->router,
281 endpoint);
282 r |= stasis_message_router_add(endpoint->router,
284 endpoint);
285 if (r) {
286 return NULL;
287 }
288
289 endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
290 stasis_cp_single_topic(tech_endpoint->topics));
291
293 ao2_link(endpoints, endpoint);
294 } else {
295 char *topic_name;
296 int ret;
297
298 ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
299 if (ret < 0) {
300 return NULL;
301 }
302
304 topic_name);
305 ast_free(topic_name);
306 if (!endpoint->topics) {
307 return NULL;
308 }
311
312 ao2_link(tech_endpoints, endpoint);
313 }
314
315 ao2_ref(endpoint, +1);
316 return endpoint;
317}
318
319struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
320{
321 if (ast_strlen_zero(tech)) {
322 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
323 return NULL;
324 }
325
327 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
328 return NULL;
329 }
330
332}
333
335{
336 RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
337
339 return NULL;
340 }
341
342 snapshot = ast_endpoint_snapshot_create(endpoint);
343 if (!snapshot) {
344 return NULL;
345 }
346
348}
349
351{
352 RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
353
354 if (endpoint == NULL) {
355 return;
356 }
357
358 ao2_unlink(endpoints, endpoint);
359 endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
360
361 clear_msg = create_endpoint_snapshot_message(endpoint);
362 if (clear_msg) {
365 if (message) {
367 }
368 }
369
370 /* Bump refcount to hold on to the router */
371 ao2_ref(endpoint->router, +1);
373}
374
375const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
376{
377 if (!endpoint) {
378 return NULL;
379 }
380 return endpoint->tech;
381}
382
383const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
384{
385 if (!endpoint) {
386 return NULL;
387 }
388 return endpoint->resource;
389}
390
391const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
392{
393 if (!endpoint) {
394 return NULL;
395 }
396 return endpoint->id;
397}
398
400{
401 if (!endpoint) {
403 }
404 return endpoint->state;
405}
406
409{
410 ast_assert(endpoint != NULL);
412
413 ao2_lock(endpoint);
414 endpoint->state = state;
415 ao2_unlock(endpoint);
417}
418
420 int max_channels)
421{
422 ast_assert(endpoint != NULL);
424
425 ao2_lock(endpoint);
426 endpoint->max_channels = max_channels;
427 ao2_unlock(endpoint);
429}
430
431static void endpoint_snapshot_dtor(void *obj)
432{
433 struct ast_endpoint_snapshot *snapshot = obj;
434 int channel;
435
436 ast_assert(snapshot != NULL);
437
438 for (channel = 0; channel < snapshot->num_channels; channel++) {
439 ao2_ref(snapshot->channel_ids[channel], -1);
440 }
441
443}
444
446 struct ast_endpoint *endpoint)
447{
448 struct ast_endpoint_snapshot *snapshot;
449 int channel_count;
450 struct ao2_iterator i;
451 void *obj;
452 SCOPED_AO2LOCK(lock, endpoint);
453
454 ast_assert(endpoint != NULL);
456
457 channel_count = ao2_container_count(endpoint->channel_ids);
458
459 snapshot = ao2_alloc_options(
460 sizeof(*snapshot) + channel_count * sizeof(char *),
463
464 if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
465 ao2_cleanup(snapshot);
466 return NULL;
467 }
468
469 ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
470 endpoint->resource);
471 ast_string_field_set(snapshot, tech, endpoint->tech);
472 ast_string_field_set(snapshot, resource, endpoint->resource);
473
474 snapshot->state = endpoint->state;
475 snapshot->max_channels = endpoint->max_channels;
476
477 i = ao2_iterator_init(endpoint->channel_ids, 0);
478 while ((obj = ao2_iterator_next(&i))) {
479 /* The reference is kept so the channel id does not go away until the snapshot is gone */
480 snapshot->channel_ids[snapshot->num_channels++] = obj;
481 }
483
484 return snapshot;
485}
486
487static void endpoint_cleanup(void)
488{
490 endpoints = NULL;
491
494}
495
497{
499
501 ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
502 if (!endpoints) {
503 return -1;
504 }
505
507 TECH_ENDPOINT_BUCKETS, ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
508 if (!tech_endpoints) {
509 return -1;
510 }
511
512 return 0;
513}
Prototypes for public functions only of internal interest,.
ast_mutex_t lock
Definition: app_sla.c:331
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_free(a)
Definition: astmm.h:180
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_log
Definition: astobj2.c:42
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
#define OBJ_KEY
Definition: astobj2.h:1151
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition: astobj2.h:2048
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition: astobj2.h:2032
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
enum cc_state state
Definition: ccss.c:393
const char * ast_channel_uniqueid(const struct ast_channel *chan)
int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint)
Forward channel stasis messages to the given endpoint.
@ AST_FLAG_DEAD
Definition: channel.h:1045
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
Endpoint abstractions.
ast_endpoint_state
Valid states for an endpoint.
Definition: endpoints.h:51
@ AST_ENDPOINT_OFFLINE
Definition: endpoints.h:55
@ AST_ENDPOINT_ONLINE
Definition: endpoints.h:57
@ AST_ENDPOINT_UNKNOWN
Definition: endpoints.h:53
struct stasis_topic * ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_topic * ast_endpoint_topic_all(void)
Topic for all endpoint related messages.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct stasis_cp_all * ast_endpoint_cache_all(void)
struct ast_endpoint_snapshot * ast_endpoint_snapshot_create(struct ast_endpoint *endpoint)
Create a snapshot of an endpoint.
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
#define LOG_ERROR
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
Updates the state of the given endpoint.
static struct ao2_container * tech_endpoints
static struct ao2_container * endpoints
#define ENDPOINT_CHANNEL_BUCKETS
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
static struct stasis_message * create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
#define ENDPOINT_BUCKETS
static void endpoint_dtor(void *obj)
const char * ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
Gets the technology of the given endpoint.
void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
Shutsdown an ast_endpoint.
static struct ast_endpoint * endpoint_internal_create(const char *tech, const char *resource)
static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int max_channels)
Updates the maximum number of channels an endpoint supports.
static void endpoint_cache_clear(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Handler for channel snapshot update.
int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan)
Adds a channel to the given endpoint.
enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
Gets the state of the given endpoint.
static void endpoint_cleanup(void)
int ast_endpoint_init(void)
Endpoint support initialization.
static void endpoint_subscription_change(void *data, struct stasis_subscription *sub, struct stasis_message *message)
const char * ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
Gets the resource name of the given endpoint.
#define TECH_ENDPOINT_BUCKETS
const char * ast_endpoint_state_to_string(enum ast_endpoint_state state)
Returns a string representation of the given endpoint state.
struct ast_endpoint * ast_endpoint_create(const char *tech, const char *resource)
Create an endpoint struct.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
static void endpoint_snapshot_dtor(void *obj)
struct stasis_forward * sub
Definition: res_corosync.c:240
#define NULL
Definition: resample.c:96
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
@ STASIS_SUBSCRIPTION_FILTER_SELECTIVE
Definition: stasis.h:297
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the 'one' side of the cache pattern.
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
Endpoint abstractions.
int stasis_message_router_is_done(struct stasis_message_router *router)
Returns whether router has received its final message.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
#define stasis_message_router_create_pool(topic)
Create a new message router object.
#define AST_DECLARE_STRING_FIELDS(field_list)
Declare the fields needed in a structure.
Definition: stringfields.h:341
#define AST_STRING_FIELD(name)
Declare a string field.
Definition: stringfields.h:303
#define ast_string_field_set(x, field, data)
Set a field to a simple string value.
Definition: stringfields.h:521
#define ast_string_field_init(x, size)
Initialize a field pool and fields.
Definition: stringfields.h:359
#define ast_string_field_build(x, field, fmt, args...)
Set a field to a complex (built) value.
Definition: stringfields.h:555
#define ast_string_field_free_memory(x)
free all memory - to be called before destroying the object
Definition: stringfields.h:374
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
struct ao2_container * ast_str_container_alloc_options(enum ao2_alloc_opts opts, int buckets)
Allocates a hash container for bare strings.
Definition: strings.c:200
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Structure representing a change of snapshot of channel state.
Main Channel structure associated with a channel.
A snapshot of an endpoint's state.
enum ast_endpoint_state state
struct ao2_container * channel_ids
struct stasis_cp_single * topics
const ast_string_field tech
enum ast_endpoint_state state
struct stasis_forward * tech_forward
const ast_string_field id
struct stasis_message_router * router
int max_channels
Max channels for this endpoint. -1 means unlimited or unknown.
const ast_string_field resource
Forwarding information.
Definition: stasis.c:1531
#define ast_test_flag(p, flag)
Definition: utils.h:63
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739