28#define THREAD_BUCKETS 89
407 struct ast_str *control_tps_name;
411 if (!pool || !control_tps_name) {
420 if (!pool->control_tps) {
425 if (!pool->active_threads) {
430 if (!pool->idle_threads) {
435 if (!pool->zombie_threads) {
507 ast_debug(1,
"Failed to activate thread %d. It is dead\n",
536 ast_debug(3,
"Increasing threadpool %s's size by %d\n",
539 for (i = 0; i < delta; ++i) {
570 unsigned int existing_active;
717 int *num_to_kill = arg;
719 if (*num_to_kill > 0) {
750 int *num_to_zombify = data;
752 if ((*num_to_zombify)-- > 0) {
754 ast_log(
LOG_WARNING,
"Failed to zombify active thread %d. Thread will remain active\n", worker->
id);
785 int active_threads_to_zombify = delta - idle_threads_to_kill;
787 ast_debug(3,
"Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
793 ast_debug(3,
"Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
843 unsigned int num_threads = ssd->
size;
851 if (current_size == num_threads) {
852 ast_debug(3,
"Not changing threadpool size since new size %u is the same as current %u\n",
853 num_threads, current_size);
857 if (current_size < num_threads) {
864 grow(pool, num_threads - current_size);
868 shrink(pool, current_size - num_threads);
941 sprintf(fullname,
"%s/pool",
name);
1029 ast_debug(3,
"Destroying worker thread %d\n", worker->
id);
1063 saved_state = worker->
state;
1074 if (saved_state ==
ZOMBIE) {
1156 struct timespec
end = {
1158 .tv_nsec = start.tv_usec * 1000,
1169 ast_debug(1,
"Worker thread idle timeout reached. Dying.\n");
1255 struct timeval start;
1256 struct timespec
end;
1259 end.tv_sec = start.tv_sec + timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
#define ast_malloc(len)
A wrapper for malloc()
#define ao2_link(container, obj)
Add an object to a container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container,...
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_callback_data(container, flags, cb_fn, arg, data)
#define ao2_unlink(container, obj)
Remove an object from a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
#define ast_debug(level,...)
Log a DEBUG message.
#define ast_cond_destroy(cond)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_cond_wait(cond, mutex)
#define AST_PTHREADT_NULL
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define ast_cond_signal(cond)
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Structure for mutex and tracking information.
Support for dynamic strings.
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
listener for a threadpool
const struct ast_threadpool_listener_callbacks * callbacks
void(* thread_start)(void)
Function to call when a thread starts.
int idle_timeout
Time limit in seconds for idle threads.
int max_size
Maximum number of threads a pool may have.
void(* thread_end)(void)
Function to call when a thread ends.
int auto_increment
Number of threads to increment pool by.
An opaque threadpool structure.
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks.
struct ast_taskprocessor * tps
The main taskprocessor.
struct ast_taskprocessor * control_tps
The control taskprocessor.
struct ast_threadpool_options options
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
struct ast_threadpool_listener * listener
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks.
struct ast_threadpool_options options
struct ast_threadpool * pool
struct ast_serializer_shutdown_group * shutdown_group
struct ast_threadpool * pool
Helper struct used for queued operations that change the size of the threadpool.
struct ast_threadpool * pool
helper used for queued task when tasks are pushed
struct ast_threadpool * pool
Struct used for queued operations involving worker state changes.
struct ast_threadpool * pool
struct worker_thread * worker
struct ast_threadpool_options options
struct ast_threadpool * pool
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static int task(void *data)
Queued task for baseline test.
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
static int worker_thread_hash(const void *obj, int flags)
static int worker_thread_start(struct worker_thread *worker)
static void serializer_shutdown_group_dtor(void *vdoomed)
static void serializer_dtor(void *obj)
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static int queued_set_size(void *data)
Change the size of the threadpool.
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool's task queue.
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
worker_state
states for worker threads
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
AST_THREADSTORAGE_RAW(current_serializer)
static int worker_id_counter
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
static int queued_idle_thread_dead(void *data)
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
static int worker_thread_cmp(void *obj, void *arg, int flags)
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
static void worker_thread_destroy(void *obj)
Worker thread destructor.
static int execute_tasks(void *data)
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
static int serializer_start(struct ast_taskprocessor_listener *listener)
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
static void * worker_start(void *arg)
start point for worker threads
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
#define AST_THREADPOOL_OPTIONS_VERSION
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define ast_pthread_create(a, b, c, d)