Asterisk - The Open Source Telephony Project GIT-master-a358458
Data Structures | Macros | Enumerations | Functions | Variables
threadpool.c File Reference
#include "asterisk.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
Include dependency graph for threadpool.c:

Go to the source code of this file.

Data Structures

struct  ast_serializer_shutdown_group
 
struct  ast_threadpool
 An opaque threadpool structure. More...
 
struct  ast_threadpool_listener
 listener for a threadpool More...
 
struct  pool_options_pair
 
struct  serializer
 
struct  set_size_data
 Helper struct used for queued operations that change the size of the threadpool. More...
 
struct  task_pushed_data
 helper used for queued task when tasks are pushed More...
 
struct  thread_worker_pair
 Struct used for queued operations involving worker state changes. More...
 
struct  worker_thread
 

Macros

#define THREAD_BUCKETS   89
 

Enumerations

enum  worker_state { ALIVE , ZOMBIE , DEAD }
 states for worker threads More...
 

Functions

static int activate_thread (void *obj, void *arg, int flags)
 Activate idle threads. More...
 
struct ast_serializer_shutdown_groupast_serializer_shutdown_group_alloc (void)
 Create a serializer group shutdown control object. More...
 
int ast_serializer_shutdown_group_join (struct ast_serializer_shutdown_group *shutdown_group, int timeout)
 Wait for the serializers in the group to shutdown with timeout. More...
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool. More...
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener. More...
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data. More...
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data)
 Push a task to the threadpool. More...
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue. More...
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool. More...
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool. More...
 
void ast_threadpool_set_size (struct ast_threadpool *pool, unsigned int size)
 Set the number of threads for the thread pool. More...
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it. More...
 
 AST_THREADSTORAGE_RAW (current_serializer)
 
static int execute_tasks (void *data)
 
static void grow (struct ast_threadpool *pool, int delta)
 Add threads to the threadpool. More...
 
static int kill_threads (void *obj, void *arg, int flags)
 ao2 callback to kill a set number of threads. More...
 
static int queued_active_thread_idle (void *data)
 Move a worker thread from the active container to the idle container. More...
 
static int queued_emptied (void *data)
 Queued task that handles the case where the threadpool's taskprocessor is emptied. More...
 
static int queued_idle_thread_dead (void *data)
 
static int queued_set_size (void *data)
 Change the size of the threadpool. More...
 
static int queued_task_pushed (void *data)
 Queued task called when tasks are pushed into the threadpool. More...
 
static int queued_zombie_thread_dead (void *data)
 Kill a zombie thread. More...
 
