Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions | Variables
threadpool.c File Reference
#include "asterisk.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
Include dependency graph for threadpool.c:

Go to the source code of this file.

Data Structures

struct  ast_threadpool
 An opaque threadpool structure. More...
 
struct  ast_threadpool_listener
 listener for a threadpool More...
 
struct  pool_options_pair
 
struct  serializer
 
struct  set_size_data
 Helper struct used for queued operations that change the size of the threadpool. More...
 
struct  task_pushed_data
 helper used for queued task when tasks are pushed More...
 
struct  thread_worker_pair
 Struct used for queued operations involving worker state changes. More...
 
struct  worker_thread
 

Macros

#define ast_threadpool_push_internal(pool, task, data)    __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define THREAD_BUCKETS   89
 

Enumerations

enum  worker_state { ALIVE , ZOMBIE , DEAD }
 states for worker threads More...
 

Functions

int __ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
 Push a task to the threadpool.
 
static int activate_thread (void *obj, void *arg, int flags)
 Activate idle threads.
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool.
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener.
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data.
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data)
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue.
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool.
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread.
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool.
 
void ast_threadpool_set_size (struct ast_threadpool *pool, unsigned int size)
 Set the number of threads for the thread pool.
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it.
 
 AST_THREADSTORAGE_RAW (current_serializer)
 
static int execute_tasks (void *data)
 
static void grow (struct ast_threadpool *pool, int delta)
 Add threads to the threadpool.
 
static int kill_threads (void *obj, void *arg, int flags)
 ao2 callback to kill a set number of threads.
 
static int queued_active_thread_idle (void *data)
 Move a worker thread from the active container to the idle container.
 
static int queued_emptied (void *data)
 Queued task that handles the case where the threadpool's taskprocessor is emptied.
 
static int queued_idle_thread_dead (void *data)
 
static int queued_set_size (void *data)
 Change the size of the threadpool.
 
static int queued_task_pushed (void *data)
 Queued task called when tasks are pushed into the threadpool.
 
static int queued_zombie_thread_dead (void *data)
 Kill a zombie thread.
 
static struct serializerserializer_create (struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_dtor (void *obj)
 
static void serializer_shutdown (struct ast_taskprocessor_listener *listener)
 
static int serializer_start (struct ast_taskprocessor_listener *listener)
 
static void serializer_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static struct set_size_dataset_size_data_alloc (struct ast_threadpool *pool, unsigned int size)
 Allocate and initialize a set_size_data.
 
static void shrink (struct ast_threadpool *pool, int delta)
 Remove threads from the threadpool.
 
static struct task_pushed_datatask_pushed_data_alloc (struct ast_threadpool *pool, int was_empty)
 Allocate and initialize a task_pushed_data.
 
static struct thread_worker_pairthread_worker_pair_alloc (struct ast_threadpool *pool, struct worker_thread *worker)
 Allocate and initialize a thread_worker_pair.
 
static void thread_worker_pair_free (struct thread_worker_pair *pair)
 Destructor for thread_worker_pair.
 
static void threadpool_active_thread_idle (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to move a thread from the active list to the idle list.
 
static struct ast_threadpoolthreadpool_alloc (const char *name, const struct ast_threadpool_options *options)
 Allocate a threadpool.
 
static void threadpool_destructor (void *obj)
 Destroy a threadpool's components.
 
static int threadpool_execute (struct ast_threadpool *pool)
 Execute a task in the threadpool.
 
static void threadpool_idle_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 
static void threadpool_send_state_changed (struct ast_threadpool *pool)
 Notify the threadpool listener that the state has changed.
 
static void threadpool_tps_emptied (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener emptied callback.
 
static void threadpool_tps_shutdown (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener shutdown callback.
 
static int threadpool_tps_start (struct ast_taskprocessor_listener *listener)
 
static void threadpool_tps_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 Taskprocessor listener callback called when a task is added.
 
static void threadpool_zombie_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to kill a zombie thread.
 
static void worker_active (struct worker_thread *worker)
 Active loop for worker threads.
 
static int worker_idle (struct worker_thread *worker)
 Idle function for worker threads.
 
static int worker_set_state (struct worker_thread *worker, enum worker_state state)
 Change a worker's state.
 
static void worker_shutdown (struct worker_thread *worker)
 shut a worker thread down
 
static void * worker_start (void *arg)
 start point for worker threads
 
static struct worker_threadworker_thread_alloc (struct ast_threadpool *pool)
 Allocate and initialize a new worker thread.
 
static int worker_thread_cmp (void *obj, void *arg, int flags)
 
static void worker_thread_destroy (void *obj)
 Worker thread destructor.
 
static int worker_thread_hash (const void *obj, int flags)
 
static int worker_thread_start (struct worker_thread *worker)
 
static int zombify_threads (void *obj, void *arg, void *data, int flags)
 ao2 callback to zombify a set number of threads.
 

Variables

static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
 
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
 Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
 
static int worker_id_counter
 

Macro Definition Documentation

◆ ast_threadpool_push_internal

#define ast_threadpool_push_internal (   pool,
  task,
  data 
)     __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 957 of file threadpool.c.

◆ THREAD_BUCKETS

#define THREAD_BUCKETS   89

Definition at line 28 of file threadpool.c.

Enumeration Type Documentation

◆ worker_state

states for worker threads

Enumerator
ALIVE 

The worker is either active or idle

ZOMBIE 

The worker has been asked to shut down but may still be in the process of executing tasks. This transition happens when the threadpool needs to shrink and needs to kill active threads in order to do so.

DEAD 

The worker has been asked to shut down. Typically only idle threads go to this state directly, but active threads may go straight to this state when the threadpool is shut down.

Definition at line 120 of file threadpool.c.

120 {
121 /*! The worker is either active or idle */
122 ALIVE,
123 /*!
124 * The worker has been asked to shut down but
125 * may still be in the process of executing tasks.
126 * This transition happens when the threadpool needs
127 * to shrink and needs to kill active threads in order
128 * to do so.
129 */
130 ZOMBIE,
131 /*!
132 * The worker has been asked to shut down. Typically
133 * only idle threads go to this state directly, but
134 * active threads may go straight to this state when
135 * the threadpool is shut down.
136 */
137 DEAD,
138};
@ DEAD
Definition threadpool.c:137
@ ALIVE
Definition threadpool.c:122
@ ZOMBIE
Definition threadpool.c:130

Function Documentation

◆ __ast_threadpool_push()

int __ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 961 of file threadpool.c.

963{
964 SCOPED_AO2LOCK(lock, pool);
965 if (!pool->shutting_down) {
966 return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
967 }
968 return -1;
969}
ast_mutex_t lock
Definition app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
struct ast_taskprocessor * tps
The main taskprocessor.
Definition threadpool.c:67
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.

References __ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by ast_threadpool_push().

◆ activate_thread()

static int activate_thread ( void *  obj,
void *  arg,
int  flags 
)
static

Activate idle threads.

This function always returns CMP_MATCH because all workers that this function acts on need to be seen as matches so they are unlinked from the list of idle threads.

Called as an ao2_callback in the threadpool's control taskprocessor thread.

Parameters
objThe worker to activate
argThe pool where the worker belongs
flags
Return values
CMP_MATCH

Definition at line 492 of file threadpool.c.

493{
494 struct worker_thread *worker = obj;
495 struct ast_threadpool *pool = arg;
496
497 if (!ao2_link(pool->active_threads, worker)) {
498 /* If we can't link the idle thread into the active container, then
499 * we'll just leave the thread idle and not wake it up.
500 */
501 ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
502 worker->id);
503 return 0;
504 }
505
506 if (worker_set_state(worker, ALIVE)) {
507 ast_debug(1, "Failed to activate thread %d. It is dead\n",
508 worker->id);
509 /* The worker thread will no longer exist in the active threads or
510 * idle threads container after this.
511 */
512 ao2_unlink(pool->active_threads, worker);
513 }
514
515 return CMP_MATCH;
516}
#define ast_log
Definition astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
@ CMP_MATCH
Definition astobj2.h:1027
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_WARNING
An opaque threadpool structure.
Definition threadpool.c:36
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
Definition threadpool.c:43
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.

References ast_threadpool::active_threads, ALIVE, ao2_link, ao2_unlink, ast_debug, ast_log, CMP_MATCH, worker_thread::id, LOG_WARNING, and worker_set_state().

Referenced by queued_set_size(), and queued_task_pushed().

◆ ast_threadpool_create()

struct ast_threadpool * ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 915 of file threadpool.c.

918{
919 struct ast_taskprocessor *tps;
920 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922 char *fullname;
923
925 if (!pool) {
926 return NULL;
927 }
928
930 if (!tps_listener) {
931 return NULL;
932 }
933
934 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936 return NULL;
937 }
938
939 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940 sprintf(fullname, "%s/pool", name); /* Safe */
941 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942 if (!tps) {
943 return NULL;
944 }
945
946 pool->tps = tps;
947 if (listener) {
948 ao2_ref(listener, +1);
949 pool->listener = listener;
950 }
951 ast_threadpool_set_size(pool, pool->options.initial_size);
952 ao2_ref(pool, +1);
953 return pool;
954}
static void * listener(void *unused)
Definition asterisk.c:1530
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
static const char name[]
Definition format_mp3.c:68
#define NULL
Definition resample.c:96
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct test_options options
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition threadpool.c:874
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition threadpool.c:695
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition threadpool.c:404
#define AST_THREADPOOL_OPTIONS_VERSION
Definition threadpool.h:73
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), listener(), LOG_WARNING, name, NULL, options, RAII_VAR, threadpool_alloc(), and threadpool_tps_listener_callbacks.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), and load_module().

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener * ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 893 of file threadpool.c.

