Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
threadpool.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2012-2013, Digium, Inc.
5 *
6 * Mark Michelson <mmmichelson@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19
20#include "asterisk.h"
21
22#include "asterisk/threadpool.h"
24#include "asterisk/astobj2.h"
25#include "asterisk/utils.h"
26
27/* Needs to stay prime if increased */
28#define THREAD_BUCKETS 89
29
30/*!
31 * \brief An opaque threadpool structure
32 *
33 * A threadpool is a collection of threads that execute
34 * tasks from a common queue.
35 */
37 /*! Threadpool listener */
39 /*!
40 * \brief The container of active threads.
41 * Active threads are those that are currently running tasks
42 */
44 /*!
45 * \brief The container of idle threads.
46 * Idle threads are those that are currently waiting to run tasks
47 */
49 /*!
50 * \brief The container of zombie threads.
51 * Zombie threads may be running tasks, but they are scheduled to die soon
52 */
54 /*!
55 * \brief The main taskprocessor
56 *
57 * Tasks that are queued in this taskprocessor are
58 * doled out to the worker threads. Worker threads that
59 * execute tasks from the threadpool are executing tasks
60 * in this taskprocessor.
61 *
62 * The threadpool itself is actually the private data for
63 * this taskprocessor's listener. This way, as taskprocessor
64 * changes occur, the threadpool can alert its listeners
65 * appropriately.
66 */
68 /*!
69 * \brief The control taskprocessor
70 *
71 * This is a standard taskprocessor that uses the default
72 * taskprocessor listener. In other words, all tasks queued to
73 * this taskprocessor have a single thread that executes the
74 * tasks.
75 *
76 * All tasks that modify the state of the threadpool and all tasks
77 * that call out to threadpool listeners are pushed to this
78 * taskprocessor.
79 *
80 * For instance, when the threadpool changes sizes, a task is put
81 * into this taskprocessor to do so. When it comes time to tell the
82 * threadpool listener that worker threads have changed state,
83 * the task is placed in this taskprocessor.
84 *
85 * This is done for three main reasons
86 * 1) It ensures that listeners are given an accurate portrayal
87 * of the threadpool's current state. In other words, when a listener
88 * gets told a count of active, idle and zombie threads, it does not
89 * need to worry that internal state of the threadpool might be different
90 * from what it has been told.
91 * 2) It minimizes the locking required in both the threadpool and in
92 * threadpool listener's callbacks.
93 * 3) It ensures that listener callbacks are called in the same order
94 * that the threadpool had its state change.
95 */
97 /*! True if the threadpool is in the process of shutting down */
99 /*! Threadpool-specific options */
101};
102
103/*!
104 * \brief listener for a threadpool
105 *
106 * The listener is notified of changes in a threadpool. It can
107 * react by doing things like increasing the number of threads
108 * in the pool
109 */
111 /*! Callbacks called by the threadpool */
113 /*! User data for the listener */
115};
116
117/*!
118 * \brief states for worker threads
119 */
121 /*! The worker is either active or idle */
123 /*!
124 * The worker has been asked to shut down but
125 * may still be in the process of executing tasks.
126 * This transition happens when the threadpool needs
127 * to shrink and needs to kill active threads in order
128 * to do so.
129 */
131 /*!
132 * The worker has been asked to shut down. Typically
133 * only idle threads go to this state directly, but
134 * active threads may go straight to this state when
135 * the threadpool is shut down.
136 */
138};
139
140/*!
141 * A thread that executes threadpool tasks
142 */
144 /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145 int id;
146 /*! Condition used in conjunction with state changes */
148 /*! Lock used alongside the condition for state changes */
150 /*! The actual thread that is executing tasks */
151 pthread_t thread;
152 /*! A pointer to the threadpool. Needed to be able to execute tasks */
154 /*! The current state of the worker thread */
156 /*! A boolean used to determine if an idle thread should become active */
158 /*! Options for this threadpool */
160};
161
162/* Worker thread forward declarations. See definitions for documentation */
163static int worker_thread_hash(const void *obj, int flags);
164static int worker_thread_cmp(void *obj, void *arg, int flags);
165static void worker_thread_destroy(void *obj);
166static void worker_active(struct worker_thread *worker);
167static void *worker_start(void *arg);
169static int worker_thread_start(struct worker_thread *worker);
170static int worker_idle(struct worker_thread *worker);
171static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172static void worker_shutdown(struct worker_thread *worker);
173
174/*!
175 * \brief Notify the threadpool listener that the state has changed.
176 *
177 * This notifies the threadpool listener via its state_changed callback.
178 * \param pool The threadpool whose state has changed
179 */
181{
182 int active_size = ao2_container_count(pool->active_threads);
183 int idle_size = ao2_container_count(pool->idle_threads);
184
186 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187 }
188}
189
190/*!
191 * \brief Struct used for queued operations involving worker state changes
192 */
194 /*! Threadpool that contains the worker whose state has changed */
196 /*! Worker whose state has changed */
198};
199
200/*!
201 * \brief Destructor for thread_worker_pair
202 */
204{
205 ao2_ref(pair->worker, -1);
206 ast_free(pair);
207}
208
209/*!
210 * \brief Allocate and initialize a thread_worker_pair
211 * \param pool Threadpool to assign to the thread_worker_pair
212 * \param worker Worker thread to assign to the thread_worker_pair
213 */
215 struct worker_thread *worker)
216{
217 struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218 if (!pair) {
219 return NULL;
220 }
221 pair->pool = pool;
222 ao2_ref(worker, +1);
223 pair->worker = worker;
224
225 return pair;
226}
227
228/*!
229 * \brief Move a worker thread from the active container to the idle container.
230 *
231 * This function is called from the threadpool's control taskprocessor thread.
232 * \param data A thread_worker_pair containing the threadpool and the worker to move.
233 * \return 0
234 */
235static int queued_active_thread_idle(void *data)
236{
237 struct thread_worker_pair *pair = data;
238
239 ao2_link(pair->pool->idle_threads, pair->worker);
240 ao2_unlink(pair->pool->active_threads, pair->worker);
241
243
245 return 0;
246}
247
248/*!
249 * \brief Queue a task to move a thread from the active list to the idle list
250 *
251 * This is called by a worker thread when it runs out of tasks to perform and
252 * goes idle.
253 * \param pool The threadpool to which the worker belongs
254 * \param worker The worker thread that has gone idle
255 */
257 struct worker_thread *worker)
258{
259 struct thread_worker_pair *pair;
261
262 if (pool->shutting_down) {
263 return;
264 }
265
267 if (!pair) {
268 return;
269 }
270
273 }
274}
275
276/*!
277 * \brief Kill a zombie thread
278 *
279 * This runs from the threadpool's control taskprocessor thread.
280 *
281 * \param data A thread_worker_pair containing the threadpool and the zombie thread
282 * \return 0
283 */
284static int queued_zombie_thread_dead(void *data)
285{
286 struct thread_worker_pair *pair = data;
287
288 ao2_unlink(pair->pool->zombie_threads, pair->worker);
290
292 return 0;
293}
294
295/*!
296 * \brief Queue a task to kill a zombie thread
297 *
298 * This is called by a worker thread when it acknowledges that it is time for
299 * it to die.
300 */
302 struct worker_thread *worker)
303{
304 struct thread_worker_pair *pair;
306
307 if (pool->shutting_down) {
308 return;
309 }
310
312 if (!pair) {
313 return;
314 }
315
318 }
319}
320
321static int queued_idle_thread_dead(void *data)
322{
323 struct thread_worker_pair *pair = data;
324
325 ao2_unlink(pair->pool->idle_threads, pair->worker);
327
329 return 0;
330}
331
333 struct worker_thread *worker)
334{
335 struct thread_worker_pair *pair;
337
338 if (pool->shutting_down) {
339 return;
340 }
341
343 if (!pair) {
344 return;
345 }
346
349 }
350}
351
352/*!
353 * \brief Execute a task in the threadpool
354 *
355 * This is the function that worker threads call in order to execute tasks
356 * in the threadpool
357 *
358 * \param pool The pool to which the tasks belong.
359 * \retval 0 Either the pool has been shut down or there are no tasks.
360 * \retval 1 There are still tasks remaining in the pool.
361 */
363{
364 ao2_lock(pool);
365 if (!pool->shutting_down) {
368 }
370 return 0;
371}
372
373/*!
374 * \brief Destroy a threadpool's components.
375 *
376 * This is the destructor called automatically when the threadpool's
377 * reference count reaches zero. This is not to be confused with
378 * threadpool_destroy.
379 *
380 * By the time this actually gets called, most of the cleanup has already
381 * been done in the pool. The only thing left to do is to release the
382 * final reference to the threadpool listener.
383 *
384 * \param obj The pool to destroy
385 */
386static void threadpool_destructor(void *obj)
387{
388 struct ast_threadpool *pool = obj;
389 ao2_cleanup(pool->listener);
390}
391
392/*!
393 * \brief Allocate a threadpool
394 *
395 * This is implemented as a taskprocessor listener's alloc callback. This
396 * is because the threadpool exists as the private data on a taskprocessor
397 * listener.
398 *
399 * \param name The name of the threadpool.
400 * \param options The options the threadpool uses.
401 * \retval NULL Could not initialize threadpool properly
402 * \retval non-NULL The newly-allocated threadpool
403 */
404static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
405{
406 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407 struct ast_str *control_tps_name;
408
409 pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410 control_tps_name = ast_str_create(64);
411 if (!pool || !control_tps_name) {
412 ast_free(control_tps_name);
413 return NULL;
414 }
415
416 ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417
418 pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419 ast_free(control_tps_name);
420 if (!pool->control_tps) {
421 return NULL;
422 }
423 pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425 if (!pool->active_threads) {
426 return NULL;
427 }
430 if (!pool->idle_threads) {
431 return NULL;
432 }
433 pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435 if (!pool->zombie_threads) {
436 return NULL;
437 }
438 pool->options = *options;
439
440 ao2_ref(pool, +1);
441 return pool;
442}
443
445{
446 return 0;
447}
448
449/*!
450 * \brief helper used for queued task when tasks are pushed
451 */
453 /*! Pool into which a task was pushed */
455 /*! Indicator of whether the pool had no tasks prior to the new task being added */
457};
458
459/*!
460 * \brief Allocate and initialize a task_pushed_data
461 * \param pool The threadpool to set in the task_pushed_data
462 * \param was_empty The was_empty value to set in the task_pushed_data
463 * \retval NULL Unable to allocate task_pushed_data
464 * \retval non-NULL The newly-allocated task_pushed_data
465 */
467 int was_empty)
468{
469 struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470
471 if (!tpd) {
472 return NULL;
473 }
474 tpd->pool = pool;
475 tpd->was_empty = was_empty;
476 return tpd;
477}
478
479/*!
480 * \brief Activate idle threads
481 *
482 * This function always returns CMP_MATCH because all workers that this
483 * function acts on need to be seen as matches so they are unlinked from the
484 * list of idle threads.
485 *
486 * Called as an ao2_callback in the threadpool's control taskprocessor thread.
487 * \param obj The worker to activate
488 * \param arg The pool where the worker belongs
489 * \param flags
490 * \retval CMP_MATCH
491 */
492static int activate_thread(void *obj, void *arg, int flags)
493{
494 struct worker_thread *worker = obj;
495 struct ast_threadpool *pool = arg;
496
497 if (!ao2_link(pool->active_threads, worker)) {
498 /* If we can't link the idle thread into the active container, then
499 * we'll just leave the thread idle and not wake it up.
500 */
501 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
502 worker->id);
503 return 0;
504 }
505
506 if (worker_set_state(worker, ALIVE)) {
507 ast_debug(1, "Failed to activate thread %d. It is dead\n",
508 worker->id);
509 /* The worker thread will no longer exist in the active threads or
510 * idle threads container after this.
511 */
512 ao2_unlink(pool->active_threads, worker);
513 }
514
515 return CMP_MATCH;
516}
517
518/*!
519 * \brief Add threads to the threadpool
520 *
521 * This function is called from the threadpool's control taskprocessor thread.
522 * \param pool The pool that is expanding
523 * \param delta The number of threads to add to the pool
524 */
525static void grow(struct ast_threadpool *pool, int delta)
526{
527 int i;
528
529 int current_size = ao2_container_count(pool->active_threads) +
531
532 if (pool->options.max_size && current_size + delta > pool->options.max_size) {
533 delta = pool->options.max_size - current_size;
534 }
535
536 ast_debug(3, "Increasing threadpool %s's size by %d\n",
537 ast_taskprocessor_name(pool->tps), delta);
538
539 for (i = 0; i < delta; ++i) {
540 struct worker_thread *worker = worker_thread_alloc(pool);
541 if (!worker) {
542 return;
543 }
544 if (ao2_link(pool->idle_threads, worker)) {
545 if (worker_thread_start(worker)) {
546 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
548 }
549 } else {
550 ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
551 }
552 ao2_ref(worker, -1);
553 }
554}
555
556/*!
557 * \brief Queued task called when tasks are pushed into the threadpool
558 *
559 * This function first calls into the threadpool's listener to let it know
560 * that a task has been pushed. It then wakes up all idle threads and moves
561 * them into the active thread container.
562 * \param data A task_pushed_data
563 * \return 0
564 */
565static int queued_task_pushed(void *data)
566{
567 struct task_pushed_data *tpd = data;
568 struct ast_threadpool *pool = tpd->pool;
569 int was_empty = tpd->was_empty;
570 unsigned int existing_active;
571
572 ast_free(tpd);
573
574 if (pool->listener && pool->listener->callbacks->task_pushed) {
575 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
576 }
577
578 existing_active = ao2_container_count(pool->active_threads);
579
580 /* The first pass transitions any existing idle threads to be active, and
581 * will also remove any worker threads that have recently entered the dead
582 * state.
583 */
585 activate_thread, pool);
586
587 /* If no idle threads could be transitioned to active grow the pool as permitted. */
588 if (ao2_container_count(pool->active_threads) == existing_active) {
589 if (!pool->options.auto_increment) {
590 return 0;
591 }
592 grow(pool, pool->options.auto_increment);
593 /* An optional second pass transitions any newly added threads. */
595 activate_thread, pool);
596 }
597
599 return 0;
600}
601
602/*!
603 * \brief Taskprocessor listener callback called when a task is added
604 *
605 * The threadpool uses this opportunity to queue a task on its control taskprocessor
606 * in order to activate idle threads and notify the threadpool listener that the
607 * task has been pushed.
608 * \param listener The taskprocessor listener. The threadpool is the listener's private data
609 * \param was_empty True if the taskprocessor was empty prior to the task being pushed
610 */
612{
614 struct task_pushed_data *tpd;
616
617 if (pool->shutting_down) {
618 return;
619 }
620
622 if (!tpd) {
623 return;
624 }
625
627 ast_free(tpd);
628 }
629}
630
631/*!
632 * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
633 *
634 * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
635 * \param data The pool that has become empty
636 * \return 0
637 */
638static int queued_emptied(void *data)
639{
640 struct ast_threadpool *pool = data;
641
642 /* We already checked for existence of this callback when this was queued */
643 pool->listener->callbacks->emptied(pool, pool->listener);
644 return 0;
645}
646
647/*!
648 * \brief Taskprocessor listener emptied callback
649 *
650 * The threadpool queues a task to let the threadpool listener know that
651 * the threadpool no longer contains any tasks.
652 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
653 */
655{
657 SCOPED_AO2LOCK(lock, pool);
658
659 if (pool->shutting_down) {
660 return;
661 }
662
663 if (pool->listener && pool->listener->callbacks->emptied) {
665 /* Nothing to do here but we need the check to keep the compiler happy. */
666 }
667 }
668}
669
670/*!
671 * \brief Taskprocessor listener shutdown callback
672 *
673 * The threadpool will shut down and destroy all of its worker threads when
674 * this is called back. By the time this gets called, the taskprocessor's
675 * control taskprocessor has already been destroyed. Therefore there is no risk
676 * in outright destroying the worker threads here.
677 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
678 */
691
692/*!
693 * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
694 */
701
702/*!
703 * \brief ao2 callback to kill a set number of threads.
704 *
705 * Threads will be unlinked from the container as long as the
706 * counter has not reached zero. The counter is decremented with
707 * each thread that is removed.
708 * \param obj The worker thread up for possible destruction
709 * \param arg The counter
710 * \param flags Unused
711 * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
712 * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
713 */
714static int kill_threads(void *obj, void *arg, int flags)
715{
716 int *num_to_kill = arg;
717
718 if (*num_to_kill > 0) {
719 --(*num_to_kill);
720 return CMP_MATCH;
721 } else {
722 return CMP_STOP;
723 }
724}
725
726/*!
727 * \brief ao2 callback to zombify a set number of threads.
728 *
729 * Threads will be zombified as long as the counter has not reached
730 * zero. The counter is decremented with each thread that is zombified.
731 *
732 * Zombifying a thread involves removing it from its current container,
733 * adding it to the zombie container, and changing the state of the
734 * worker to a zombie
735 *
736 * This callback is called from the threadpool control taskprocessor thread.
737 *
738 * \param obj The worker thread that may be zombified
739 * \param arg The pool to which the worker belongs
740 * \param data The counter
741 * \param flags Unused
742 * \retval CMP_MATCH The zombified thread should be removed from its current container
743 * \retval CMP_STOP Stop attempting to zombify threads
744 */
745static int zombify_threads(void *obj, void *arg, void *data, int flags)
746{
747 struct worker_thread *worker = obj;
748 struct ast_threadpool *pool = arg;
749 int *num_to_zombify = data;
750
751 if ((*num_to_zombify)-- > 0) {
752 if (!ao2_link(pool->zombie_threads, worker)) {
753 ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
754 return 0;
755 }
756 worker_set_state(worker, ZOMBIE);
757 return CMP_MATCH;
758 } else {
759 return CMP_STOP;
760 }
761}
762
763/*!
764 * \brief Remove threads from the threadpool
765 *
766 * The preference is to kill idle threads. However, if there are
767 * more threads to remove than there are idle threads, then active
768 * threads will be zombified instead.
769 *
770 * This function is called from the threadpool control taskprocessor thread.
771 *
772 * \param pool The threadpool to remove threads from
773 * \param delta The number of threads to remove
774 */
775static void shrink(struct ast_threadpool *pool, int delta)
776{
777 /*
778 * Preference is to kill idle threads, but
779 * we'll move on to deactivating active threads
780 * if we have to
781 */
783 int idle_threads_to_kill = MIN(delta, idle_threads);
784 int active_threads_to_zombify = delta - idle_threads_to_kill;
785
786 ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
788
790 kill_threads, &idle_threads_to_kill);
791
792 ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
794
796 zombify_threads, pool, &active_threads_to_zombify);
797}
798
799/*!
800 * \brief Helper struct used for queued operations that change the size of the threadpool
801 */
803 /*! The pool whose size is to change */
805 /*! The requested new size of the pool */
806 unsigned int size;
807};
808
809/*!
810 * \brief Allocate and initialize a set_size_data
811 * \param pool The pool for the set_size_data
812 * \param size The size to store in the set_size_data
813 */
815 unsigned int size)
816{
817 struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
818 if (!ssd) {
819 return NULL;
820 }
821
822 ssd->pool = pool;
823 ssd->size = size;
824 return ssd;
825}
826
827/*!
828 * \brief Change the size of the threadpool
829 *
830 * This can either result in shrinking or growing the threadpool depending
831 * on the new desired size and the current size.
832 *
833 * This function is run from the threadpool control taskprocessor thread
834 *
835 * \param data A set_size_data used for determining how to act
836 * \return 0
837 */
838static int queued_set_size(void *data)
839{
840 struct set_size_data *ssd = data;
841 struct ast_threadpool *pool = ssd->pool;
842 unsigned int num_threads = ssd->size;
843
844 /* We don't count zombie threads as being "live" when potentially resizing */
845 unsigned int current_size = ao2_container_count(pool->active_threads) +
847
848 ast_free(ssd);
849
850 if (current_size == num_threads) {
851 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
852 num_threads, current_size);
853 return 0;
854 }
855
856 if (current_size < num_threads) {
858 activate_thread, pool);
859
860 /* As the above may have altered the number of current threads update it */
861 current_size = ao2_container_count(pool->active_threads) +
863 grow(pool, num_threads - current_size);
865 activate_thread, pool);
866 } else {
867 shrink(pool, current_size - num_threads);
868 }
869
871 return 0;
872}
873
874void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
875{
876 struct set_size_data *ssd;
878
879 if (pool->shutting_down) {
880 return;
881 }
882
884 if (!ssd) {
885 return;
886 }
887
889 ast_free(ssd);
890 }
891}
892
895{
897 if (!listener) {
898 return NULL;
899 }
900 listener->callbacks = callbacks;
901 listener->user_data = user_data;
902 return listener;
903}
904
906{
907 return listener->user_data;
908}
909
914
917 const struct ast_threadpool_options *options)
918{
919 struct ast_taskprocessor *tps;
920 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922 char *fullname;
923
925 if (!pool) {
926 return NULL;
927 }
928
930 if (!tps_listener) {
931 return NULL;
932 }
933
934 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936 return NULL;
937 }
938
939 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940 sprintf(fullname, "%s/pool", name); /* Safe */
941 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942 if (!tps) {
943 return NULL;
944 }
945
946 pool->tps = tps;
947 if (listener) {
948 ao2_ref(listener, +1);
949 pool->listener = listener;
950 }
951 ast_threadpool_set_size(pool, pool->options.initial_size);
952 ao2_ref(pool, +1);
953 return pool;
954}
955
956#undef ast_threadpool_push
957#define ast_threadpool_push_internal(pool, task, data) \
958 __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
959int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
960
961int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
962 const char *file, int line, const char *function)
963{
964 SCOPED_AO2LOCK(lock, pool);
965 if (!pool->shutting_down) {
966 return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
967 }
968 return -1;
969}
970
971/* ABI compatibility: Provide actual function symbol for external modules */
972int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
973{
974 return __ast_threadpool_push(pool, task, data, NULL, 0, NULL);
975}
976
978{
979 if (!pool) {
980 return;
981 }
982 /* Shut down the taskprocessors and everything else just
983 * takes care of itself via the taskprocessor callbacks
984 */
985 ao2_lock(pool);
986 pool->shutting_down = 1;
987 ao2_unlock(pool);
990}
991
992/*!
993 * A monotonically increasing integer used for worker
994 * thread identification.
995 */
997
998static int worker_thread_hash(const void *obj, int flags)
999{
1000 const struct worker_thread *worker = obj;
1001
1002 return worker->id;
1003}
1004
1005static int worker_thread_cmp(void *obj, void *arg, int flags)
1006{
1007 struct worker_thread *worker1 = obj;
1008 struct worker_thread *worker2 = arg;
1009
1010 return worker1->id == worker2->id ? CMP_MATCH : 0;
1011}
1012
1013/*!
1014 * \brief shut a worker thread down
1015 *
1016 * Set the worker dead and then wait for its thread
1017 * to finish executing.
1018 *
1019 * \param worker The worker thread to shut down
1020 */
1021static void worker_shutdown(struct worker_thread *worker)
1022{
1023 worker_set_state(worker, DEAD);
1024 if (worker->thread != AST_PTHREADT_NULL) {
1025 pthread_join(worker->thread, NULL);
1026 worker->thread = AST_PTHREADT_NULL;
1027 }
1028}
1029
1030/*!
1031 * \brief Worker thread destructor
1032 *
1033 * Called automatically when refcount reaches 0. Shuts
1034 * down the worker thread and destroys its component
1035 * parts
1036 */
1037static void worker_thread_destroy(void *obj)
1038{
1039 struct worker_thread *worker = obj;
1040 ast_debug(3, "Destroying worker thread %d\n", worker->id);
1041 worker_shutdown(worker);
1042 ast_mutex_destroy(&worker->lock);
1043 ast_cond_destroy(&worker->cond);
1044}
1045
1046/*!
1047 * \brief start point for worker threads
1048 *
1049 * Worker threads start in the active state but may
1050 * immediately go idle if there is no work to be
1051 * done
1052 *
1053 * \param arg The worker thread
1054 */
1055static void *worker_start(void *arg)
1056{
1057 struct worker_thread *worker = arg;
1058 enum worker_state saved_state;
1059
1060 if (worker->options.thread_start) {
1061 worker->options.thread_start();
1062 }
1063
1064 ast_mutex_lock(&worker->lock);
1065 while (worker_idle(worker)) {
1066 ast_mutex_unlock(&worker->lock);
1067 worker_active(worker);
1068 ast_mutex_lock(&worker->lock);
1069 if (worker->state != ALIVE) {
1070 break;
1071 }
1072 threadpool_active_thread_idle(worker->pool, worker);
1073 }
1074 saved_state = worker->state;
1075 ast_mutex_unlock(&worker->lock);
1076
1077 /* Reaching this portion means the thread is
1078 * on death's door. It may have been killed while
1079 * it was idle, in which case it can just die
1080 * peacefully. If it's a zombie, though, then
1081 * it needs to let the pool know so
1082 * that the thread can be removed from the
1083 * list of zombie threads.
1084 */
1085 if (saved_state == ZOMBIE) {
1086 threadpool_zombie_thread_dead(worker->pool, worker);
1087 }
1088
1089 if (worker->options.thread_end) {
1090 worker->options.thread_end();
1091 }
1092 return NULL;
1093}
1094
1095/*!
1096 * \brief Allocate and initialize a new worker thread
1097 *
1098 * This will create, initialize, and start the thread.
1099 *
1100 * \param pool The threadpool to which the worker will be added
1101 * \retval NULL Failed to allocate or start the worker thread
1102 * \retval non-NULL The newly-created worker thread
1103 */
1105{
1106 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1107 if (!worker) {
1108 return NULL;
1109 }
1111 ast_mutex_init(&worker->lock);
1112 ast_cond_init(&worker->cond, NULL);
1113 worker->pool = pool;
1114 worker->thread = AST_PTHREADT_NULL;
1115 worker->state = ALIVE;
1116 worker->options = pool->options;
1117 return worker;
1118}
1119
1120static int worker_thread_start(struct worker_thread *worker)
1121{
1122 return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1123}
1124
1125/*!
1126 * \brief Active loop for worker threads
1127 *
1128 * The worker will stay in this loop for its lifetime,
1129 * executing tasks as they become available. If there
1130 * are no tasks currently available, then the thread
1131 * will go idle.
1132 *
1133 * \param worker The worker thread executing tasks.
1134 */
1135static void worker_active(struct worker_thread *worker)
1136{
1137 int alive;
1138
1139 /* The following is equivalent to
1140 *
1141 * while (threadpool_execute(worker->pool));
1142 *
1143 * However, reviewers have suggested in the past
1144 * doing that can cause optimizers to (wrongly)
1145 * optimize the code away.
1146 */
1147 do {
1148 alive = threadpool_execute(worker->pool);
1149 } while (alive);
1150}
1151
1152/*!
1153 * \brief Idle function for worker threads
1154 *
1155 * The worker waits here until it gets told by the threadpool
1156 * to wake up.
1157 *
1158 * worker is locked before entering this function.
1159 *
1160 * \param worker The idle worker
1161 * \retval 0 The thread is being woken up so that it can conclude.
1162 * \retval non-zero The thread is being woken up to do more work.
1163 */
1164static int worker_idle(struct worker_thread *worker)
1165{
1166 struct timeval start = ast_tvnow();
1167 struct timespec end = {
1168 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1169 .tv_nsec = start.tv_usec * 1000,
1170 };
1171 while (!worker->wake_up) {
1172 if (worker->options.idle_timeout <= 0) {
1173 ast_cond_wait(&worker->cond, &worker->lock);
1174 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1175 break;
1176 }
1177 }
1178
1179 if (!worker->wake_up) {
1180 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1181 threadpool_idle_thread_dead(worker->pool, worker);
1182 worker->state = DEAD;
1183 }
1184 worker->wake_up = 0;
1185 return worker->state == ALIVE;
1186}
1187
1188/*!
1189 * \brief Change a worker's state
1190 *
1191 * The threadpool calls into this function in order to let a worker know
1192 * how it should proceed.
1193 *
1194 * \retval -1 failure (state transition not permitted)
1195 * \retval 0 success
1196 */
1197static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1198{
1199 SCOPED_MUTEX(lock, &worker->lock);
1200
1201 switch (state) {
1202 case ALIVE:
1203 /* This can occur due to a race condition between being told to go active
1204 * and an idle timeout happening.
1205 */
1206 if (worker->state == DEAD) {
1207 return -1;
1208 }
1209 ast_assert(worker->state != ZOMBIE);
1210 break;
1211 case DEAD:
1212 break;
1213 case ZOMBIE:
1214 ast_assert(worker->state != DEAD);
1215 break;
1216 }
1217
1218 worker->state = state;
1219 worker->wake_up = 1;
1220 ast_cond_signal(&worker->cond);
1221
1222 return 0;
1223}
1224
1225struct serializer {
1226 /*! Threadpool the serializer will use to process the jobs. */
1228 /*! Which group will wait for this serializer to shutdown. */
1230};
1231
1232static void serializer_dtor(void *obj)
1233{
1234 struct serializer *ser = obj;
1235
1236 ao2_cleanup(ser->pool);
1237 ser->pool = NULL;
1239 ser->shutdown_group = NULL;
1240}
1241
1244{
1245 struct serializer *ser;
1246
1248 if (!ser) {
1249 return NULL;
1250 }
1251 ao2_ref(pool, +1);
1252 ser->pool = pool;
1254 return ser;
1255}
1256
1257AST_THREADSTORAGE_RAW(current_serializer);
1258
1259static int execute_tasks(void *data)
1260{
1261 struct ast_taskprocessor *tps = data;
1262
1263 ast_threadstorage_set_ptr(&current_serializer, tps);
1264 while (ast_taskprocessor_execute(tps)) {
1265 /* No-op */
1266 }
1267 ast_threadstorage_set_ptr(&current_serializer, NULL);
1268
1270 return 0;
1271}
1272
1274{
1275 if (was_empty) {
1278
1279 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1281 }
1282 }
1283}
1284
1286{
1287 /* No-op */
1288 return 0;
1289}
1290
1300
1306
1308{
1309 return ast_threadstorage_get_ptr(&current_serializer);
1310}
1311
1314{
1315 struct serializer *ser;
1317 struct ast_taskprocessor *tps;
1318
1319 ser = serializer_create(pool, shutdown_group);
1320 if (!ser) {
1321 return NULL;
1322 }
1323
1325 if (!listener) {
1326 ao2_ref(ser, -1);
1327 return NULL;
1328 }
1329
1331 if (!tps) {
1332 /* ser ref transferred to listener but not cleaned without tps */
1333 ao2_ref(ser, -1);
1334 } else if (shutdown_group) {
1336 }
1337
1338 ao2_ref(listener, -1);
1339 return tps;
1340}
1341
1343{
1345}
1346
1348{
1349 return ast_taskprocessor_size(pool->tps);
1350}
ast_mutex_t lock
Definition app_sla.c:337
static void * listener(void *unused)
Definition asterisk.c:1530
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_free(a)
Definition astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define ast_log
Definition astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
@ CMP_MATCH
Definition astobj2.h:1027
@ CMP_STOP
Definition astobj2.h:1028
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition astobj2.h:1723
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049
@ OBJ_UNLINK
Definition astobj2.h:1039
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
char * end
Definition eagi_proxy.c:73
static const char name[]
Definition format_mp3.c:68
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define ast_cond_destroy(cond)
Definition lock.h:209
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define AST_PTHREADT_NULL
Definition lock.h:73
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define ast_mutex_init(pmutex)
Definition lock.h:193
#define ast_mutex_unlock(a)
Definition lock.h:197
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
pthread_cond_t ast_cond_t
Definition lock.h:185
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition lock.h:596
#define ast_mutex_destroy(a)
Definition lock.h:195
#define ast_mutex_lock(a)
Definition lock.h:196
#define ast_cond_signal(cond)
Definition lock.h:210
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
struct @506 callbacks
#define NULL
Definition resample.c:96
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition strings.h:659
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition strings.h:1113
char *attribute_pure ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition strings.h:761
Generic container type.
Structure for mutex and tracking information.
Definition lock.h:142
Support for dynamic strings.
Definition strings.h:623
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition threadpool.h:69
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
Definition threadpool.h:58
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition threadpool.h:49
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition threadpool.h:38
listener for a threadpool
Definition threadpool.c:110
const struct ast_threadpool_listener_callbacks * callbacks
Definition threadpool.c:112
void(* thread_start)(void)
Function to call when a thread starts.
Definition threadpool.h:119
int idle_timeout
Time limit in seconds for idle threads.
Definition threadpool.h:81
int max_size
Maximum number of threads a pool may have.
Definition threadpool.h:112
void(* thread_end)(void)
Function to call when a thread ends.
Definition threadpool.h:126
int auto_increment
Number of threads to increment pool by.
Definition threadpool.h:92
An opaque threadpool structure.
Definition threadpool.c:36
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
Definition threadpool.c:48
struct ast_taskprocessor * tps
The main taskprocessor.
Definition threadpool.c:67
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition threadpool.c:96
struct ast_threadpool_options options
Definition threadpool.c:100
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition threadpool.c:53
struct ast_threadpool_listener * listener
Definition threadpool.c:38
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
Definition threadpool.c:43
struct ast_threadpool_options options
Definition threadpool.c:912
struct ast_threadpool * pool
Definition threadpool.c:911
struct ast_threadpool * pool
struct ast_taskpool * pool
Definition taskpool.c:698
struct ast_serializer_shutdown_group * shutdown_group
Definition taskpool.c:700
Helper struct used for queued operations that change the size of the threadpool.
Definition threadpool.c:802
unsigned int size
Definition threadpool.c:806
struct ast_threadpool * pool
Definition threadpool.c:804
helper used for queued task when tasks are pushed
Definition threadpool.c:452
struct ast_threadpool * pool
Definition threadpool.c:454
Struct used for queued operations involving worker state changes.
Definition threadpool.c:193
struct ast_threadpool * pool
Definition threadpool.c:195
struct worker_thread * worker
Definition threadpool.c:197
pthread_t thread
Definition threadpool.c:151
ast_cond_t cond
Definition threadpool.c:147
struct ast_threadpool_options options
Definition threadpool.c:159
enum worker_state state
Definition threadpool.c:155
struct ast_threadpool * pool
Definition threadpool.c:153
ast_mutex_t lock
Definition threadpool.c:149
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_taskprocessor_push(tps, task_exe, datap)
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static struct test_options options
static int task(void *data)
Queued task for baseline test.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition threadpool.c:203
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition threadpool.c:214
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition threadpool.c:874
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition threadpool.c:565
int __ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to the threadpool.
Definition threadpool.c:961
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition threadpool.c:492
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition threadpool.c:256
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
Definition threadpool.c:638
static int worker_thread_hash(const void *obj, int flags)
Definition threadpool.c:998
static int worker_thread_start(struct worker_thread *worker)
static void serializer_dtor(void *obj)
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
Definition threadpool.c:386
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition threadpool.c:977
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition threadpool.c:466
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
Definition threadpool.c:679
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition threadpool.c:525
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition threadpool.c:695
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
Definition threadpool.c:654
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition threadpool.c:838
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool's task queue.
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition threadpool.c:362
worker_state
states for worker threads
Definition threadpool.c:120
@ DEAD
Definition threadpool.c:137
@ ALIVE
Definition threadpool.c:122
@ ZOMBIE
Definition threadpool.c:130
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition threadpool.c:180
static int worker_id_counter
Definition threadpool.c:996
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition threadpool.c:775
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition threadpool.c:235
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition threadpool.c:284
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition threadpool.c:301
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
static int queued_idle_thread_dead(void *data)
Definition threadpool.c:321
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
Definition threadpool.c:905
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
Definition threadpool.c:444
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition threadpool.c:332
static int worker_thread_cmp(void *obj, void *arg, int flags)
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition threadpool.c:404
static void worker_thread_destroy(void *obj)
Worker thread destructor.
#define THREAD_BUCKETS
Definition threadpool.c:28
static int execute_tasks(void *data)
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition threadpool.c:814
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition threadpool.c:915
static int serializer_start(struct ast_taskprocessor_listener *listener)
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition threadpool.c:893
static void * worker_start(void *arg)
start point for worker threads
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
Definition threadpool.c:611
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition threadpool.c:714
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition threadpool.c:745
#define ast_threadpool_push(pool, task, data)
Definition threadpool.h:194
#define AST_THREADPOOL_OPTIONS_VERSION
Definition threadpool.h:73
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
#define AST_THREADSTORAGE_RAW(name)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define ast_assert(a)
Definition utils.h:779
#define MIN(a, b)
Definition utils.h:252
#define ast_pthread_create(a, b, c, d)
Definition utils.h:624