static struct serializerserializer_create (struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_dtor (void *obj)
 
static void serializer_shutdown (struct ast_taskprocessor_listener *listener)
 
static void serializer_shutdown_group_dec (struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_shutdown_group_dtor (void *vdoomed)
 
static void serializer_shutdown_group_inc (struct ast_serializer_shutdown_group *shutdown_group)
 
static int serializer_start (struct ast_taskprocessor_listener *listener)
 
static void serializer_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static struct set_size_dataset_size_data_alloc (struct ast_threadpool *pool, unsigned int size)
 Allocate and initialize a set_size_data. More...
 
static void shrink (struct ast_threadpool *pool, int delta)
 Remove threads from the threadpool. More...
 
static struct task_pushed_datatask_pushed_data_alloc (struct ast_threadpool *pool, int was_empty)
 Allocate and initialize a task_pushed_data. More...
 
static struct thread_worker_pairthread_worker_pair_alloc (struct ast_threadpool *pool, struct worker_thread *worker)
 Allocate and initialize a thread_worker_pair. More...
 
static void thread_worker_pair_free (struct thread_worker_pair *pair)
 Destructor for thread_worker_pair. More...
 
static void threadpool_active_thread_idle (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to move a thread from the active list to the idle list. More...
 
static struct ast_threadpoolthreadpool_alloc (const char *name, const struct ast_threadpool_options *options)
 Allocate a threadpool. More...
 
static void threadpool_destructor (void *obj)
 Destroy a threadpool's components. More...
 
static int threadpool_execute (struct ast_threadpool *pool)
 Execute a task in the threadpool. More...
 
static void threadpool_idle_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 
static void threadpool_send_state_changed (struct ast_threadpool *pool)
 Notify the threadpool listener that the state has changed. More...
 
static void threadpool_tps_emptied (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener emptied callback. More...
 
static void threadpool_tps_shutdown (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener shutdown callback. More...
 
static int threadpool_tps_start (struct ast_taskprocessor_listener *listener)
 
static void threadpool_tps_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 Taskprocessor listener callback called when a task is added. More...
 
static void threadpool_zombie_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to kill a zombie thread. More...
 
static void worker_active (struct worker_thread *worker)
 Active loop for worker threads. More...
 
static int worker_idle (struct worker_thread *worker)
 Idle function for worker threads. More...
 
static int worker_set_state (struct worker_thread *worker, enum worker_state state)
 Change a worker's state. More...
 
static void worker_shutdown (struct worker_thread *worker)
 shut a worker thread down More...
 
static void * worker_start (void *arg)
 start point for worker threads More...
 
static struct worker_threadworker_thread_alloc (struct ast_threadpool *pool)
 Allocate and initialize a new worker thread. More...
 
static int worker_thread_cmp (void *obj, void *arg, int flags)
 
static void worker_thread_destroy (void *obj)
 Worker thread destructor. More...
 
static int worker_thread_hash (const void *obj, int flags)
 
static int worker_thread_start (struct worker_thread *worker)
 
static int zombify_threads (void *obj, void *arg, void *data, int flags)
 ao2 callback to zombify a set number of threads. More...
 

Variables

static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
 
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
 Table of taskprocessor listener callbacks for threadpool's main taskprocessor. More...
 
static int worker_id_counter
 

Macro Definition Documentation

◆ THREAD_BUCKETS

#define THREAD_BUCKETS   89

Definition at line 28 of file threadpool.c.

Enumeration Type Documentation

◆ worker_state

states for worker threads

Enumerator
ALIVE 

The worker is either active or idle

ZOMBIE 

The worker has been asked to shut down but may still be in the process of executing tasks. This transition happens when the threadpool needs to shrink and needs to kill active threads in order to do so.

DEAD 

The worker has been asked to shut down. Typically only idle threads go to this state directly, but active threads may go straight to this state when the threadpool is shut down.

Definition at line 120 of file threadpool.c.

120 {
121 /*! The worker is either active or idle */
122 ALIVE,
123 /*!
124 * The worker has been asked to shut down but
125 * may still be in the process of executing tasks.
126 * This transition happens when the threadpool needs
127 * to shrink and needs to kill active threads in order
128 * to do so.
129 */
130 ZOMBIE,
131 /*!
132 * The worker has been asked to shut down. Typically
133 * only idle threads go to this state directly, but
134 * active threads may go straight to this state when
135 * the threadpool is shut down.
136 */
137 DEAD,
138};
@ DEAD
Definition: threadpool.c:137
@ ALIVE
Definition: threadpool.c:122
@ ZOMBIE
Definition: threadpool.c:130

Function Documentation

◆ activate_thread()

static int activate_thread ( void *  obj,
void *  arg,
int  flags 
)
static

Activate idle threads.

This function always returns CMP_MATCH because all workers that this function acts on need to be seen as matches so they are unlinked from the list of idle threads.

Called as an ao2_callback in the threadpool's control taskprocessor thread.

Parameters
objThe worker to activate
argThe pool where the worker belongs
flags
Return values
CMP_MATCH

Definition at line 492 of file threadpool.c.

493{
494 struct worker_thread *worker = obj;
495 struct ast_threadpool *pool = arg;
496
497 if (!ao2_link(pool->active_threads, worker)) {
498 /* If we can't link the idle thread into the active container, then
499 * we'll just leave the thread idle and not wake it up.
500 */
501 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
502 worker->id);
503 return 0;
504 }
505
506 if (worker_set_state(worker, ALIVE)) {
507 ast_debug(1, "Failed to activate thread %d. It is dead\n",
508 worker->id);
509 /* The worker thread will no longer exist in the active threads or
510 * idle threads container after this.
511 */
512 ao2_unlink(pool->active_threads, worker);
513 }
514
515 return CMP_MATCH;
516}
#define ast_log
Definition: astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
@ CMP_MATCH
Definition: astobj2.h:1027
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_WARNING
An opaque threadpool structure.
Definition: threadpool.c:36
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
Definition: threadpool.c:43
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
Definition: threadpool.c:1186

References ast_threadpool::active_threads, ALIVE, ao2_link, ao2_unlink, ast_debug, ast_log, CMP_MATCH, worker_thread::id, LOG_WARNING, and worker_set_state().

Referenced by queued_set_size(), and queued_task_pushed().

◆ ast_serializer_shutdown_group_alloc()

struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc ( void  )

Create a serializer group shutdown control object.

Since
13.5.0
Returns
ao2 object to control shutdown of a serializer group.

Definition at line 1229 of file threadpool.c.

1230{
1232
1234 if (!shutdown_group) {
1235 return NULL;
1236 }
1238 return shutdown_group;
1239}
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ast_cond_init(cond, attr)
Definition: lock.h:201
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
#define NULL
Definition: resample.c:96
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222

References ao2_alloc, ast_cond_init, ast_serializer_shutdown_group::cond, NULL, serializer_shutdown_group_dtor(), and shutdown_group.

Referenced by ast_serializer_pool_create(), load_module(), and sip_options_init_task().

◆ ast_serializer_shutdown_group_join()

int ast_serializer_shutdown_group_join ( struct ast_serializer_shutdown_group shutdown_group,
int  timeout 
)

Wait for the serializers in the group to shutdown with timeout.

Since
13.5.0
Parameters
shutdown_groupGroup shutdown controller. (Returns 0 immediately if NULL)
timeoutNumber of seconds to wait for the serializers in the group to shutdown. Zero if the timeout is disabled.
Returns
Number of serializers that did not get shutdown within the timeout.

Definition at line 1241 of file threadpool.c.

1242{
1243 int remaining;
1245
1246 if (!shutdown_group) {
1247 return 0;
1248 }
1249
1251 ast_assert(lock != NULL);
1252
1254 if (timeout) {
1255 struct timeval start;
1256 struct timespec end;
1257
1258 start = ast_tvnow();
1259 end.tv_sec = start.tv_sec + timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
1261 while (shutdown_group->count) {
1263 /* Error or timed out waiting for the count to reach zero. */
1264 break;
1265 }
1266 }
1267 } else {
1268 while (shutdown_group->count) {
1270 /* Error */
1271 break;
1272 }
1273 }
1274 }
1275 remaining = shutdown_group->count;
1277 return remaining;
1278}
ast_mutex_t lock
Definition: app_sla.c:331
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
char * end
Definition: eagi_proxy.c:73
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
Structure for mutex and tracking information.
Definition: lock.h:135
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define ast_assert(a)
Definition: utils.h:739

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_assert, ast_cond_timedwait, ast_cond_wait, ast_tvnow(), ast_serializer_shutdown_group::cond, ast_serializer_shutdown_group::count, end, lock, NULL, and shutdown_group.

Referenced by ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), and unload_module().

◆ ast_threadpool_create()

struct ast_threadpool * ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 916 of file threadpool.c.

919{
920 struct ast_taskprocessor *tps;
921 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
923 char *fullname;
924
926 if (!pool) {
927 return NULL;
928 }
929
931 if (!tps_listener) {
932 return NULL;
933 }
934
935 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
936 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
937 return NULL;
938 }
939
940 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
941 sprintf(fullname, "%s/pool", name); /* Safe */
942 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
943 if (!tps) {
944 return NULL;
945 }
946
947 pool->tps = tps;
948 if (listener) {
949 ao2_ref(listener, +1);
950 pool->listener = listener;
951 }
952 ast_threadpool_set_size(pool, pool->options.initial_size);
953 ao2_ref(pool, +1);
954 return pool;
955}
static void * listener(void *unused)
Definition: asterisk.c:1514
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static const char name[]
Definition: format_mp3.c:68
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct test_options options
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:696
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition: threadpool.c:404
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), listener(), LOG_WARNING, name, NULL, options, RAII_VAR, threadpool_alloc(), and threadpool_tps_listener_callbacks.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), load_module(), and stasis_init().

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener * ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 894 of file threadpool.c.

896{
898 if (!listener) {
899 return NULL;
900 }
901 listener->callbacks = callbacks;
902 listener->user_data = user_data;
903 return listener;
904}
struct @468 callbacks
listener for a threadpool
Definition: threadpool.c:110

References ao2_alloc, callbacks, listener(), NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE().

◆ ast_threadpool_listener_get_user_data()

void * ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 906 of file threadpool.c.