895{
897 if (!listener) {
898 return NULL;
899 }
900 listener->callbacks = callbacks;
901 listener->user_data = user_data;
902 return listener;
903}
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
struct @506 callbacks
listener for a threadpool
Definition threadpool.c:110

References ao2_alloc, callbacks, listener(), NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), and handle_cli_threadpool_push_serializer_efficiency().

◆ ast_threadpool_listener_get_user_data()

void * ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 905 of file threadpool.c.

906{
907 return listener->user_data;
908}

References listener().

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Definition at line 972 of file threadpool.c.

973{
974 return __ast_threadpool_push(pool, task, data, NULL, 0, NULL);
975}
int __ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to the threadpool.
Definition threadpool.c:961

References __ast_threadpool_push(), NULL, and task().

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1347 of file threadpool.c.

1348{
1349 return ast_taskprocessor_size(pool->tps);
1350}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

◆ ast_threadpool_serializer()

struct ast_taskprocessor * ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1342 of file threadpool.c.

1343{
1345}
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.

References ast_threadpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), and handle_cli_threadpool_push_serializer_efficiency().

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor * ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1307 of file threadpool.c.

1308{
1309 return ast_threadstorage_get_ptr(&current_serializer);
1310}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), rfc3326_outgoing_response(), and serializer_efficiency_task().

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor * ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1312 of file threadpool.c.

1314{
1315 struct serializer *ser;
1317 struct ast_taskprocessor *tps;
1318
1319 ser = serializer_create(pool, shutdown_group);
1320 if (!ser) {
1321 return NULL;
1322 }
1323
1325 if (!listener) {
1326 ao2_ref(ser, -1);
1327 return NULL;
1328 }
1329
1331 if (!tps) {
1332 /* ser ref transferred to listener but not cleaned without tps */
1333 ao2_ref(ser, -1);
1334 } else if (shutdown_group) {
1336 }
1337
1338 ao2_ref(listener, -1);
1339 return tps;
1340}
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 874 of file threadpool.c.

875{
876 struct set_size_data *ssd;
878
879 if (pool->shutting_down) {
880 return;
881 }
882
884 if (!ssd) {
885 return;
886 }
887
889 ast_free(ssd);
890 }
891}
#define ast_free(a)
Definition astmm.h:180
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition threadpool.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition threadpool.c:802
unsigned int size
Definition threadpool.c:806
struct ast_threadpool * pool
Definition threadpool.c:804
#define ast_taskprocessor_push(tps, task_exe, datap)
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition threadpool.c:838
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition threadpool.c:814

