Asterisk - The Open Source Telephony Project  GIT-master-e8cda4b
Data Structures | Macros | Enumerations | Functions | Variables
threadpool.c File Reference
#include "asterisk.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"

Go to the source code of this file.

Data Structures

struct  ast_serializer_shutdown_group
 
struct  ast_threadpool
 An opaque threadpool structure. More...
 
struct  ast_threadpool_listener
 listener for a threadpool More...
 
struct  pool_options_pair
 
struct  serializer
 
struct  set_size_data
 Helper struct used for queued operations that change the size of the threadpool. More...
 
struct  task_pushed_data
 helper used for queued task when tasks are pushed More...
 
struct  thread_worker_pair
 Struct used for queued operations involving worker state changes. More...
 
struct  worker_thread
 

Macros

#define THREAD_BUCKETS   89
 

Enumerations

enum  worker_state { ALIVE, ZOMBIE, DEAD }
 states for worker threads More...
 

Functions

static int activate_thread (void *obj, void *arg, int flags)
 Activate idle threads. More...
 
struct ast_serializer_shutdown_groupast_serializer_shutdown_group_alloc (void)
 Create a serializer group shutdown control object. More...
 
int ast_serializer_shutdown_group_join (struct ast_serializer_shutdown_group *shutdown_group, int timeout)
 Wait for the serializers in the group to shutdown with timeout. More...
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool. More...
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener. More...
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data. More...
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data)
 Push a task to the threadpool. More...
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue. More...
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool. More...
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool. More...
 
void ast_threadpool_set_size (struct ast_threadpool *pool, unsigned int size)
 Set the number of threads for the thread pool. More...
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it. More...
 
 AST_THREADSTORAGE_RAW (current_serializer)
 
static int execute_tasks (void *data)
 
static void grow (struct ast_threadpool *pool, int delta)
 Add threads to the threadpool. More...
 
static int kill_threads (void *obj, void *arg, int flags)
 ao2 callback to kill a set number of threads. More...
 
static int queued_active_thread_idle (void *data)
 Move a worker thread from the active container to the idle container. More...
 
static int queued_emptied (void *data)
 Queued task that handles the case where the threadpool's taskprocessor is emptied. More...
 
static int queued_idle_thread_dead (void *data)
 
static int queued_set_size (void *data)
 Change the size of the threadpool. More...
 
static int queued_task_pushed (void *data)
 Queued task called when tasks are pushed into the threadpool. More...
 
static int queued_zombie_thread_dead (void *data)
 Kill a zombie thread. More...
 
static struct serializerserializer_create (struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_dtor (void *obj)
 
static void serializer_shutdown (struct ast_taskprocessor_listener *listener)
 
static void serializer_shutdown_group_dec (struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_shutdown_group_dtor (void *vdoomed)
 
static void serializer_shutdown_group_inc (struct ast_serializer_shutdown_group *shutdown_group)
 
static int serializer_start (struct ast_taskprocessor_listener *listener)
 
static void serializer_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static struct set_size_dataset_size_data_alloc (struct ast_threadpool *pool, unsigned int size)
 Allocate and initialize a set_size_data. More...
 
static void shrink (struct ast_threadpool *pool, int delta)
 Remove threads from the threadpool. More...
 
static struct task_pushed_datatask_pushed_data_alloc (struct ast_threadpool *pool, int was_empty)
 Allocate and initialize a task_pushed_data. More...
 
static struct thread_worker_pairthread_worker_pair_alloc (struct ast_threadpool *pool, struct worker_thread *worker)
 Allocate and initialize a thread_worker_pair. More...
 
static void thread_worker_pair_free (struct thread_worker_pair *pair)
 Destructor for thread_worker_pair. More...
 
static void threadpool_active_thread_idle (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to move a thread from the active list to the idle list. More...
 
static struct ast_threadpoolthreadpool_alloc (const char *name, const struct ast_threadpool_options *options)
 
static void threadpool_destructor (void *obj)
 Destroy a threadpool's components. More...
 
static int threadpool_execute (struct ast_threadpool *pool)
 Execute a task in the threadpool. More...
 
static void threadpool_idle_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 
static void threadpool_send_state_changed (struct ast_threadpool *pool)
 Notify the threadpool listener that the state has changed. More...
 
static void threadpool_tps_emptied (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener emptied callback. More...
 
static void threadpool_tps_shutdown (struct ast_taskprocessor_listener *listener)
 Taskprocessor listener shutdown callback. More...
 
static int threadpool_tps_start (struct ast_taskprocessor_listener *listener)
 
static void threadpool_tps_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 Taskprocessor listener callback called when a task is added. More...
 
static void threadpool_zombie_thread_dead (struct ast_threadpool *pool, struct worker_thread *worker)
 Queue a task to kill a zombie thread. More...
 
static void worker_active (struct worker_thread *worker)
 Active loop for worker threads. More...
 
static int worker_idle (struct worker_thread *worker)
 Idle function for worker threads. More...
 
static int worker_set_state (struct worker_thread *worker, enum worker_state state)
 Change a worker's state. More...
 
static void worker_shutdown (struct worker_thread *worker)
 shut a worker thread down More...
 
static void * worker_start (void *arg)
 start point for worker threads More...
 
static struct worker_threadworker_thread_alloc (struct ast_threadpool *pool)
 Allocate and initialize a new worker thread. More...
 
static int worker_thread_cmp (void *obj, void *arg, int flags)
 
static void worker_thread_destroy (void *obj)
 Worker thread destructor. More...
 
static int worker_thread_hash (const void *obj, int flags)
 
static int worker_thread_start (struct worker_thread *worker)
 
static int zombify_threads (void *obj, void *arg, void *data, int flags)
 ao2 callback to zombify a set number of threads. More...
 

Variables

static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
 
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
 Table of taskprocessor listener callbacks for threadpool's main taskprocessor. More...
 
static int worker_id_counter
 

Macro Definition Documentation

◆ THREAD_BUCKETS

#define THREAD_BUCKETS   89

Definition at line 28 of file threadpool.c.

Referenced by threadpool_alloc().

Enumeration Type Documentation

◆ worker_state

states for worker threads

Enumerator
ALIVE 

The worker is either active or idle

ZOMBIE 

The worker has been asked to shut down but may still be in the process of executing tasks. This transition happens when the threadpool needs to shrink and needs to kill active threads in order to do so.

DEAD 

The worker has been asked to shut down. Typically only idle threads go to this state directly, but active threads may go straight to this state when the threadpool is shut down.

Definition at line 120 of file threadpool.c.

120  {
121  /*! The worker is either active or idle */
122  ALIVE,
123  /*!
124  * The worker has been asked to shut down but
125  * may still be in the process of executing tasks.
126  * This transition happens when the threadpool needs
127  * to shrink and needs to kill active threads in order
128  * to do so.
129  */
130  ZOMBIE,
131  /*!
132  * The worker has been asked to shut down. Typically
133  * only idle threads go to this state directly, but
134  * active threads may go straight to this state when
135  * the threadpool is shut down.
136  */
137  DEAD,
138 };

Function Documentation

◆ activate_thread()

static int activate_thread ( void *  obj,
void *  arg,
int  flags 
)
static

Activate idle threads.

This function always returns CMP_MATCH because all workers that this function acts on need to be seen as matches so they are unlinked from the list of idle threads.

Called as an ao2_callback in the threadpool's control taskprocessor thread.

Parameters
objThe worker to activate
argThe pool where the worker belongs
Return values
CMP_MATCH

Definition at line 491 of file threadpool.c.

References ast_threadpool::active_threads, ALIVE, ao2_link, ao2_unlink, ast_debug, ast_log, CMP_MATCH, worker_thread::id, LOG_WARNING, and worker_set_state().

Referenced by queued_set_size(), and queued_task_pushed().

492 {
493  struct worker_thread *worker = obj;
494  struct ast_threadpool *pool = arg;
495 
496  if (!ao2_link(pool->active_threads, worker)) {
497  /* If we can't link the idle thread into the active container, then
498  * we'll just leave the thread idle and not wake it up.
499  */
500  ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
501  worker->id);
502  return 0;
503  }
504 
505  if (worker_set_state(worker, ALIVE)) {
506  ast_debug(1, "Failed to activate thread %d. It is dead\n",
507  worker->id);
508  /* The worker thread will no longer exist in the active threads or
509  * idle threads container after this.
510  */
511  ao2_unlink(pool->active_threads, worker);
512  }
513 
514  return CMP_MATCH;
515 }
#define LOG_WARNING
Definition: logger.h:274
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
Definition: threadpool.c:1186
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
An opaque threadpool structure.
Definition: threadpool.c:36
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ ast_serializer_shutdown_group_alloc()

struct ast_serializer_shutdown_group* ast_serializer_shutdown_group_alloc ( void  )

Create a serializer group shutdown control object.

Since
13.5.0
Returns
ao2 object to control shutdown of a serializer group.

Definition at line 1229 of file threadpool.c.

References ao2_alloc, ast_cond_init, ast_serializer_shutdown_group::cond, NULL, serializer_shutdown_group_dtor(), and shutdown_group.

Referenced by ast_serializer_pool_create(), load_module(), and sip_options_init_task().

1230 {
1232 
1233  shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1234  if (!shutdown_group) {
1235  return NULL;
1236  }
1237  ast_cond_init(&shutdown_group->cond, NULL);
1238  return shutdown_group;
1239 }
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.

◆ ast_serializer_shutdown_group_join()

int ast_serializer_shutdown_group_join ( struct ast_serializer_shutdown_group shutdown_group,
int  timeout 
)

Wait for the serializers in the group to shutdown with timeout.

Since
13.5.0
Parameters
shutdown_groupGroup shutdown controller. (Returns 0 immediately if NULL)
timeoutNumber of seconds to wait for the serializers in the group to shutdown. Zero if the timeout is disabled.
Returns
Number of seriaizers that did not get shutdown within the timeout.

Definition at line 1241 of file threadpool.c.

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_assert, ast_cond_timedwait, ast_cond_wait, ast_tvnow(), ast_serializer_shutdown_group::cond, ast_serializer_shutdown_group::count, lock, NULL, and timeout.

Referenced by ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), and unload_module().

1242 {
1243  int remaining;
1244  ast_mutex_t *lock;
1245 
1246  if (!shutdown_group) {
1247  return 0;
1248  }
1249 
1250  lock = ao2_object_get_lockaddr(shutdown_group);
1251  ast_assert(lock != NULL);
1252 
1253  ao2_lock(shutdown_group);
1254  if (timeout) {
1255  struct timeval start;
1256  struct timespec end;
1257 
1258  start = ast_tvnow();
1259  end.tv_sec = start.tv_sec + timeout;
1260  end.tv_nsec = start.tv_usec * 1000;
1261  while (shutdown_group->count) {
1262  if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
1263  /* Error or timed out waiting for the count to reach zero. */
1264  break;
1265  }
1266  }
1267  } else {
1268  while (shutdown_group->count) {
1269  if (ast_cond_wait(&shutdown_group->cond, lock)) {
1270  /* Error */
1271  break;
1272  }
1273  }
1274  }
1275  remaining = shutdown_group->count;
1276  ao2_unlock(shutdown_group);
1277  return remaining;
1278 }
static int timeout
Definition: cdr_mysql.c:86
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
char * end
Definition: eagi_proxy.c:73
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ao2_lock(a)
Definition: astobj2.h:718
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
Structure for mutex and tracking information.
Definition: lock.h:135

◆ ast_threadpool_create()

struct ast_threadpool* ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 915 of file threadpool.c.

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), ast_threadpool::listener, LOG_WARNING, NULL, RAII_VAR, threadpool_alloc(), ast_threadpool::tps, and ast_threadpool_options::version.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), load_module(), and stasis_init().