907{
908 return listener->user_data;
909}

References listener().

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 957 of file threadpool.c.

958{
959 SCOPED_AO2LOCK(lock, pool);
960 if (!pool->shutting_down) {
961 return ast_taskprocessor_push(pool->tps, task, data);
962 }
963 return -1;
964}
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.

References ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), and serializer_task_pushed().

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1433 of file threadpool.c.

1434{
1435 return ast_taskprocessor_size(pool->tps);
1436}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

◆ ast_threadpool_serializer()

struct ast_taskprocessor * ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1428 of file threadpool.c.

1429{
1431}
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398

References ast_threadpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor * ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1393 of file threadpool.c.

1394{
1395 return ast_threadstorage_get_ptr(&current_serializer);
1396}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), and rfc3326_outgoing_response().

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor * ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1398 of file threadpool.c.

1400{
1401 struct serializer *ser;
1403 struct ast_taskprocessor *tps;
1404
1405 ser = serializer_create(pool, shutdown_group);
1406 if (!ser) {
1407 return NULL;
1408 }
1409
1411 if (!listener) {
1412 ao2_ref(ser, -1);
1413 return NULL;
1414 }
1415
1417 if (!tps) {
1418 /* ser ref transferred to listener but not cleaned without tps */
1419 ao2_ref(ser, -1);
1420 } else if (shutdown_group) {
1422 }
1423
1424 ao2_ref(listener, -1);
1425 return tps;
1426}
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1328
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1387
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1287

References ao2_ref, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_shutdown_group_inc(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 875 of file threadpool.c.

876{
877 struct set_size_data *ssd;
879
880 if (pool->shutting_down) {
881 return;
882 }
883
885 if (!ssd) {
886 return;
887 }
888
890 ast_free(ssd);
891 }
892}
#define ast_free(a)
Definition: astmm.h:180
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:803
unsigned int size
Definition: threadpool.c:807
struct ast_threadpool * pool
Definition: threadpool.c:805
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:839
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:815

References ast_free, ast_taskprocessor_push(), ast_threadpool::control_tps, lock, set_size_data::pool, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), ast_threadpool::shutting_down, and set_size_data::size.

Referenced by AST_TEST_DEFINE(), and ast_threadpool_create().

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 966 of file threadpool.c.

967{
968 if (!pool) {
969 return;
970 }
971 /* Shut down the taskprocessors and everything else just
972 * takes care of itself via the taskprocessor callbacks
973 */
974 ao2_lock(pool);
975 pool->shutting_down = 1;
976 ao2_unlock(pool);
979}
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().

◆ AST_THREADSTORAGE_RAW()

AST_THREADSTORAGE_RAW ( current_serializer  )

◆ execute_tasks()

static int execute_tasks ( void *  data)
static

Definition at line 1345 of file threadpool.c.

1346{
1347 struct ast_taskprocessor *tps = data;
1348
1349 ast_threadstorage_set_ptr(&current_serializer, tps);
1350 while (ast_taskprocessor_execute(tps)) {
1351 /* No-op */
1352 }
1353 ast_threadstorage_set_ptr(&current_serializer, NULL);
1354
1356 return 0;
1357}
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References ast_taskprocessor_execute(), ast_taskprocessor_unreference(), ast_threadstorage_set_ptr(), and NULL.

Referenced by serializer_task_pushed().

◆ grow()

static void grow ( struct ast_threadpool pool,
int  delta 
)
static

Add threads to the threadpool.

This function is called from the threadpool's control taskprocessor thread.

Parameters
poolThe pool that is expanding
deltaThe number of threads to add to the pool

Definition at line 525 of file threadpool.c.

526{
527 int i;
528
529 int current_size = ao2_container_count(pool->active_threads) +
531
532 if (pool->options.max_size && current_size + delta > pool->options.max_size) {
533 delta = pool->options.max_size - current_size;
534 }
535
536 ast_debug(3, "Increasing threadpool %s's size by %d\n",
537 ast_taskprocessor_name(pool->tps), delta);
538
539 for (i = 0; i < delta; ++i) {
540 struct worker_thread *worker = worker_thread_alloc(pool);
541 if (!worker) {
542 return;
543 }
544 if (ao2_link(pool->idle_threads, worker)) {
545 if (worker_thread_start(worker)) {
546 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
548 }
549 } else {
550 ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
551 }
552 ao2_ref(worker, -1);
553 }
554}
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define LOG_ERROR
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
Definition: threadpool.c:48
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ast_threadpool * pool
Definition: threadpool.c:153
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
Definition: threadpool.c:1093
static int worker_thread_start(struct worker_thread *worker)
Definition: threadpool.c:1109

References ast_threadpool::active_threads, ao2_container_count(), ao2_link, ao2_ref, ao2_unlink, ast_debug, ast_log, ast_taskprocessor_name(), worker_thread::id, ast_threadpool::idle_threads, LOG_ERROR, LOG_WARNING, ast_threadpool_options::max_size, ast_threadpool::options, worker_thread::pool, ast_threadpool::tps, worker_thread_alloc(), and worker_thread_start().

Referenced by __ast_string_field_ptr_build_va(), __ast_string_field_ptr_grow(), queued_set_size(), and queued_task_pushed().

◆ kill_threads()

static int kill_threads ( void *  obj,
void *  arg,
int  flags 
)
static

ao2 callback to kill a set number of threads.

Threads will be unlinked from the container as long as the counter has not reached zero. The counter is decremented with each thread that is removed.

Parameters
objThe worker thread up for possible destruction
argThe counter
flagsUnused
Return values
CMP_MATCHThe counter has not reached zero, so this flag should be removed.
CMP_STOPThe counter has reached zero so no more threads should be removed.

Definition at line 715 of file threadpool.c.

716{
717 int *num_to_kill = arg;
718
719 if (*num_to_kill > 0) {
720 --(*num_to_kill);
721 return CMP_MATCH;
722 } else {
723 return CMP_STOP;
724 }
725}
@ CMP_STOP
Definition: astobj2.h:1028

References CMP_MATCH, and CMP_STOP.

Referenced by shrink().

◆ queued_active_thread_idle()

static int queued_active_thread_idle ( void *  data)
static

Move a worker thread from the active container to the idle container.

This function is called from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the worker to move.
Returns
0

Definition at line 235 of file threadpool.c.

236{
237 struct thread_worker_pair *pair = data;
238
239 ao2_link(pair->pool->idle_threads, pair->worker);
240 ao2_unlink(pair->pool->active_threads, pair->worker);
241
243
245 return 0;
246}
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180

