Asterisk - The Open Source Telephony Project GIT-master-f36a736
pjsip_transport_management.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2014, Digium, Inc.
5 *
6 * Joshua Colp <jcolp@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#include "asterisk.h"
20
21#include <signal.h>
22
23#include <pjsip.h>
24#include <pjsip_ua.h>
25
26#include "asterisk/res_pjsip.h"
27#include "asterisk/module.h"
28#include "asterisk/astobj2.h"
30
31/*! \brief Number of buckets for monitored transports */
32#define TRANSPORTS_BUCKETS 127
33
34#define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
35
36/*! \brief The keep alive packet to send */
37static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
38
39/*! \brief Global container of active transports */
40static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
41
42/*! \brief Scheduler context for timing out connections with no data received */
43static struct ast_sched_context *sched;
44
45/*! \brief Thread keeping things alive */
47
48/*! \brief The global interval at which to send keepalives */
49static unsigned int keepalive_interval;
50
51/*! \brief Structure for transport to be monitored */
53 /*! \brief The underlying PJSIP transport */
54 pjsip_transport *transport;
55 /*! \brief Non-zero if a PJSIP request was received */
57};
58
60{
61 pjsip_tpselector selector = {
62 .type = PJSIP_TPSELECTOR_TRANSPORT,
63 .u.transport = monitored->transport,
64 };
65
66 pjsip_tpmgr_send_raw(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()),
67 monitored->transport->key.type,
68 &selector,
69 NULL,
72 &monitored->transport->key.rem_addr,
73 pj_sockaddr_get_len(&monitored->transport->key.rem_addr),
74 NULL, NULL);
75}
76
77/*! \brief Thread which sends keepalives to all active connection-oriented transports */
78static void *keepalive_transport_thread(void *data)
79{
80 struct ao2_container *transports;
81 pj_thread_desc desc = { 0 };
82 pj_thread_t *thread;
83
84 if (pj_thread_register("Asterisk Keepalive Thread", desc, &thread) != PJ_SUCCESS) {
85 ast_log(LOG_ERROR, "Could not register keepalive thread with PJLIB, keepalives will not occur.\n");
86 return NULL;
87 }
88
89 transports = ao2_global_obj_ref(monitored_transports);
90 if (!transports) {
91 return NULL;
92 }
93
94 /*
95 * Once loaded this module just keeps on going as it is unsafe to stop
96 * and change the underlying callback for the transport manager.
97 */
98 while (keepalive_interval) {
99 struct ao2_iterator iter;
100 struct monitored_transport *monitored;
101
102 sleep(keepalive_interval);
103
104 /*
105 * We must use the iterator to avoid deadlock between the container lock
106 * and the pjproject transport manager group lock when sending
107 * the keepalive packet.
108 */
109 iter = ao2_iterator_init(transports, 0);
110 for (; (monitored = ao2_iterator_next(&iter)); ao2_ref(monitored, -1)) {
112 }
114 }
115
116 ao2_ref(transports, -1);
117 return NULL;
118}
119
121
123{
124 if (!pj_thread_is_registered()) {
125 pj_thread_t *thread;
126 pj_thread_desc *desc;
127
128 desc = ast_threadstorage_get(&desc_storage, sizeof(pj_thread_desc));
129 if (!desc) {
130 ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage.\n");
131 return -1;
132 }
133
134 pj_bzero(*desc, sizeof(*desc));
135
136 pj_thread_register("Transport Monitor", *desc, &thread);
137 }
138
139 return 0;
140}
141
142static struct monitored_transport *get_monitored_transport_by_name(const char *obj_name)
143{
144 struct ao2_container *transports;
145 struct monitored_transport *monitored = NULL;
146
147 transports = ao2_global_obj_ref(monitored_transports);
148 if (transports) {
149 monitored = ao2_find(transports, obj_name, OBJ_SEARCH_KEY);
150 }
151 ao2_cleanup(transports);
152
153 /* Caller is responsible for cleaning up reference */
154 return monitored;
155}
156
157static int idle_sched_cb(const void *data)
158{
159 char *obj_name = (char *) data;
160 struct monitored_transport *monitored;
161
163 ast_free(obj_name);
164 return 0;
165 }
166
167 monitored = get_monitored_transport_by_name(obj_name);
168 if (monitored) {
169 if (!monitored->sip_received) {
170 ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
171 monitored->transport->info, IDLE_TIMEOUT / 1000);
172 pjsip_transport_shutdown(monitored->transport);
173 }
174 ao2_ref(monitored, -1);
175 }
176
177 ast_free(obj_name);
178 return 0;
179}
180
181static int idle_sched_cleanup(const void *data)
182{
183 char *obj_name = (char *) data;
184 struct monitored_transport *monitored;
185
187 ast_free(obj_name);
188 return 0;
189 }
190
191 monitored = get_monitored_transport_by_name(obj_name);
192 if (monitored) {
193 pjsip_transport_shutdown(monitored->transport);
194 ao2_ref(monitored, -1);
195 }
196
197 ast_free(obj_name);
198 return 0;
199}
200
201/*! \brief Destructor for keepalive transport */
202static void monitored_transport_destroy(void *obj)
203{
204 struct monitored_transport *monitored = obj;
205
206 pjsip_transport_dec_ref(monitored->transport);
207}
208
209/*! \brief Callback invoked when transport changes occur */
210static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
211 const pjsip_transport_state_info *info)
212{
213 struct ao2_container *transports;
214
215 /* We only care about reliable transports */
216 if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
217 && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
218 && (transports = ao2_global_obj_ref(monitored_transports))) {
219 struct monitored_transport *monitored;
220
221 switch (state) {
222 case PJSIP_TP_STATE_CONNECTED:
223 monitored = ao2_alloc_options(sizeof(*monitored),
225 if (!monitored) {
226 break;
227 }
228 monitored->transport = transport;
229 pjsip_transport_add_ref(monitored->transport);
230
231 ao2_link(transports, monitored);
232
233 if (transport->dir == PJSIP_TP_DIR_INCOMING) {
234 char *obj_name = ast_strdup(transport->obj_name);
235
236 if (!obj_name
238 /* Shut down the transport if anything fails */
239 pjsip_transport_shutdown(transport);
240 ast_free(obj_name);
241 }
242 }
243 ao2_ref(monitored, -1);
244 break;
245 case PJSIP_TP_STATE_SHUTDOWN:
246 case PJSIP_TP_STATE_DISCONNECTED:
247 ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
248 break;
249 default:
250 break;
251 }
252
253 ao2_ref(transports, -1);
254 }
255}
256
259};
260
261/*! \brief Hashing function for monitored transport */
262static int monitored_transport_hash_fn(const void *obj, int flags)
263{
264 const struct monitored_transport *object;
265 const char *key;
266
267 switch (flags & OBJ_SEARCH_MASK) {
268 case OBJ_SEARCH_KEY:
269 key = obj;
270 break;
272 object = obj;
273 key = object->transport->obj_name;
274 break;
275 default:
276 /* Hash can only work on something with a full key. */
277 ast_assert(0);
278 return 0;
279 }
280 return ast_str_hash(key);
281}
282
283/*! \brief Comparison function for monitored transport */
284static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
285{
286 const struct monitored_transport *object_left = obj;
287 const struct monitored_transport *object_right = arg;
288 const char *right_key = arg;
289 int cmp;
290
291 switch (flags & OBJ_SEARCH_MASK) {
293 right_key = object_right->transport->obj_name;
294 /* Fall through */
295 case OBJ_SEARCH_KEY:
296 cmp = strcmp(object_left->transport->obj_name, right_key);
297 break;
299 /*
300 * We could also use a partial key struct containing a length
301 * so strlen() does not get called for every comparison instead.
302 */
303 cmp = strncmp(object_left->transport->obj_name, right_key, strlen(right_key));
304 break;
305 default:
306 /*
307 * What arg points to is specific to this traversal callback
308 * and has no special meaning to astobj2.
309 */
310 cmp = 0;
311 break;
312 }
313
314 return !cmp ? CMP_MATCH : 0;
315}
316
317static void keepalive_global_loaded(const char *object_type)
318{
319 unsigned int new_interval = ast_sip_get_keep_alive_interval();
320
321 if (new_interval) {
322 keepalive_interval = new_interval;
323 } else if (keepalive_interval) {
324 ast_log(LOG_NOTICE, "Keepalive support can not be disabled once activated.\n");
325 return;
326 } else {
327 /* This will occur if no keepalive interval has been specified at initial start */
328 return;
329 }
330
332 return;
333 }
334
336 ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
339 }
340}
341
342/*! \brief Observer which is used to update our interval when the global setting changes */
345};
346
347/*!
348 * \brief
349 * On incoming TCP connections, when we receive a SIP request, we mark that we have
350 * received a valid SIP request. This way, we will not shut the transport down for
351 * idleness
352 */
353static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
354{
355 struct monitored_transport *idle_trans;
356
357 idle_trans = get_monitored_transport_by_name(rdata->tp_info.transport->obj_name);
358 if (idle_trans) {
359 idle_trans->sip_received = 1;
360 ao2_ref(idle_trans, -1);
361 }
362
363 return PJ_FALSE;
364}
365
366static pjsip_module idle_monitor_module = {
367 .name = {"idle monitor module", 19},
368 .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER + 3,
369 .on_rx_request = idle_monitor_on_rx_request,
370};
371
373{
374 struct ao2_container *transports;
375
378 if (!transports) {
379 ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
381 }
382 ao2_global_obj_replace_unref(monitored_transports, transports);
383 ao2_ref(transports, -1);
384
386 if (!sched) {
387 ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
388 ao2_global_obj_release(monitored_transports);
390 }
391
393 ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
395 sched = NULL;
396 ao2_global_obj_release(monitored_transports);
398 }
399
401
403
406
408}
409
411{
412 if (keepalive_interval) {
415 pthread_kill(keepalive_thread, SIGURG);
416 pthread_join(keepalive_thread, NULL);
418 }
419 }
420
422
424
426
429 sched = NULL;
430
431 ao2_global_obj_release(monitored_transports);
432}
pthread_t thread
Definition: app_sla.c:329
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_log
Definition: astobj2.c:42
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_UNLINK
Definition: astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
static const char desc[]
Definition: cdr_radius.c:84
#define LOG_ERROR
#define LOG_NOTICE
#define AST_PTHREADT_NULL
Definition: lock.h:66
Asterisk module definitions.
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
def info(msg)
static pjsip_module idle_monitor_module
int ast_sip_initialize_transport_management(void)
static int idle_sched_init_pj_thread(void)
static void keepalive_transport_send_keepalive(struct monitored_transport *monitored)
static void * keepalive_transport_thread(void *data)
Thread which sends keepalives to all active connection-oriented transports.
static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state, const pjsip_transport_state_info *info)
Callback invoked when transport changes occur.
#define TRANSPORTS_BUCKETS
Number of buckets for monitored transports.
static struct ast_sched_context * sched
Scheduler context for timing out connections with no data received.
static AO2_GLOBAL_OBJ_STATIC(monitored_transports)
Global container of active transports.
static const pj_str_t keepalive_packet
The keep alive packet to send.
static int monitored_transport_hash_fn(const void *obj, int flags)
Hashing function for monitored transport.
static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
On incoming TCP connections, when we receive a SIP request, we mark that we have received a valid SIP...
static int idle_sched_cb(const void *data)
static unsigned int keepalive_interval
The global interval at which to send keepalives.
void ast_sip_destroy_transport_management(void)
#define IDLE_TIMEOUT
static int idle_sched_cleanup(const void *data)
static pthread_t keepalive_thread
Thread keeping things alive.
static void keepalive_global_loaded(const char *object_type)
struct ast_sip_tpmgr_state_callback monitored_transport_reg
static struct ast_threadstorage desc_storage
static struct monitored_transport * get_monitored_transport_by_name(const char *obj_name)
static struct ast_sorcery_observer keepalive_global_observer
Observer which is used to update our interval when the global setting changes.
static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
Comparison function for monitored transport.
static void monitored_transport_destroy(void *obj)
Destructor for keepalive transport.
unsigned int ast_sip_get_keep_alive_interval(void)
Retrieve the system keep alive interval setting.
void ast_sip_unregister_service(pjsip_module *module)
Definition: res_pjsip.c:133
int ast_sip_register_service(pjsip_module *module)
Register a SIP service in Asterisk.
Definition: res_pjsip.c:117
void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element)
Register a transport state notification callback element.
pjsip_endpoint * ast_sip_get_pjsip_endpoint(void)
Get a pointer to the PJSIP endpoint.
Definition: res_pjsip.c:520
void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element)
Unregister a transport state notification callback element.
struct ast_sorcery * ast_sip_get_sorcery(void)
Get a pointer to the SIP sorcery structure.
#define NULL
Definition: resample.c:96
void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
Clean all scheduled events with matching callback.
Definition: sched.c:409
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
Definition: sched.c:271
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition: sched.c:197
int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) attribute_warn_unused_result
Adds a scheduled event with rescheduling support.
Definition: sched.c:526
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition: sched.c:238
void ast_sorcery_observer_remove(const struct ast_sorcery *sorcery, const char *type, const struct ast_sorcery_observer *callbacks)
Remove an observer from a specific object type.
Definition: sorcery.c:2423
int ast_sorcery_observer_add(const struct ast_sorcery *sorcery, const char *type, const struct ast_sorcery_observer *callbacks)
Add an observer to a specific object type.
Definition: sorcery.c:2391
void ast_sorcery_reload_object(const struct ast_sorcery *sorcery, const char *type)
Inform any wizards of a specific object type to reload persistent objects.
Definition: sorcery.c:1442
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
Definition: strings.h:1259
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Interface for a sorcery object type observer.
Definition: sorcery.h:332
void(* loaded)(const char *object_type)
Callback for when an object type is loaded/reloaded.
Definition: sorcery.h:343
Structure for transport to be monitored.
pjsip_transport * transport
The underlying PJSIP transport.
int sip_received
Non-zero if a PJSIP request was received.
Definition: sched.c:76
void * ast_threadstorage_get(struct ast_threadstorage *ts, size_t init_size)
Retrieve thread storage.
#define AST_THREADSTORAGE(name)
Define a thread storage variable.
Definition: threadstorage.h:86
#define ast_assert(a)
Definition: utils.h:739
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584