918 {
919  struct ast_taskprocessor *tps;
920  RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921  RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922  char *fullname;
923 
924  pool = threadpool_alloc(name, options);
925  if (!pool) {
926  return NULL;
927  }
928 
930  if (!tps_listener) {
931  return NULL;
932  }
933 
934  if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935  ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936  return NULL;
937  }
938 
939  fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940  sprintf(fullname, "%s/pool", name); /* Safe */
941  tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942  if (!tps) {
943  return NULL;
944  }
945 
946  pool->tps = tps;
947  if (listener) {
948  ao2_ref(listener, +1);
949  pool->listener = listener;
950  }
951  ast_threadpool_set_size(pool, pool->options.initial_size);
952  ao2_ref(pool, +1);
953  return pool;
954 }
A listener for taskprocessors.
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:874
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
#define LOG_WARNING
Definition: logger.h:274
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Definition: threadpool.c:404
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:695
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static const char name[]
Definition: cdr_mysql.c:74
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener* ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 893 of file threadpool.c.

References ao2_alloc, ast_threadpool_listener::callbacks, ast_threadpool::listener, NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE().

895 {
896  struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
897  if (!listener) {
898  return NULL;
899  }
900  listener->callbacks = callbacks;
901  listener->user_data = user_data;
902  return listener;
903 }
#define NULL
Definition: resample.c:96
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
static void * listener(void *unused)
Definition: asterisk.c:1476
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
listener for a threadpool
Definition: threadpool.c:110

◆ ast_threadpool_listener_get_user_data()

void* ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 905 of file threadpool.c.

References ast_threadpool_listener::user_data.

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

906 {
907  return listener->user_data;
908 }

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 956 of file threadpool.c.

References ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), and serializer_task_pushed().

957 {
958  SCOPED_AO2LOCK(lock, pool);
959  if (!pool->shutting_down) {
960  return ast_taskprocessor_push(pool->tps, task, data);
961  }
962  return -1;
963 }
static int task(void *data)
Queued task for baseline test.
ast_mutex_t lock
Definition: app_meetme.c:1091
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1437 of file threadpool.c.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

1438 {
1439  return ast_taskprocessor_size(pool->tps);
1440 }
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67

◆ ast_threadpool_serializer()

struct ast_taskprocessor* ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
NULL on error.

Definition at line 1432 of file threadpool.c.

References ast_threadpool_serializer_group(), and NULL.

Referenced by AST_TEST_DEFINE(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

1433 {
1434  return ast_threadpool_serializer_group(name, pool, NULL);
1435 }
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1402
#define NULL
Definition: resample.c:96
static const char name[]
Definition: cdr_mysql.c:74

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor* ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1397 of file threadpool.c.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), and rfc3326_outgoing_response().

1398 {
1399  return ast_threadstorage_get_ptr(&current_serializer);
1400 }
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor* ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
NULL on error.

Definition at line 1402 of file threadpool.c.

References ao2_ref, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), ast_threadpool::listener, NULL, serializer_create(), serializer_shutdown_group_inc(), and ast_threadpool::tps.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

