Asterisk - The Open Source Telephony Project GIT-master-d856a3e
res_timing_pthread.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2008, Digium, Inc.
5 *
6 * Russell Bryant <russell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \author Russell Bryant <russell@digium.com>
22 *
23 * \brief pthread timing interface
24 */
25
26/*** MODULEINFO
27 <support_level>extended</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include <stdbool.h>
33#include <math.h>
34#include <unistd.h>
35#include <fcntl.h>
36
37#include "asterisk/module.h"
38#include "asterisk/timing.h"
39#include "asterisk/utils.h"
40#include "asterisk/astobj2.h"
41#include "asterisk/time.h"
42#include "asterisk/lock.h"
43
45
46static void *pthread_timer_open(void);
47static void pthread_timer_close(void *data);
48static int pthread_timer_set_rate(void *data, unsigned int rate);
49static int pthread_timer_ack(void *data, unsigned int quantity);
50static int pthread_timer_enable_continuous(void *data);
51static int pthread_timer_disable_continuous(void *data);
52static enum ast_timer_event pthread_timer_get_event(void *data);
53static unsigned int pthread_timer_get_max_rate(void *data);
54static int pthread_timer_fd(void *data);
55
57 .name = "pthread",
58 .priority = 0, /* use this as a last resort */
59 .timer_open = pthread_timer_open,
60 .timer_close = pthread_timer_close,
61 .timer_set_rate = pthread_timer_set_rate,
62 .timer_ack = pthread_timer_ack,
63 .timer_enable_continuous = pthread_timer_enable_continuous,
64 .timer_disable_continuous = pthread_timer_disable_continuous,
65 .timer_get_event = pthread_timer_get_event,
66 .timer_get_max_rate = pthread_timer_get_max_rate,
67 .timer_fd = pthread_timer_fd,
68};
69
70/* 1 tick / 10 ms */
71#define MAX_RATE 100
72
74#define PTHREAD_TIMER_BUCKETS 563
75
76enum {
78 PIPE_WRITE = 1
79};
80
84};
85
87 int pipe[2];
89 unsigned int rate;
90 /*! Interval in ms for current rate */
91 unsigned int interval;
92 unsigned int tick_count;
93 unsigned int pending_ticks;
94 struct timeval start;
95 bool continuous:1;
97};
98
99static void pthread_timer_destructor(void *obj);
100static void signal_pipe(struct pthread_timer *timer);
101static void unsignal_pipe(struct pthread_timer *timer);
102static void ack_ticks(struct pthread_timer *timer, unsigned int num);
103
104/*!
105 * \brief Data for the timing thread
106 */
107static struct {
108 pthread_t thread;
111 unsigned int stop:1;
113
114static void *pthread_timer_open(void)
115{
116 struct pthread_timer *timer;
117
118 if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
119 errno = ENOMEM;
120 return NULL;
121 }
122
123 timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
124 timer->state = TIMER_STATE_IDLE;
125
126 if (ast_pipe_nonblock(timer->pipe)) {
127 ao2_ref(timer, -1);
128 return NULL;
129 }
130
136 }
139
140 return timer;
141}
142
143static void pthread_timer_close(void *data)
144{
145 struct pthread_timer *timer = data;
146
148 ao2_ref(timer, -1);
149}
150
151static int pthread_timer_set_rate(void *data, unsigned int rate)
152{
153 struct pthread_timer *timer = data;
154
155 if (rate > MAX_RATE) {
156 ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
157 "max rate of %d / sec\n", MAX_RATE);
158 errno = EINVAL;
159 return -1;
160 }
161
163
164 if ((timer->rate = rate)) {
165 timer->interval = roundf(1000.0 / ((float) rate));
166 timer->start = ast_tvnow();
167 timer->state = TIMER_STATE_TICKING;
168 } else {
169 timer->interval = 0;
170 timer->start = ast_tv(0, 0);
171 timer->state = TIMER_STATE_IDLE;
172 }
173 timer->tick_count = 0;
174
176
177 return 0;
178}
179
180static int pthread_timer_ack(void *data, unsigned int quantity)
181{
182 struct pthread_timer *timer = data;
183
184 ast_assert(quantity > 0);
185
187 ack_ticks(timer, quantity);
189
190 return 0;
191}
192
194{
195 struct pthread_timer *timer = data;
196
198 if (!timer->continuous) {
199 timer->continuous = true;
201 }
203
204 return 0;
205}
206
208{
209 struct pthread_timer *timer = data;
210
212 if (timer->continuous) {
213 timer->continuous = false;
215 }
217
218 return 0;
219}
220
222{
223 struct pthread_timer *timer = data;
225
227 if (timer->continuous) {
229 }
231
232 return res;
233}
234
235static unsigned int pthread_timer_get_max_rate(void *data)
236{
237 return MAX_RATE;
238}
239
240static int pthread_timer_fd(void *data)
241{
242 struct pthread_timer *timer = data;
243
244 return timer->pipe[PIPE_READ];
245}
246
247static void pthread_timer_destructor(void *obj)
248{
249 struct pthread_timer *timer = obj;
250
251 if (timer->pipe[PIPE_READ] > -1) {
252 close(timer->pipe[PIPE_READ]);
253 timer->pipe[PIPE_READ] = -1;
254 }
255
256 if (timer->pipe[PIPE_WRITE] > -1) {
257 close(timer->pipe[PIPE_WRITE]);
258 timer->pipe[PIPE_WRITE] = -1;
259 }
260}
261
262/*!
263 * \note only PIPE_READ is guaranteed valid
264 */
265static int pthread_timer_hash(const void *obj, const int flags)
266{
267 const struct pthread_timer *timer = obj;
268
269 return timer->pipe[PIPE_READ];
270}
271
272/*!
273 * \note only PIPE_READ is guaranteed valid
274 */
275static int pthread_timer_cmp(void *obj, void *arg, int flags)
276{
277 struct pthread_timer *timer1 = obj, *timer2 = arg;
278
279 return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
280}
281
282/*!
283 * \retval 0 no timer tick needed
284 * \retval non-zero write to the timing pipe needed
285 */
286static int check_timer(struct pthread_timer *timer)
287{
288 struct timeval now;
289
290 if (timer->state == TIMER_STATE_IDLE) {
291 return 0;
292 }
293
294 now = ast_tvnow();
295
296 if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
297 timer->tick_count++;
298 if (!timer->tick_count) {
299 /* Handle overflow. */
300 timer->start = now;
301 }
302 return 1;
303 }
304
305 return 0;
306}
307
308/*!
309 * \internal
310 * \pre timer is locked
311 */
312static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
313{
314 int pending_ticks = timer->pending_ticks;
315
316 ast_assert(quantity);
317
318 if (quantity > pending_ticks) {
319 quantity = pending_ticks;
320 }
321
322 if (!quantity) {
323 return;
324 }
325
326 timer->pending_ticks -= quantity;
327
328 if ((0 == timer->pending_ticks) && !timer->continuous) {
330 }
331}
332
333/*!
334 * \internal
335 * \pre timer is locked
336 */
337static void signal_pipe(struct pthread_timer *timer)
338{
339 ssize_t res;
340 unsigned char x = 42;
341
342 if (timer->pipe_signaled) {
343 return;
344 }
345
346 res = write(timer->pipe[PIPE_WRITE], &x, 1);
347 if (-1 == res) {
348 ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
349 strerror(errno));
350 } else {
351 timer->pipe_signaled = true;
352 }
353}
354
355/*!
356 * \internal
357 * \pre timer is locked
358 */
360{
361 ssize_t res;
362 unsigned long buffer;
363
364 if (!timer->pipe_signaled) {
365 return;
366 }
367
368 res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
369 if (-1 == res) {
370 ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
371 strerror(errno));
372 } else {
373 timer->pipe_signaled = false;
374 }
375}
376
377static int run_timer(void *obj, void *arg, int flags)
378{
379 struct pthread_timer *timer = obj;
380
381 if (timer->state == TIMER_STATE_IDLE) {
382 return 0;
383 }
384
386 if (check_timer(timer)) {
387 timer->pending_ticks++;
389 }
391
392 return 0;
393}
394
395static void *do_timing(void *arg)
396{
397 struct timeval next_wakeup = ast_tvnow();
398
399 while (!timing_thread.stop) {
400 struct timespec ts = { 0, };
401
403
404 next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
405
406 ts.tv_sec = next_wakeup.tv_sec;
407 ts.tv_nsec = next_wakeup.tv_usec * 1000;
408
410 if (!timing_thread.stop) {
413 } else {
415 }
416 }
418 }
419
420 return NULL;
421}
422
423static int init_timing_thread(void)
424{
427
429 ast_log(LOG_ERROR, "Unable to start timing thread.\n");
430 return -1;
431 }
432
433 return 0;
434}
435
436static int load_module(void)
437{
440 if (!pthread_timers) {
442 }
443
444 if (init_timing_thread()) {
448 }
449
452}
453
454static int unload_module(void)
455{
456 int res;
457
459 timing_thread.stop = 1;
462 pthread_join(timing_thread.thread, NULL);
463
467 }
468
469 return res;
470}
471AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
472 .support_level = AST_MODULE_SUPPORT_EXTENDED,
473 .load = load_module,
474 .unload = unload_module,
475 .load_pri = AST_MODPRI_TIMING,
Asterisk main include file. File version handling, generic pbx functions.
#define ast_log
Definition: astobj2.c:42
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
static struct ast_timer * timer
Definition: chan_iax2.c:364
float roundf(float x)
#define LOG_ERROR
Asterisk locking-related definitions:
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define ast_mutex_lock(a)
Definition: lock.h:189
#define ast_cond_signal(cond)
Definition: lock.h:203
int errno
Asterisk module definitions.
@ AST_MODFLAG_LOAD_ORDER
Definition: module.h:331
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition: module.h:557
@ AST_MODPRI_TIMING
Definition: module.h:339
@ AST_MODULE_SUPPORT_EXTENDED
Definition: module.h:122
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
pthread_t thread
unsigned int stop
static void pthread_timer_destructor(void *obj)
static int pthread_timer_disable_continuous(void *data)
ast_cond_t cond
static int pthread_timer_enable_continuous(void *data)
static int pthread_timer_hash(const void *obj, const int flags)
static unsigned int pthread_timer_get_max_rate(void *data)
static int init_timing_thread(void)
static int pthread_timer_fd(void *data)
@ PIPE_WRITE
@ PIPE_READ
static void * pthread_timer_open(void)
static int run_timer(void *obj, void *arg, int flags)
static void pthread_timer_close(void *data)
static void * do_timing(void *arg)
static void ack_ticks(struct pthread_timer *timer, unsigned int num)
static void unsignal_pipe(struct pthread_timer *timer)
#define PTHREAD_TIMER_BUCKETS
static void signal_pipe(struct pthread_timer *timer)
#define MAX_RATE
static int load_module(void)
static void * timing_funcs_handle
static int unload_module(void)
static struct @483 timing_thread
Data for the timing thread.
static int check_timer(struct pthread_timer *timer)
static struct ast_timing_interface pthread_timing
ast_mutex_t lock
static int pthread_timer_cmp(void *obj, void *arg, int flags)
static int pthread_timer_set_rate(void *data, unsigned int rate)
static int pthread_timer_ack(void *data, unsigned int quantity)
static struct ao2_container * pthread_timers
static enum ast_timer_event pthread_timer_get_event(void *data)
pthread_timer_state
@ TIMER_STATE_TICKING
@ TIMER_STATE_IDLE
#define NULL
Definition: resample.c:96
Generic container type.
Structure for mutex and tracking information.
Definition: lock.h:135
Timing module interface.
Definition: timing.h:69
const char * name
Definition: timing.h:70
unsigned int pending_ticks
unsigned int interval
unsigned int tick_count
enum pthread_timer_state state
struct timeval start
unsigned int rate
Time-related functions and macros.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2282
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition: time.h:235
Timing source management.
#define ast_register_timing_interface(i)
Register a set of timing functions.
Definition: timing.h:95
int ast_unregister_timing_interface(void *handle)
Unregister a previously registered timing interface.
Definition: timing.c:104
ast_timer_event
Definition: timing.h:57
@ AST_TIMING_EVENT_CONTINUOUS
Definition: timing.h:59
@ AST_TIMING_EVENT_EXPIRED
Definition: timing.h:58
Utility functions.
#define ast_assert(a)
Definition: utils.h:739
#define ast_pthread_create_background(a, b, c, d)
Definition: utils.h:592
#define ast_pipe_nonblock(filedes)
Create a non-blocking pipe.
Definition: utils.h:1090