Asterisk - The Open Source Telephony Project  GIT-master-a24979a
threadpool.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 
20 #include "asterisk.h"
21 
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26 
27 /* Needs to stay prime if increased */
28 #define THREAD_BUCKETS 89
29 
30 /*!
31  * \brief An opaque threadpool structure
32  *
33  * A threadpool is a collection of threads that execute
34  * tasks from a common queue.
35  */
37  /*! Threadpool listener */
39  /*!
40  * \brief The container of active threads.
41  * Active threads are those that are currently running tasks
42  */
44  /*!
45  * \brief The container of idle threads.
46  * Idle threads are those that are currently waiting to run tasks
47  */
49  /*!
50  * \brief The container of zombie threads.
51  * Zombie threads may be running tasks, but they are scheduled to die soon
52  */
54  /*!
55  * \brief The main taskprocessor
56  *
57  * Tasks that are queued in this taskprocessor are
58  * doled out to the worker threads. Worker threads that
59  * execute tasks from the threadpool are executing tasks
60  * in this taskprocessor.
61  *
62  * The threadpool itself is actually the private data for
63  * this taskprocessor's listener. This way, as taskprocessor
64  * changes occur, the threadpool can alert its listeners
65  * appropriately.
66  */
68  /*!
69  * \brief The control taskprocessor
70  *
71  * This is a standard taskprocessor that uses the default
72  * taskprocessor listener. In other words, all tasks queued to
73  * this taskprocessor have a single thread that executes the
74  * tasks.
75  *
76  * All tasks that modify the state of the threadpool and all tasks
77  * that call out to threadpool listeners are pushed to this
78  * taskprocessor.
79  *
80  * For instance, when the threadpool changes sizes, a task is put
81  * into this taskprocessor to do so. When it comes time to tell the
82  * threadpool listener that worker threads have changed state,
83  * the task is placed in this taskprocessor.
84  *
85  * This is done for three main reasons
86  * 1) It ensures that listeners are given an accurate portrayal
87  * of the threadpool's current state. In other words, when a listener
88  * gets told a count of active, idle and zombie threads, it does not
89  * need to worry that internal state of the threadpool might be different
90  * from what it has been told.
91  * 2) It minimizes the locking required in both the threadpool and in
92  * threadpool listener's callbacks.
93  * 3) It ensures that listener callbacks are called in the same order
94  * that the threadpool had its state change.
95  */
97  /*! True if the threadpool is in the process of shutting down */
99  /*! Threadpool-specific options */
101 };
102 
103 /*!
104  * \brief listener for a threadpool
105  *
106  * The listener is notified of changes in a threadpool. It can
107  * react by doing things like increasing the number of threads
108  * in the pool
109  */
111  /*! Callbacks called by the threadpool */
113  /*! User data for the listener */
114  void *user_data;
115 };
116 
117 /*!
118  * \brief states for worker threads
119  */
121  /*! The worker is either active or idle */
123  /*!
124  * The worker has been asked to shut down but
125  * may still be in the process of executing tasks.
126  * This transition happens when the threadpool needs
127  * to shrink and needs to kill active threads in order
128  * to do so.
129  */
131  /*!
132  * The worker has been asked to shut down. Typically
133  * only idle threads go to this state directly, but
134  * active threads may go straight to this state when
135  * the threadpool is shut down.
136  */
138 };
139 
140 /*!
141  * A thread that executes threadpool tasks
142  */
144  /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145  int id;
146  /*! Condition used in conjunction with state changes */
148  /*! Lock used alongside the condition for state changes */
150  /*! The actual thread that is executing tasks */
151  pthread_t thread;
152  /*! A pointer to the threadpool. Needed to be able to execute tasks */
154  /*! The current state of the worker thread */
155  enum worker_state state;
156  /*! A boolean used to determine if an idle thread should become active */
157  int wake_up;
158  /*! Options for this threadpool */
160 };
161 
162 /* Worker thread forward declarations. See definitions for documentation */
163 static int worker_thread_hash(const void *obj, int flags);
164 static int worker_thread_cmp(void *obj, void *arg, int flags);
165 static void worker_thread_destroy(void *obj);
166 static void worker_active(struct worker_thread *worker);
167 static void *worker_start(void *arg);
168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
169 static int worker_thread_start(struct worker_thread *worker);
170 static int worker_idle(struct worker_thread *worker);
171 static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172 static void worker_shutdown(struct worker_thread *worker);
173 
174 /*!
175  * \brief Notify the threadpool listener that the state has changed.
176  *
177  * This notifies the threadpool listener via its state_changed callback.
178  * \param pool The threadpool whose state has changed
179  */
181 {
182  int active_size = ao2_container_count(pool->active_threads);
183  int idle_size = ao2_container_count(pool->idle_threads);
184 
186  pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187  }
188 }
189 
190 /*!
191  * \brief Struct used for queued operations involving worker state changes
192  */
194  /*! Threadpool that contains the worker whose state has changed */
196  /*! Worker whose state has changed */
198 };
199 
200 /*!
201  * \brief Destructor for thread_worker_pair
202  */
204 {
205  ao2_ref(pair->worker, -1);
206  ast_free(pair);
207 }
208 
209 /*!
210  * \brief Allocate and initialize a thread_worker_pair
211  * \param pool Threadpool to assign to the thread_worker_pair
212  * \param worker Worker thread to assign to the thread_worker_pair
213  */
215  struct worker_thread *worker)
216 {
217  struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218  if (!pair) {
219  return NULL;
220  }
221  pair->pool = pool;
222  ao2_ref(worker, +1);
223  pair->worker = worker;
224 
225  return pair;
226 }
227 
228 /*!
229  * \brief Move a worker thread from the active container to the idle container.
230  *
231  * This function is called from the threadpool's control taskprocessor thread.
232  * \param data A thread_worker_pair containing the threadpool and the worker to move.
233  * \return 0
234  */
235 static int queued_active_thread_idle(void *data)
236 {
237  struct thread_worker_pair *pair = data;
238 
239  ao2_link(pair->pool->idle_threads, pair->worker);
240  ao2_unlink(pair->pool->active_threads, pair->worker);
241 
243 
245  return 0;
246 }
247 
248 /*!
249  * \brief Queue a task to move a thread from the active list to the idle list
250  *
251  * This is called by a worker thread when it runs out of tasks to perform and
252  * goes idle.
253  * \param pool The threadpool to which the worker belongs
254  * \param worker The worker thread that has gone idle
255  */
257  struct worker_thread *worker)
258 {
259  struct thread_worker_pair *pair;
261 
262  if (pool->shutting_down) {
263  return;
264  }
265 
267  if (!pair) {
268  return;
269  }
270 
273  }
274 }
275 
276 /*!
277  * \brief Kill a zombie thread
278  *
279  * This runs from the threadpool's control taskprocessor thread.
280  *
281  * \param data A thread_worker_pair containing the threadpool and the zombie thread
282  * \return 0
283  */
284 static int queued_zombie_thread_dead(void *data)
285 {
286  struct thread_worker_pair *pair = data;
287 
288  ao2_unlink(pair->pool->zombie_threads, pair->worker);
290 
292  return 0;
293 }
294 
295 /*!
296  * \brief Queue a task to kill a zombie thread
297  *
298  * This is called by a worker thread when it acknowledges that it is time for
299  * it to die.
300  */
302  struct worker_thread *worker)
303 {
304  struct thread_worker_pair *pair;
306 
307  if (pool->shutting_down) {
308  return;
309  }
310 
312  if (!pair) {
313  return;
314  }
315 
318  }
319 }
320 
321 static int queued_idle_thread_dead(void *data)
322 {
323  struct thread_worker_pair *pair = data;
324 
325  ao2_unlink(pair->pool->idle_threads, pair->worker);
327 
329  return 0;
330 }
331 
333  struct worker_thread *worker)
334 {
335  struct thread_worker_pair *pair;
337 
338  if (pool->shutting_down) {
339  return;
340  }
341 
343  if (!pair) {
344  return;
345  }
346 
349  }
350 }
351 
352 /*!
353  * \brief Execute a task in the threadpool
354  *
355  * This is the function that worker threads call in order to execute tasks
356  * in the threadpool
357  *
358  * \param pool The pool to which the tasks belong.
359  * \retval 0 Either the pool has been shut down or there are no tasks.
360  * \retval 1 There are still tasks remaining in the pool.
361  */
363 {
364  ao2_lock(pool);
365  if (!pool->shutting_down) {
366  ao2_unlock(pool);
368  }
369  ao2_unlock(pool);
370  return 0;
371 }
372 
373 /*!
374  * \brief Destroy a threadpool's components.
375  *
376  * This is the destructor called automatically when the threadpool's
377  * reference count reaches zero. This is not to be confused with
378  * threadpool_destroy.
379  *
380  * By the time this actually gets called, most of the cleanup has already
381  * been done in the pool. The only thing left to do is to release the
382  * final reference to the threadpool listener.
383  *
384  * \param obj The pool to destroy
385  */
386 static void threadpool_destructor(void *obj)
387 {
388  struct ast_threadpool *pool = obj;
389  ao2_cleanup(pool->listener);
390 }
391 
392 /*!
393  * \brief Allocate a threadpool
394  *
395  * This is implemented as a taskprocessor listener's alloc callback. This
396  * is because the threadpool exists as the private data on a taskprocessor
397  * listener.
398  *
399  * \param name The name of the threadpool.
400  * \param options The options the threadpool uses.
401  * \retval NULL Could not initialize threadpool properly
402  * \retval non-NULL The newly-allocated threadpool
403  */
404 static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
405 {
406  RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407  struct ast_str *control_tps_name;
408 
409  pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410  control_tps_name = ast_str_create(64);
411  if (!pool || !control_tps_name) {
412  ast_free(control_tps_name);
413  return NULL;
414  }
415 
416  ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417 
418  pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419  ast_free(control_tps_name);
420  if (!pool->control_tps) {
421  return NULL;
422  }
423  pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425  if (!pool->active_threads) {
426  return NULL;
427  }
428  pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
430  if (!pool->idle_threads) {
431  return NULL;
432  }
433  pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435  if (!pool->zombie_threads) {
436  return NULL;
437  }
438  pool->options = *options;
439 
440  ao2_ref(pool, +1);
441  return pool;
442 }
443 
445 {
446  return 0;
447 }
448 
449 /*!
450  * \brief helper used for queued task when tasks are pushed
451  */
453  /*! Pool into which a task was pushed */
455  /*! Indicator of whether the pool had no tasks prior to the new task being added */
457 };
458 
459 /*!
460  * \brief Allocate and initialize a task_pushed_data
461  * \param pool The threadpool to set in the task_pushed_data
462  * \param was_empty The was_empty value to set in the task_pushed_data
463  * \retval NULL Unable to allocate task_pushed_data
464  * \retval non-NULL The newly-allocated task_pushed_data
465  */
467  int was_empty)
468 {
469  struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470 
471  if (!tpd) {
472  return NULL;
473  }
474  tpd->pool = pool;
475  tpd->was_empty = was_empty;
476  return tpd;
477 }
478 
479 /*!
480  * \brief Activate idle threads
481  *
482  * This function always returns CMP_MATCH because all workers that this
483  * function acts on need to be seen as matches so they are unlinked from the
484  * list of idle threads.
485  *
486  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
487  * \param obj The worker to activate
488  * \param arg The pool where the worker belongs
489  * \param flags
490  * \retval CMP_MATCH
491  */
492 static int activate_thread(void *obj, void *arg, int flags)
493 {
494  struct worker_thread *worker = obj;
495  struct ast_threadpool *pool = arg;
496 
497  if (!ao2_link(pool->active_threads, worker)) {
498  /* If we can't link the idle thread into the active container, then
499  * we'll just leave the thread idle and not wake it up.
500  */
501  ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
502  worker->id);
503  return 0;
504  }
505 
506  if (worker_set_state(worker, ALIVE)) {
507  ast_debug(1, "Failed to activate thread %d. It is dead\n",
508  worker->id);
509  /* The worker thread will no longer exist in the active threads or
510  * idle threads container after this.
511  */
512  ao2_unlink(pool->active_threads, worker);
513  }
514 
515  return CMP_MATCH;
516 }
517 
518 /*!
519  * \brief Add threads to the threadpool
520  *
521  * This function is called from the threadpool's control taskprocessor thread.
522  * \param pool The pool that is expanding
523  * \param delta The number of threads to add to the pool
524  */
525 static void grow(struct ast_threadpool *pool, int delta)
526 {
527  int i;
528 
529  int current_size = ao2_container_count(pool->active_threads) +
531 
532  if (pool->options.max_size && current_size + delta > pool->options.max_size) {
533  delta = pool->options.max_size - current_size;
534  }
535 
536  ast_debug(3, "Increasing threadpool %s's size by %d\n",
537  ast_taskprocessor_name(pool->tps), delta);
538 
539  for (i = 0; i < delta; ++i) {
540  struct worker_thread *worker = worker_thread_alloc(pool);
541  if (!worker) {
542  return;
543  }
544  if (ao2_link(pool->idle_threads, worker)) {
545  if (worker_thread_start(worker)) {
546  ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
547  ao2_unlink(pool->active_threads, worker);
548  }
549  } else {
550  ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
551  }
552  ao2_ref(worker, -1);
553  }
554 }
555 
556 /*!
557  * \brief Queued task called when tasks are pushed into the threadpool
558  *
559  * This function first calls into the threadpool's listener to let it know
560  * that a task has been pushed. It then wakes up all idle threads and moves
561  * them into the active thread container.
562  * \param data A task_pushed_data
563  * \return 0
564  */
565 static int queued_task_pushed(void *data)
566 {
567  struct task_pushed_data *tpd = data;
568  struct ast_threadpool *pool = tpd->pool;
569  int was_empty = tpd->was_empty;
570  unsigned int existing_active;
571 
572  ast_free(tpd);
573 
574  if (pool->listener && pool->listener->callbacks->task_pushed) {
575  pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
576  }
577 
578  existing_active = ao2_container_count(pool->active_threads);
579 
580  /* The first pass transitions any existing idle threads to be active, and
581  * will also remove any worker threads that have recently entered the dead
582  * state.
583  */
585  activate_thread, pool);
586 
587  /* If no idle threads could be transitioned to active grow the pool as permitted. */
588  if (ao2_container_count(pool->active_threads) == existing_active) {
589  if (!pool->options.auto_increment) {
590  return 0;
591  }
592  grow(pool, pool->options.auto_increment);
593  /* An optional second pass transitions any newly added threads. */
595  activate_thread, pool);
596  }
597 
599  return 0;
600 }
601 
602 /*!
603  * \brief Taskprocessor listener callback called when a task is added
604  *
605  * The threadpool uses this opportunity to queue a task on its control taskprocessor
606  * in order to activate idle threads and notify the threadpool listener that the
607  * task has been pushed.
608  * \param listener The taskprocessor listener. The threadpool is the listener's private data
609  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
610  */
612  int was_empty)
613 {
615  struct task_pushed_data *tpd;
617 
618  if (pool->shutting_down) {
619  return;
620  }
621 
623  if (!tpd) {
624  return;
625  }
626 
628  ast_free(tpd);
629  }
630 }
631 
632 /*!
633  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
634  *
635  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
636  * \param data The pool that has become empty
637  * \return 0
638  */
639 static int queued_emptied(void *data)
640 {
641  struct ast_threadpool *pool = data;
642 
643  /* We already checked for existence of this callback when this was queued */
644  pool->listener->callbacks->emptied(pool, pool->listener);
645  return 0;
646 }
647 
648 /*!
649  * \brief Taskprocessor listener emptied callback
650  *
651  * The threadpool queues a task to let the threadpool listener know that
652  * the threadpool no longer contains any tasks.
653  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
654  */
656 {
658  SCOPED_AO2LOCK(lock, pool);
659 
660  if (pool->shutting_down) {
661  return;
662  }
663 
664  if (pool->listener && pool->listener->callbacks->emptied) {
666  /* Nothing to do here but we need the check to keep the compiler happy. */
667  }
668  }
669 }
670 
671 /*!
672  * \brief Taskprocessor listener shutdown callback
673  *
674  * The threadpool will shut down and destroy all of its worker threads when
675  * this is called back. By the time this gets called, the taskprocessor's
676  * control taskprocessor has already been destroyed. Therefore there is no risk
677  * in outright destroying the worker threads here.
678  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
679  */
681 {
683 
684  if (pool->listener && pool->listener->callbacks->shutdown) {
685  pool->listener->callbacks->shutdown(pool->listener);
686  }
688  ao2_cleanup(pool->idle_threads);
690  ao2_cleanup(pool);
691 }
692 
693 /*!
694  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
695  */
698  .task_pushed = threadpool_tps_task_pushed,
699  .emptied = threadpool_tps_emptied,
700  .shutdown = threadpool_tps_shutdown,
701 };
702 
703 /*!
704  * \brief ao2 callback to kill a set number of threads.
705  *
706  * Threads will be unlinked from the container as long as the
707  * counter has not reached zero. The counter is decremented with
708  * each thread that is removed.
709  * \param obj The worker thread up for possible destruction
710  * \param arg The counter
711  * \param flags Unused
712  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
713  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
714  */
715 static int kill_threads(void *obj, void *arg, int flags)
716 {
717  int *num_to_kill = arg;
718 
719  if (*num_to_kill > 0) {
720  --(*num_to_kill);
721  return CMP_MATCH;
722  } else {
723  return CMP_STOP;
724  }
725 }
726 
727 /*!
728  * \brief ao2 callback to zombify a set number of threads.
729  *
730  * Threads will be zombified as long as the counter has not reached
731  * zero. The counter is decremented with each thread that is zombified.
732  *
733  * Zombifying a thread involves removing it from its current container,
734  * adding it to the zombie container, and changing the state of the
735  * worker to a zombie
736  *
737  * This callback is called from the threadpool control taskprocessor thread.
738  *
739  * \param obj The worker thread that may be zombified
740  * \param arg The pool to which the worker belongs
741  * \param data The counter
742  * \param flags Unused
743  * \retval CMP_MATCH The zombified thread should be removed from its current container
744  * \retval CMP_STOP Stop attempting to zombify threads
745  */
746 static int zombify_threads(void *obj, void *arg, void *data, int flags)
747 {
748  struct worker_thread *worker = obj;
749  struct ast_threadpool *pool = arg;
750  int *num_to_zombify = data;
751 
752  if ((*num_to_zombify)-- > 0) {
753  if (!ao2_link(pool->zombie_threads, worker)) {
754  ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
755  return 0;
756  }
757  worker_set_state(worker, ZOMBIE);
758  return CMP_MATCH;
759  } else {
760  return CMP_STOP;
761  }
762 }
763 
764 /*!
765  * \brief Remove threads from the threadpool
766  *
767  * The preference is to kill idle threads. However, if there are
768  * more threads to remove than there are idle threads, then active
769  * threads will be zombified instead.
770  *
771  * This function is called from the threadpool control taskprocessor thread.
772  *
773  * \param pool The threadpool to remove threads from
774  * \param delta The number of threads to remove
775  */
776 static void shrink(struct ast_threadpool *pool, int delta)
777 {
778  /*
779  * Preference is to kill idle threads, but
780  * we'll move on to deactivating active threads
781  * if we have to
782  */
784  int idle_threads_to_kill = MIN(delta, idle_threads);
785  int active_threads_to_zombify = delta - idle_threads_to_kill;
786 
787  ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
788  ast_taskprocessor_name(pool->tps));
789 
791  kill_threads, &idle_threads_to_kill);
792 
793  ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
794  ast_taskprocessor_name(pool->tps));
795 
797  zombify_threads, pool, &active_threads_to_zombify);
798 }
799 
800 /*!
801  * \brief Helper struct used for queued operations that change the size of the threadpool
802  */
804  /*! The pool whose size is to change */
806  /*! The requested new size of the pool */
807  unsigned int size;
808 };
809 
810 /*!
811  * \brief Allocate and initialize a set_size_data
812  * \param pool The pool for the set_size_data
813  * \param size The size to store in the set_size_data
814  */
816  unsigned int size)
817 {
818  struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
819  if (!ssd) {
820  return NULL;
821  }
822 
823  ssd->pool = pool;
824  ssd->size = size;
825  return ssd;
826 }
827 
828 /*!
829  * \brief Change the size of the threadpool
830  *
831  * This can either result in shrinking or growing the threadpool depending
832  * on the new desired size and the current size.
833  *
834  * This function is run from the threadpool control taskprocessor thread
835  *
836  * \param data A set_size_data used for determining how to act
837  * \return 0
838  */
839 static int queued_set_size(void *data)
840 {
841  struct set_size_data *ssd = data;
842  struct ast_threadpool *pool = ssd->pool;
843  unsigned int num_threads = ssd->size;
844 
845  /* We don't count zombie threads as being "live" when potentially resizing */
846  unsigned int current_size = ao2_container_count(pool->active_threads) +
848 
849  ast_free(ssd);
850 
851  if (current_size == num_threads) {
852  ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
853  num_threads, current_size);
854  return 0;
855  }
856 
857  if (current_size < num_threads) {
859  activate_thread, pool);
860 
861  /* As the above may have altered the number of current threads update it */
862  current_size = ao2_container_count(pool->active_threads) +
864  grow(pool, num_threads - current_size);
866  activate_thread, pool);
867  } else {
868  shrink(pool, current_size - num_threads);
869  }
870 
872  return 0;
873 }
874 
875 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
876 {
877  struct set_size_data *ssd;
879 
880  if (pool->shutting_down) {
881  return;
882  }
883 
884  ssd = set_size_data_alloc(pool, size);
885  if (!ssd) {
886  return;
887  }
888 
890  ast_free(ssd);
891  }
892 }
893 
896 {
898  if (!listener) {
899  return NULL;
900  }
901  listener->callbacks = callbacks;
902  listener->user_data = user_data;
903  return listener;
904 }
905 
907 {
908  return listener->user_data;
909 }
910 
914 };
915 
918  const struct ast_threadpool_options *options)
919 {
920  struct ast_taskprocessor *tps;
921  RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922  RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
923  char *fullname;
924 
925  pool = threadpool_alloc(name, options);
926  if (!pool) {
927  return NULL;
928  }
929 
931  if (!tps_listener) {
932  return NULL;
933  }
934 
935  if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
936  ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
937  return NULL;
938  }
939 
940  fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
941  sprintf(fullname, "%s/pool", name); /* Safe */
942  tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
943  if (!tps) {
944  return NULL;
945  }
946 
947  pool->tps = tps;
948  if (listener) {
949  ao2_ref(listener, +1);
950  pool->listener = listener;
951  }
952  ast_threadpool_set_size(pool, pool->options.initial_size);
953  ao2_ref(pool, +1);
954  return pool;
955 }
956 
957 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
958 {
959  SCOPED_AO2LOCK(lock, pool);
960  if (!pool->shutting_down) {
961  return ast_taskprocessor_push(pool->tps, task, data);
962  }
963  return -1;
964 }
965 
967 {
968  if (!pool) {
969  return;
970  }
971  /* Shut down the taskprocessors and everything else just
972  * takes care of itself via the taskprocessor callbacks
973  */
974  ao2_lock(pool);
975  pool->shutting_down = 1;
976  ao2_unlock(pool);
979 }
980 
981 /*!
982  * A monotonically increasing integer used for worker
983  * thread identification.
984  */
985 static int worker_id_counter;
986 
987 static int worker_thread_hash(const void *obj, int flags)
988 {
989  const struct worker_thread *worker = obj;
990 
991  return worker->id;
992 }
993 
994 static int worker_thread_cmp(void *obj, void *arg, int flags)
995 {
996  struct worker_thread *worker1 = obj;
997  struct worker_thread *worker2 = arg;
998 
999  return worker1->id == worker2->id ? CMP_MATCH : 0;
1000 }
1001 
1002 /*!
1003  * \brief shut a worker thread down
1004  *
1005  * Set the worker dead and then wait for its thread
1006  * to finish executing.
1007  *
1008  * \param worker The worker thread to shut down
1009  */
1010 static void worker_shutdown(struct worker_thread *worker)
1011 {
1012  worker_set_state(worker, DEAD);
1013  if (worker->thread != AST_PTHREADT_NULL) {
1014  pthread_join(worker->thread, NULL);
1015  worker->thread = AST_PTHREADT_NULL;
1016  }
1017 }
1018 
1019 /*!
1020  * \brief Worker thread destructor
1021  *
1022  * Called automatically when refcount reaches 0. Shuts
1023  * down the worker thread and destroys its component
1024  * parts
1025  */
1026 static void worker_thread_destroy(void *obj)
1027 {
1028  struct worker_thread *worker = obj;
1029  ast_debug(3, "Destroying worker thread %d\n", worker->id);
1030  worker_shutdown(worker);
1031  ast_mutex_destroy(&worker->lock);
1032  ast_cond_destroy(&worker->cond);
1033 }
1034 
1035 /*!
1036  * \brief start point for worker threads
1037  *
1038  * Worker threads start in the active state but may
1039  * immediately go idle if there is no work to be
1040  * done
1041  *
1042  * \param arg The worker thread
1043  */
1044 static void *worker_start(void *arg)
1045 {
1046  struct worker_thread *worker = arg;
1047  enum worker_state saved_state;
1048 
1049  if (worker->options.thread_start) {
1050  worker->options.thread_start();
1051  }
1052 
1053  ast_mutex_lock(&worker->lock);
1054  while (worker_idle(worker)) {
1055  ast_mutex_unlock(&worker->lock);
1056  worker_active(worker);
1057  ast_mutex_lock(&worker->lock);
1058  if (worker->state != ALIVE) {
1059  break;
1060  }
1061  threadpool_active_thread_idle(worker->pool, worker);
1062  }
1063  saved_state = worker->state;
1064  ast_mutex_unlock(&worker->lock);
1065 
1066  /* Reaching this portion means the thread is
1067  * on death's door. It may have been killed while
1068  * it was idle, in which case it can just die
1069  * peacefully. If it's a zombie, though, then
1070  * it needs to let the pool know so
1071  * that the thread can be removed from the
1072  * list of zombie threads.
1073  */
1074  if (saved_state == ZOMBIE) {
1075  threadpool_zombie_thread_dead(worker->pool, worker);
1076  }
1077 
1078  if (worker->options.thread_end) {
1079  worker->options.thread_end();
1080  }
1081  return NULL;
1082 }
1083 
1084 /*!
1085  * \brief Allocate and initialize a new worker thread
1086  *
1087  * This will create, initialize, and start the thread.
1088  *
1089  * \param pool The threadpool to which the worker will be added
1090  * \retval NULL Failed to allocate or start the worker thread
1091  * \retval non-NULL The newly-created worker thread
1092  */
1094 {
1095  struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1096  if (!worker) {
1097  return NULL;
1098  }
1100  ast_mutex_init(&worker->lock);
1101  ast_cond_init(&worker->cond, NULL);
1102  worker->pool = pool;
1103  worker->thread = AST_PTHREADT_NULL;
1104  worker->state = ALIVE;
1105  worker->options = pool->options;
1106  return worker;
1107 }
1108 
1109 static int worker_thread_start(struct worker_thread *worker)
1110 {
1111  return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1112 }
1113 
1114 /*!
1115  * \brief Active loop for worker threads
1116  *
1117  * The worker will stay in this loop for its lifetime,
1118  * executing tasks as they become available. If there
1119  * are no tasks currently available, then the thread
1120  * will go idle.
1121  *
1122  * \param worker The worker thread executing tasks.
1123  */
1124 static void worker_active(struct worker_thread *worker)
1125 {
1126  int alive;
1127 
1128  /* The following is equivalent to
1129  *
1130  * while (threadpool_execute(worker->pool));
1131  *
1132  * However, reviewers have suggested in the past
1133  * doing that can cause optimizers to (wrongly)
1134  * optimize the code away.
1135  */
1136  do {
1137  alive = threadpool_execute(worker->pool);
1138  } while (alive);
1139 }
1140 
1141 /*!
1142  * \brief Idle function for worker threads
1143  *
1144  * The worker waits here until it gets told by the threadpool
1145  * to wake up.
1146  *
1147  * worker is locked before entering this function.
1148  *
1149  * \param worker The idle worker
1150  * \retval 0 The thread is being woken up so that it can conclude.
1151  * \retval non-zero The thread is being woken up to do more work.
1152  */
1153 static int worker_idle(struct worker_thread *worker)
1154 {
1155  struct timeval start = ast_tvnow();
1156  struct timespec end = {
1157  .tv_sec = start.tv_sec + worker->options.idle_timeout,
1158  .tv_nsec = start.tv_usec * 1000,
1159  };
1160  while (!worker->wake_up) {
1161  if (worker->options.idle_timeout <= 0) {
1162  ast_cond_wait(&worker->cond, &worker->lock);
1163  } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1164  break;
1165  }
1166  }
1167 
1168  if (!worker->wake_up) {
1169  ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1170  threadpool_idle_thread_dead(worker->pool, worker);
1171  worker->state = DEAD;
1172  }
1173  worker->wake_up = 0;
1174  return worker->state == ALIVE;
1175 }
1176 
1177 /*!
1178  * \brief Change a worker's state
1179  *
1180  * The threadpool calls into this function in order to let a worker know
1181  * how it should proceed.
1182  *
1183  * \retval -1 failure (state transition not permitted)
1184  * \retval 0 success
1185  */
1186 static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1187 {
1188  SCOPED_MUTEX(lock, &worker->lock);
1189 
1190  switch (state) {
1191  case ALIVE:
1192  /* This can occur due to a race condition between being told to go active
1193  * and an idle timeout happening.
1194  */
1195  if (worker->state == DEAD) {
1196  return -1;
1197  }
1198  ast_assert(worker->state != ZOMBIE);
1199  break;
1200  case DEAD:
1201  break;
1202  case ZOMBIE:
1203  ast_assert(worker->state != DEAD);
1204  break;
1205  }
1206 
1207  worker->state = state;
1208  worker->wake_up = 1;
1209  ast_cond_signal(&worker->cond);
1210 
1211  return 0;
1212 }
1213 
1214 /*! Serializer group shutdown control object. */
1216  /*! Shutdown thread waits on this conditional. */
1218  /*! Count of serializers needing to shutdown. */
1219  int count;
1220 };
1221 
1222 static void serializer_shutdown_group_dtor(void *vdoomed)
1223 {
1224  struct ast_serializer_shutdown_group *doomed = vdoomed;
1225 
1226  ast_cond_destroy(&doomed->cond);
1227 }
1228 
1230 {
1232 
1234  if (!shutdown_group) {
1235  return NULL;
1236  }
1238  return shutdown_group;
1239 }
1240 
1242 {
1243  int remaining;
1244  ast_mutex_t *lock;
1245 
1246  if (!shutdown_group) {
1247  return 0;
1248  }
1249 
1251  ast_assert(lock != NULL);
1252 
1254  if (timeout) {
1255  struct timeval start;
1256  struct timespec end;
1257 
1258  start = ast_tvnow();
1259  end.tv_sec = start.tv_sec + timeout;
1260  end.tv_nsec = start.tv_usec * 1000;
1261  while (shutdown_group->count) {
1263  /* Error or timed out waiting for the count to reach zero. */
1264  break;
1265  }
1266  }
1267  } else {
1268  while (shutdown_group->count) {
1270  /* Error */
1271  break;
1272  }
1273  }
1274  }
1275  remaining = shutdown_group->count;
1277  return remaining;
1278 }
1279 
1280 /*!
1281  * \internal
1282  * \brief Increment the number of serializer members in the group.
1283  * \since 13.5.0
1284  *
1285  * \param shutdown_group Group shutdown controller.
1286  */
1288 {
1290  ++shutdown_group->count;
1292 }
1293 
1294 /*!
1295  * \internal
1296  * \brief Decrement the number of serializer members in the group.
1297  * \since 13.5.0
1298  *
1299  * \param shutdown_group Group shutdown controller.
1300  */
1302 {
1304  --shutdown_group->count;
1305  if (!shutdown_group->count) {
1307  }
1309 }
1310 
1311 struct serializer {
1312  /*! Threadpool the serializer will use to process the jobs. */
1314  /*! Which group will wait for this serializer to shutdown. */
1316 };
1317 
1318 static void serializer_dtor(void *obj)
1319 {
1320  struct serializer *ser = obj;
1321 
1322  ao2_cleanup(ser->pool);
1323  ser->pool = NULL;
1325  ser->shutdown_group = NULL;
1326 }
1327 
1330 {
1331  struct serializer *ser;
1332 
1334  if (!ser) {
1335  return NULL;
1336  }
1337  ao2_ref(pool, +1);
1338  ser->pool = pool;
1340  return ser;
1341 }
1342 
1343 AST_THREADSTORAGE_RAW(current_serializer);
1344 
1345 static int execute_tasks(void *data)
1346 {
1347  struct ast_taskprocessor *tps = data;
1348 
1349  ast_threadstorage_set_ptr(&current_serializer, tps);
1350  while (ast_taskprocessor_execute(tps)) {
1351  /* No-op */
1352  }
1353  ast_threadstorage_set_ptr(&current_serializer, NULL);
1354 
1356  return 0;
1357 }
1358 
1360 {
1361  if (was_empty) {
1364 
1365  if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1367  }
1368  }
1369 }
1370 
1372 {
1373  /* No-op */
1374  return 0;
1375 }
1376 
1378 {
1380 
1381  if (ser->shutdown_group) {
1383  }
1384  ao2_cleanup(ser);
1385 }
1386 
1389  .start = serializer_start,
1390  .shutdown = serializer_shutdown,
1391 };
1392 
1394 {
1395  return ast_threadstorage_get_ptr(&current_serializer);
1396 }
1397 
1400 {
1401  struct serializer *ser;
1403  struct ast_taskprocessor *tps;
1404 
1405  ser = serializer_create(pool, shutdown_group);
1406  if (!ser) {
1407  return NULL;
1408  }
1409 
1411  if (!listener) {
1412  ao2_ref(ser, -1);
1413  return NULL;
1414  }
1415 
1417  if (!tps) {
1418  /* ser ref transferred to listener but not cleaned without tps */
1419  ao2_ref(ser, -1);
1420  } else if (shutdown_group) {
1422  }
1423 
1424  ao2_ref(listener, -1);
1425  return tps;
1426 }
1427 
1429 {
1430  return ast_threadpool_serializer_group(name, pool, NULL);
1431 }
1432 
1434 {
1435  return ast_taskprocessor_size(pool->tps);
1436 }
ast_mutex_t lock
Definition: app_meetme.c:1093
static void * listener(void *unused)
Definition: asterisk.c:1512
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1723
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_UNLINK
Definition: astobj2.h:1039
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
enum sip_cc_notify_state state
Definition: chan_sip.c:966
char * end
Definition: eagi_proxy.c:73
static const char name[]
Definition: format_mp3.c:68
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_unlock(a)
Definition: lock.h:188
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
pthread_cond_t ast_cond_t
Definition: lock.h:176
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
#define ast_mutex_destroy(a)
Definition: lock.h:186
#define ast_mutex_lock(a)
Definition: lock.h:187
#define ast_cond_signal(cond)
Definition: lock.h:201
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
struct @498 callbacks
#define NULL
Definition: resample.c:96
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:739
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:640
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1091
Generic container type.
Structure for mutex and tracking information.
Definition: lock.h:135
Support for dynamic strings.
Definition: strings.h:604
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:99
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:92
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition: threadpool.h:67
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
Definition: threadpool.h:56
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition: threadpool.h:47
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
listener for a threadpool
Definition: threadpool.c:110
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
void(* thread_start)(void)
Function to call when a thread starts.
Definition: threadpool.h:117
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
void(* thread_end)(void)
Function to call when a thread ends.
Definition: threadpool.h:124
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
An opaque threadpool structure.
Definition: threadpool.c:36
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
Definition: threadpool.c:48
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
Definition: threadpool.c:43
struct ast_threadpool_options options
Definition: threadpool.c:913
struct ast_threadpool * pool
Definition: threadpool.c:912
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1315
struct ast_threadpool * pool
Definition: threadpool.c:1313
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:803
unsigned int size
Definition: threadpool.c:807
struct ast_threadpool * pool
Definition: threadpool.c:805
helper used for queued task when tasks are pushed
Definition: threadpool.c:452
struct ast_threadpool * pool
Definition: threadpool.c:454
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
struct ast_threadpool * pool
Definition: threadpool.c:195
struct worker_thread * worker
Definition: threadpool.c:197
pthread_t thread
Definition: threadpool.c:151
ast_cond_t cond
Definition: threadpool.c:147
struct ast_threadpool_options options
Definition: threadpool.c:159
enum worker_state state
Definition: threadpool.c:155
struct ast_threadpool * pool
Definition: threadpool.c:153
ast_mutex_t lock
Definition: threadpool.c:149
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static struct test_options options
static int task(void *data)
Queued task for baseline test.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
Definition: threadpool.c:1093
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Definition: threadpool.c:1124
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1301
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition: threadpool.c:565
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:492
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition: threadpool.c:256
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1387
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
Definition: threadpool.c:639
static int worker_thread_hash(const void *obj, int flags)
Definition: threadpool.c:987
static int worker_thread_start(struct worker_thread *worker)
Definition: threadpool.c:1109
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:815
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition: threadpool.c:894
static void serializer_dtor(void *obj)
Definition: threadpool.c:1318
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
Definition: threadpool.c:386
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
Definition: threadpool.c:680
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:525
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
Definition: threadpool.c:1010
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:696
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
Definition: threadpool.c:1186
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
Definition: threadpool.c:1153
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
Definition: threadpool.c:655
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: threadpool.c:1359
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:839
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool's task queue.
Definition: threadpool.c:1433
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition: threadpool.c:362
worker_state
states for worker threads
Definition: threadpool.c:120
@ DEAD
Definition: threadpool.c:137
@ ALIVE
Definition: threadpool.c:122
@ ZOMBIE
Definition: threadpool.c:130
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
AST_THREADSTORAGE_RAW(current_serializer)
static int worker_id_counter
Definition: threadpool.c:985
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition: threadpool.c:776
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition: threadpool.c:235
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1328
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1287
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition: threadpool.c:284
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition: threadpool.c:301
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
Definition: threadpool.c:957
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1377
static int queued_idle_thread_dead(void *data)
Definition: threadpool.c:321
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:444
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
Definition: threadpool.c:906
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition: threadpool.c:332
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
static int worker_thread_cmp(void *obj, void *arg, int flags)
Definition: threadpool.c:994
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Definition: threadpool.c:1026
#define THREAD_BUCKETS
Definition: threadpool.c:28
static int execute_tasks(void *data)
Definition: threadpool.c:1345
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
Definition: threadpool.c:1393
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1371
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition: threadpool.c:466
static void * worker_start(void *arg)
start point for worker threads
Definition: threadpool.c:1044
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
Definition: threadpool.c:611
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition: threadpool.c:715
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition: threadpool.c:404
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition: threadpool.c:746
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:157
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:936
#define ast_assert(a)
Definition: utils.h:734
#define MIN(a, b)
Definition: utils.h:226
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:579