References ao2_link, ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_active_thread_idle().

◆ queued_emptied()

static int queued_emptied ( void *  data)
static

Queued task that handles the case where the threadpool's taskprocessor is emptied.

This simply lets the threadpool's listener know that the threadpool is devoid of tasks

Parameters
dataThe pool that has become empty
Returns
0

Definition at line 639 of file threadpool.c.

640{
641 struct ast_threadpool *pool = data;
642
643 /* We already checked for existence of this callback when this was queued */
644 pool->listener->callbacks->emptied(pool, pool->listener);
645 return 0;
646}
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
Definition: threadpool.h:56
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
struct ast_threadpool_listener * listener
Definition: threadpool.c:38

References ast_threadpool_listener::callbacks, ast_threadpool_listener_callbacks::emptied, and ast_threadpool::listener.

Referenced by threadpool_tps_emptied().

◆ queued_idle_thread_dead()

static int queued_idle_thread_dead ( void *  data)
static

Definition at line 321 of file threadpool.c.

322{
323 struct thread_worker_pair *pair = data;
324
325 ao2_unlink(pair->pool->idle_threads, pair->worker);
327
329 return 0;
330}

References ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_idle_thread_dead().

◆ queued_set_size()

static int queued_set_size ( void *  data)
static

Change the size of the threadpool.

This can either result in shrinking or growing the threadpool depending on the new desired size and the current size.

This function is run from the threadpool control taskprocessor thread

Parameters
dataA set_size_data used for determining how to act
Returns
0

Definition at line 839 of file threadpool.c.

840{
841 struct set_size_data *ssd = data;
842 struct ast_threadpool *pool = ssd->pool;
843 unsigned int num_threads = ssd->size;
844
845 /* We don't count zombie threads as being "live" when potentially resizing */
846 unsigned int current_size = ao2_container_count(pool->active_threads) +
848
849 ast_free(ssd);
850
851 if (current_size == num_threads) {
852 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
853 num_threads, current_size);
854 return 0;
855 }
856
857 if (current_size < num_threads) {
859 activate_thread, pool);
860
861 /* As the above may have altered the number of current threads update it */
862 current_size = ao2_container_count(pool->active_threads) +
864 grow(pool, num_threads - current_size);
866 activate_thread, pool);
867 } else {
868 shrink(pool, current_size - num_threads);
869 }
870
872 return 0;
873}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition: astobj2.h:1693
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_NODATA
Definition: astobj2.h:1044
@ OBJ_MULTIPLE
Definition: astobj2.h:1049
@ OBJ_UNLINK
Definition: astobj2.h:1039
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:492
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:525
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition: threadpool.c:776

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_debug, ast_free, grow(), ast_threadpool::idle_threads, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, set_size_data::pool, shrink(), set_size_data::size, and threadpool_send_state_changed().

Referenced by ast_threadpool_set_size().

◆ queued_task_pushed()

static int queued_task_pushed ( void *  data)
static

Queued task called when tasks are pushed into the threadpool.

This function first calls into the threadpool's listener to let it know that a task has been pushed. It then wakes up all idle threads and moves them into the active thread container.

Parameters
dataA task_pushed_data
Returns
0

Definition at line 565 of file threadpool.c.

566{
567 struct task_pushed_data *tpd = data;
568 struct ast_threadpool *pool = tpd->pool;
569 int was_empty = tpd->was_empty;
570 unsigned int existing_active;
571
572 ast_free(tpd);
573
574 if (pool->listener && pool->listener->callbacks->task_pushed) {
575 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
576 }
577
578 existing_active = ao2_container_count(pool->active_threads);
579
580 /* The first pass transitions any existing idle threads to be active, and
581 * will also remove any worker threads that have recently entered the dead
582 * state.
583 */
585 activate_thread, pool);
586
587 /* If no idle threads could be transitioned to active grow the pool as permitted. */
588 if (ao2_container_count(pool->active_threads) == existing_active) {
589 if (!pool->options.auto_increment) {
590 return 0;
591 }
592 grow(pool, pool->options.auto_increment);
593 /* An optional second pass transitions any newly added threads. */
595 activate_thread, pool);
596 }
597
599 return 0;
600}
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition: threadpool.h:47
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
helper used for queued task when tasks are pushed
Definition: threadpool.c:452
struct ast_threadpool * pool
Definition: threadpool.c:454

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_free, ast_threadpool_options::auto_increment, ast_threadpool_listener::callbacks, grow(), ast_threadpool::idle_threads, ast_threadpool::listener, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::options, task_pushed_data::pool, ast_threadpool_listener_callbacks::task_pushed, threadpool_send_state_changed(), and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

◆ queued_zombie_thread_dead()

static int queued_zombie_thread_dead ( void *  data)
static

Kill a zombie thread.

This runs from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the zombie thread
Returns
0

Definition at line 284 of file threadpool.c.

285{
286 struct thread_worker_pair *pair = data;
287
288 ao2_unlink(pair->pool->zombie_threads, pair->worker);
290
292 return 0;
293}

References ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_zombie_thread_dead().

◆ serializer_create()

static struct serializer * serializer_create ( struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)
static

Definition at line 1328 of file threadpool.c.

1330{
1331 struct serializer *ser;
1332
1334 if (!ser) {
1335 return NULL;
1336 }
1337 ao2_ref(pool, +1);
1338 ser->pool = pool;
1340 return ser;
1341}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1315
struct ast_threadpool * pool
Definition: threadpool.c:1313
static void serializer_dtor(void *obj)
Definition: threadpool.c:1318

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, NULL, serializer::pool, serializer_dtor(), serializer::shutdown_group, and shutdown_group.

Referenced by ast_threadpool_serializer_group().

◆ serializer_dtor()

static void serializer_dtor ( void *  obj)
static

Definition at line 1318 of file threadpool.c.

1319{
1320 struct serializer *ser = obj;
1321
1322 ao2_cleanup(ser->pool);
1323 ser->pool = NULL;
1325 ser->shutdown_group = NULL;
1326}

References ao2_cleanup, NULL, serializer::pool, and serializer::shutdown_group.

Referenced by serializer_create().

◆ serializer_shutdown()

static void serializer_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 1377 of file threadpool.c.

1378{
1380
1381 if (ser->shutdown_group) {
1383 }
1384 ao2_cleanup(ser);
1385}
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1301