1404 {
1405  struct serializer *ser;
1407  struct ast_taskprocessor *tps;
1408 
1409  ser = serializer_create(pool, shutdown_group);
1410  if (!ser) {
1411  return NULL;
1412  }
1413 
1415  if (!listener) {
1416  ao2_ref(ser, -1);
1417  return NULL;
1418  }
1419 
1421  if (!tps) {
1422  /* ser ref transferred to listener but not cleaned without tps */
1423  ao2_ref(ser, -1);
1424  } else if (shutdown_group) {
1425  serializer_shutdown_group_inc(shutdown_group);
1426  }
1427 
1428  ao2_ref(listener, -1);
1429  return tps;
1430 }
A listener for taskprocessors.
#define NULL
Definition: resample.c:96
static void * listener(void *unused)
Definition: asterisk.c:1476
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1391
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1289
static const char name[]
Definition: cdr_mysql.c:74
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1332

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 874 of file threadpool.c.

References ast_free, ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), and ast_threadpool::shutting_down.

Referenced by AST_TEST_DEFINE(), and ast_threadpool_create().

875 {
876  struct set_size_data *ssd;
877  SCOPED_AO2LOCK(lock, pool);
878 
879  if (pool->shutting_down) {
880  return;
881  }
882 
883  ssd = set_size_data_alloc(pool, size);
884  if (!ssd) {
885  return;
886  }
887 
889  ast_free(ssd);
890  }
891 }
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:814
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:802
ast_mutex_t lock
Definition: app_meetme.c:1091
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_free(a)
Definition: astmm.h:182
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:838
unsigned int size
Definition: threadpool.c:806
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 965 of file threadpool.c.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().

966 {
967  if (!pool) {
968  return;
969  }
970  /* Shut down the taskprocessors and everything else just
971  * takes care of itself via the taskprocessor callbacks
972  */
973  ao2_lock(pool);
974  pool->shutting_down = 1;
975  ao2_unlock(pool);
978 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ AST_THREADSTORAGE_RAW()

AST_THREADSTORAGE_RAW ( current_serializer  )

Referenced by serializer_create().

◆ execute_tasks()

static int execute_tasks ( void *  data)
static

Definition at line 1349 of file threadpool.c.

References ast_taskprocessor_execute(), ast_taskprocessor_unreference(), ast_threadstorage_set_ptr(), NULL, and ast_threadpool::tps.

Referenced by serializer_task_pushed().

1350 {
1351  struct ast_taskprocessor *tps = data;
1352 
1353  ast_threadstorage_set_ptr(&current_serializer, tps);
1354  while (ast_taskprocessor_execute(tps)) {
1355  /* No-op */
1356  }
1357  ast_threadstorage_set_ptr(&current_serializer, NULL);
1358 
1360  return 0;
1361 }
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define NULL
Definition: resample.c:96
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ grow()

static void grow ( struct ast_threadpool pool,
int  delta 
)
static

Add threads to the threadpool.

This function is called from the threadpool's control taskprocessor thread.

Parameters
poolThe pool that is expanding The number of threads to add to the pool

Definition at line 524 of file threadpool.c.

References ast_threadpool::active_threads, ao2_container_count(), ao2_link, ao2_ref, ao2_unlink, ast_debug, ast_log, ast_taskprocessor_name(), worker_thread::id, ast_threadpool::idle_threads, LOG_ERROR, LOG_WARNING, ast_threadpool_options::max_size, ast_threadpool::options, ast_threadpool::tps, worker_thread_alloc(), and worker_thread_start().

Referenced by __ast_string_field_ptr_build_va(), __ast_string_field_ptr_grow(), queued_set_size(), and queued_task_pushed().

525 {
526  int i;
527 
528  int current_size = ao2_container_count(pool->active_threads) +
530 
531  if (pool->options.max_size && current_size + delta > pool->options.max_size) {
532  delta = pool->options.max_size - current_size;
533  }
534 
535  ast_debug(3, "Increasing threadpool %s's size by %d\n",
536  ast_taskprocessor_name(pool->tps), delta);
537 
538  for (i = 0; i < delta; ++i) {
539  struct worker_thread *worker = worker_thread_alloc(pool);
540  if (!worker) {
541  return;
542  }
543  if (ao2_link(pool->idle_threads, worker)) {
544  if (worker_thread_start(worker)) {
545  ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
546  ao2_unlink(pool->active_threads, worker);
547  }
548  } else {
549  ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
550  }
551  ao2_ref(worker, -1);
552  }
553 }
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define LOG_WARNING
Definition: logger.h:274
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
Definition: threadpool.c:1093
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define LOG_ERROR
Definition: logger.h:285
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
static int worker_thread_start(struct worker_thread *worker)
Definition: threadpool.c:1109
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ kill_threads()

static int kill_threads ( void *  obj,
void *  arg,
int  flags 
)
static

ao2 callback to kill a set number of threads.

Threads will be unlinked from the container as long as the counter has not reached zero. The counter is decremented with each thread that is removed.

Parameters
objThe worker thread up for possible destruction
argThe counter
flagsUnused
Return values
CMP_MATCHThe counter has not reached zero, so this flag should be removed.
CMP_STOPThe counter has reached zero so no more threads should be removed.

Definition at line 714 of file threadpool.c.

References CMP_MATCH, and CMP_STOP.

Referenced by shrink().

715 {
716  int *num_to_kill = arg;
717 
718  if (*num_to_kill > 0) {
719  --(*num_to_kill);
720  return CMP_MATCH;
721  } else {
722  return CMP_STOP;
723  }
724 }

◆ queued_active_thread_idle()

static int queued_active_thread_idle ( void *  data)
static

Move a worker thread from the active container to the idle container.

This function is called from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the worker to move.
Returns
0

Definition at line 235 of file threadpool.c.

References ast_threadpool::active_threads, ao2_link, ao2_unlink, ast_threadpool::idle_threads, thread_worker_pair::pool, thread_worker_pair_free(), threadpool_send_state_changed(), and thread_worker_pair::worker.

Referenced by threadpool_active_thread_idle().

236 {
237  struct thread_worker_pair *pair = data;
238 
239  ao2_link(pair->pool->idle_threads, pair->worker);
240  ao2_unlink(pair->pool->active_threads, pair->worker);
241 
243 
245  return 0;
246 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
struct worker_thread * worker
Definition: threadpool.c:197
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
struct ast_threadpool * pool
Definition: threadpool.c:195
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ queued_emptied()

static int queued_emptied ( void *  data)
static

Queued task that handles the case where the threadpool's taskprocessor is emptied.

This simply lets the threadpool's listener know that the threadpool is devoid of tasks

Parameters
dataThe pool that has become empty
Returns
0

Definition at line 638 of file threadpool.c.

References ast_threadpool_listener::callbacks, ast_threadpool_listener_callbacks::emptied, and ast_threadpool::listener.

Referenced by threadpool_tps_emptied().

639 {
640  struct ast_threadpool *pool = data;
641 
642  /* We already checked for existence of this callback when this was queued */
643  pool->listener->callbacks->emptied(pool, pool->listener);
644  return 0;
645 }
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool&#39;s taskprocessor has become empty.
Definition: threadpool.h:56
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
An opaque threadpool structure.
Definition: threadpool.c:36

◆ queued_idle_thread_dead()

static int queued_idle_thread_dead ( void *  data)
static

Definition at line 321 of file threadpool.c.

References ao2_unlink, ast_threadpool::idle_threads, thread_worker_pair::pool, thread_worker_pair_free(), threadpool_send_state_changed(), and thread_worker_pair::worker.

Referenced by threadpool_idle_thread_dead().

322 {
323  struct thread_worker_pair *pair = data;
324 
325  ao2_unlink(pair->pool->idle_threads, pair->worker);
327 
329  return 0;
330 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
struct worker_thread * worker
Definition: threadpool.c:197
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
struct ast_threadpool * pool
Definition: threadpool.c:195

◆ queued_set_size()

static int queued_set_size ( void *  data)
static

Change the size of the threadpool.

This can either result in shrinking or growing the threadpool depending on the new desired size and the current size.

This function is run from the threadpool control taskprocessor thread

Parameters
dataA set_size_data used for determining how to act
Returns
0

Definition at line 838 of file threadpool.c.

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_debug, ast_free, grow(), ast_threadpool::idle_threads, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, set_size_data::pool, shrink(), set_size_data::size, and threadpool_send_state_changed().

Referenced by ast_threadpool_set_size().

839 {
840  struct set_size_data *ssd = data;
841  struct ast_threadpool *pool = ssd->pool;
842  unsigned int num_threads = ssd->size;
843 
844  /* We don't count zombie threads as being "live" when potentially resizing */
845  unsigned int current_size = ao2_container_count(pool->active_threads) +
847 
848  ast_free(ssd);
849 
850  if (current_size == num_threads) {
851  ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
852  num_threads, current_size);
853  return 0;
854  }
855 
856  if (current_size < num_threads) {
858  activate_thread, pool);
859 
860  /* As the above may have altered the number of current threads update it */
861  current_size = ao2_container_count(pool->active_threads) +
863  grow(pool, num_threads - current_size);
865  activate_thread, pool);
866  } else {
867  shrink(pool, current_size - num_threads);
868  }
869 
871  return 0;
872 }
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:524
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition: threadpool.c:775
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:802
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:491
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
struct ast_threadpool * pool
Definition: threadpool.c:804
#define ast_free(a)
Definition: astmm.h:182
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
An opaque threadpool structure.
Definition: threadpool.c:36
unsigned int size
Definition: threadpool.c:806