References ast_free, ast_taskprocessor_push, ast_threadpool::control_tps, lock, set_size_data::pool, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), ast_threadpool::shutting_down, and set_size_data::size.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and ast_threadpool_create().

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 977 of file threadpool.c.

978{
979 if (!pool) {
980 return;
981 }
982 /* Shut down the taskprocessors and everything else just
983 * takes care of itself via the taskprocessor callbacks
984 */
985 ao2_lock(pool);
986 pool->shutting_down = 1;
987 ao2_unlock(pool);
990}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), load_module(), and unload_module().

◆ AST_THREADSTORAGE_RAW()

AST_THREADSTORAGE_RAW ( current_serializer  )

◆ execute_tasks()

static int execute_tasks ( void *  data)
static

Definition at line 1259 of file threadpool.c.

1260{
1261 struct ast_taskprocessor *tps = data;
1262
1263 ast_threadstorage_set_ptr(&current_serializer, tps);
1264 while (ast_taskprocessor_execute(tps)) {
1265 /* No-op */
1266 }
1267 ast_threadstorage_set_ptr(&current_serializer, NULL);
1268
1270 return 0;
1271}
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References ast_taskprocessor_execute(), ast_taskprocessor_unreference(), ast_threadstorage_set_ptr(), and NULL.

Referenced by serializer_task_pushed().

◆ grow()

static void grow ( struct ast_threadpool pool,
int  delta 
)
static

Add threads to the threadpool.

This function is called from the threadpool's control taskprocessor thread.

Parameters
poolThe pool that is expanding
deltaThe number of threads to add to the pool

Definition at line 525 of file threadpool.c.

526{
527 int i;
528
529 int current_size = ao2_container_count(pool->active_threads) +
531
532 if (pool->options.max_size && current_size + delta > pool->options.max_size) {
533 delta = pool->options.max_size - current_size;
534 }
535
536 ast_debug(3, "Increasing threadpool %s's size by %d\n",
537 ast_taskprocessor_name(pool->tps), delta);
538
539 for (i = 0; i < delta; ++i) {
540 struct worker_thread *worker = worker_thread_alloc(pool);
541 if (!worker) {
542 return;
543 }
544 if (ao2_link(pool->idle_threads, worker)) {
545 if (worker_thread_start(worker)) {
546 ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
548 }
549 } else {
550 ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
551 }
552 ao2_ref(worker, -1);
553 }
554}
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define LOG_ERROR
int max_size
Maximum number of threads a pool may have.
Definition threadpool.h:112
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
Definition threadpool.c:48
struct ast_threadpool_options options
Definition threadpool.c:100
struct ast_threadpool * pool
Definition threadpool.c:153
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
static int worker_thread_start(struct worker_thread *worker)

References ast_threadpool::active_threads, ao2_container_count(), ao2_link, ao2_ref, ao2_unlink, ast_debug, ast_log, ast_taskprocessor_name(), worker_thread::id, ast_threadpool::idle_threads, LOG_ERROR, LOG_WARNING, ast_threadpool_options::max_size, ast_threadpool::options, worker_thread::pool, ast_threadpool::tps, worker_thread_alloc(), and worker_thread_start().

Referenced by __ast_string_field_ptr_build_va(), __ast_string_field_ptr_grow(), queued_set_size(), and queued_task_pushed().

◆ kill_threads()

static int kill_threads ( void *  obj,
void *  arg,
int  flags 
)
static

ao2 callback to kill a set number of threads.

Threads will be unlinked from the container as long as the counter has not reached zero. The counter is decremented with each thread that is removed.

Parameters
objThe worker thread up for possible destruction
argThe counter
flagsUnused
Return values
CMP_MATCHThe counter has not reached zero, so this flag should be removed.
CMP_STOPThe counter has reached zero so no more threads should be removed.

Definition at line 714 of file threadpool.c.

715{
716 int *num_to_kill = arg;
717
718 if (*num_to_kill > 0) {
719 --(*num_to_kill);
720 return CMP_MATCH;
721 } else {
722 return CMP_STOP;
723 }
724}
@ CMP_STOP
Definition astobj2.h:1028

References CMP_MATCH, CMP_STOP, and ast_taskprocessor_listener_callbacks::start.

Referenced by shrink().

◆ queued_active_thread_idle()

static int queued_active_thread_idle ( void *  data)
static

Move a worker thread from the active container to the idle container.

This function is called from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the worker to move.
Returns
0

Definition at line 235 of file threadpool.c.

236{
237 struct thread_worker_pair *pair = data;
238
239 ao2_link(pair->pool->idle_threads, pair->worker);
240 ao2_unlink(pair->pool->active_threads, pair->worker);
241
243
245 return 0;
246}
Struct used for queued operations involving worker state changes.
Definition threadpool.c:193
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition threadpool.c:203
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition threadpool.c:180

References ao2_link, ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_active_thread_idle().

◆ queued_emptied()

static int queued_emptied ( void *  data)
static

Queued task that handles the case where the threadpool's taskprocessor is emptied.

This simply lets the threadpool's listener know that the threadpool is devoid of tasks

Parameters
dataThe pool that has become empty
Returns
0

Definition at line 638 of file threadpool.c.

639{
640 struct ast_threadpool *pool = data;
641
642 /* We already checked for existence of this callback when this was queued */
643 pool->listener->callbacks->emptied(pool, pool->listener);
644 return 0;
645}
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
Definition threadpool.h:58
const struct ast_threadpool_listener_callbacks * callbacks
Definition threadpool.c:112
struct ast_threadpool_listener * listener
Definition threadpool.c:38

References ast_threadpool_listener::callbacks, ast_threadpool_listener_callbacks::emptied, and ast_threadpool::listener.

Referenced by threadpool_tps_emptied().

◆ queued_idle_thread_dead()

static int queued_idle_thread_dead ( void *  data)
static

Definition at line 321 of file threadpool.c.

322{
323 struct thread_worker_pair *pair = data;
324
325 ao2_unlink(pair->pool->idle_threads, pair->worker);
327
329 return 0;
330}

References ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_idle_thread_dead().

◆ queued_set_size()

static int queued_set_size ( void *  data)
static

Change the size of the threadpool.

This can either result in shrinking or growing the threadpool depending on the new desired size and the current size.