References ao2_cleanup, ast_taskprocessor_listener_get_user_data(), listener(), serializer_shutdown_group_dec(), and serializer::shutdown_group.

◆ serializer_shutdown_group_dec()

static void serializer_shutdown_group_dec ( struct ast_serializer_shutdown_group shutdown_group)
static

◆ serializer_shutdown_group_dtor()

static void serializer_shutdown_group_dtor ( void *  vdoomed)
static

Definition at line 1222 of file threadpool.c.

1223{
1224 struct ast_serializer_shutdown_group *doomed = vdoomed;
1225
1226 ast_cond_destroy(&doomed->cond);
1227}
#define ast_cond_destroy(cond)
Definition: lock.h:202

References ast_cond_destroy, and ast_serializer_shutdown_group::cond.

Referenced by ast_serializer_shutdown_group_alloc().

◆ serializer_shutdown_group_inc()

static void serializer_shutdown_group_inc ( struct ast_serializer_shutdown_group shutdown_group)
static

◆ serializer_start()

static int serializer_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 1371 of file threadpool.c.

1372{
1373 /* No-op */
1374 return 0;
1375}

◆ serializer_task_pushed()

static void serializer_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 1359 of file threadpool.c.

1360{
1361 if (was_empty) {
1364
1365 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1367 }
1368 }
1369}
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
Definition: threadpool.c:957
static int execute_tasks(void *data)
Definition: threadpool.c:1345

References ast_taskprocessor_listener_get_tps(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), ast_threadpool_push(), execute_tasks(), listener(), and serializer::pool.

◆ set_size_data_alloc()

static struct set_size_data * set_size_data_alloc ( struct ast_threadpool pool,
unsigned int  size 
)
static

Allocate and initialize a set_size_data.

Parameters
poolThe pool for the set_size_data
sizeThe size to store in the set_size_data

Definition at line 815 of file threadpool.c.

817{
818 struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
819 if (!ssd) {
820 return NULL;
821 }
822
823 ssd->pool = pool;
824 ssd->size = size;
825 return ssd;
826}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191

References ast_malloc, NULL, set_size_data::pool, and set_size_data::size.

Referenced by ast_threadpool_set_size().

◆ shrink()

static void shrink ( struct ast_threadpool pool,
int  delta 
)
static

Remove threads from the threadpool.

The preference is to kill idle threads. However, if there are more threads to remove than there are idle threads, then active threads will be zombified instead.

This function is called from the threadpool control taskprocessor thread.

Parameters
poolThe threadpool to remove threads from
deltaThe number of threads to remove

Definition at line 776 of file threadpool.c.

777{
778 /*
779 * Preference is to kill idle threads, but
780 * we'll move on to deactivating active threads
781 * if we have to
782 */
783 int idle_threads = ao2_container_count(pool->idle_threads);
784 int idle_threads_to_kill = MIN(delta, idle_threads);
785 int active_threads_to_zombify = delta - idle_threads_to_kill;
786
787 ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
789
791 kill_threads, &idle_threads_to_kill);
792
793 ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
795
797 zombify_threads, pool, &active_threads_to_zombify);
798}
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1723
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition: threadpool.c:715
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition: threadpool.c:746
#define MIN(a, b)
Definition: utils.h:231

References ast_threadpool::active_threads, ao2_callback, ao2_callback_data, ao2_container_count(), ast_debug, ast_taskprocessor_name(), ast_threadpool::idle_threads, kill_threads(), MIN, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::tps, and zombify_threads().

Referenced by queued_set_size().

◆ task_pushed_data_alloc()

static struct task_pushed_data * task_pushed_data_alloc ( struct ast_threadpool pool,
int  was_empty 
)
static

Allocate and initialize a task_pushed_data.

Parameters
poolThe threadpool to set in the task_pushed_data
was_emptyThe was_empty value to set in the task_pushed_data
Return values
NULLUnable to allocate task_pushed_data
non-NULLThe newly-allocated task_pushed_data

Definition at line 466 of file threadpool.c.

468{
469 struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470
471 if (!tpd) {
472 return NULL;
473 }
474 tpd->pool = pool;
475 tpd->was_empty = was_empty;
476 return tpd;
477}

References ast_malloc, NULL, task_pushed_data::pool, and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

◆ thread_worker_pair_alloc()

static struct thread_worker_pair * thread_worker_pair_alloc ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Allocate and initialize a thread_worker_pair.

Parameters
poolThreadpool to assign to the thread_worker_pair
workerWorker thread to assign to the thread_worker_pair

Definition at line 214 of file threadpool.c.

216{
217 struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218 if (!pair) {
219 return NULL;
220 }
221 pair->pool = pool;
222 ao2_ref(worker, +1);
223 pair->worker = worker;
224
225 return pair;
226}
struct ast_threadpool * pool
Definition: threadpool.c:195
struct worker_thread * worker
Definition: threadpool.c:197

References ao2_ref, ast_malloc, NULL, thread_worker_pair::pool, and thread_worker_pair::worker.

Referenced by threadpool_active_thread_idle(), threadpool_idle_thread_dead(), and threadpool_zombie_thread_dead().

◆ thread_worker_pair_free()

static void thread_worker_pair_free ( struct thread_worker_pair pair)
static

◆ threadpool_active_thread_idle()

static void threadpool_active_thread_idle ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to move a thread from the active list to the idle list.

This is called by a worker thread when it runs out of tasks to perform and goes idle.

Parameters
poolThe threadpool to which the worker belongs
workerThe worker thread that has gone idle

Definition at line 256 of file threadpool.c.

258{
259 struct thread_worker_pair *pair;
261
262 if (pool->shutting_down) {
263 return;
264 }
265
267 if (!pair) {
268 return;
269 }
270
273 }
274}
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition: threadpool.c:235

References ast_taskprocessor_push(), ast_threadpool::control_tps, lock, thread_worker_pair::pool, queued_active_thread_idle(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), thread_worker_pair_free(), and thread_worker_pair::worker.

Referenced by worker_start().

◆ threadpool_alloc()

static struct ast_threadpool * threadpool_alloc ( const char *  name,
const struct ast_threadpool_options options 
)
static

Allocate a threadpool.

This is implemented as a taskprocessor listener's alloc callback. This is because the threadpool exists as the private data on a taskprocessor listener.

Parameters
nameThe name of the threadpool.
optionsThe options the threadpool uses.
Return values
NULLCould not initialize threadpool properly
non-NULLThe newly-allocated threadpool