◆ queued_task_pushed()

static int queued_task_pushed ( void *  data)
static

Queued task called when tasks are pushed into the threadpool.

This function first calls into the threadpool's listener to let it know that a task has been pushed. It then wakes up all idle threads and moves them into the active thread container.

Parameters
dataA task_pushed_data
Returns
0

Definition at line 564 of file threadpool.c.

References activate_thread(), ast_threadpool::active_threads, ao2_callback, ao2_container_count(), ast_free, ast_threadpool_options::auto_increment, ast_threadpool_listener::callbacks, grow(), ast_threadpool::idle_threads, ast_threadpool::listener, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::options, task_pushed_data::pool, ast_threadpool_listener_callbacks::task_pushed, threadpool_send_state_changed(), and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

565 {
566  struct task_pushed_data *tpd = data;
567  struct ast_threadpool *pool = tpd->pool;
568  int was_empty = tpd->was_empty;
569  unsigned int existing_active;
570 
571  ast_free(tpd);
572 
573  if (pool->listener && pool->listener->callbacks->task_pushed) {
574  pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
575  }
576 
577  existing_active = ao2_container_count(pool->active_threads);
578 
579  /* The first pass transitions any existing idle threads to be active, and
580  * will also remove any worker threads that have recently entered the dead
581  * state.
582  */
584  activate_thread, pool);
585 
586  /* If no idle threads could be transitioned to active grow the pool as permitted. */
587  if (ao2_container_count(pool->active_threads) == existing_active) {
588  if (!pool->options.auto_increment) {
589  return 0;
590  }
591  grow(pool, pool->options.auto_increment);
592  /* An optional second pass transitions any newly added threads. */
594  activate_thread, pool);
595  }
596 
598  return 0;
599 }
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:524
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:491
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ast_threadpool * pool
Definition: threadpool.c:454
#define ast_free(a)
Definition: astmm.h:182
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition: threadpool.h:47
An opaque threadpool structure.
Definition: threadpool.c:36
helper used for queued task when tasks are pushed
Definition: threadpool.c:452

◆ queued_zombie_thread_dead()

static int queued_zombie_thread_dead ( void *  data)
static

Kill a zombie thread.

This runs from the threadpool's control taskprocessor thread.

Parameters
dataA thread_worker_pair containing the threadpool and the zombie thread
Returns
0

Definition at line 284 of file threadpool.c.

References ao2_unlink, thread_worker_pair::pool, thread_worker_pair_free(), threadpool_send_state_changed(), thread_worker_pair::worker, and ast_threadpool::zombie_threads.

Referenced by threadpool_zombie_thread_dead().