This function is run from the threadpool control taskprocessor thread

Parameters
dataA set_size_data used for determining how to act
Returns
0

Definition at line 838 of file threadpool.c.

839{
840 struct set_size_data *ssd = data;
841 struct ast_threadpool *pool = ssd->pool;
842 unsigned int num_threads = ssd->size;
843
844 /* We don't count zombie threads as being "live" when potentially resizing */
845 unsigned int current_size = ao2_container_count(pool->active_threads) +
847
848 ast_free(ssd);
849
850 if (current_size == num_threads) {
851 ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
852 num_threads, current_size);
853 return 0;
854 }
855
856 if (current_size < num_threads) {
858 activate_thread, pool);
859
860 /* As the above may have altered the number of current threads update it */
861 current_size = ao2_container_count(pool->active_threads) +
863 grow(pool, num_threads - current_size);
865 activate_thread, pool);
866 } else {
867 shrink(pool, current_size - num_threads);
868 }
869
871 return 0;
872}
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
Definition astobj2.h:1693
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_NODATA
Definition astobj2.h:1044
@ OBJ_MULTIPLE
Definition astobj2.h:1049
@ OBJ_UNLINK
Definition astobj2.h:1039
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition threadpool.c:492
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition threadpool.c:525
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition threadpool.c:775

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_debug, ast_free, grow(), ast_threadpool::idle_threads, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, set_size_data::pool, shrink(), set_size_data::size, and threadpool_send_state_changed().

Referenced by ast_threadpool_set_size().

◆ queued_task_pushed()

static int queued_task_pushed ( void *  data)
static

Queued task called when tasks are pushed into the threadpool.

This function first calls into the threadpool's listener to let it know that a task has been pushed. It then wakes up all idle threads and moves them into the active thread container.

Parameters
dataA task_pushed_data
Returns
0

Definition at line 565 of file threadpool.c.

566{
567 struct task_pushed_data *tpd = data;
568 struct ast_threadpool *pool = tpd->pool;
569 int was_empty = tpd->was_empty;
570 unsigned int existing_active;
571
572 ast_free(tpd);
573
574 if (pool->listener && pool->listener->callbacks->task_pushed) {
575 pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
576 }
577
578 existing_active = ao2_container_count(pool->active_threads);
579
580 /* The first pass transitions any existing idle threads to be active, and
581 * will also remove any worker threads that have recently entered the dead
582 * state.
583 */
585 activate_thread, pool);
586
587 /* If no idle threads could be transitioned to active grow the pool as permitted. */
588 if (ao2_container_count(pool->active_threads) == existing_active) {
589 if (!pool->options.auto_increment) {
590 return 0;
591 }
592 grow(pool, pool->options.auto_increment);
593 /* An optional second pass transitions any newly added threads. */
595 activate_thread, pool);
596 }
597
599 return 0;
600}
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition threadpool.h:49
int auto_increment
Number of threads to increment pool by.
Definition threadpool.h:92
helper used for queued task when tasks are pushed
Definition threadpool.c:452
struct ast_threadpool * pool
Definition threadpool.c:454

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_free, ast_threadpool_options::auto_increment, ast_threadpool_listener::callbacks, grow(), ast_threadpool::idle_threads, ast_threadpool::listener, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::options, task_pushed_data::pool, ast_threadpool_listener_callbacks::task_pushed, threadpool_send_state_changed(), and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

◆ queued_zombie_thread_dead()

static int queued_zombie_thread_dead ( void *  data)
static

Kill a zombie thread.

This runs from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the zombie thread
Returns
0

Definition at line 284 of file threadpool.c.

285{
286 struct thread_worker_pair *pair = data;
287
288 ao2_unlink(pair->pool->zombie_threads, pair->worker);
290
292 return 0;
293}

References ao2_unlink, thread_worker_pair_free(), and threadpool_send_state_changed().

Referenced by threadpool_zombie_thread_dead().

◆ serializer_create()

static struct serializer * serializer_create ( struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)
static

Definition at line 1242 of file threadpool.c.

1244{
1245 struct serializer *ser;
1246
1248 if (!ser) {
1249 return NULL;
1250 }
1251 ao2_ref(pool, +1);
1252 ser->pool = pool;
1254 return ser;
1255}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
struct ast_taskpool * pool
Definition taskpool.c:698
struct ast_serializer_shutdown_group * shutdown_group
Definition taskpool.c:700
static void serializer_dtor(void *obj)

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, NULL, serializer::pool, serializer_dtor(), serializer::shutdown_group, and shutdown_group.

Referenced by ast_threadpool_serializer_group().

◆ serializer_dtor()

static void serializer_dtor ( void *  obj)
static

Definition at line 1232 of file threadpool.c.

1233{
1234 struct serializer *ser = obj;
1235
1236 ao2_cleanup(ser->pool);
1237 ser->pool = NULL;
1239 ser->shutdown_group = NULL;
1240}

References ao2_cleanup, NULL, serializer::pool, and serializer::shutdown_group.

Referenced by serializer_create().

◆ serializer_shutdown()

static void serializer_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 1291 of file threadpool.c.

1292{
1294
1295 if (ser->shutdown_group) {
1297 }
1298 ao2_cleanup(ser);
1299}
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.

References ao2_cleanup, ast_serializer_shutdown_group_dec(), ast_taskprocessor_listener_get_user_data(), listener(), and serializer::shutdown_group.

◆ serializer_start()

static int serializer_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 1285 of file threadpool.c.

1286{
1287 /* No-op */
1288 return 0;
1289}

◆ serializer_task_pushed()

static void serializer_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 1273 of file threadpool.c.

1274{
1275 if (was_empty) {
1278
1279 if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1281 }
1282 }
1283}
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
static int execute_tasks(void *data)
#define ast_threadpool_push(pool, task, data)
Definition threadpool.h:194

References ast_taskprocessor_listener_get_tps(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), ast_threadpool_push, execute_tasks(), listener(), and serializer::pool.

◆ set_size_data_alloc()

static struct set_size_data * set_size_data_alloc ( struct ast_threadpool pool,
unsigned int  size 
)
static

Allocate and initialize a set_size_data.

Parameters
poolThe pool for the set_size_data
sizeThe size to store in the set_size_data