Definition at line 404 of file threadpool.c.

405{
406 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407 struct ast_str *control_tps_name;
408
409 pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410 control_tps_name = ast_str_create(64);
411 if (!pool || !control_tps_name) {
412 ast_free(control_tps_name);
413 return NULL;
414 }
415
416 ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417
418 pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419 ast_free(control_tps_name);
420 if (!pool->control_tps) {
421 return NULL;
422 }
423 pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425 if (!pool->active_threads) {
426 return NULL;
427 }
430 if (!pool->idle_threads) {
431 return NULL;
432 }
433 pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435 if (!pool->zombie_threads) {
436 return NULL;
437 }
438 pool->options = *options;
439
440 ao2_ref(pool, +1);
441 return pool;
442}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1113
Support for dynamic strings.
Definition: strings.h:623
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
static int worker_thread_hash(const void *obj, int flags)
Definition: threadpool.c:987
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
Definition: threadpool.c:386
static int worker_thread_cmp(void *obj, void *arg, int flags)
Definition: threadpool.c:994
#define THREAD_BUCKETS
Definition: threadpool.c:28

References ao2_alloc, AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_ref, ast_free, ast_str_buffer(), ast_str_create, ast_str_set(), ast_taskprocessor_get(), name, NULL, options, RAII_VAR, THREAD_BUCKETS, threadpool_destructor(), TPS_REF_DEFAULT, worker_thread_cmp(), and worker_thread_hash().

Referenced by ast_threadpool_create().

◆ threadpool_destructor()

static void threadpool_destructor ( void *  obj)
static

Destroy a threadpool's components.

This is the destructor called automatically when the threadpool's reference count reaches zero. This is not to be confused with threadpool_destroy.

By the time this actually gets called, most of the cleanup has already been done in the pool. The only thing left to do is to release the final reference to the threadpool listener.

Parameters
objThe pool to destroy

Definition at line 386 of file threadpool.c.

387{
388 struct ast_threadpool *pool = obj;
389 ao2_cleanup(pool->listener);
390}

References ao2_cleanup, and ast_threadpool::listener.

Referenced by threadpool_alloc().

◆ threadpool_execute()

static int threadpool_execute ( struct ast_threadpool pool)
static

Execute a task in the threadpool.

This is the function that worker threads call in order to execute tasks in the threadpool

Parameters
poolThe pool to which the tasks belong.
Return values
0Either the pool has been shut down or there are no tasks.
1There are still tasks remaining in the pool.

Definition at line 362 of file threadpool.c.

363{
364 ao2_lock(pool);
365 if (!pool->shutting_down) {
366 ao2_unlock(pool);
367 return ast_taskprocessor_execute(pool->tps);
368 }
369 ao2_unlock(pool);
370 return 0;
371}

References ao2_lock, ao2_unlock, ast_taskprocessor_execute(), thread_worker_pair::pool, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by worker_active().

◆ threadpool_idle_thread_dead()

static void threadpool_idle_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

◆ threadpool_send_state_changed()

static void threadpool_send_state_changed ( struct ast_threadpool pool)
static

Notify the threadpool listener that the state has changed.

This notifies the threadpool listener via its state_changed callback.

Parameters
poolThe threadpool whose state has changed

Definition at line 180 of file threadpool.c.

181{
182 int active_size = ao2_container_count(pool->active_threads);
183 int idle_size = ao2_container_count(pool->idle_threads);
184
185 if (pool->listener && pool->listener->callbacks->state_changed) {
186 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187 }
188}
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36

References ast_threadpool::active_threads, ao2_container_count(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, ast_threadpool::listener, worker_thread::pool, and ast_threadpool_listener_callbacks::state_changed.

Referenced by queued_active_thread_idle(), queued_idle_thread_dead(), queued_set_size(), queued_task_pushed(), and queued_zombie_thread_dead().

◆ threadpool_tps_emptied()

static void threadpool_tps_emptied ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener emptied callback.

The threadpool queues a task to let the threadpool listener know that the threadpool no longer contains any tasks.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 655 of file threadpool.c.

656{
658 SCOPED_AO2LOCK(lock, pool);
659
660 if (pool->shutting_down) {
661 return;
662 }
663
664 if (pool->listener && pool->listener->callbacks->emptied) {
666 /* Nothing to do here but we need the check to keep the compiler happy. */
667 }
668 }
669}
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
Definition: threadpool.c:639

References ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_threadpool_listener::callbacks, ast_threadpool::control_tps, ast_threadpool_listener_callbacks::emptied, listener(), ast_threadpool::listener, lock, queued_emptied(), SCOPED_AO2LOCK, and ast_threadpool::shutting_down.

◆ threadpool_tps_shutdown()

static void threadpool_tps_shutdown ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener shutdown callback.

The threadpool will shut down and destroy all of its worker threads when this is called back. By the time this gets called, the taskprocessor's control taskprocessor has already been destroyed. Therefore there is no risk in outright destroying the worker threads here.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 680 of file threadpool.c.

681{
683
684 if (pool->listener && pool->listener->callbacks->shutdown) {
685 pool->listener->callbacks->shutdown(pool->listener);
686 }
690 ao2_cleanup(pool);
691}
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition: threadpool.h:67
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53

References ast_threadpool::active_threads, ao2_cleanup, ast_taskprocessor_listener_get_user_data(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, listener(), ast_threadpool::listener, ast_threadpool_listener_callbacks::shutdown, and ast_threadpool::zombie_threads.

◆ threadpool_tps_start()

static int threadpool_tps_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 444 of file threadpool.c.

445{
446 return 0;
447}

◆ threadpool_tps_task_pushed()

static void threadpool_tps_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Taskprocessor listener callback called when a task is added.

The threadpool uses this opportunity to queue a task on its control taskprocessor in order to activate idle threads and notify the threadpool listener that the task has been pushed.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data
was_emptyTrue if the taskprocessor was empty prior to the task being pushed

Definition at line 611 of file threadpool.c.

613{
615 struct task_pushed_data *tpd;
617
618 if (pool->shutting_down) {
619 return;
620 }
621
623 if (!tpd) {
624 return;
625 }
626
628 ast_free(tpd);
629 }
630}
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition: threadpool.c:565
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition: threadpool.c:466