285 {
286  struct thread_worker_pair *pair = data;
287 
288  ao2_unlink(pair->pool->zombie_threads, pair->worker);
290 
292  return 0;
293 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
struct worker_thread * worker
Definition: threadpool.c:197
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
struct ast_threadpool * pool
Definition: threadpool.c:195

◆ serializer_create()

static struct serializer* serializer_create ( struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)
static

Definition at line 1332 of file threadpool.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, AST_THREADSTORAGE_RAW(), NULL, serializer::pool, serializer_dtor(), and serializer::shutdown_group.

Referenced by ast_threadpool_serializer_group().

1334 {
1335  struct serializer *ser;
1336 
1338  if (!ser) {
1339  return NULL;
1340  }
1341  ao2_ref(pool, +1);
1342  ser->pool = pool;
1343  ser->shutdown_group = ao2_bump(shutdown_group);
1344  return ser;
1345 }
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1319
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
static void serializer_dtor(void *obj)
Definition: threadpool.c:1322
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ast_threadpool * pool
Definition: threadpool.c:1317

◆ serializer_dtor()

static void serializer_dtor ( void *  obj)
static

Definition at line 1322 of file threadpool.c.

References ao2_cleanup, NULL, serializer::pool, and serializer::shutdown_group.

Referenced by serializer_create().

1323 {
1324  struct serializer *ser = obj;
1325 
1326  ao2_cleanup(ser->pool);
1327  ser->pool = NULL;
1329  ser->shutdown_group = NULL;
1330 }
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1319
#define NULL
Definition: resample.c:96
struct ast_threadpool * pool
Definition: threadpool.c:1317
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ serializer_shutdown()

static void serializer_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 1381 of file threadpool.c.

References ao2_cleanup, ast_taskprocessor_listener_get_user_data(), serializer_shutdown_group_dec(), and serializer::shutdown_group.

1382 {
1383  struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1384 
1385  if (ser->shutdown_group) {
1387  }
1388  ao2_cleanup(ser);
1389 }
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1319
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1305

◆ serializer_shutdown_group_dec()

static void serializer_shutdown_group_dec ( struct ast_serializer_shutdown_group shutdown_group)
static

Definition at line 1305 of file threadpool.c.

References ao2_lock, ao2_unlock, ast_cond_signal, ast_serializer_shutdown_group::cond, and ast_serializer_shutdown_group::count.

Referenced by serializer_shutdown().

1306 {
1307  ao2_lock(shutdown_group);
1308  --shutdown_group->count;
1309  if (!shutdown_group->count) {
1310  ast_cond_signal(&shutdown_group->cond);
1311  }
1312  ao2_unlock(shutdown_group);
1313 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ast_cond_signal(cond)
Definition: lock.h:201
#define ao2_lock(a)
Definition: astobj2.h:718

◆ serializer_shutdown_group_dtor()

static void serializer_shutdown_group_dtor ( void *  vdoomed)
static

Definition at line 1222 of file threadpool.c.

References ast_cond_destroy, and ast_serializer_shutdown_group::cond.

Referenced by ast_serializer_shutdown_group_alloc().

1223 {
1224  struct ast_serializer_shutdown_group *doomed = vdoomed;
1225 
1226  ast_cond_destroy(&doomed->cond);
1227 }
#define ast_cond_destroy(cond)
Definition: lock.h:200

◆ serializer_shutdown_group_inc()

static void serializer_shutdown_group_inc ( struct ast_serializer_shutdown_group shutdown_group)
static

Definition at line 1289 of file threadpool.c.

References ao2_lock, ao2_unlock, and ast_serializer_shutdown_group::count.

Referenced by ast_threadpool_serializer_group().

1290 {
1291  ao2_lock(shutdown_group);
1292  ++shutdown_group->count;
1293  ao2_unlock(shutdown_group);
1294 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ serializer_start()

static int serializer_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 1375 of file threadpool.c.

1376 {
1377  /* No-op */
1378  return 0;
1379 }

◆ serializer_task_pushed()

static void serializer_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 1363 of file threadpool.c.

References ast_taskprocessor_listener_get_tps(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), ast_threadpool_push(), execute_tasks(), serializer::pool, and ast_threadpool::tps.

1364 {
1365  if (was_empty) {
1366  struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1367  struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
1368 
1369  if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1371  }
1372  }
1373 }
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener&#39;s taskprocessor.
struct ast_threadpool * pool
Definition: threadpool.c:1317
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
Definition: threadpool.c:956
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static int execute_tasks(void *data)
Definition: threadpool.c:1349

◆ set_size_data_alloc()

static struct set_size_data* set_size_data_alloc ( struct ast_threadpool pool,
unsigned int  size 
)
static

Allocate and initialize a set_size_data.

Parameters
poolThe pool for the set_size_data
sizeThe size to store in the set_size_data

Definition at line 814 of file threadpool.c.

References ast_malloc, NULL, set_size_data::pool, and set_size_data::size.

Referenced by ast_threadpool_set_size().

816 {
817  struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
818  if (!ssd) {
819  return NULL;
820  }
821 
822  ssd->pool = pool;
823  ssd->size = size;
824  return ssd;
825 }
#define NULL
Definition: resample.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:802
struct ast_threadpool * pool
Definition: threadpool.c:804
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
unsigned int size
Definition: threadpool.c:806

◆ shrink()

static void shrink ( struct ast_threadpool pool,
int  delta 
)
static

Remove threads from the threadpool.

The preference is to kill idle threads. However, if there are more threads to remove than there are idle threads, then active threads will be zombified instead.

This function is called from the threadpool control taskprocessor thread.

Parameters
poolThe threadpool to remove threads from
deltaThe number of threads to remove

Definition at line 775 of file threadpool.c.

References ast_threadpool::active_threads, ao2_callback, ao2_callback_data, ao2_container_count(), ast_debug, ast_taskprocessor_name(), ast_threadpool::idle_threads, kill_threads(), MIN, OBJ_MULTIPLE, OBJ_NODATA, OBJ_NOLOCK, OBJ_UNLINK, ast_threadpool::tps, and zombify_threads().

Referenced by queued_set_size().

776 {
777  /*
778  * Preference is to kill idle threads, but
779  * we'll move on to deactivating active threads
780  * if we have to
781  */
782  int idle_threads = ao2_container_count(pool->idle_threads);
783  int idle_threads_to_kill = MIN(delta, idle_threads);
784  int active_threads_to_zombify = delta - idle_threads_to_kill;
785 
786  ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
787  ast_taskprocessor_name(pool->tps));
788 
790  kill_threads, &idle_threads_to_kill);
791 
792  ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
793  ast_taskprocessor_name(pool->tps));
794 
796  zombify_threads, pool, &active_threads_to_zombify);
797 }
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition: threadpool.c:745
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define MIN(a, b)
Definition: utils.h:226
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition: threadpool.c:714
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1743
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43

◆ task_pushed_data_alloc()

static struct task_pushed_data* task_pushed_data_alloc ( struct ast_threadpool pool,
int  was_empty 
)
static

Allocate and initialize a task_pushed_data.

Parameters
poolThe threadpool to set in the task_pushed_data
was_emptyThe was_empty value to set in the task_pushed_data
Return values
NULLUnable to allocate task_pushed_data
non-NULLThe newly-allocated task_pushed_data

Definition at line 466 of file threadpool.c.

References ast_malloc, NULL, task_pushed_data::pool, and task_pushed_data::was_empty.

Referenced by threadpool_tps_task_pushed().

468 {
469  struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470 
471  if (!tpd) {
472  return NULL;
473  }
474  tpd->pool = pool;
475  tpd->was_empty = was_empty;
476  return tpd;
477 }
#define NULL
Definition: resample.c:96
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
struct ast_threadpool * pool
Definition: threadpool.c:454
helper used for queued task when tasks are pushed
Definition: threadpool.c:452

◆ thread_worker_pair_alloc()

static struct thread_worker_pair* thread_worker_pair_alloc ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Allocate and initialize a thread_worker_pair.

Parameters
poolThreadpool to assign to the thread_worker_pair
workerWorker thread to assign to the thread_worker_pair

Definition at line 214 of file threadpool.c.

References ao2_ref, ast_malloc, NULL, thread_worker_pair::pool, and thread_worker_pair::worker.

Referenced by threadpool_active_thread_idle(), threadpool_idle_thread_dead(), and threadpool_zombie_thread_dead().