Definition at line 814 of file threadpool.c.

816{
817 struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
818 if (!ssd) {
819 return NULL;
820 }
821
822 ssd->pool = pool;
823 ssd->size = size;
824 return ssd;
825}
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191

References ast_malloc, NULL, set_size_data::pool, and set_size_data::size.

Referenced by ast_threadpool_set_size().

◆ shrink()

static void shrink ( struct ast_threadpool pool,
int  delta 
)
static

Remove threads from the threadpool.

The preference is to kill idle threads. However, if there are more threads to remove than there are idle threads, then active threads will be zombified instead.

This function is called from the threadpool control taskprocessor thread.

Parameters
poolThe threadpool to remove threads from
deltaThe number of threads to remove

Definition at line 775 of file threadpool.c.

776{
777 /*
778 * Preference is to kill idle threads, but
779 * we'll move on to deactivating active threads
780 * if we have to
781 */
782 int idle_threads = ao2_container_count(pool->idle_threads);
783 int idle_threads_to_kill = MIN(delta, idle_threads);
784 int active_threads_to_zombify = delta - idle_threads_to_kill;
785
786 ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
788
790 kill_threads, &idle_threads_to_kill);
791
792 ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
794
796 zombify_threads, pool, &active_threads_to_zombify);
797}
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition astobj2.h:1723
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition threadpool.c:714
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition threadpool.c:745
#define MIN(a, b)
Definition utils.h:252

References ast_threadpool::active_threads, ao2_callback, ao2_callback_data, ao2_container_count(), ast_debug, ast_taskprocessor_name(), ast_threadpool::idle_threads, kill_threads(), MIN, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::tps, and zombify_threads().

Referenced by queued_set_size().

◆ task_pushed_data_alloc()

static struct task_pushed_data * task_pushed_data_alloc ( struct ast_threadpool pool,
int  was_empty 
)
static

Allocate and initialize a task_pushed_data.

Parameters
poolThe threadpool to set in the task_pushed_data
was_emptyThe was_empty value to set in the task_pushed_data
Return values
NULLUnable to allocate task_pushed_data
non-NULLThe newly-allocated task_pushed_data

Definition at line 466 of file threadpool.c.

468{
469 struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470
471 if (!tpd) {
472 return NULL;
473 }
474 tpd->pool = pool;
475 tpd->was_empty = was_empty;
476 return tpd;
477}

References ast_malloc, NULL, task_pushed_data::pool, and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

◆ thread_worker_pair_alloc()

static struct thread_worker_pair * thread_worker_pair_alloc ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Allocate and initialize a thread_worker_pair.

Parameters
poolThreadpool to assign to the thread_worker_pair
workerWorker thread to assign to the thread_worker_pair

Definition at line 214 of file threadpool.c.

216{
217 struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218 if (!pair) {
219 return NULL;
220 }
221 pair->pool = pool;
222 ao2_ref(worker, +1);
223 pair->worker = worker;
224
225 return pair;
226}
struct ast_threadpool * pool
Definition threadpool.c:195
struct worker_thread * worker
Definition threadpool.c:197

References ao2_ref, ast_malloc, NULL, thread_worker_pair::pool, and thread_worker_pair::worker.

Referenced by threadpool_active_thread_idle(), threadpool_idle_thread_dead(), and threadpool_zombie_thread_dead().

◆ thread_worker_pair_free()

static void thread_worker_pair_free ( struct thread_worker_pair pair)
static

◆ threadpool_active_thread_idle()

static void threadpool_active_thread_idle ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to move a thread from the active list to the idle list.

This is called by a worker thread when it runs out of tasks to perform and goes idle.

Parameters
poolThe threadpool to which the worker belongs
workerThe worker thread that has gone idle

Definition at line 256 of file threadpool.c.

258{
259 struct thread_worker_pair *pair;
261
262 if (pool->shutting_down) {
263 return;
264 }
265
267 if (!pair) {
268 return;
269 }
270
273 }
274}
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition threadpool.c:214
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition threadpool.c:235

References ast_taskprocessor_push, ast_threadpool::control_tps, lock, thread_worker_pair::pool, queued_active_thread_idle(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), thread_worker_pair_free(), and thread_worker_pair::worker.

Referenced by worker_start().

◆ threadpool_alloc()

static struct ast_threadpool * threadpool_alloc ( const char *  name,
const struct ast_threadpool_options options 
)
static

Allocate a threadpool.

This is implemented as a taskprocessor listener's alloc callback. This is because the threadpool exists as the private data on a taskprocessor listener.

Parameters
nameThe name of the threadpool.
optionsThe options the threadpool uses.
Return values
NULLCould not initialize threadpool properly
non-NULLThe newly-allocated threadpool

Definition at line 404 of file threadpool.c.

405{
406 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
407 struct ast_str *control_tps_name;
408
409 pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410 control_tps_name = ast_str_create(64);
411 if (!pool || !control_tps_name) {
412 ast_free(control_tps_name);
413 return NULL;
414 }
415
416 ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417
418 pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419 ast_free(control_tps_name);
420 if (!pool->control_tps) {
421 return NULL;
422 }
423 pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425 if (!pool->active_threads) {
426 return NULL;
427 }
430 if (!pool->idle_threads) {
431 return NULL;
432 }
433 pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435 if (!pool->zombie_threads) {
436 return NULL;
437 }
438 pool->options = *options;
439
440 ao2_ref(pool, +1);
441 return pool;
442}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition strings.h:659
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition strings.h:1113
char *attribute_pure ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition strings.h:761
Support for dynamic strings.
Definition strings.h:623
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
static int worker_thread_hash(const void *obj, int flags)
Definition threadpool.c:998
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
Definition threadpool.c:386
static int worker_thread_cmp(void *obj, void *arg, int flags)
#define THREAD_BUCKETS
Definition threadpool.c:28

References ao2_alloc, AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_ref, ast_free, ast_str_buffer(), ast_str_create, ast_str_set(), ast_taskprocessor_get(), name, NULL, options, RAII_VAR, THREAD_BUCKETS, threadpool_destructor(), TPS_REF_DEFAULT, worker_thread_cmp(), and worker_thread_hash().

Referenced by ast_threadpool_create().