References ast_free, ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_threadpool::control_tps, listener(), lock, task_pushed_data::pool, queued_task_pushed(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, task_pushed_data_alloc(), and task_pushed_data::was_empty.

◆ threadpool_zombie_thread_dead()

static void threadpool_zombie_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to kill a zombie thread.

This is called by a worker thread when it acknowledges that it is time for it to die.

Definition at line 301 of file threadpool.c.

303{
304 struct thread_worker_pair *pair;
306
307 if (pool->shutting_down) {
308 return;
309 }
310
312 if (!pair) {
313 return;
314 }
315
318 }
319}
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition: threadpool.c:284

References ast_taskprocessor_push(), ast_threadpool::control_tps, lock, thread_worker_pair::pool, queued_zombie_thread_dead(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), thread_worker_pair_free(), and thread_worker_pair::worker.

Referenced by worker_start().

◆ worker_active()

static void worker_active ( struct worker_thread worker)
static

Active loop for worker threads.

The worker will stay in this loop for its lifetime, executing tasks as they become available. If there are no tasks currently available, then the thread will go idle.

Parameters
workerThe worker thread executing tasks.

Definition at line 1124 of file threadpool.c.

1125{
1126 int alive;
1127
1128 /* The following is equivalent to
1129 *
1130 * while (threadpool_execute(worker->pool));
1131 *
1132 * However, reviewers have suggested in the past
1133 * doing that can cause optimizers to (wrongly)
1134 * optimize the code away.
1135 */
1136 do {
1137 alive = threadpool_execute(worker->pool);
1138 } while (alive);
1139}
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition: threadpool.c:362

References worker_thread::pool, and threadpool_execute().

Referenced by worker_start().

◆ worker_idle()

static int worker_idle ( struct worker_thread worker)
static

Idle function for worker threads.

The worker waits here until it gets told by the threadpool to wake up.

worker is locked before entering this function.

Parameters
workerThe idle worker
Return values
0The thread is being woken up so that it can conclude.
non-zeroThe thread is being woken up to do more work.

Definition at line 1153 of file threadpool.c.

1154{
1155 struct timeval start = ast_tvnow();
1156 struct timespec end = {
1157 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1158 .tv_nsec = start.tv_usec * 1000,
1159 };
1160 while (!worker->wake_up) {
1161 if (worker->options.idle_timeout <= 0) {
1162 ast_cond_wait(&worker->cond, &worker->lock);
1163 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1164 break;
1165 }
1166 }
1167
1168 if (!worker->wake_up) {
1169 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1170 threadpool_idle_thread_dead(worker->pool, worker);
1171 worker->state = DEAD;
1172 }
1173 worker->wake_up = 0;
1174 return worker->state == ALIVE;
1175}
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
ast_cond_t cond
Definition: threadpool.c:147
struct ast_threadpool_options options
Definition: threadpool.c:159
enum worker_state state
Definition: threadpool.c:155
ast_mutex_t lock
Definition: threadpool.c:149
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition: threadpool.c:332

References ALIVE, ast_cond_timedwait, ast_cond_wait, ast_debug, ast_tvnow(), worker_thread::cond, DEAD, end, ast_threadpool_options::idle_timeout, worker_thread::lock, worker_thread::options, worker_thread::pool, worker_thread::state, threadpool_idle_thread_dead(), and worker_thread::wake_up.

Referenced by worker_start().

◆ worker_set_state()

static int worker_set_state ( struct worker_thread worker,
enum worker_state  state 
)
static

Change a worker's state.

The threadpool calls into this function in order to let a worker know how it should proceed.

Return values
-1failure (state transition not permitted)
0success

Definition at line 1186 of file threadpool.c.

1187{
1188 SCOPED_MUTEX(lock, &worker->lock);
1189
1190 switch (state) {
1191 case ALIVE:
1192 /* This can occur due to a race condition between being told to go active
1193 * and an idle timeout happening.
1194 */
1195 if (worker->state == DEAD) {
1196 return -1;
1197 }
1198 ast_assert(worker->state != ZOMBIE);
1199 break;
1200 case DEAD:
1201 break;
1202 case ZOMBIE:
1203 ast_assert(worker->state != DEAD);
1204 break;
1205 }
1206
1207 worker->state = state;
1208 worker->wake_up = 1;
1209 ast_cond_signal(&worker->cond);
1210
1211 return 0;
1212}
enum cc_state state
Definition: ccss.c:393
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589

References ALIVE, ast_assert, ast_cond_signal, worker_thread::cond, DEAD, lock, worker_thread::lock, SCOPED_MUTEX, state, worker_thread::state, worker_thread::wake_up, and ZOMBIE.

Referenced by activate_thread(), worker_shutdown(), and zombify_threads().

◆ worker_shutdown()

static void worker_shutdown ( struct worker_thread worker)
static

shut a worker thread down

Set the worker dead and then wait for its thread to finish executing.

Parameters
workerThe worker thread to shut down

Definition at line 1010 of file threadpool.c.

1011{
1012 worker_set_state(worker, DEAD);
1013 if (worker->thread != AST_PTHREADT_NULL) {
1014 pthread_join(worker->thread, NULL);
1015 worker->thread = AST_PTHREADT_NULL;
1016 }
1017}
#define AST_PTHREADT_NULL
Definition: lock.h:66
pthread_t thread
Definition: threadpool.c:151

References AST_PTHREADT_NULL, DEAD, NULL, worker_thread::thread, and worker_set_state().

Referenced by worker_thread_destroy().

◆ worker_start()

static void * worker_start ( void *  arg)
static

start point for worker threads

Worker threads start in the active state but may immediately go idle if there is no work to be done

Parameters
argThe worker thread

Definition at line 1044 of file threadpool.c.

1045{
1046 struct worker_thread *worker = arg;
1047 enum worker_state saved_state;
1048
1049 if (worker->options.thread_start) {
1050 worker->options.thread_start();
1051 }
1052
1053 ast_mutex_lock(&worker->lock);
1054 while (worker_idle(worker)) {
1055 ast_mutex_unlock(&worker->lock);
1056 worker_active(worker);
1057 ast_mutex_lock(&worker->lock);
1058 if (worker->state != ALIVE) {
1059 break;
1060 }
1061 threadpool_active_thread_idle(worker->pool, worker);
1062 }
1063 saved_state = worker->state;
1064 ast_mutex_unlock(&worker->lock);
1065
1066 /* Reaching this portion means the thread is
1067 * on death's door. It may have been killed while
1068 * it was idle, in which case it can just die
1069 * peacefully. If it's a zombie, though, then
1070 * it needs to let the pool know so
1071 * that the thread can be removed from the
1072 * list of zombie threads.
1073 */
1074 if (saved_state == ZOMBIE) {
1075 threadpool_zombie_thread_dead(worker->pool, worker);
1076 }
1077
1078 if (worker->options.thread_end) {
1079 worker->options.thread_end();
1080 }
1081 return NULL;
1082}
#define ast_mutex_unlock(a)
Definition: lock.h:190
#define ast_mutex_lock(a)
Definition: lock.h:189
void(* thread_start)(void)
Function to call when a thread starts.
Definition: threadpool.h:117
void(* thread_end)(void)
Function to call when a thread ends.
Definition: threadpool.h:124
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Definition: threadpool.c:1124
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition: threadpool.c:256
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
Definition: threadpool.c:1153
worker_state
states for worker threads
Definition: threadpool.c:120
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition: threadpool.c:301