216 {
217  struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218  if (!pair) {
219  return NULL;
220  }
221  pair->pool = pool;
222  ao2_ref(worker, +1);
223  pair->worker = worker;
224 
225  return pair;
226 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
struct worker_thread * worker
Definition: threadpool.c:197
struct ast_threadpool * pool
Definition: threadpool.c:195

◆ thread_worker_pair_free()

static void thread_worker_pair_free ( struct thread_worker_pair pair)
static

Destructor for thread_worker_pair.

Definition at line 203 of file threadpool.c.

References ao2_ref, ast_free, and thread_worker_pair::worker.

Referenced by queued_active_thread_idle(), queued_idle_thread_dead(), queued_zombie_thread_dead(), threadpool_active_thread_idle(), threadpool_idle_thread_dead(), and threadpool_zombie_thread_dead().

204 {
205  ao2_ref(pair->worker, -1);
206  ast_free(pair);
207 }
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct worker_thread * worker
Definition: threadpool.c:197
#define ast_free(a)
Definition: astmm.h:182

◆ threadpool_active_thread_idle()

static void threadpool_active_thread_idle ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to move a thread from the active list to the idle list.

This is called by a worker thread when it runs out of tasks to perform and goes idle.

Parameters
poolThe threadpool to which the worker belongs
workerThe worker thread that has gone idle

Definition at line 256 of file threadpool.c.

References ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_active_thread_idle(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), and thread_worker_pair_free().

Referenced by worker_start().

258 {
259  struct thread_worker_pair *pair;
260  SCOPED_AO2LOCK(lock, pool);
261 
262  if (pool->shutting_down) {
263  return;
264  }
265 
266  pair = thread_worker_pair_alloc(pool, worker);
267  if (!pair) {
268  return;
269  }
270 
273  }
274 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
ast_mutex_t lock
Definition: app_meetme.c:1091
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition: threadpool.c:235
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ threadpool_alloc()

static struct ast_threadpool* threadpool_alloc ( const char *  name,
const struct ast_threadpool_options options 
)
static

Definition at line 404 of file threadpool.c.

References ao2_alloc, AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_ref, ast_free, ast_str_buffer(), ast_str_create, ast_str_set(), ast_taskprocessor_get(), NULL, ast_threadpool::options, RAII_VAR, THREAD_BUCKETS, threadpool_destructor(), TPS_REF_DEFAULT, worker_thread_cmp(), and worker_thread_hash().

Referenced by ast_threadpool_create().

405 {
407  struct ast_str *control_tps_name;
408 
409  pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
410  control_tps_name = ast_str_create(64);
411  if (!pool || !control_tps_name) {
412  ast_free(control_tps_name);
413  return NULL;
414  }
415 
416  ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417 
418  pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419  ast_free(control_tps_name);
420  if (!pool->control_tps) {
421  return NULL;
422  }
423  pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
425  if (!pool->active_threads) {
426  return NULL;
427  }
428  pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
430  if (!pool->idle_threads) {
431  return NULL;
432  }
433  pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
435  if (!pool->zombie_threads) {
436  return NULL;
437  }
438  pool->options = *options;
439 
440  ao2_ref(pool, +1);
441  return pool;
442 }
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
static void threadpool_destructor(void *obj)
Destroy a threadpool&#39;s components.
Definition: threadpool.c:386
#define NULL
Definition: resample.c:96
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1065
static int worker_thread_hash(const void *obj, int flags)
Definition: threadpool.c:986
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define THREAD_BUCKETS
Definition: threadpool.c:28
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static int worker_thread_cmp(void *obj, void *arg, int flags)
Definition: threadpool.c:993
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_threadpool * pool
Definition: threadpool.c:195
static struct test_options options
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620

◆ threadpool_destructor()

static void threadpool_destructor ( void *  obj)
static

Destroy a threadpool's components.

This is the destructor called automatically when the threadpool's reference count reaches zero. This is not to be confused with threadpool_destroy.

By the time this actually gets called, most of the cleanup has already been done in the pool. The only thing left to do is to release the final reference to the threadpool listener.

Parameters
objThe pool to destroy

Definition at line 386 of file threadpool.c.

References ao2_cleanup, and ast_threadpool::listener.

Referenced by threadpool_alloc().

387 {
388  struct ast_threadpool *pool = obj;
389  ao2_cleanup(pool->listener);
390 }
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36

◆ threadpool_execute()

static int threadpool_execute ( struct ast_threadpool pool)
static

Execute a task in the threadpool.

This is the function that worker threads call in order to execute tasks in the threadpool

Parameters
poolThe pool to which the tasks belong.
Return values
0Either the pool has been shut down or there are no tasks.
1There are still tasks remaining in the pool.

Definition at line 362 of file threadpool.c.

References ao2_lock, ao2_unlock, ast_taskprocessor_execute(), ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by worker_active().

363 {
364  ao2_lock(pool);
365  if (!pool->shutting_down) {
366  ao2_unlock(pool);
367  return ast_taskprocessor_execute(pool->tps);
368  }
369  ao2_unlock(pool);
370  return 0;
371 }
#define ao2_unlock(a)
Definition: astobj2.h:730
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ao2_lock(a)
Definition: astobj2.h:718
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67

◆ threadpool_idle_thread_dead()

static void threadpool_idle_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Definition at line 332 of file threadpool.c.

References ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_idle_thread_dead(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), and thread_worker_pair_free().

Referenced by worker_idle().

334 {
335  struct thread_worker_pair *pair;
336  SCOPED_AO2LOCK(lock, pool);
337 
338  if (pool->shutting_down) {
339  return;
340  }
341 
342  pair = thread_worker_pair_alloc(pool, worker);
343  if (!pair) {
344  return;
345  }
346 
349  }
350 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
ast_mutex_t lock
Definition: app_meetme.c:1091
static int queued_idle_thread_dead(void *data)
Definition: threadpool.c:321
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ threadpool_send_state_changed()

static void threadpool_send_state_changed ( struct ast_threadpool pool)
static

Notify the threadpool listener that the state has changed.

This notifies the threadpool listener via its state_changed callback.

Parameters
poolThe threadpool whose state has changed

Definition at line 180 of file threadpool.c.

References ast_threadpool::active_threads, ao2_container_count(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, ast_threadpool::listener, and ast_threadpool_listener_callbacks::state_changed.

Referenced by queued_active_thread_idle(), queued_idle_thread_dead(), queued_set_size(), queued_task_pushed(), and queued_zombie_thread_dead().

181 {
182  int active_size = ao2_container_count(pool->active_threads);
183  int idle_size = ao2_container_count(pool->idle_threads);
184 
185  if (pool->listener && pool->listener->callbacks->state_changed) {
186  pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187  }
188 }
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43

◆ threadpool_tps_emptied()

static void threadpool_tps_emptied ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener emptied callback.

The threadpool queues a task to let the threadpool listener know that the threadpool no longer contains any tasks.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 654 of file threadpool.c.

References ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_threadpool_listener::callbacks, ast_threadpool::control_tps, ast_threadpool_listener_callbacks::emptied, ast_threadpool::listener, lock, queued_emptied(), SCOPED_AO2LOCK, and ast_threadpool::shutting_down.

655 {
657  SCOPED_AO2LOCK(lock, pool);
658 
659  if (pool->shutting_down) {
660  return;
661  }
662 
663  if (pool->listener && pool->listener->callbacks->emptied) {
665  /* Nothing to do here but we need the check to keep the compiler happy. */
666  }
667  }
668 }
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool&#39;s taskprocessor has become empty.
Definition: threadpool.h:56
ast_mutex_t lock
Definition: app_meetme.c:1091
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool&#39;s taskprocessor is emptied.
Definition: threadpool.c:638
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ threadpool_tps_shutdown()

static void threadpool_tps_shutdown ( struct ast_taskprocessor_listener listener)
static

Taskprocessor listener shutdown callback.

The threadpool will shut down and destroy all of its worker threads when this is called back. By the time this gets called, the taskprocessor's control taskprocessor has already been destroyed. Therefore there is no risk in outright destroying the worker threads here.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data.

Definition at line 679 of file threadpool.c.

References ast_threadpool::active_threads, ao2_cleanup, ast_taskprocessor_listener_get_user_data(), ast_threadpool_listener::callbacks, ast_threadpool::idle_threads, ast_threadpool::listener, ast_threadpool_listener_callbacks::shutdown, and ast_threadpool::zombie_threads.

680 {
682 
683  if (pool->listener && pool->listener->callbacks->shutdown) {
684  pool->listener->callbacks->shutdown(pool->listener);
685  }
687  ao2_cleanup(pool->idle_threads);
689  ao2_cleanup(pool);
690 }
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition: threadpool.h:67
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36

◆ threadpool_tps_start()

static int threadpool_tps_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 444 of file threadpool.c.

445 {
446  return 0;
447 }

◆ threadpool_tps_task_pushed()

static void threadpool_tps_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Taskprocessor listener callback called when a task is added.

The threadpool uses this opportunity to queue a task on its control taskprocessor in order to activate idle threads and notify the threadpool listener that the task has been pushed.

Parameters
listenerThe taskprocessor listener. The threadpool is the listener's private data
was_emptyTrue if the taskprocessor was empty prior to the task being pushed

Definition at line 610 of file threadpool.c.

References ast_free, ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_task_pushed(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, and task_pushed_data_alloc().

612 {
614  struct task_pushed_data *tpd;
615  SCOPED_AO2LOCK(lock, pool);
616 
617  if (pool->shutting_down) {
618  return;
619  }
620 
621  tpd = task_pushed_data_alloc(pool, was_empty);
622  if (!tpd) {
623  return;
624  }
625 
627  ast_free(tpd);
628  }
629 }
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition: threadpool.c:466
ast_mutex_t lock
Definition: app_meetme.c:1091
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_free(a)
Definition: astmm.h:182
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
Definition: threadpool.c:36
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition: threadpool.c:564
helper used for queued task when tasks are pushed
Definition: threadpool.c:452
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ threadpool_zombie_thread_dead()

static void threadpool_zombie_thread_dead ( struct ast_threadpool pool,
struct worker_thread worker 
)
static

Queue a task to kill a zombie thread.

This is called by a worker thread when it acknowledges that it is time for it to die.

Definition at line 301 of file threadpool.c.

References ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_zombie_thread_dead(), SCOPED_AO2LOCK, ast_threadpool::shutting_down, thread_worker_pair_alloc(), and thread_worker_pair_free().

Referenced by worker_start().

303 {
304  struct thread_worker_pair *pair;
305  SCOPED_AO2LOCK(lock, pool);
306 
307  if (pool->shutting_down) {
308  return;
309  }
310 
311  pair = thread_worker_pair_alloc(pool, worker);
312  if (!pair) {
313  return;
314  }
315 
318  }
319 }
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
ast_mutex_t lock
Definition: app_meetme.c:1091
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition: threadpool.c:284
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ worker_active()