◆ threadpool_destructor()

static void threadpool_destructor ( void *  obj)
static

Destroy a threadpool's components.

This is the destructor called automatically when the threadpool's reference count reaches zero. This is not to be confused with threadpool_destroy.

By the time this actually gets called, most of the cleanup has already been done in the pool. The only thing left to do is to release the final reference to the threadpool listener.

Parameters
objThe pool to destroy

Definition at line 386 of file threadpool.c.

387{
388 struct ast_threadpool *pool = obj;
389 ao2_cleanup(pool->listener);
390}

References ao2_cleanup, and ast_threadpool::listener.

Referenced by threadpool_alloc().

◆ threadpool_execute()

static int threadpool_execute ( struct ast_threadpool pool)
static

Execute a task in the threadpool.

This is the function that worker threads call in order to execute tasks in the threadpool

Parameters
poolThe pool to which the tasks belong.
Return values
0Either the pool has been shut down or there are no tasks.
1There are still tasks remaining in the pool.

Definition at line 362 of file threadpool.c.

363{
364 ao2_lock(pool);
365 if (!pool->shutting_down) {
366 ao2_unlock(pool);
367 return ast_taskprocessor_execute(pool->tps);
368 }
369 ao2_unlock(pool);
370 return 0;
371}

References ao2_lock, ao2_unlock, ast_taskprocessor_execute(), thread_worker_pair::pool, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by worker_active().

◆ threadpool_idle_thread_dead()

static void threadpool_idle_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

◆ threadpool_send_state_changed()

static void threadpool_send_state_changed ( struct ast_threadpool pool)
static

Notify the threadpool listener that the state has changed.

This notifies the threadpool listener via its state_changed callback.

Parameters
poolThe threadpool whose state has changed

Definition at line 180 of file threadpool.c.

181{
182 int active_size = ao2_container_count(pool->active_threads);
183 int idle_size = ao2_container_count(pool->idle_threads);
184
185 if (pool->listener && pool->listener->callbacks->state_changed) {
186 pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187 }
188}
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition threadpool.h:38

References ast_threadpool::active_threads, ao2_container_count(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, ast_threadpool::listener, worker_thread::pool, and ast_threadpool_listener_callbacks::state_changed.

Referenced by queued_active_thread_idle(), queued_idle_thread_dead(), queued_set_size(), queued_task_pushed(), and queued_zombie_thread_dead().

◆ threadpool_tps_emptied()

static void threadpool_tps_emptied ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener emptied callback.

The threadpool queues a task to let the threadpool listener know that the threadpool no longer contains any tasks.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 654 of file threadpool.c.

655{
657 SCOPED_AO2LOCK(lock, pool);
658
659 if (pool->shutting_down) {
660 return;
661 }
662
663 if (pool->listener && pool->listener->callbacks->emptied) {
665 /* Nothing to do here but we need the check to keep the compiler happy. */
666 }
667 }
668}
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
Definition threadpool.c:638

References ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push, ast_threadpool_listener::callbacks, ast_threadpool::control_tps, ast_threadpool_listener_callbacks::emptied, listener(), ast_threadpool::listener, lock, queued_emptied(), SCOPED_AO2LOCK, and ast_threadpool::shutting_down.

◆ threadpool_tps_shutdown()

static void threadpool_tps_shutdown ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener shutdown callback.

The threadpool will shut down and destroy all of its worker threads when this is called back. By the time this gets called, the taskprocessor's control taskprocessor has already been destroyed. Therefore there is no risk in outright destroying the worker threads here.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 679 of file threadpool.c.

680{
682
683 if (pool->listener && pool->listener->callbacks->shutdown) {
684 pool->listener->callbacks->shutdown(pool->listener);
685 }
689 ao2_cleanup(pool);
690}
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition threadpool.h:69
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition threadpool.c:53

References ast_threadpool::active_threads, ao2_cleanup, ast_taskprocessor_listener_get_user_data(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, listener(), ast_threadpool::listener, ast_threadpool_listener_callbacks::shutdown, and ast_threadpool::zombie_threads.

◆ threadpool_tps_start()

static int threadpool_tps_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 444 of file threadpool.c.

445{
446 return 0;
447}

◆ threadpool_tps_task_pushed()

static void threadpool_tps_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Taskprocessor listener callback called when a task is added.

The threadpool uses this opportunity to queue a task on its control taskprocessor in order to activate idle threads and notify the threadpool listener that the task has been pushed.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data
was_emptyTrue if the taskprocessor was empty prior to the task being pushed

Definition at line 611 of file threadpool.c.

612{
614 struct task_pushed_data *tpd;
616
617 if (pool->shutting_down) {
618 return;
619 }
620
622 if (!tpd) {
623 return;
624 }
625
627 ast_free(tpd);
628 }
629}
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition threadpool.c:565
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition threadpool.c:466

References ast_free, ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push, ast_threadpool::control_tps, listener(), lock, task_pushed_data::pool, queued_task_pushed(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, task_pushed_data_alloc(), and task_pushed_data::was_empty.

◆ threadpool_zombie_thread_dead()

static void threadpool_zombie_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to kill a zombie thread.

This is called by a worker thread when it acknowledges that it is time for it to die.

Definition at line 301 of file threadpool.c.

303{
304 struct thread_worker_pair *pair;
306
307 if (pool->shutting_down) {
308 return;
309 }
310
312 if (!pair) {
313 return;
314 }
315
318 }
319}
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition threadpool.c:284

References ast_taskprocessor_push, ast_threadpool::control_tps, lock, thread_worker_pair::pool, queued_zombie_thread_dead(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), thread_worker_pair_free(), and thread_worker_pair::worker.

Referenced by worker_start().

◆ worker_active()

static void worker_active ( struct worker_thread worker)
static

Active loop for worker threads.

The worker will stay in this loop for its lifetime, executing tasks as they become available. If there are no tasks currently available, then the thread will go idle.

Parameters
workerThe worker thread executing tasks.

Definition at line 1135 of file threadpool.c.

