Asterisk - The Open Source Telephony Project  GIT-master-44aef04
res_timing_pthread.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2008, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief pthread timing interface
24  */
25 
26 /*** MODULEINFO
27  <support_level>extended</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include <stdbool.h>
33 #include <math.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36 
37 #include "asterisk/module.h"
38 #include "asterisk/timing.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/astobj2.h"
41 #include "asterisk/time.h"
42 #include "asterisk/lock.h"
43 
44 static void *timing_funcs_handle;
45 
46 static void *pthread_timer_open(void);
47 static void pthread_timer_close(void *data);
48 static int pthread_timer_set_rate(void *data, unsigned int rate);
49 static int pthread_timer_ack(void *data, unsigned int quantity);
50 static int pthread_timer_enable_continuous(void *data);
51 static int pthread_timer_disable_continuous(void *data);
52 static enum ast_timer_event pthread_timer_get_event(void *data);
53 static unsigned int pthread_timer_get_max_rate(void *data);
54 static int pthread_timer_fd(void *data);
55 
57  .name = "pthread",
58  .priority = 0, /* use this as a last resort */
59  .timer_open = pthread_timer_open,
60  .timer_close = pthread_timer_close,
61  .timer_set_rate = pthread_timer_set_rate,
62  .timer_ack = pthread_timer_ack,
63  .timer_enable_continuous = pthread_timer_enable_continuous,
64  .timer_disable_continuous = pthread_timer_disable_continuous,
65  .timer_get_event = pthread_timer_get_event,
66  .timer_get_max_rate = pthread_timer_get_max_rate,
67  .timer_fd = pthread_timer_fd,
68 };
69 
70 /* 1 tick / 10 ms */
71 #define MAX_RATE 100
72 
74 #define PTHREAD_TIMER_BUCKETS 563
75 
76 enum {
77  PIPE_READ = 0,
79 };
80 
84 };
85 
86 struct pthread_timer {
87  int pipe[2];
89  unsigned int rate;
90  /*! Interval in ms for current rate */
91  unsigned int interval;
92  unsigned int tick_count;
93  unsigned int pending_ticks;
94  struct timeval start;
95  bool continuous:1;
96  bool pipe_signaled:1;
97 };
98 
99 static void pthread_timer_destructor(void *obj);
100 static void signal_pipe(struct pthread_timer *timer);
101 static void unsignal_pipe(struct pthread_timer *timer);
102 static void ack_ticks(struct pthread_timer *timer, unsigned int num);
103 
104 /*!
105  * \brief Data for the timing thread
106  */
107 static struct {
108  pthread_t thread;
111  unsigned int stop:1;
112 } timing_thread;
113 
114 static void *pthread_timer_open(void)
115 {
116  struct pthread_timer *timer;
117 
118  if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
119  errno = ENOMEM;
120  return NULL;
121  }
122 
123  timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
124  timer->state = TIMER_STATE_IDLE;
125 
126  if (ast_pipe_nonblock(timer->pipe)) {
127  ao2_ref(timer, -1);
128  return NULL;
129  }
130 
131  ao2_lock(pthread_timers);
132  if (!ao2_container_count(pthread_timers)) {
136  }
137  ao2_link_flags(pthread_timers, timer, OBJ_NOLOCK);
138  ao2_unlock(pthread_timers);
139 
140  return timer;
141 }
142 
143 static void pthread_timer_close(void *data)
144 {
145  struct pthread_timer *timer = data;
146 
147  ao2_unlink(pthread_timers, timer);
148  ao2_ref(timer, -1);
149 }
150 
151 static int pthread_timer_set_rate(void *data, unsigned int rate)
152 {
153  struct pthread_timer *timer = data;
154 
155  if (rate > MAX_RATE) {
156  ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
157  "max rate of %d / sec\n", MAX_RATE);
158  errno = EINVAL;
159  return -1;
160  }
161 
162  ao2_lock(timer);
163 
164  if ((timer->rate = rate)) {
165  timer->interval = roundf(1000.0 / ((float) rate));
166  timer->start = ast_tvnow();
167  timer->state = TIMER_STATE_TICKING;
168  } else {
169  timer->interval = 0;
170  timer->start = ast_tv(0, 0);
171  timer->state = TIMER_STATE_IDLE;
172  }
173  timer->tick_count = 0;
174 
175  ao2_unlock(timer);
176 
177  return 0;
178 }
179 
180 static int pthread_timer_ack(void *data, unsigned int quantity)
181 {
182  struct pthread_timer *timer = data;
183 
184  ast_assert(quantity > 0);
185 
186  ao2_lock(timer);
187  ack_ticks(timer, quantity);
188  ao2_unlock(timer);
189 
190  return 0;
191 }
192 
193 static int pthread_timer_enable_continuous(void *data)
194 {
195  struct pthread_timer *timer = data;
196 
197  ao2_lock(timer);
198  if (!timer->continuous) {
199  timer->continuous = true;
200  signal_pipe(timer);
201  }
202  ao2_unlock(timer);
203 
204  return 0;
205 }
206 
207 static int pthread_timer_disable_continuous(void *data)
208 {
209  struct pthread_timer *timer = data;
210 
211  ao2_lock(timer);
212  if (timer->continuous) {
213  timer->continuous = false;
214  unsignal_pipe(timer);
215  }
216  ao2_unlock(timer);
217 
218  return 0;
219 }
220 
222 {
223  struct pthread_timer *timer = data;
225 
226  ao2_lock(timer);
227  if (timer->continuous) {
229  }
230  ao2_unlock(timer);
231 
232  return res;
233 }
234 
235 static unsigned int pthread_timer_get_max_rate(void *data)
236 {
237  return MAX_RATE;
238 }
239 
240 static int pthread_timer_fd(void *data)
241 {
242  struct pthread_timer *timer = data;
243 
244  return timer->pipe[PIPE_READ];
245 }
246 
247 static void pthread_timer_destructor(void *obj)
248 {
249  struct pthread_timer *timer = obj;
250 
251  if (timer->pipe[PIPE_READ] > -1) {
252  close(timer->pipe[PIPE_READ]);
253  timer->pipe[PIPE_READ] = -1;
254  }
255 
256  if (timer->pipe[PIPE_WRITE] > -1) {
257  close(timer->pipe[PIPE_WRITE]);
258  timer->pipe[PIPE_WRITE] = -1;
259  }
260 }
261 
262 /*!
263  * \note only PIPE_READ is guaranteed valid
264  */
265 static int pthread_timer_hash(const void *obj, const int flags)
266 {
267  const struct pthread_timer *timer = obj;
268 
269  return timer->pipe[PIPE_READ];
270 }
271 
272 /*!
273  * \note only PIPE_READ is guaranteed valid
274  */
275 static int pthread_timer_cmp(void *obj, void *arg, int flags)
276 {
277  struct pthread_timer *timer1 = obj, *timer2 = arg;
278 
279  return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
280 }
281 
282 /*!
283  * \retval 0 no timer tick needed
284  * \retval non-zero write to the timing pipe needed
285  */
286 static int check_timer(struct pthread_timer *timer)
287 {
288  struct timeval now;
289 
290  if (timer->state == TIMER_STATE_IDLE) {
291  return 0;
292  }
293 
294  now = ast_tvnow();
295 
296  if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
297  timer->tick_count++;
298  if (!timer->tick_count) {
299  /* Handle overflow. */
300  timer->start = now;
301  }
302  return 1;
303  }
304 
305  return 0;
306 }
307 
308 /*!
309  * \internal
310  * \pre timer is locked
311  */
312 static void ack_ticks(struct pthread_timer *timer, unsigned int quantity)
313 {
314  int pending_ticks = timer->pending_ticks;
315 
316  ast_assert(quantity);
317 
318  if (quantity > pending_ticks) {
319  quantity = pending_ticks;
320  }
321 
322  if (!quantity) {
323  return;
324  }
325 
326  timer->pending_ticks -= quantity;
327 
328  if ((0 == timer->pending_ticks) && !timer->continuous) {
329  unsignal_pipe(timer);
330  }
331 }
332 
333 /*!
334  * \internal
335  * \pre timer is locked
336  */
337 static void signal_pipe(struct pthread_timer *timer)
338 {
339  ssize_t res;
340  unsigned char x = 42;
341 
342  if (timer->pipe_signaled) {
343  return;
344  }
345 
346  res = write(timer->pipe[PIPE_WRITE], &x, 1);
347  if (-1 == res) {
348  ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
349  strerror(errno));
350  } else {
351  timer->pipe_signaled = true;
352  }
353 }
354 
355 /*!
356  * \internal
357  * \pre timer is locked
358  */
359 static void unsignal_pipe(struct pthread_timer *timer)
360 {
361  ssize_t res;
362  unsigned long buffer;
363 
364  if (!timer->pipe_signaled) {
365  return;
366  }
367 
368  res = read(timer->pipe[PIPE_READ], &buffer, sizeof(buffer));
369  if (-1 == res) {
370  ast_log(LOG_ERROR, "Error reading from pipe: %s\n",
371  strerror(errno));
372  } else {
373  timer->pipe_signaled = false;
374  }
375 }
376 
377 static int run_timer(void *obj, void *arg, int flags)
378 {
379  struct pthread_timer *timer = obj;
380 
381  if (timer->state == TIMER_STATE_IDLE) {
382  return 0;
383  }
384 
385  ao2_lock(timer);
386  if (check_timer(timer)) {
387  timer->pending_ticks++;
388  signal_pipe(timer);
389  }
390  ao2_unlock(timer);
391 
392  return 0;
393 }
394 
395 static void *do_timing(void *arg)
396 {
397  struct timeval next_wakeup = ast_tvnow();
398 
399  while (!timing_thread.stop) {
400  struct timespec ts = { 0, };
401 
402  ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
403 
404  next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
405 
406  ts.tv_sec = next_wakeup.tv_sec;
407  ts.tv_nsec = next_wakeup.tv_usec * 1000;
408 
410  if (!timing_thread.stop) {
411  if (ao2_container_count(pthread_timers)) {
413  } else {
415  }
416  }
418  }
419 
420  return NULL;
421 }
422 
423 static int init_timing_thread(void)
424 {
427 
429  ast_log(LOG_ERROR, "Unable to start timing thread.\n");
430  return -1;
431  }
432 
433  return 0;
434 }
435 
436 static int load_module(void)
437 {
440  if (!pthread_timers) {
442  }
443 
444  if (init_timing_thread()) {
445  ao2_ref(pthread_timers, -1);
446  pthread_timers = NULL;
448  }
449 
450  return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
452 }
453 
454 static int unload_module(void)
455 {
456  int res;
457 
459  timing_thread.stop = 1;
462  pthread_join(timing_thread.thread, NULL);
463 
465  ao2_ref(pthread_timers, -1);
466  pthread_timers = NULL;
467  }
468 
469  return res;
470 }
471 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
472  .support_level = AST_MODULE_SUPPORT_EXTENDED,
473  .load = load_module,
474  .unload = unload_module,
475  .load_pri = AST_MODPRI_TIMING,
476 );
const char * name
Definition: timing.h:70
Timing module interface.
Definition: timing.h:69
static unsigned int pthread_timer_get_max_rate(void *data)
static struct ast_timing_interface pthread_timing
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
static int pthread_timer_fd(void *data)
static void ack_ticks(struct pthread_timer *timer, unsigned int num)
unsigned int rate
struct timeval start
#define ast_pipe_nonblock(filedes)
Create a non-blocking pipe.
Definition: utils.h:1000
static struct ao2_container * pthread_timers
static int check_timer(struct pthread_timer *timer)
Time-related functions and macros.
int ast_unregister_timing_interface(void *handle)
Unregister a previously registered timing interface.
Definition: timing.c:104
unsigned int tick_count
unsigned int pending_ticks
ast_timer_event
Definition: timing.h:57
static void pthread_timer_destructor(void *obj)
unsigned int interval
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
static void * pthread_timer_open(void)
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define PTHREAD_TIMER_BUCKETS
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:650
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ast_mutex_lock(a)
Definition: lock.h:187
#define ao2_unlock(a)
Definition: astobj2.h:730
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:98
static int unload_module(void)
static void signal_pipe(struct pthread_timer *timer)
#define NULL
Definition: resample.c:96
#define ast_cond_signal(cond)
Definition: lock.h:201
static int pthread_timer_disable_continuous(void *data)
Utility functions.
pthread_cond_t ast_cond_t
Definition: lock.h:176
pthread_t thread
#define ast_pthread_create_background(a, b, c, d)
Definition: utils.h:507
#define ast_log
Definition: astobj2.c:42
static int run_timer(void *obj, void *arg, int flags)
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
static int pthread_timer_cmp(void *obj, void *arg, int flags)
static void * do_timing(void *arg)
static int load_module(void)
enum pthread_timer_state state
static int pthread_timer_enable_continuous(void *data)
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
int errno
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2283
static int pthread_timer_set_rate(void *data, unsigned int rate)
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
ast_mutex_t lock
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS|AST_MODFLAG_LOAD_ORDER, "HTTP Phone Provisioning",.support_level=AST_MODULE_SUPPORT_EXTENDED,.load=load_module,.unload=unload_module,.reload=reload,.load_pri=AST_MODPRI_CHANNEL_DEPEND,.requires="http",)
static void unsignal_pipe(struct pthread_timer *timer)
#define MAX_RATE
unsigned int stop
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition: time.h:226
float roundf(float x)
static void * timing_funcs_handle
#define ast_register_timing_interface(i)
Register a set of timing functions.
Definition: timing.h:95
static int init_timing_thread(void)
#define ast_mutex_init(pmutex)
Definition: lock.h:184
static enum ast_timer_event pthread_timer_get_event(void *data)
Generic container type.
ast_cond_t cond
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
static struct ast_timer * timer
Definition: chan_iax2.c:360
static struct @491 timing_thread
Data for the timing thread.
Asterisk module definitions.
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
Timing source management.
Structure for mutex and tracking information.
Definition: lock.h:135
static void pthread_timer_close(void *data)
#define ast_mutex_unlock(a)
Definition: lock.h:188
pthread_timer_state
static int pthread_timer_ack(void *data, unsigned int quantity)
static int pthread_timer_hash(const void *obj, const int flags)