static void worker_active ( struct worker_thread worker)
static

Active loop for worker threads.

The worker will stay in this loop for its lifetime, executing tasks as they become available. If there are no tasks currently available, then the thread will go idle.

Parameters
workerThe worker thread executing tasks.

Definition at line 1124 of file threadpool.c.

References worker_thread::pool, and threadpool_execute().

Referenced by worker_start().

1125 {
1126  int alive;
1127 
1128  /* The following is equivalent to
1129  *
1130  * while (threadpool_execute(worker->pool));
1131  *
1132  * However, reviewers have suggested in the past
1133  * doing that can cause optimizers to (wrongly)
1134  * optimize the code away.
1135  */
1136  do {
1137  alive = threadpool_execute(worker->pool);
1138  } while (alive);
1139 }
struct ast_threadpool * pool
Definition: threadpool.c:153
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition: threadpool.c:362

◆ worker_idle()

static int worker_idle ( struct worker_thread worker)
static

Idle function for worker threads.

The worker waits here until it gets told by the threadpool to wake up.

worker is locked before entering this function.

Parameters
workerThe idle worker
Return values
0The thread is being woken up so that it can conclude.
non-zeroThe thread is being woken up to do more work.

Definition at line 1153 of file threadpool.c.

References ALIVE, ast_cond_timedwait, ast_cond_wait, ast_debug, ast_tvnow(), worker_thread::cond, DEAD, ast_threadpool_options::idle_timeout, worker_thread::lock, worker_thread::options, worker_thread::pool, worker_thread::state, threadpool_idle_thread_dead(), and worker_thread::wake_up.

Referenced by worker_start().

1154 {
1155  struct timeval start = ast_tvnow();
1156  struct timespec end = {
1157  .tv_sec = start.tv_sec + worker->options.idle_timeout,
1158  .tv_nsec = start.tv_usec * 1000,
1159  };
1160  while (!worker->wake_up) {
1161  if (worker->options.idle_timeout <= 0) {
1162  ast_cond_wait(&worker->cond, &worker->lock);
1163  } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1164  break;
1165  }
1166  }
1167 
1168  if (!worker->wake_up) {
1169  ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1170  threadpool_idle_thread_dead(worker->pool, worker);
1171  worker->state = DEAD;
1172  }
1173  worker->wake_up = 0;
1174  return worker->state == ALIVE;
1175 }
struct ast_threadpool_options options
Definition: threadpool.c:159
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
ast_mutex_t lock
Definition: threadpool.c:149
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition: threadpool.c:332
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
char * end
Definition: eagi_proxy.c:73
enum worker_state state
Definition: threadpool.c:155
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
struct ast_threadpool * pool
Definition: threadpool.c:153
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
ast_cond_t cond
Definition: threadpool.c:147

◆ worker_set_state()

static int worker_set_state ( struct worker_thread worker,
enum worker_state  state 
)
static

Change a worker's state.

The threadpool calls into this function in order to let a worker know how it should proceed.

Return values
-1failure (state transition not permitted)
0success

Definition at line 1186 of file threadpool.c.

References ALIVE, ast_assert, ast_cond_signal, worker_thread::cond, DEAD, worker_thread::lock, lock, SCOPED_MUTEX, worker_thread::state, state, worker_thread::wake_up, and ZOMBIE.

Referenced by activate_thread(), worker_shutdown(), and zombify_threads().

1187 {
1188  SCOPED_MUTEX(lock, &worker->lock);
1189 
1190  switch (state) {
1191  case ALIVE:
1192  /* This can occur due to a race condition between being told to go active
1193  * and an idle timeout happening.
1194  */
1195  if (worker->state == DEAD) {
1196  return -1;
1197  }
1198  ast_assert(worker->state != ZOMBIE);
1199  break;
1200  case DEAD:
1201  break;
1202  case ZOMBIE:
1203  ast_assert(worker->state != DEAD);
1204  break;
1205  }
1206 
1207  worker->state = state;
1208  worker->wake_up = 1;
1209  ast_cond_signal(&worker->cond);
1210 
1211  return 0;
1212 }
enum sip_cc_notify_state state
Definition: chan_sip.c:960
ast_mutex_t lock
Definition: threadpool.c:149
#define ast_assert(a)
Definition: utils.h:710
#define ast_cond_signal(cond)
Definition: lock.h:201
enum worker_state state
Definition: threadpool.c:155
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
ast_cond_t cond
Definition: threadpool.c:147

◆ worker_shutdown()

static void worker_shutdown ( struct worker_thread worker)
static

shut a worker thread down

Set the worker dead and then wait for its thread to finish executing.

Parameters
workerThe worker thread to shut down

Definition at line 1009 of file threadpool.c.

References AST_PTHREADT_NULL, DEAD, NULL, worker_thread::thread, and worker_set_state().

Referenced by worker_thread_destroy().

1010 {
1011  worker_set_state(worker, DEAD);
1012  if (worker->thread != AST_PTHREADT_NULL) {
1013  pthread_join(worker->thread, NULL);
1014  worker->thread = AST_PTHREADT_NULL;
1015  }
1016 }
#define NULL
Definition: resample.c:96
#define AST_PTHREADT_NULL
Definition: lock.h:66
pthread_t thread
Definition: threadpool.c:151
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker&#39;s state.
Definition: threadpool.c:1186

◆ worker_start()

static void * worker_start ( void *  arg)
static

start point for worker threads

Worker threads start in the active state but may immediately go idle if there is no work to be done

Parameters
argThe worker thread
Return values
NULL

Definition at line 1044 of file threadpool.c.

References ALIVE, ast_mutex_lock, ast_mutex_unlock, worker_thread::lock, NULL, worker_thread::options, worker_thread::pool, worker_thread::state, ast_threadpool_options::thread_end, ast_threadpool_options::thread_start, threadpool_active_thread_idle(), threadpool_zombie_thread_dead(), worker_active(), worker_idle(), and ZOMBIE.

Referenced by worker_thread_start().