1136{
1137 int alive;
1138
1139 /* The following is equivalent to
1140 *
1141 * while (threadpool_execute(worker->pool));
1142 *
1143 * However, reviewers have suggested in the past
1144 * doing that can cause optimizers to (wrongly)
1145 * optimize the code away.
1146 */
1147 do {
1148 alive = threadpool_execute(worker->pool);
1149 } while (alive);
1150}
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition threadpool.c:362

References worker_thread::pool, and threadpool_execute().

Referenced by worker_start().

◆ worker_idle()

static int worker_idle ( struct worker_thread worker)
static

Idle function for worker threads.

The worker waits here until it gets told by the threadpool to wake up.

worker is locked before entering this function.

Parameters
workerThe idle worker
Return values
0The thread is being woken up so that it can conclude.
non-zeroThe thread is being woken up to do more work.

Definition at line 1164 of file threadpool.c.

1165{
1166 struct timeval start = ast_tvnow();
1167 struct timespec end = {
1168 .tv_sec = start.tv_sec + worker->options.idle_timeout,
1169 .tv_nsec = start.tv_usec * 1000,
1170 };
1171 while (!worker->wake_up) {
1172 if (worker->options.idle_timeout <= 0) {
1173 ast_cond_wait(&worker->cond, &worker->lock);
1174 } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1175 break;
1176 }
1177 }
1178
1179 if (!worker->wake_up) {
1180 ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1181 threadpool_idle_thread_dead(worker->pool, worker);
1182 worker->state = DEAD;
1183 }
1184 worker->wake_up = 0;
1185 return worker->state == ALIVE;
1186}
char * end
Definition eagi_proxy.c:73
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
int idle_timeout
Time limit in seconds for idle threads.
Definition threadpool.h:81
ast_cond_t cond
Definition threadpool.c:147
struct ast_threadpool_options options
Definition threadpool.c:159
enum worker_state state
Definition threadpool.c:155
ast_mutex_t lock
Definition threadpool.c:149
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition threadpool.c:332
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159

References ALIVE, ast_cond_timedwait, ast_cond_wait, ast_debug, ast_tvnow(), worker_thread::cond, DEAD, end, ast_threadpool_options::idle_timeout, worker_thread::lock, worker_thread::options, worker_thread::pool, worker_thread::state, threadpool_idle_thread_dead(), and worker_thread::wake_up.

Referenced by worker_start().

◆ worker_set_state()

static int worker_set_state ( struct worker_thread worker,
enum worker_state  state 
)
static

Change a worker's state.

The threadpool calls into this function in order to let a worker know how it should proceed.

Return values
-1failure (state transition not permitted)
0success

Definition at line 1197 of file threadpool.c.

1198{
1199 SCOPED_MUTEX(lock, &worker->lock);
1200
1201 switch (state) {
1202 case ALIVE:
1203 /* This can occur due to a race condition between being told to go active
1204 * and an idle timeout happening.
1205 */
1206 if (worker->state == DEAD) {
1207 return -1;
1208 }
1209 ast_assert(worker->state != ZOMBIE);
1210 break;
1211 case DEAD:
1212 break;
1213 case ZOMBIE:
1214 ast_assert(worker->state != DEAD);
1215 break;
1216 }
1217
1218 worker->state = state;
1219 worker->wake_up = 1;
1220 ast_cond_signal(&worker->cond);
1221
1222 return 0;
1223}
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition lock.h:596
#define ast_cond_signal(cond)
Definition lock.h:210
#define ast_assert(a)
Definition utils.h:779

References ALIVE, ast_assert, ast_cond_signal, worker_thread::cond, DEAD, lock, worker_thread::lock, SCOPED_MUTEX, worker_thread::state, worker_thread::wake_up, and ZOMBIE.

Referenced by activate_thread(), worker_shutdown(), and zombify_threads().

◆ worker_shutdown()

static void worker_shutdown ( struct worker_thread worker)
static

shut a worker thread down

Set the worker dead and then wait for its thread to finish executing.

Parameters
workerThe worker thread to shut down

Definition at line 1021 of file threadpool.c.

1022{
1023 worker_set_state(worker, DEAD);
1024 if (worker->thread != AST_PTHREADT_NULL) {
1025 pthread_join(worker->thread, NULL);
1026 worker->thread = AST_PTHREADT_NULL;
1027 }
1028}
#define AST_PTHREADT_NULL
Definition lock.h:73
pthread_t thread
Definition threadpool.c:151

References AST_PTHREADT_NULL, DEAD, NULL, worker_thread::thread, and worker_set_state().

Referenced by worker_thread_destroy().

◆ worker_start()

static void * worker_start ( void *  arg)
static

start point for worker threads

Worker threads start in the active state but may immediately go idle if there is no work to be done

Parameters
argThe worker thread

Definition at line 1055 of file threadpool.c.

1056{
1057 struct worker_thread *worker = arg;
1058 enum worker_state saved_state;
1059
1060 if (worker->options.thread_start) {
1061 worker->options.thread_start();
1062 }
1063
1064 ast_mutex_lock(&worker->lock);
1065 while (worker_idle(worker)) {
1066 ast_mutex_unlock(&worker->lock);
1067 worker_active(worker);
1068 ast_mutex_lock(&worker->lock);
1069 if (worker->state != ALIVE) {
1070 break;
1071 }
1072 threadpool_active_thread_idle(worker->pool, worker);
1073 }
1074 saved_state = worker->state;
1075 ast_mutex_unlock(&worker->lock);
1076
1077 /* Reaching this portion means the thread is
1078 * on death's door. It may have been killed while
1079 * it was idle, in which case it can just die
1080 * peacefully. If it's a zombie, though, then
1081 * it needs to let the pool know so
1082 * that the thread can be removed from the
1083 * list of zombie threads.
1084 */
1085 if (saved_state == ZOMBIE) {
1086 threadpool_zombie_thread_dead(worker->pool, worker);
1087 }
1088
1089 if (worker->options.thread_end) {
1090 worker->options.thread_end();
1091 }
1092 return NULL;
1093}
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_mutex_lock(a)
Definition lock.h:196
void(* thread_start)(void)
Function to call when a thread starts.
Definition threadpool.h:119
void(* thread_end)(void)
Function to call when a thread ends.
Definition threadpool.h:126
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition threadpool.c:256
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
worker_state
states for worker threads
Definition threadpool.c:120
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition threadpool.c:301