References ALIVE, ast_mutex_lock, ast_mutex_unlock, worker_thread::lock, NULL, worker_thread::options, worker_thread::pool, worker_thread::state, ast_threadpool_options::thread_end, ast_threadpool_options::thread_start, threadpool_active_thread_idle(), threadpool_zombie_thread_dead(), worker_active(), worker_idle(), and ZOMBIE.

Referenced by worker_thread_start().

◆ worker_thread_alloc()

static struct worker_thread * worker_thread_alloc ( struct ast_threadpool pool)
static

Allocate and initialize a new worker thread.

This will create, initialize, and start the thread.

Parameters
poolThe threadpool to which the worker will be added
Return values
NULLFailed to allocate or start the worker thread
non-NULLThe newly-created worker thread

Definition at line 1093 of file threadpool.c.

1094{
1095 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1096 if (!worker) {
1097 return NULL;
1098 }
1100 ast_mutex_init(&worker->lock);
1101 ast_cond_init(&worker->cond, NULL);
1102 worker->pool = pool;
1103 worker->thread = AST_PTHREADT_NULL;
1104 worker->state = ALIVE;
1105 worker->options = pool->options;
1106 return worker;
1107}
#define ast_mutex_init(pmutex)
Definition: lock.h:186
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
static int worker_id_counter
Definition: threadpool.c:985
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Definition: threadpool.c:1026

References ALIVE, ao2_alloc, ast_atomic_fetchadd_int(), ast_cond_init, ast_mutex_init, AST_PTHREADT_NULL, worker_thread::cond, worker_thread::id, worker_thread::lock, NULL, ast_threadpool::options, worker_thread::options, worker_thread::pool, worker_thread::state, worker_thread::thread, worker_id_counter, and worker_thread_destroy().

Referenced by grow().

◆ worker_thread_cmp()

static int worker_thread_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 994 of file threadpool.c.

995{
996 struct worker_thread *worker1 = obj;
997 struct worker_thread *worker2 = arg;
998
999 return worker1->id == worker2->id ? CMP_MATCH : 0;
1000}

References CMP_MATCH, and worker_thread::id.

Referenced by threadpool_alloc().

◆ worker_thread_destroy()

static void worker_thread_destroy ( void *  obj)
static

Worker thread destructor.

Called automatically when refcount reaches 0. Shuts down the worker thread and destroys its component parts

Definition at line 1026 of file threadpool.c.

1027{
1028 struct worker_thread *worker = obj;
1029 ast_debug(3, "Destroying worker thread %d\n", worker->id);
1030 worker_shutdown(worker);
1031 ast_mutex_destroy(&worker->lock);
1032 ast_cond_destroy(&worker->cond);
1033}
#define ast_mutex_destroy(a)
Definition: lock.h:188
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
Definition: threadpool.c:1010

References ast_cond_destroy, ast_debug, ast_mutex_destroy, worker_thread::cond, worker_thread::id, worker_thread::lock, and worker_shutdown().

Referenced by worker_thread_alloc().

◆ worker_thread_hash()

static int worker_thread_hash ( const void *  obj,
int  flags 
)
static

Definition at line 987 of file threadpool.c.

988{
989 const struct worker_thread *worker = obj;
990
991 return worker->id;
992}

References worker_thread::id.

Referenced by threadpool_alloc().

◆ worker_thread_start()

static int worker_thread_start ( struct worker_thread worker)
static

Definition at line 1109 of file threadpool.c.

1110{
1111 return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1112}
static void * worker_start(void *arg)
start point for worker threads
Definition: threadpool.c:1044
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584

References ast_pthread_create, NULL, worker_thread::thread, and worker_start().

Referenced by grow().

◆ zombify_threads()

static int zombify_threads ( void *  obj,
void *  arg,
void *  data,
int  flags 
)
static

ao2 callback to zombify a set number of threads.

Threads will be zombified as long as the counter has not reached zero. The counter is decremented with each thread that is zombified.

Zombifying a thread involves removing it from its current container, adding it to the zombie container, and changing the state of the worker to a zombie

This callback is called from the threadpool control taskprocessor thread.

Parameters
objThe worker thread that may be zombified
argThe pool to which the worker belongs
dataThe counter
flagsUnused
Return values
CMP_MATCHThe zombified thread should be removed from its current container
CMP_STOPStop attempting to zombify threads

Definition at line 746 of file threadpool.c.

747{
748 struct worker_thread *worker = obj;
749 struct ast_threadpool *pool = arg;
750 int *num_to_zombify = data;
751
752 if ((*num_to_zombify)-- > 0) {
753 if (!ao2_link(pool->zombie_threads, worker)) {
754 ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
755 return 0;
756 }
757 worker_set_state(worker, ZOMBIE);
758 return CMP_MATCH;
759 } else {
760 return CMP_STOP;
761 }
762}

References ao2_link, ast_log, CMP_MATCH, CMP_STOP, worker_thread::id, LOG_WARNING, worker_set_state(), ZOMBIE, and ast_threadpool::zombie_threads.

Referenced by shrink().

Variable Documentation

◆ serializer_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static
Initial value:
= {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: threadpool.c:1359
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1377
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1371

Definition at line 1387 of file threadpool.c.

Referenced by ast_threadpool_serializer_group().

◆ threadpool_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
static

Table of taskprocessor listener callbacks for threadpool's main taskprocessor.

Definition at line 696 of file threadpool.c.

Referenced by ast_threadpool_create().

◆ worker_id_counter

int worker_id_counter
static

A monotonically increasing integer used for worker thread identification.

Definition at line 985 of file threadpool.c.

Referenced by worker_thread_alloc().