1045 {
1046  struct worker_thread *worker = arg;
1047  enum worker_state saved_state;
1048 
1049  if (worker->options.thread_start) {
1050  worker->options.thread_start();
1051  }
1052 
1053  ast_mutex_lock(&worker->lock);
1054  while (worker_idle(worker)) {
1055  ast_mutex_unlock(&worker->lock);
1056  worker_active(worker);
1057  ast_mutex_lock(&worker->lock);
1058  if (worker->state != ALIVE) {
1059  break;
1060  }
1061  threadpool_active_thread_idle(worker->pool, worker);
1062  }
1063  saved_state = worker->state;
1064  ast_mutex_unlock(&worker->lock);
1065 
1066  /* Reaching this portion means the thread is
1067  * on death's door. It may have been killed while
1068  * it was idle, in which case it can just die
1069  * peacefully. If it's a zombie, though, then
1070  * it needs to let the pool know so
1071  * that the thread can be removed from the
1072  * list of zombie threads.
1073  */
1074  if (saved_state == ZOMBIE) {
1075  threadpool_zombie_thread_dead(worker->pool, worker);
1076  }
1077 
1078  if (worker->options.thread_end) {
1079  worker->options.thread_end();
1080  }
1081  return NULL;
1082 }
worker_state
states for worker threads
Definition: threadpool.c:120
struct ast_threadpool_options options
Definition: threadpool.c:159
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition: threadpool.c:256
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Definition: threadpool.c:1124
ast_mutex_t lock
Definition: threadpool.c:149
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
enum worker_state state
Definition: threadpool.c:155
void(* thread_start)(void)
Function to call when a thread starts.
Definition: threadpool.h:117
void(* thread_end)(void)
Function to call when a thread ends.
Definition: threadpool.h:124
struct ast_threadpool * pool
Definition: threadpool.c:153
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition: threadpool.c:301
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
Definition: threadpool.c:1153
#define ast_mutex_unlock(a)
Definition: lock.h:188

◆ worker_thread_alloc()

static struct worker_thread * worker_thread_alloc ( struct ast_threadpool pool)
static

Allocate and initialize a new worker thread.

This will create, initialize, and start the thread.

Parameters
poolThe threadpool to which the worker will be added
Return values
NULLFailed to allocate or start the worker thread
non-NULLThe newly-created worker thread

Definition at line 1093 of file threadpool.c.

References ALIVE, ao2_alloc, ast_atomic_fetchadd_int(), ast_cond_init, ast_mutex_init, AST_PTHREADT_NULL, worker_thread::cond, worker_thread::id, worker_thread::lock, NULL, ast_threadpool::options, worker_thread::options, worker_thread::pool, worker_thread::state, worker_thread::thread, and worker_thread_destroy().

Referenced by grow().

1094 {
1095  struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1096  if (!worker) {
1097  return NULL;
1098  }
1100  ast_mutex_init(&worker->lock);
1101  ast_cond_init(&worker->cond, NULL);
1102  worker->pool = pool;
1103  worker->thread = AST_PTHREADT_NULL;
1104  worker->state = ALIVE;
1105  worker->options = pool->options;
1106  return worker;
1107 }
struct ast_threadpool_options options
Definition: threadpool.c:159
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Definition: threadpool.c:1025
struct ast_threadpool_options options
Definition: threadpool.c:100
ast_mutex_t lock
Definition: threadpool.c:149
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
enum worker_state state
Definition: threadpool.c:155
#define AST_PTHREADT_NULL
Definition: lock.h:66
struct ast_threadpool * pool
Definition: threadpool.c:153
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
pthread_t thread
Definition: threadpool.c:151
static int worker_id_counter
Definition: threadpool.c:984
#define ast_mutex_init(pmutex)
Definition: lock.h:184
ast_cond_t cond
Definition: threadpool.c:147

◆ worker_thread_cmp()

static int worker_thread_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 993 of file threadpool.c.

References CMP_MATCH, and worker_thread::id.

Referenced by threadpool_alloc().

994 {
995  struct worker_thread *worker1 = obj;
996  struct worker_thread *worker2 = arg;
997 
998  return worker1->id == worker2->id ? CMP_MATCH : 0;
999 }

◆ worker_thread_destroy()

static void worker_thread_destroy ( void *  obj)
static

Worker thread destructor.

Called automatically when refcount reaches 0. Shuts down the worker thread and destroys its component parts

Definition at line 1025 of file threadpool.c.

References ast_cond_destroy, ast_debug, ast_mutex_destroy, worker_thread::cond, worker_thread::id, worker_thread::lock, and worker_shutdown().

Referenced by worker_thread_alloc().

1026 {
1027  struct worker_thread *worker = obj;
1028  ast_debug(3, "Destroying worker thread %d\n", worker->id);
1029  worker_shutdown(worker);
1030  ast_mutex_destroy(&worker->lock);
1031  ast_cond_destroy(&worker->cond);
1032 }
ast_mutex_t lock
Definition: threadpool.c:149
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_cond_destroy(cond)
Definition: lock.h:200
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
Definition: threadpool.c:1009
#define ast_mutex_destroy(a)
Definition: lock.h:186
ast_cond_t cond
Definition: threadpool.c:147

◆ worker_thread_hash()

static int worker_thread_hash ( const void *  obj,
int  flags 
)
static

Definition at line 986 of file threadpool.c.

References worker_thread::id.

Referenced by threadpool_alloc().

987 {
988  const struct worker_thread *worker = obj;
989 
990  return worker->id;
991 }

◆ worker_thread_start()

static int worker_thread_start ( struct worker_thread worker)
static

Definition at line 1109 of file threadpool.c.

References ast_pthread_create, NULL, worker_thread::thread, and worker_start().

Referenced by grow().

1110 {
1111  return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1112 }
#define NULL
Definition: resample.c:96
static void * worker_start(void *arg)
start point for worker threads
Definition: threadpool.c:1044
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
pthread_t thread
Definition: threadpool.c:151

◆ zombify_threads()

static int zombify_threads ( void *  obj,
void *  arg,
void *  data,
int  flags 
)
static

ao2 callback to zombify a set number of threads.

Threads will be zombified as long as the counter has not reached zero. The counter is decremented with each thread that is zombified.

Zombifying a thread involves removing it from its current container, adding it to the zombie container, and changing the state of the worker to a zombie

This callback is called from the threadpool control taskprocessor thread.

Parameters
objThe worker thread that may be zombified
argThe pool to which the worker belongs
dataThe counter
flagsUnused
Return values
CMP_MATCHThe zombified thread should be removed from its current container
CMP_STOPStop attempting to zombify threads

Definition at line 745 of file threadpool.c.

References ao2_link, ast_log, CMP_MATCH, CMP_STOP, worker_thread::id, LOG_WARNING, worker_set_state(), ZOMBIE, and ast_threadpool::zombie_threads.

Referenced by shrink().

746 {
747  struct worker_thread *worker = obj;
748  struct ast_threadpool *pool = arg;
749  int *num_to_zombify = data;
750 
751  if ((*num_to_zombify)-- > 0) {
752  if (!ao2_link(pool->zombie_threads, worker)) {
753  ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
754  return 0;
755  }
756  worker_set_state(worker, ZOMBIE);
757  return CMP_MATCH;
758  } else {
759  return CMP_STOP;
760  }
761 }
#define LOG_WARNING
Definition: logger.h:274
#define ast_log
Definition: astobj2.c:42
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker&#39;s state.
Definition: threadpool.c:1186
An opaque threadpool structure.
Definition: threadpool.c:36
#define ao2_link(container, obj)
Definition: astobj2.h:1549

Variable Documentation

◆ serializer_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static
Initial value:
= {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
}
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1375
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1381
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: threadpool.c:1363

Definition at line 1391 of file threadpool.c.

◆ threadpool_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
static

Table of taskprocessor listener callbacks for threadpool's main taskprocessor.

Definition at line 695 of file threadpool.c.

◆ worker_id_counter

int worker_id_counter
static

A monotonically increasing integer used for worker thread identification.

Definition at line 984 of file threadpool.c.