References ALIVE, ast_mutex_lock, ast_mutex_unlock, worker_thread::lock, NULL, worker_thread::options, worker_thread::pool, worker_thread::state, ast_threadpool_options::thread_end, ast_threadpool_options::thread_start, threadpool_active_thread_idle(), threadpool_zombie_thread_dead(), worker_active(), worker_idle(), and ZOMBIE.

Referenced by worker_thread_start().

◆ worker_thread_alloc()

static struct worker_thread * worker_thread_alloc ( struct ast_threadpool pool)
static

Allocate and initialize a new worker thread.

This will create, initialize, and start the thread.

Parameters
poolThe threadpool to which the worker will be added
Return values
NULLFailed to allocate or start the worker thread
non-NULLThe newly-created worker thread

Definition at line 1104 of file threadpool.c.

1105{
1106 struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1107 if (!worker) {
1108 return NULL;
1109 }
1111 ast_mutex_init(&worker->lock);
1112 ast_cond_init(&worker->cond, NULL);
1113 worker->pool = pool;
1114 worker->thread = AST_PTHREADT_NULL;
1115 worker->state = ALIVE;
1116 worker->options = pool->options;
1117 return worker;
1118}
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_mutex_init(pmutex)
Definition lock.h:193
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
static int worker_id_counter
Definition threadpool.c:996
static void worker_thread_destroy(void *obj)
Worker thread destructor.

References ALIVE, ao2_alloc, ast_atomic_fetchadd_int(), ast_cond_init, ast_mutex_init, AST_PTHREADT_NULL, worker_thread::cond, worker_thread::id, worker_thread::lock, NULL, ast_threadpool::options, worker_thread::options, worker_thread::pool, worker_thread::state, worker_thread::thread, worker_id_counter, and worker_thread_destroy().

Referenced by grow().

◆ worker_thread_cmp()

static int worker_thread_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 1005 of file threadpool.c.

1006{
1007 struct worker_thread *worker1 = obj;
1008 struct worker_thread *worker2 = arg;
1009
1010 return worker1->id == worker2->id ? CMP_MATCH : 0;
1011}

References CMP_MATCH, and worker_thread::id.

Referenced by threadpool_alloc().

◆ worker_thread_destroy()

static void worker_thread_destroy ( void *  obj)
static

Worker thread destructor.

Called automatically when refcount reaches 0. Shuts down the worker thread and destroys its component parts

Definition at line 1037 of file threadpool.c.

1038{
1039 struct worker_thread *worker = obj;
1040 ast_debug(3, "Destroying worker thread %d\n", worker->id);
1041 worker_shutdown(worker);
1042 ast_mutex_destroy(&worker->lock);
1043 ast_cond_destroy(&worker->cond);
1044}
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_mutex_destroy(a)
Definition lock.h:195
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down

References ast_cond_destroy, ast_debug, ast_mutex_destroy, worker_thread::cond, worker_thread::id, worker_thread::lock, and worker_shutdown().

Referenced by worker_thread_alloc().

◆ worker_thread_hash()

static int worker_thread_hash ( const void *  obj,
int  flags 
)
static

Definition at line 998 of file threadpool.c.

999{
1000 const struct worker_thread *worker = obj;
1001
1002 return worker->id;
1003}

References worker_thread::id.

Referenced by threadpool_alloc().

◆ worker_thread_start()

static int worker_thread_start ( struct worker_thread worker)
static

Definition at line 1120 of file threadpool.c.

1121{
1122 return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1123}
static void * worker_start(void *arg)
start point for worker threads
#define ast_pthread_create(a, b, c, d)
Definition utils.h:624

References ast_pthread_create, NULL, worker_thread::thread, and worker_start().

Referenced by grow().

◆ zombify_threads()

static int zombify_threads ( void *  obj,
void *  arg,
void *  data,
int  flags 
)
static

ao2 callback to zombify a set number of threads.

Threads will be zombified as long as the counter has not reached zero. The counter is decremented with each thread that is zombified.

Zombifying a thread involves removing it from its current container, adding it to the zombie container, and changing the state of the worker to a zombie

This callback is called from the threadpool control taskprocessor thread.

Parameters
objThe worker thread that may be zombified
argThe pool to which the worker belongs
dataThe counter
flagsUnused
Return values
CMP_MATCHThe zombified thread should be removed from its current container
CMP_STOPStop attempting to zombify threads

Definition at line 745 of file threadpool.c.

746{
747 struct worker_thread *worker = obj;
748 struct ast_threadpool *pool = arg;
749 int *num_to_zombify = data;
750
751 if ((*num_to_zombify)-- > 0) {
752 if (!ao2_link(pool->zombie_threads, worker)) {
753 ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
754 return 0;
755 }
756 worker_set_state(worker, ZOMBIE);
757 return CMP_MATCH;
758 } else {
759 return CMP_STOP;
760 }
761}

References ao2_link, ast_log, CMP_MATCH, CMP_STOP, worker_thread::id, LOG_WARNING, worker_set_state(), ZOMBIE, and ast_threadpool::zombie_threads.

Referenced by shrink().

Variable Documentation

◆ serializer_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static
Initial value:
= {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
static int serializer_start(struct ast_taskprocessor_listener *listener)

Definition at line 1301 of file threadpool.c.

1301 {
1302 .task_pushed = serializer_task_pushed,
1303 .start = serializer_start,
1304 .shutdown = serializer_shutdown,
1305};

Referenced by ast_threadpool_serializer_group().

◆ threadpool_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
static

Table of taskprocessor listener callbacks for threadpool's main taskprocessor.

Definition at line 695 of file threadpool.c.

695 {
696 .start = threadpool_tps_start,
697 .task_pushed = threadpool_tps_task_pushed,
698 .emptied = threadpool_tps_emptied,
699 .shutdown = threadpool_tps_shutdown,
700};
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
Definition threadpool.c:679
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
Definition threadpool.c:654
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
Definition threadpool.c:444
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
Definition threadpool.c:611

Referenced by ast_threadpool_create().

◆ worker_id_counter

int worker_id_counter
static

A monotonically increasing integer used for worker thread identification.

Definition at line 996 of file threadpool.c.

Referenced by worker_thread_alloc().