Asterisk - The Open Source Telephony Project GIT-master-a358458
threadpool.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2012-2013, Digium, Inc.
5 *
6 * Mark Michelson <mmmichelson@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19
20#include "asterisk.h"
21
22#include "asterisk/threadpool.h"
24#include "asterisk/astobj2.h"
25#include "asterisk/utils.h"
26
27/* Needs to stay prime if increased */
28#define THREAD_BUCKETS 89
29
30/*!
31 * \brief An opaque threadpool structure
32 *
33 * A threadpool is a collection of threads that execute
34 * tasks from a common queue.
35 */
37 /*! Threadpool listener */
39 /*!
40 * \brief The container of active threads.
41 * Active threads are those that are currently running tasks
42 */
44 /*!
45 * \brief The container of idle threads.
46 * Idle threads are those that are currently waiting to run tasks
47 */
49 /*!
50 * \brief The container of zombie threads.
51 * Zombie threads may be running tasks, but they are scheduled to die soon
52 */
54 /*!
55 * \brief The main taskprocessor
56 *
57 * Tasks that are queued in this taskprocessor are
58 * doled out to the worker threads. Worker threads that
59 * execute tasks from the threadpool are executing tasks
60 * in this taskprocessor.
61 *
62 * The threadpool itself is actually the private data for
63 * this taskprocessor's listener. This way, as taskprocessor
64 * changes occur, the threadpool can alert its listeners
65 * appropriately.
66 */
68 /*!
69 * \brief The control taskprocessor
70 *
71 * This is a standard taskprocessor that uses the default
72 * taskprocessor listener. In other words, all tasks queued to
73 * this taskprocessor have a single thread that executes the
74 * tasks.
75 *
76 * All tasks that modify the state of the threadpool and all tasks
77 * that call out to threadpool listeners are pushed to this
78 * taskprocessor.
79 *
80 * For instance, when the threadpool changes sizes, a task is put
81 * into this taskprocessor to do so. When it comes time to tell the
82 * threadpool listener that worker threads have changed state,
83 * the task is placed in this taskprocessor.
84 *
85 * This is done for three main reasons
86 * 1) It ensures that listeners are given an accurate portrayal
87 * of the threadpool's current state. In other words, when a listener
88 * gets told a count of active, idle and zombie threads, it does not
89 * need to worry that internal state of the threadpool might be different
90 * from what it has been told.
91 * 2) It minimizes the locking required in both the threadpool and in
92 * threadpool listener's callbacks.
93 * 3) It ensures that listener callbacks are called in the same order
94 * that the threadpool had its state change.
95 */
97 /*! True if the threadpool is in the process of shutting down */
99 /*! Threadpool-specific options */
101};
102
103/*!
104 * \brief listener for a threadpool
105 *
106 * The listener is notified of changes in a threadpool. It can
107 * react by doing things like increasing the number of threads
108 * in the pool
109 */
111 /*! Callbacks called by the threadpool */
113 /*! User data for the listener */
115};
116
117/*!
118 * \brief states for worker threads
119 */
121 /*! The worker is either active or idle */
123 /*!
124 * The worker has been asked to shut down but
125 * may still be in the process of executing tasks.
126 * This transition happens when the threadpool needs
127 * to shrink and needs to kill active threads in order
128 * to do so.
129 */
131 /*!
132 * The worker has been asked to shut down. Typically
133 * only idle threads go to this state directly, but
134 * active threads may go straight to this state when
135 * the threadpool is shut down.
136 */
138};
139
140/*!
141 * A thread that executes threadpool tasks
142 */
144 /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145 int id;
146 /*! Condition used in conjunction with state changes */
148 /*! Lock used alongside the condition for state changes */
150 /*! The actual thread that is executing tasks */
151 pthread_t thread;
152 /*! A pointer to the threadpool. Needed to be able to execute tasks */
154 /*! The current state of the worker thread */
156 /*! A boolean used to determine if an idle thread should become active */
158 /*! Options for this threadpool */
160};
161
162/* Worker thread forward declarations. See definitions for documentation */
163static int worker_thread_hash(const void *obj, int flags);
164static int worker_thread_cmp(void *obj, void *arg, int flags);
165static void worker_thread_destroy(void *obj);
166static void worker_active(struct worker_thread *worker);
167static void *worker_start(void *arg);
169static int worker_thread_start(struct worker_thread *worker);
170static int worker_idle(struct worker_thread *worker);
171static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172static void worker_shutdown(struct worker_thread *worker);
173
174/*!
175 * \brief Notify the threadpool listener that the state has changed.
176 *
177 * This notifies the threadpool listener via its state_changed callback.
178 * \param pool The threadpool whose state has changed
179 */
181{
182 int active_size = ao2_container_count(pool->active_threads);
183 int idle_size = ao2_container_count(pool->idle_threads);
184
186 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187 }
188}
189
190/*!
191 * \brief Struct used for queued operations involving worker state changes
192 */
194 /*! Threadpool that contains the worker whose state has changed */
196 /*! Worker whose state has changed */
198};
199
200/*!
201 * \brief Destructor for thread_worker_pair
202 */
204{
205 ao2_ref(pair->worker, -1);
206 ast_free(pair);
207}
208
209/*!
210 * \brief Allocate and initialize a thread_worker_pair
211 * \param pool Threadpool to assign to the thread_worker_pair
212 * \param worker Worker thread to assign to the thread_worker_pair
213 */
215 struct worker_thread *worker)
216{
217 struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218 if (!pair) {
219 return NULL;
220 }
221 pair->pool = pool;
222 ao2_ref(worker, +1);
223 pair->worker = worker;
224
225 return pair;
226}
227
228/*!
229 * \brief Move a worker thread from the active container to the idle container.
230 *
231 * This function is called from the threadpool's control taskprocessor thread.
232 * \param data A thread_worker_pair containing the threadpool and the worker to move.
233 * \return 0
234 */
235static int queued_active_thread_idle(void *data)
236{
237 struct thread_worker_pair *pair = data;
238
239 ao2_link(pair->pool->idle_threads, pair->worker);
240 ao2_unlink(pair->pool->active_threads, pair->worker);
241
243
245 return 0;
246}
247
248/*!
249 * \brief Queue a task to move a thread from the active list to the idle list
250 *
251 * This is called by a worker thread when it runs out of tasks to perform and
252 * goes idle.
253 * \param pool The threadpool to which the worker belongs
254 * \param worker The worker thread that has gone idle
255 */
257 struct worker_thread *worker)
258{
259 struct thread_worker_pair *pair;
261
262 if (pool->shutting_down) {
263 return;
264 }
265
267 if (!pair) {
268 return;
269 }
270
273 }
274}
275
276/*!
277 * \brief Kill a zombie thread
278 *
279 * This runs from the threadpool's control taskprocessor thread.
280 *
281 * \param data A thread_worker_pair containing the threadpool and the zombie thread
282 * \return 0
283 */
284static int queued_zombie_thread_dead(void *data)
285{
286 struct thread_worker_pair *pair = data;
287
288 ao2_unlink(pair->pool->zombie_threads, pair->worker);
290
292 return 0;
293}
294
295/*!
296 * \brief Queue a task to kill a zombie thread
297 *
298 * This is called by a worker thread when it acknowledges that it is time for
299 * it to die.
300 */
302 struct worker_thread *worker)
303{
304 struct thread_worker_pair *pair;
306
307 if (pool->shutting_down) {
308 return;
309 }
310
312 if (!pair) {
313 return;
314 }
315
318 }
319}
320
321static int queued_idle_thread_dead(void *data)
322{
323 struct thread_worker_pair *pair = data;
324
325 ao2_unlink(pair->pool->idle_threads, pair->worker);
327
329 return 0;
330}
331
333 struct worker_thread *worker)
334{
335 struct thread_worker_pair *pair;
337
338 if (pool->shutting_down) {
339 return;
340 }
341
343 if (!pair) {
344 return;
345 }
346
349 }
350}
351
352/*!
353 * \brief Execute a task in the threadpool
354 *
355 * This is the function that worker threads call in order to execute tasks
356 * in the threadpool
357 *
358 * \param pool The pool to which the tasks belong.
359 * \retval 0 Either the pool has been shut down or there are no tasks.
360 * \retval 1 There are still tasks remaining in the pool.
361 */
363{
364 ao2_lock(pool);
365 if (!pool->shutting_down) {
368 }
370 return 0;
371}
372
373/*!
374 * \brief Destroy a threadpool's components.
375 *
376 * This is the destructor called automatically when the threadpool's
377 * reference count reaches zero. This is not to be confused with
378 * threadpool_destroy.
379 *
380 * By the time this actually gets called, most of the cleanup has already
381 * been done in the pool. The only thing left to do is to release the
382 * final reference to the threadpool listener.
383 *
384 * \param obj The pool to destroy
385 */
386static void threadpool_destructor(void *obj)
387{
388 struct ast_threadpool *pool = obj;
389 ao2_cleanup(pool->listener);
390}
391
392/*!
393 * \brief Allocate a threadpool
394 *
395 * This is implemented as a taskprocessor listener's alloc callback. This
396 * is because the threadpool exists as the private data on a taskprocessor
397 * listener.
398 *
399 * \param name The name of the threadpool.
400 * \param options The options the threadpool uses.
401 * \retval NULL Could not initialize threadpool properly
402 * \retval non-NULL The newly-allocated threadpool
403 */
404static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
405{
406 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407 struct ast_str *control_tps_name;
408
409 pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410 control_tps_name = ast_str_create(64);
411 if (!pool || !control_tps_name) {
412 ast_free(control_tps_name);
413 return NULL;
414 }
415
416 ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417
418 pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419 ast_free(control_tps_name);
420 if (!pool->control_tps) {
421 return NULL;
422 }
423 pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425 if (!pool->active_threads) {
426 return NULL;
427 }
430 if (!pool->idle_threads) {
431 return NULL;
432 }
433 pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435 if (!pool->zombie_threads) {
436 return NULL;
437 }
438 pool->options = *options;
439
440 ao2_ref(pool, +1);
441 return pool;
442}
443
445{
446 return 0;
447}
448
449/*!
450 * \brief helper used for queued task when tasks are pushed
451 */
453 /*! Pool into which a task was pushed */
455 /*! Indicator of whether the pool had no tasks prior to the new task being added */
457};
458
459/*!
460 * \brief Allocate and initialize a task_pushed_data
461 * \param pool The threadpool to set in the task_pushed_data
462 * \param was_empty The was_empty value to set in the task_pushed_data
463 * \retval NULL Unable to allocate task_pushed_data
464 * \retval non-NULL The newly-allocated task_pushed_data
465 */
467 int was_empty)
468{
469 struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470
471 if (!tpd) {
472 return NULL;
473 }
474 tpd->pool = pool;
475 tpd->was_empty = was_empty;
476 return tpd;
477}
478
479/*!
480 * \brief Activate idle threads
481 *
482 * This function always returns CMP_MATCH because all workers that this
483 * function acts on need to be seen as matches so they are unlinked from the
484 * list of idle threads.
485 *
486 * Called as an ao2_callback in the threadpool's control taskprocessor thread.
487 * \param obj The worker to activate
488 * \param arg The pool where the worker belongs
489 * \param flags
490 * \retval CMP_MATCH
491 */
492static int activate_thread(void *obj, void *arg, int flags)
493{
494 struct worker_thread *worker = obj;
495 struct ast_threadpool *pool = arg;
496
497 if (!ao2_link(pool->active_threads, worker)) {
498 /* If we can't link the idle thread into the active container, then
499 * we'll just leave the thread idle and not wake it up.
500 */
501 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
502 worker->id);
503 return 0;
504 }
505
506 if (worker_set_state(worker, ALIVE)) {
507 ast_debug(1, "Failed to activate thread %d. It is dead\n",
508 worker->id);
509 /* The worker thread will no longer exist in the active threads or
510 * idle threads container after this.
511 */
512 ao2_unlink(pool->active_threads, worker);
513 }
514
515 return CMP_MATCH;
516}
517
518/*!
519 * \brief Add threads to the threadpool
520 *
521 * This function is called from the threadpool's control taskprocessor thread.
522 * \param pool The pool that is expanding
523 * \param delta The number of threads to add to the pool
524 */
525static void grow(struct ast_threadpool *pool, int delta)
526{
527 int i;
528
529 int current_size = ao2_container_count(pool->active_threads) +
531
532 if (pool->options.max_size && current_size + delta > pool->options.max_size) {
533 delta = pool->options.max_size - current_size;
534 }
535
536 ast_debug(3, "Increasing threadpool %s's size by %d\n",
537 ast_taskprocessor_name(pool->tps), delta);
538
539 for (i = 0; i < delta; ++i) {
540 struct worker_thread *worker = worker_thread_alloc(pool);
541 if (!worker) {
542 return;
543 }
544 if (ao2_link(pool->idle_threads, worker)) {
545 if (worker_thread_start(worker)) {
546 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
548 }
549 } else {
550 ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
551 }
552 ao2_ref(worker, -1);
553 }
554}
555
556/*!
557 * \brief Queued task called when tasks are pushed into the threadpool
558 *
559 * This function first calls into the threadpool's listener to let it know
560 * that a task has been pushed. It then wakes up all idle threads and moves
561 * them into the active thread container.
562 * \param data A task_pushed_data
563 * \return 0
564 */
565static int queued_task_pushed(void *data)
566{
567 struct task_pushed_data *tpd = data;
568 struct ast_threadpool *pool = tpd->pool;
569 int was_empty = tpd->was_empty;
570 unsigned int existing_active;
571
572 ast_free(tpd);
573
574 if (pool->listener && pool->listener->callbacks->task_pushed) {
575 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
576 }
577
578 existing_active = ao2_container_count(pool->active_threads);
579
580 /* The first pass transitions any existing idle threads to be active, and
581 * will also remove any worker threads that have recently entered the dead
582 * state.
583 */
585 activate_thread, pool);
586
587 /* If no idle threads could be transitioned to active grow the pool as permitted. */
588 if (ao2_container_count(pool->active_threads) == existing_active) {
589 if (!pool->options.auto_increment) {
590 return 0;
591 }
592 grow(pool, pool->options.auto_increment);
593 /* An optional second pass transitions any newly added threads. */
595 activate_thread, pool);
596 }
597
599 return 0;
600}
601
602/*!
603 * \brief Taskprocessor listener callback called when a task is added
604 *
605 * The threadpool uses this opportunity to queue a task on its control taskprocessor
606 * in order to activate idle threads and notify the threadpool listener that the
607 * task has been pushed.
608 * \param listener The taskprocessor listener. The threadpool is the listener's private data
609 * \param was_empty True if the taskprocessor was empty prior to the task being pushed
610 */
612 int was_empty)
613{
615 struct task_pushed_data *tpd;
617
618 if (pool->shutting_down) {
619 return;
620 }
621
623 if (!tpd) {
624 return;
625 }
626
628 ast_free(tpd);
629 }
630}
631
632/*!
633 * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
634 *
635 * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
636 * \param data The pool that has become empty
637 * \return 0
638 */
639static int queued_emptied(void *data)
640{
641 struct ast_threadpool *pool = data;
642
643 /* We already checked for existence of this callback when this was queued */
644 pool->listener->callbacks->emptied(pool, pool->listener);
645 return 0;
646}
647
648/*!
649 * \brief Taskprocessor listener emptied callback
650 *
651 * The threadpool queues a task to let the threadpool listener know that
652 * the threadpool no longer contains any tasks.
653 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
654 */
656{
658 SCOPED_AO2LOCK(lock, pool);
659
660 if (pool->shutting_down) {
661 return;
662 }
663
664 if (pool->listener && pool->listener->callbacks->emptied) {
666 /* Nothing to do here but we need the check to keep the compiler happy. */
667 }
668 }
669}
670
671/*!
672 * \brief Taskprocessor listener shutdown callback
673 *
674 * The threadpool will shut down and destroy all of its worker threads when
675 * this is called back. By the time this gets called, the taskprocessor's
676 * control taskprocessor has already been destroyed. Therefore there is no risk
677 * in outright destroying the worker threads here.
678 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
679 */
681{
683
684 if (pool->listener && pool->listener->callbacks->shutdown) {
685 pool->listener->callbacks->shutdown(pool->listener);
686 }
690 ao2_cleanup(pool);
691}
692
693/*!
694 * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
695 */
698 .task_pushed = threadpool_tps_task_pushed,
699 .emptied = threadpool_tps_emptied,
700 .shutdown = threadpool_tps_shutdown,
701};
702
703/*!
704 * \brief ao2 callback to kill a set number of threads.
705 *
706 * Threads will be unlinked from the container as long as the
707 * counter has not reached zero. The counter is decremented with
708 * each thread that is removed.
709 * \param obj The worker thread up for possible destruction
710 * \param arg The counter
711 * \param flags Unused
712 * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
713 * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
714 */
715static int kill_threads(void *obj, void *arg, int flags)
716{
717 int *num_to_kill = arg;
718
719 if (*num_to_kill > 0) {
720 --(*num_to_kill);
721 return CMP_MATCH;
722 } else {
723 return CMP_STOP;
724 }
725}
726
727/*!
728 * \brief ao2 callback to zombify a set number of threads.
729 *
730 * Threads will be zombified as long as the counter has not reached
731 * zero. The counter is decremented with each thread that is zombified.
732 *
733 * Zombifying a thread involves removing it from its current container,
734 * adding it to the zombie container, and changing the state of the
735 * worker to a zombie
736 *
737 * This callback is called from the threadpool control taskprocessor thread.
738 *
739 * \param obj The worker thread that may be zombified
740 * \param arg The pool to which the worker belongs
741 * \param data The counter
742 * \param flags Unused
743 * \retval CMP_MATCH The zombified thread should be removed from its current container
744 * \retval CMP_STOP Stop attempting to zombify threads
745 */
746static int zombify_threads(void *obj, void *arg, void *data, int flags)
747{
748 struct worker_thread *worker = obj;
749 struct ast_threadpool *pool = arg;
750 int *num_to_zombify = data;
751
752 if ((*num_to_zombify)-- > 0) {
753 if (!ao2_link(pool->zombie_threads, worker)) {
754 ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
755 return 0;
756 }
757 worker_set_state(worker, ZOMBIE);
758 return CMP_MATCH;
759 } else {
760 return CMP_STOP;
761 }
762}
763
764/*!
765 * \brief Remove threads from the threadpool
766 *
767 * The preference is to kill idle threads. However, if there are
768 * more threads to remove than there are idle threads, then active
769 * threads will be zombified instead.
770 *
771 * This function is called from the threadpool control taskprocessor thread.
772 *
773 * \param pool The threadpool to remove threads from
774 * \param delta The number of threads to remove
775 */
776static void shrink(struct ast_threadpool *pool, int delta)
777{
778 /*
779 * Preference is to kill idle threads, but
780 * we'll move on to deactivating active threads
781 * if we have to
782 */
784 int idle_threads_to_kill = MIN(delta, idle_threads);
785 int active_threads_to_zombify = delta - idle_threads_to_kill;
786
787 ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
789
791 kill_threads, &idle_threads_to_kill);
792
793 ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
795
797 zombify_threads, pool, &active_threads_to_zombify);
798}
799
800/*!
801 * \brief Helper struct used for queued operations that change the size of the threadpool
802 */
804 /*! The pool whose size is to change */
806 /*! The requested new size of the pool */
807 unsigned int size;
808};
809
810/*!
811 * \brief Allocate and initialize a set_size_data
812 * \param pool The pool for the set_size_data
813 * \param size The size to store in the set_size_data
814 */
816 unsigned int size)
817{
818 struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
819 if (!ssd) {
820 return NULL;
821 }
822
823 ssd->pool = pool;
824 ssd->size = size;
825 return ssd;
826}
827
828/*!
829 * \brief Change the size of the threadpool
830 *
831 * This can either result in shrinking or growing the threadpool depending
832 * on the new desired size and the current size.
833 *
834 * This function is run from the threadpool control taskprocessor thread
835 *
836 * \param data A set_size_data used for determining how to act
837 * \return 0
838 */
839static int queued_set_size(void *data)
840{
841 struct set_size_data *ssd = data;
842 struct ast_threadpool *pool = ssd->pool;
843 unsigned int num_threads = ssd->size;
844
845 /* We don't count zombie threads as being "live" when potentially resizing */
846 unsigned int current_size = ao2_container_count(pool->active_threads) +
848
849 ast_free(ssd);
850
851 if (current_size == num_threads) {
852 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
853 num_threads, current_size);
854 return 0;
855 }
856
857 if (current_size < num_threads) {
859 activate_thread, pool);
860
861 /* As the above may have altered the number of current threads update it */
862 current_size = ao2_container_count(pool->active_threads) +
864 grow(pool, num_threads - current_size);
866 activate_thread, pool);
867 } else {
868 shrink(pool, current_size - num_threads);
869 }
870
872 return 0;
873}
874
875void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
876{
877 struct set_size_data *ssd;
879
880 if (pool->shutting_down) {
881 return;
882 }
883
885 if (!ssd) {
886 return;
887 }
888
890 ast_free(ssd);
891 }
892}
893
896{
898 if (!listener) {
899 return NULL;
900 }
901 listener->callbacks = callbacks;
902 listener->user_data = user_data;
903 return listener;
904}
905
907{
908 return listener->user_data;
909}
910
914};
915
918 const struct ast_threadpool_options *options)
919{
920 struct ast_taskprocessor *tps;
921 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
923 char *fullname;
924
926 if (!pool) {
927 return NULL;
928 }
929
931 if (!tps_listener) {
932 return NULL;
933 }
934
935 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
936 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
937 return NULL;
938 }
939
940 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
941 sprintf(fullname, "%s/pool", name); /* Safe */
942 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
943 if (!tps) {
944 return NULL;
945 }
946
947 pool->tps = tps;
948 if (listener) {
949 ao2_ref(listener, +1);
950 pool->listener = listener;
951 }
952 ast_threadpool_set_size(pool, pool->options.initial_size);
953 ao2_ref(pool, +1);
954 return pool;
955}
956
957int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
958{
959 SCOPED_AO2LOCK(lock, pool);
960 if (!pool->shutting_down) {
961 return ast_taskprocessor_push(pool->tps, task, data);
962 }
963 return -1;
964}
965
967{
968 if (!pool) {
969 return;
970 }
971 /* Shut down the taskprocessors and everything else just
972 * takes care of itself via the taskprocessor callbacks
973 */
974 ao2_lock(pool);
975 pool->shutting_down = 1;
976 ao2_unlock(pool);
979}
980
981/*!
982 * A monotonically increasing integer used for worker
983 * thread identification.
984 */
986
987static int worker_thread_hash(const void *obj, int flags)
988{
989 const struct worker_thread *worker = obj;
990
991 return worker->id;
992}
993
994static int worker_thread_cmp(void *obj, void *arg, int flags)
995{
996 struct worker_thread *worker1 = obj;
997 struct worker_thread *worker2 = arg;
998
999 return worker1->id == worker2->id ? CMP_MATCH : 0;
1000}
1001
1002/*!
1003 * \brief shut a worker thread down
1004 *
1005 * Set the worker dead and then wait for its thread
1006 * to finish executing.
1007 *
1008 * \param worker The worker thread to shut down
1009 */
1010static void worker_shutdown(struct worker_thread *worker)
1011{
1012 worker_set_state(worker, DEAD);
1013 if (worker->thread != AST_PTHREADT_NULL) {
1014 pthread_join(worker->thread, NULL);
1015 worker->thread = AST_PTHREADT_NULL;
1016 }
1017}
1018
1019/*!
1020 * \brief Worker thread destructor
1021 *
1022 * Called automatically when refcount reaches 0. Shuts
1023 * down the worker thread and destroys its component
1024 * parts
1025 */
1026static void worker_thread_destroy(void *obj)
1027{
1028 struct worker_thread *worker = obj;
1029 ast_debug(3, "Destroying worker thread %d\n", worker->id);
1030 worker_shutdown(worker);
1031 ast_mutex_destroy(&worker->lock);
1032 ast_cond_destroy(&worker->cond);
1033}
1034
1035/*!
1036 * \brief start point for worker threads
1037 *
1038 * Worker threads start in the active state but may
1039 * immediately go idle if there is no work to be
1040 * done
1041 *
1042 * \param arg The worker thread
1043 */
1044static void *worker_start(void *arg)
1045{
1046 struct worker_thread *worker = arg;
1047 enum worker_state saved_state;
1048
1049 if (worker->options.thread_start) {
1050 worker->options.thread_start();
1051 }
1052
1053 ast_mutex_lock(&worker->lock);
1054 while (worker_idle(worker)) {
1055 ast_mutex_unlock(&worker->lock);
1056 worker_active(worker);
1057 ast_mutex_lock(&worker->lock);
1058 if (worker->state != ALIVE) {
1059 break;
1060 }
1061 threadpool_active_thread_idle(worker->pool, worker);
1062 }
1063 saved_state = worker->state;
1064 ast_mutex_unlock(&worker->lock);
1065
1066 /* Reaching this portion means the thread is
1067 * on death's door. It may have been killed while
1068 * it was idle, in which case it can just die
1069 * peacefully. If it's a zombie, though, then
1070 * it needs to let the pool know so
1071 * that the thread can be removed from the
1072 * list of zombie threads.
1073 */
1074 if (saved_state == ZOMBIE) {
1075 threadpool_zombie_thread_dead(worker->pool, worker);
1076 }
1077
1078 if (worker->options.thread_end) {
1079 worker->options.thread_end();
1080 }
1081 return NULL;
1082}
1083
1084/*!
1085 * \brief Allocate and initialize a new worker thread
1086 *
1087 * This will create, initialize, and start the thread.
1088 *
1089 * \param pool The threadpool to which the worker will be added
1090 * \retval NULL Failed to allocate or start the worker thread
1091 * \retval non-NULL The newly-created worker thread
1092 */
1094{
1095 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1096 if (!worker) {
1097 return NULL;
1098 }
1100 ast_mutex_init(&worker->lock);
1101 ast_cond_init(&worker->cond, NULL);
1102 worker->pool = pool;
1103 worker->thread = AST_PTHREADT_NULL;
1104 worker->state = ALIVE;
1105 worker->options = pool->options;
1106 return worker;
1107}
1108
1109static int worker_thread_start(struct worker_thread *worker)
1110{
1111 return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1112}
1113
1114/*!
1115 * \brief Active loop for worker threads
1116 *
1117 * The worker will stay in this loop for its lifetime,
1118 * executing tasks as they become available. If there
1119 * are no tasks currently available, then the thread
1120 * will go idle.
1121 *
1122 * \param worker The worker thread executing tasks.
1123 */
1124static void worker_active(struct worker_thread *worker)
1125{
1126 int alive;
1127
1128 /* The following is equivalent to
1129 *
1130 * while (threadpool_execute(worker->pool));
1131 *
1132 * However, reviewers have suggested in the past
1133 * doing that can cause optimizers to (wrongly)
1134 * optimize the code away.
1135 */
1136 do {
1137 alive = threadpool_execute(worker->pool);
1138 } while (alive);
1139}
1140
1141/*!
1142 * \brief Idle function for worker threads
1143 *
1144 * The worker waits here until it gets told by the threadpool
1145 * to wake up.
1146 *
1147 * worker is locked before entering this function.
1148 *
1149 * \param worker The idle worker
1150 * \retval 0 The thread is being woken up so that it can conclude.
1151 * \retval non-zero The thread is being woken up to do more work.
1152 */
1153static int worker_idle(struct worker_thread *worker)
1154{
1155 struct timeval start = ast_tvnow();
1156 struct timespec end = {
1157 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1158 .tv_nsec = start.tv_usec * 1000,
1159 };
1160 while (!worker->wake_up) {
1161 if (worker->options.idle_timeout <= 0) {
1162 ast_cond_wait(&worker->cond, &worker->lock);
1163 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1164 break;
1165 }
1166 }
1167
1168 if (!worker->wake_up) {
1169 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1170 threadpool_idle_thread_dead(worker->pool, worker);
1171 worker->state = DEAD;
1172 }
1173 worker->wake_up = 0;
1174 return worker->state == ALIVE;
1175}
1176
1177/*!
1178 * \brief Change a worker's state
1179 *
1180 * The threadpool calls into this function in order to let a worker know
1181 * how it should proceed.
1182 *
1183 * \retval -1 failure (state transition not permitted)
1184 * \retval 0 success
1185 */
1186static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1187{
1188 SCOPED_MUTEX(lock, &worker->lock);
1189
1190 switch (state) {
1191 case ALIVE:
1192 /* This can occur due to a race condition between being told to go active
1193 * and an idle timeout happening.
1194 */
1195 if (worker->state == DEAD) {
1196 return -1;
1197 }
1198 ast_assert(worker->state != ZOMBIE);
1199 break;
1200 case DEAD:
1201 break;
1202 case ZOMBIE:
1203 ast_assert(worker->state != DEAD);
1204 break;
1205 }
1206
1207 worker->state = state;
1208 worker->wake_up = 1;
1209 ast_cond_signal(&worker->cond);
1210
1211 return 0;
1212}
1213
1214/*! Serializer group shutdown control object. */
1216 /*! Shutdown thread waits on this conditional. */
1218 /*! Count of serializers needing to shutdown. */
1220};
1221
1222static void serializer_shutdown_group_dtor(void *vdoomed)
1223{
1224 struct ast_serializer_shutdown_group *doomed = vdoomed;
1225
1226 ast_cond_destroy(&doomed->cond);
1227}
1228
1230{
1232
1234 if (!shutdown_group) {
1235 return NULL;
1236 }
1238 return shutdown_group;
1239}
1240
1242{
1243 int remaining;
1245
1246 if (!shutdown_group) {
1247 return 0;
1248 }
1249
1251 ast_assert(lock != NULL);
1252
1254 if (timeout) {
1255 struct timeval start;
1256 struct timespec end;
1257
1258 start = ast_tvnow();
1259 end.tv_sec = start.tv_sec + timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
1261 while (shutdown_group->count) {
1263 /* Error or timed out waiting for the count to reach zero. */
1264 break;
1265 }
1266 }
1267 } else {
1268 while (shutdown_group->count) {
1270 /* Error */
1271 break;
1272 }
1273 }
1274 }
1275 remaining = shutdown_group->count;
1277 return remaining;
1278}
1279
1280/*!
1281 * \internal
1282 * \brief Increment the number of serializer members in the group.
1283 * \since 13.5.0
1284 *
1285 * \param shutdown_group Group shutdown controller.
1286 */
1288{
1292}
1293
1294/*!
1295 * \internal
1296 * \brief Decrement the number of serializer members in the group.
1297 * \since 13.5.0
1298 *
1299 * \param shutdown_group Group shutdown controller.
1300 */
1302{
1305 if (!shutdown_group->count) {
1307 }
1309}
1310
1312 /*! Threadpool the serializer will use to process the jobs. */
1314 /*! Which group will wait for this serializer to shutdown. */
1316};
1317
1318static void serializer_dtor(void *obj)
1319{
1320 struct serializer *ser = obj;
1321
1322 ao2_cleanup(ser->pool);
1323 ser->pool = NULL;
1325 ser->shutdown_group = NULL;
1326}
1327
1330{
1331 struct serializer *ser;
1332
1334 if (!ser) {
1335 return NULL;
1336 }
1337 ao2_ref(pool, +1);
1338 ser->pool = pool;
1340 return ser;
1341}
1342
1343AST_THREADSTORAGE_RAW(current_serializer);
1344
1345static int execute_tasks(void *data)
1346{
1347 struct ast_taskprocessor *tps = data;
1348
1349 ast_threadstorage_set_ptr(&current_serializer, tps);
1350 while (ast_taskprocessor_execute(tps)) {
1351 /* No-op */
1352 }
1353 ast_threadstorage_set_ptr(&current_serializer, NULL);
1354
1356 return 0;
1357}
1358
1360{
1361 if (was_empty) {
1364
1365 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1367 }
1368 }
1369}
1370
1372{
1373 /* No-op */
1374 return 0;
1375}
1376
1378{
1380
1381 if (ser->shutdown_group) {
1383 }
1384 ao2_cleanup(ser);
1385}
1386
1389 .start = serializer_start,
1390 .shutdown = serializer_shutdown,
1391};
1392
1394{
1395 return ast_threadstorage_get_ptr(&current_serializer);
1396}
1397
1400{
1401 struct serializer *ser;
1403 struct ast_taskprocessor *tps;
1404
1405 ser = serializer_create(pool, shutdown_group);
1406 if (!ser) {
1407 return NULL;
1408 }
1409
1411 if (!listener) {
1412 ao2_ref(ser, -1);
1413 return NULL;
1414 }
1415
1417 if (!tps) {
1418 /* ser ref transferred to listener but not cleaned without tps */
1419 ao2_ref(ser, -1);
1420 } else if (shutdown_group) {
1422 }
1423
1424 ao2_ref(listener, -1);
1425 return tps;
1426}
1427
1429{
1431}
1432
1434{
1435 return ast_taskprocessor_size(pool->tps);
1436}
ast_mutex_t lock
Definition: app_sla.c:331
static void * listener(void *unused)
Definition: asterisk.c:1514
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1723
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_UNLINK
Definition: astobj2.h:1039
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
enum cc_state state
Definition: ccss.c:393
char * end
Definition: eagi_proxy.c:73
static const char name[]
Definition: format_mp3.c:68
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
#define ast_cond_signal(cond)
Definition: lock.h:203
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
struct @468 callbacks
#define NULL
Definition: resample.c:96
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1113
Generic container type.
Structure for mutex and tracking information.
Definition: lock.h:135
Support for dynamic strings.
Definition: strings.h:623
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:99
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:92
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition: threadpool.h:67
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
Definition: threadpool.h:56
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition: threadpool.h:47
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
listener for a threadpool
Definition: threadpool.c:110
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
void(* thread_start)(void)
Function to call when a thread starts.
Definition: threadpool.h:117
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
void(* thread_end)(void)
Function to call when a thread ends.
Definition: threadpool.h:124
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
An opaque threadpool structure.
Definition: threadpool.c:36
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
Definition: threadpool.c:48
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
Definition: threadpool.c:43
struct ast_threadpool_options options
Definition: threadpool.c:913
struct ast_threadpool * pool
Definition: threadpool.c:912
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1315
struct ast_threadpool * pool
Definition: threadpool.c:1313
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:803
unsigned int size
Definition: threadpool.c:807
struct ast_threadpool * pool
Definition: threadpool.c:805
helper used for queued task when tasks are pushed
Definition: threadpool.c:452
struct ast_threadpool * pool
Definition: threadpool.c:454
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
struct ast_threadpool * pool
Definition: threadpool.c:195
struct worker_thread * worker
Definition: threadpool.c:197
pthread_t thread
Definition: threadpool.c:151
ast_cond_t cond
Definition: threadpool.c:147
struct ast_threadpool_options options
Definition: threadpool.c:159
enum worker_state state
Definition: threadpool.c:155
struct ast_threadpool * pool
Definition: threadpool.c:153
ast_mutex_t lock
Definition: threadpool.c:149
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static struct test_options options
static int task(void *data)
Queued task for baseline test.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
Definition: threadpool.c:1093
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1328
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Definition: threadpool.c:1124
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1301
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition: threadpool.c:565
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:492
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition: threadpool.c:256
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1387
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
Definition: threadpool.c:639
static int worker_thread_hash(const void *obj, int flags)
Definition: threadpool.c:987
static int worker_thread_start(struct worker_thread *worker)
Definition: threadpool.c:1109
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222
static void serializer_dtor(void *obj)
Definition: threadpool.c:1318
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
Definition: threadpool.c:386
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition: threadpool.c:466
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
Definition: threadpool.c:680
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:525
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
Definition: threadpool.c:1010
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:696
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
Definition: threadpool.c:1186
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
Definition: threadpool.c:1153
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
Definition: threadpool.c:1393
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
Definition: threadpool.c:655
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: threadpool.c:1359
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:839
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool's task queue.
Definition: threadpool.c:1433
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition: threadpool.c:362
worker_state
states for worker threads
Definition: threadpool.c:120
@ DEAD
Definition: threadpool.c:137
@ ALIVE
Definition: threadpool.c:122
@ ZOMBIE
Definition: threadpool.c:130
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
AST_THREADSTORAGE_RAW(current_serializer)
static int worker_id_counter
Definition: threadpool.c:985
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition: threadpool.c:776
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition: threadpool.c:235
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1287
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition: threadpool.c:284
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition: threadpool.c:301
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
Definition: threadpool.c:957
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1377
static int queued_idle_thread_dead(void *data)
Definition: threadpool.c:321
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
Definition: threadpool.c:906
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:444
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition: threadpool.c:332
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
static int worker_thread_cmp(void *obj, void *arg, int flags)
Definition: threadpool.c:994
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition: threadpool.c:404
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Definition: threadpool.c:1026
#define THREAD_BUCKETS
Definition: threadpool.c:28
static int execute_tasks(void *data)
Definition: threadpool.c:1345
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:815
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1371
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition: threadpool.c:894
static void * worker_start(void *arg)
start point for worker threads
Definition: threadpool.c:1044
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
Definition: threadpool.c:611
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition: threadpool.c:715
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition: threadpool.c:746
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_assert(a)
Definition: utils.h:739
#define MIN(a, b)
Definition: utils.h:231
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584