Asterisk - The Open Source Telephony Project GIT-master-77d630f
taskpool.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2025, Sangoma Technologies Corporation
5 *
6 * Joshua Colp <jcolp@sangoma.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19
20#include "asterisk.h"
21
22#include "asterisk/_private.h"
23#include "asterisk/taskpool.h"
25#include "asterisk/astobj2.h"
27#include "asterisk/utils.h"
28#include "asterisk/time.h"
29#include "asterisk/sched.h"
30
31/*!
32 * \brief A taskpool taskprocessor
33 */
35 /*! The underlying taskprocessor */
37 /*! The last time a task was pushed to this taskprocessor */
38 struct timeval last_pushed;
39};
40
41/*!
42 * \brief A container of taskprocessors
43 */
45 /*! A vector of taskprocessors */
47 /*! The next taskprocessor to use for pushing */
48 unsigned int taskprocessor_num;
49};
50
52 struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached);
53
54/*!
55 * \brief An opaque taskpool structure
56 *
57 * A taskpool is a collection of taskprocessors that
58 * execute tasks, each from their own queue. A selector
59 * determines which taskprocessor to queue to at push
60 * time.
61 */
63 /*! The static taskprocessors, those which will always exist */
65 /*! The dynamic taskprocessors, those which will be created as needed */
67 /*! True if the taskpool is in the process of shutting down */
69 /*! Taskpool-specific options */
71 /*! Dynamic pool shrinking scheduled item */
73 /*! The taskprocessor selector to use */
75 /*! The name of the taskpool */
76 char name[0];
77};
78
79/*! \brief The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) */
80#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
81
82/*! \brief Scheduler used for dynamic pool shrinking */
83static struct ast_sched_context *sched;
84
85/*! \brief Thread storage for the current taskpool */
86AST_THREADSTORAGE_RAW(current_taskpool_pool);
87
88/*!
89 * \internal
90 * \brief Get the current taskpool associated with this thread.
91 */
93{
94 return ast_threadstorage_get_ptr(&current_taskpool_pool);
95}
96
97/*!
98 * \internal
99 * \brief Shutdown task for taskpool taskprocessor
100 */
101 static int taskpool_taskprocessor_stop(void *data)
102 {
103 struct ast_taskpool *pool = ast_taskpool_get_current();
104
105 /* If a thread stop callback is set on the options, call it */
106 if (pool->options.thread_end) {
107 pool->options.thread_end();
108 }
109
110 ao2_cleanup(pool);
111
112 return 0;
113 }
114
115/*! \internal */
116static void taskpool_taskprocessor_dtor(void *obj)
117{
119
121 /* We can't actually do anything if this fails, so just accept reality */
122 }
123
125}
126
127/*!
128 * \internal
129 * \brief Startup task for taskpool taskprocessor
130 */
131static int taskpool_taskprocessor_start(void *data)
132{
133 struct ast_taskpool *pool = data;
134
135 /* Set the pool on the thread for this taskprocessor, inheriting the
136 * reference passed to the task itself.
137 */
138 ast_threadstorage_set_ptr(&current_taskpool_pool, pool);
139
140 /* If a thread start callback is set on the options, call it */
141 if (pool->options.thread_start) {
142 pool->options.thread_start();
143 }
144
145 return 0;
146}
147
148/*!
149 * \internal
150 * \brief Allocate a taskpool specific taskprocessor
151 */
153{
155 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
156
157 /* We don't actually need locking for each pool taskprocessor, as the only thing
158 * mutable is the underlying taskprocessor which has its own internal locking.
159 */
161 if (!taskprocessor) {
162 return NULL;
163 }
164
165 /* Create name with seq number appended. */
166 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name);
167
168 taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
169 if (!taskprocessor->taskprocessor) {
171 return NULL;
172 }
173
174 taskprocessor->last_pushed = ast_tvnow();
175
177 ao2_ref(pool, -1);
178 /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to
179 * NULL here.
180 */
182 taskprocessor->taskprocessor = NULL;
183 return NULL;
184 }
185
186 return taskprocessor;
187}
188
189/*!
190 * \internal
191 * \brief Initialize the taskpool taskprocessors structure
192 */
193static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
194{
195 if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) {
196 return -1;
197 }
198
199 return 0;
200}
201
202/*!
203 * \internal
204 * \brief Clean up the taskpool taskprocessors structure
205 */
207{
208 /* Access/manipulation of taskprocessors is done with the lock held, and
209 * with a check of the shutdown flag done. This means that outside of holding
210 * the lock we can safely muck with it. Pushing to the taskprocessor is done
211 * outside of the lock, but with a reference to the taskprocessor held.
212 */
214 AST_VECTOR_FREE(&taskprocessors->taskprocessors);
215}
216
217/*!
218 * \internal
219 * \brief Determine if a taskpool taskprocessor is idle
220 */
221#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
222
223/*! \internal
224 * \brief Taskpool dynamic pool shrink function
225 */
226static int taskpool_dynamic_pool_shrink(const void *data)
227{
228 struct ast_taskpool *pool = (struct ast_taskpool *)data;
229 int num_removed;
230
231 ao2_lock(pool);
232
233 /* If the pool is shutting down, do nothing and don't reschedule */
234 if (pool->shutting_down) {
235 ao2_unlock(pool);
236 ao2_ref(pool, -1);
237 return 0;
238 }
239
240 /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */
243 if (num_removed) {
244 /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */
247 }
248 }
249
250 ao2_unlock(pool);
251
252 /* It is possible for the pool to have been shut down between unlocking and returning, this is
253 * inherently a race condition we can't eliminate so we will catch it on the next iteration.
254 */
255 return pool->options.idle_timeout * 1000;
256}
257
258/*!
259 * \internal
260 * \brief Sequential taskprocessor selector
261 */
262 static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
263 struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
264{
265 unsigned int taskprocessor_num = taskprocessors->taskprocessor_num;
266
267 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
268 *growth_threshold_reached = 1;
269 return;
270 }
271
272 taskprocessors->taskprocessor_num++;
273 if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
274 taskprocessors->taskprocessor_num = 0;
275 }
276
277 *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num);
278
279 /* Check to see if this has reached the growth threshold */
280 *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0;
281}
282
283/*!
284 * \interal
285 * \brief Least full taskprocessor selector
286 */
287static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors,
288 struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
289{
290 struct taskpool_taskprocessor *least_full = NULL;
291 unsigned int i;
292
293 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
294 *growth_threshold_reached = 1;
295 return;
296 }
297
298 /* We assume that the growth threshold has not yet been reached, until proven otherwise */
299 *growth_threshold_reached = 0;
300
301 for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) {
302 struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i);
303
304 /* If this taskprocessor has no outstanding tasks, it is the best choice */
306 *taskprocessor = tp;
307 return;
308 }
309
310 /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */
312 *growth_threshold_reached = 1;
313 }
314
315 /* The taskprocessor with the fewest tasks should be used */
316 if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) {
317 least_full = tp;
318 }
319 }
320
321 *taskprocessor = least_full;
322}
323
325 const struct ast_taskpool_options *options)
326{
327 struct ast_taskpool *pool;
328
329 /* Enforce versioning on the passed-in options */
330 if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
331 return NULL;
332 }
333
334 pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
335 if (!pool) {
336 return NULL;
337 }
338
339 strcpy(pool->name, name); /* Safe */
340 memcpy(&pool->options, options, sizeof(pool->options));
341 pool->shrink_sched_id = -1;
342
343 /* Verify the passed-in options are valid, and adjust if needed */
344 if (options->initial_size < options->minimum_size) {
345 pool->options.initial_size = options->minimum_size;
346 ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
347 name, options->initial_size, options->minimum_size, options->minimum_size);
348 }
349
350 if (options->max_size && pool->options.initial_size > options->max_size) {
351 pool->options.max_size = pool->options.initial_size;
352 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
353 name, options->max_size, pool->options.initial_size, pool->options.initial_size);
354 }
355
356 if (!options->auto_increment) {
357 if (!pool->options.minimum_size) {
358 pool->options.minimum_size = 1;
359 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
360 }
361 if (!pool->options.max_size) {
362 pool->options.max_size = pool->options.minimum_size;
363 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
364 }
365 if (pool->options.minimum_size != pool->options.max_size) {
366 pool->options.minimum_size = pool->options.max_size;
367 pool->options.initial_size = pool->options.max_size;
368 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
369 name, options->minimum_size, pool->options.max_size, pool->options.max_size);
370 }
371 } else if (!options->growth_threshold) {
373 }
374
377 } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
379 } else {
380 ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
381 name, options->selector);
383 }
384
386 ao2_ref(pool, -1);
387 return NULL;
388 }
389
390 /* Create the static taskprocessors based on the passed-in options */
391 for (int i = 0; i < pool->options.minimum_size; i++) {
393
395 if (!taskprocessor) {
396 /* The reference to pool is passed to ast_taskpool_shutdown */
398 return NULL;
399 }
400
403 /* The reference to pool is passed to ast_taskpool_shutdown */
405 return NULL;
406 }
407 }
408
410 pool->options.initial_size - pool->options.minimum_size)) {
412 return NULL;
413 }
414
415 /* Create the dynamic taskprocessor based on the passed-in options */
416 for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
418
420 if (!taskprocessor) {
421 /* The reference to pool is passed to ast_taskpool_shutdown */
423 return NULL;
424 }
425
428 /* The reference to pool is passed to ast_taskpool_shutdown */
430 return NULL;
431 }
432 }
433
434 /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
435 * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
436 * and to reduce complexity.
437 */
438 if (options->idle_timeout && options->auto_increment) {
440 if (pool->shrink_sched_id < 0) {
441 ao2_ref(pool, -1);
442 /* The second reference to pool is passed to ast_taskpool_shutdown */
444 return NULL;
445 }
446 }
447
448 return pool;
449}
450
452{
453 size_t count;
454
455 ao2_lock(pool);
457 ao2_unlock(pool);
458
459 return count;
460}
461
462#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor))
463
465{
466 long queue_size = 0;
467
468 ao2_lock(pool);
471 ao2_unlock(pool);
472
473 return queue_size;
474}
475
476/*! \internal
477 * \brief Taskpool dynamic pool grow function
478 */
480{
481 unsigned int num_to_add = pool->options.auto_increment;
482 int i;
483
484 if (!num_to_add) {
485 return;
486 }
487
488 /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */
489 if (pool->options.max_size) {
491
492 if (current_size + num_to_add > pool->options.max_size) {
493 num_to_add = pool->options.max_size - current_size;
494 }
495 }
496
497 for (i = 0; i < num_to_add; i++) {
498 struct taskpool_taskprocessor *new_taskprocessor;
499
500 new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
501 if (!new_taskprocessor) {
502 return;
503 }
504
505 if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) {
506 ao2_ref(new_taskprocessor, -1);
507 return;
508 }
509
510 if (i == 0) {
511 /* On the first iteration we return the taskprocessor we just added */
512 *taskprocessor = new_taskprocessor;
513 /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */
515 } else if (i == 1) {
516 /* On the second iteration we update the next taskprocessor to use to be this one */
518 }
519 }
520}
521
522int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
523{
525
526 /* Select the taskprocessor in the pool to use for pushing this task */
527 ao2_lock(pool);
528 if (!pool->shutting_down) {
529 unsigned int growth_threshold_reached = 0;
530
531 /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
532 * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
533 * fails for some reason, it will still fall back to the initially found static one if
534 * it is present.
535 */
536 pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
537 if (pool->options.auto_increment && growth_threshold_reached) {
538 /* If we need to grow then try dynamic taskprocessors */
539 pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
540 if (growth_threshold_reached) {
541 /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
543 }
544
545 /* If a dynamic taskprocessor was used update its last push time */
546 if (taskprocessor) {
547 taskprocessor->last_pushed = ast_tvnow();
548 }
549 }
551 }
552 ao2_unlock(pool);
553
554 if (!taskprocessor) {
555 return -1;
556 }
557
558 if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
559 return -1;
560 }
561
562 return 0;
563}
564
565/*!
566 * \internal Structure used for synchronous task
567 */
572 int fail;
573 int (*task)(void *);
575};
576
577/*!
578 * \internal Initialization function for synchronous task
579 */
580static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int (*task)(void *), void *data)
581{
584 sync_task->complete = 0;
585 sync_task->fail = 0;
586 sync_task->task = task;
587 sync_task->task_data = data;
588 return 0;
589}
590
591/*!
592 * \internal Cleanup function for synchronous task
593 */
595{
598}
599
600/*!
601 * \internal Function for executing a sychronous task
602 */
603static int taskpool_sync_task(void *data)
604{
605 struct taskpool_sync_task *sync_task = data;
606 int ret;
607
608 sync_task->fail = sync_task->task(sync_task->task_data);
609
610 /*
611 * Once we unlock sync_task->lock after signaling, we cannot access
612 * sync_task again. The thread waiting within ast_taskpool_push_wait()
613 * is free to continue and release its local variable (sync_task).
614 */
616 sync_task->complete = 1;
618 ret = sync_task->fail;
620 return ret;
621}
622
623int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
624{
626
627 /* If we are already executing within a taskpool taskprocessor then
628 * don't bother pushing a new task, just directly execute the task.
629 */
631 return task(data);
632 }
633
635 return -1;
636 }
637
640 return -1;
641 }
642
644 while (!sync_task.complete) {
645 ast_cond_wait(&sync_task.cond, &sync_task.lock);
646 }
648
650 return sync_task.fail;
651}
652
654{
655 if (!pool) {
656 return;
657 }
658
659 /* Mark this pool as shutting down so nothing new is pushed */
660 ao2_lock(pool);
661 pool->shutting_down = 1;
662 ao2_unlock(pool);
663
664 /* Stop the shrink scheduled item if present */
666
667 /* Clean up all the taskprocessors */
670
671 ao2_ref(pool, -1);
672}
673
675 /*! Taskpool the serializer will use to process the jobs. */
677 /*! Which group will wait for this serializer to shutdown. */
679};
680
681static void serializer_dtor(void *obj)
682{
683 struct serializer *ser = obj;
684
685 ao2_cleanup(ser->pool);
686 ser->pool = NULL;
688 ser->shutdown_group = NULL;
689}
690
693{
694 struct serializer *ser;
695
696 /* This object has a lock so it can be used to ensure exclusive access
697 * to the execution of tasks within the serializer.
698 */
699 ser = ao2_alloc(sizeof(*ser), serializer_dtor);
700 if (!ser) {
701 return NULL;
702 }
703 ser->pool = ao2_bump(pool);
705 return ser;
706}
707
708AST_THREADSTORAGE_RAW(current_taskpool_serializer);
709
710static int execute_tasks(void *data)
711{
712 struct ast_taskpool *pool = ast_taskpool_get_current();
713 struct ast_taskprocessor *tps = data;
716 size_t remaining, requeue = 0;
717
718 /* In a normal scenario this lock will not be in contention with
719 * anything else. It is only if a synchronous task is pushed to
720 * the serializer that it may be blocked on the synchronous
721 * task thread. This is done to ensure that only one thread is executing
722 * tasks from the serializer at a given time, and not out of order
723 * either.
724 */
725 ao2_lock(ser);
726
727 ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
728 for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
729 requeue = ast_taskprocessor_execute(tps);
730 }
731 ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
732
733 ao2_unlock(ser);
734
735 /* If there are remaining tasks we requeue, this way the serializer
736 * does not hold exclusivity of the taskpool taskprocessor
737 */
738 if (requeue) {
739 /* Ownership passes to the new task */
742 }
743 } else {
745 }
746
747 return 0;
748}
749
751{
752 if (was_empty) {
755
756 if (ast_taskpool_push(ser->pool, execute_tasks, tps)) {
758 }
759 }
760}
761
763{
764 /* No-op */
765 return 0;
766}
767
769{
771
772 if (ser->shutdown_group) {
774 }
775 ao2_cleanup(ser);
776}
777
780 .start = serializer_start,
781 .shutdown = serializer_shutdown,
782};
783
785{
786 return ast_threadstorage_get_ptr(&current_taskpool_serializer);
787}
788
791{
792 struct serializer *ser;
794 struct ast_taskprocessor *tps;
795
797 if (!ser) {
798 return NULL;
799 }
800
802 if (!listener) {
803 ao2_ref(ser, -1);
804 return NULL;
805 }
806
808 if (!tps) {
809 /* ser ref transferred to listener but not cleaned without tps */
810 ao2_ref(ser, -1);
811 } else if (shutdown_group) {
813 }
814
815 ao2_ref(listener, -1);
816 return tps;
817}
818
820{
822}
823
824/*!
825 * \internal An empty task callback, used to ensure the serializer does not
826 * go empty. */
827static int taskpool_serializer_empty_task(void *data)
828{
829 return 0;
830}
831
832int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int (*task)(void *data), void *data)
833{
836 struct ast_taskprocessor *prior_serializer;
838
839 /* If not in a taskpool taskprocessor we can just queue the task like normal and
840 * wait. */
843 return -1;
844 }
845
848 return -1;
849 }
850
852 while (!sync_task.complete) {
853 ast_cond_wait(&sync_task.cond, &sync_task.lock);
854 }
856
858 return sync_task.fail;
859 }
860
861 /* It is possible that we are already executing within a serializer, so stash the existing
862 * away so we can restore it.
863 */
864 prior_serializer = ast_taskpool_serializer_get_current();
865
866 ao2_lock(ser);
867
868 /* There are two cases where we can or have to directly execute this task:
869 * 1. There are no other tasks in the serializer
870 * 2. We are already in the serializer
871 * In the second case if we don't execute the task now, we will deadlock waiting
872 * on it as it will never occur.
873 */
874 if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
875 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
876 sync_task.fail = task(data);
877 ao2_unlock(ser);
878 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
879 return sync_task.fail;
880 }
881
883 ao2_unlock(ser);
884 return -1;
885 }
886
887 /* First we queue the serialized task */
890 ao2_unlock(ser);
891 return -1;
892 }
893
894 /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
895 * stops two tasks from being queued for the same serializer at the same time.
896 */
899 ao2_unlock(ser);
900 return -1;
901 }
902
903 /* Now we execute the tasks on the serializer until our sync task is complete */
904 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
905 while (!sync_task.complete) {
906 /* The sync task is guaranteed to be executed, so doing a while loop on the complete
907 * flag is safe.
908 */
910 }
912 ao2_unlock(ser);
913
914 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
915
916 return sync_task.fail;
917}
918
919/*!
920 * \internal
921 * \brief Clean up resources on Asterisk shutdown
922 */
923static void taskpool_shutdown(void)
924{
925 if (sched) {
927 sched = NULL;
928 }
929}
930
932{
934 if (!sched) {
935 return -1;
936 }
937
939 return -1;
940 }
941
943
944 return 0;
945}
Prototypes for public functions only of internal interest,.
static void * listener(void *unused)
Definition: asterisk.c:1530
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ast_log
Definition: astobj2.c:42
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char type[]
Definition: chan_ooh323.c:109
static const char name[]
Definition: format_mp3.c:68
#define LOG_WARNING
#define ast_cond_destroy(cond)
Definition: lock.h:209
#define ast_cond_wait(cond, mutex)
Definition: lock.h:212
#define ast_cond_init(cond, attr)
Definition: lock.h:208
#define ast_mutex_init(pmutex)
Definition: lock.h:193
#define ast_mutex_unlock(a)
Definition: lock.h:197
pthread_cond_t ast_cond_t
Definition: lock.h:185
#define ast_mutex_destroy(a)
Definition: lock.h:195
#define ast_mutex_lock(a)
Definition: lock.h:196
#define ast_cond_signal(cond)
Definition: lock.h:210
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
static int sync_task(void *data)
Definition: res_pjsip.c:2117
#define NULL
Definition: resample.c:96
Scheduler Routines (derived from cheops)
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
Definition: sched.h:82
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
Definition: sched.c:567
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
Definition: sched.c:271
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition: sched.c:197
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition: sched.c:238
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
Structure for mutex and tracking information.
Definition: lock.h:142
void(* thread_start)(void)
Function to call when a taskprocessor starts.
Definition: taskpool.h:131
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition: taskpool.h:81
int max_size
Maximum number of taskprocessors a pool may have.
Definition: taskpool.h:115
void(* thread_end)(void)
Function to call when a taskprocessor ends.
Definition: taskpool.h:138
int auto_increment
Number of taskprocessors to increment the pool by.
Definition: taskpool.h:85
int growth_threshold
The threshold for when to grow the pool.
Definition: taskpool.h:124
int minimum_size
Number of taskprocessors that will always exist.
Definition: taskpool.h:92
int initial_size
Number of taskprocessors the pool will start with.
Definition: taskpool.h:102
An opaque taskpool structure.
Definition: taskpool.c:62
taskpool_selector selector
Definition: taskpool.c:74
struct ast_taskpool_options options
Definition: taskpool.c:70
int shrink_sched_id
Definition: taskpool.c:72
struct taskpool_taskprocessors static_taskprocessors
Definition: taskpool.c:64
int shutting_down
Definition: taskpool.c:68
struct taskpool_taskprocessors dynamic_taskprocessors
Definition: taskpool.c:66
char name[0]
Definition: taskpool.c:76
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:99
A listener for taskprocessors.
struct ast_taskprocessor * tps
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
Definition: sched.c:76
struct ast_taskpool * pool
Definition: taskpool.c:676
struct ast_serializer_shutdown_group * shutdown_group
Definition: taskpool.c:678
ast_cond_t cond
Definition: taskpool.c:570
int(* task)(void *)
Definition: taskpool.c:573
ast_mutex_t lock
Definition: taskpool.c:569
A taskpool taskprocessor.
Definition: taskpool.c:34
struct ast_taskprocessor * taskprocessor
Definition: taskpool.c:36
struct timeval last_pushed
Definition: taskpool.c:38
A container of taskprocessors.
Definition: taskpool.c:44
struct taskpool_taskprocessors::@413 taskprocessors
unsigned int taskprocessor_num
Definition: taskpool.c:48
AST_THREADSTORAGE_RAW(current_taskpool_pool)
Thread storage for the current taskpool.
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: taskpool.c:778
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
Definition: taskpool.c:287
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
Definition: taskpool.c:594
static int taskpool_sync_task(void *data)
Definition: taskpool.c:603
static struct ast_sched_context * sched
Scheduler used for dynamic pool shrinking.
Definition: taskpool.c:83
static int taskpool_taskprocessor_start(void *data)
Definition: taskpool.c:131
static void serializer_dtor(void *obj)
Definition: taskpool.c:681
size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool)
Get the current number of taskprocessors in the taskpool.
Definition: taskpool.c:451
void(* taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition: taskpool.c:51
static void taskpool_shutdown(void)
Definition: taskpool.c:923
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: taskpool.c:750
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
Definition: taskpool.c:462
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:819
int ast_taskpool_push_wait(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool, and wait for completion.
Definition: taskpool.c:623
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition: taskpool.c:653
static int taskpool_dynamic_pool_shrink(const void *data)
Definition: taskpool.c:226
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
Definition: taskpool.c:193
int ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool.
Definition: taskpool.c:522
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
Definition: taskpool.c:206
static void taskpool_taskprocessor_dtor(void *obj)
Definition: taskpool.c:116
int ast_taskpool_init(void)
Definition: taskpool.c:931
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
Definition: taskpool.c:152
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: taskpool.c:768
static struct ast_taskpool * ast_taskpool_get_current(void)
Definition: taskpool.c:92
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition: taskpool.c:784
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:789
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: taskpool.c:691
static int taskpool_taskprocessor_stop(void *data)
Definition: taskpool.c:101
static int execute_tasks(void *data)
Definition: taskpool.c:710
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition: taskpool.c:262
#define TASKPROCESSOR_IS_IDLE(tps, timeout)
Definition: taskpool.c:221
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
Definition: taskpool.c:80
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
Definition: taskpool.c:580
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: taskpool.c:762
int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int(*task)(void *data), void *data)
Push a task to a serializer, and wait for completion.
Definition: taskpool.c:832
static int taskpool_serializer_empty_task(void *data)
Definition: taskpool.c:827
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition: taskpool.c:324
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
Definition: taskpool.c:479
long ast_taskpool_queue_size(struct ast_taskpool *pool)
Get the current number of queued tasks in the taskpool.
Definition: taskpool.c:464
#define AST_TASKPOOL_OPTIONS_VERSION
Definition: taskpool.h:69
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
Definition: taskpool.h:65
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
Definition: taskpool.h:64
@ AST_TASKPOOL_SELECTOR_DEFAULT
Definition: taskpool.h:63
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct ast_taskprocessor_listener * ast_taskprocessor_listener(struct ast_taskprocessor *tps)
Return the listener associated with the taskprocessor.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
static struct test_options options
static int task(void *data)
Queued task for baseline test.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
Time-related functions and macros.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove all elements from a vector that matches the given comparison.
Definition: vector.h:472
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:873
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691