80#define TASKPOOL_GROW_THRESHOLD (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
221#define TASKPROCESSOR_IS_IDLE(tps, timeout) (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
268 *growth_threshold_reached = 1;
294 *growth_threshold_reached = 1;
299 *growth_threshold_reached = 0;
312 *growth_threshold_reached = 1;
346 ast_log(
LOG_WARNING,
"Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
352 ast_log(
LOG_WARNING,
"Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
356 if (!
options->auto_increment) {
359 ast_log(
LOG_WARNING,
"Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n",
name);
368 ast_log(
LOG_WARNING,
"Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
371 }
else if (!
options->growth_threshold) {
380 ast_log(
LOG_WARNING,
"Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
462#define TASKPOOL_QUEUE_SIZE_ADD(tps, size) (size += ast_taskprocessor_size(tps->taskprocessor))
497 for (i = 0; i < num_to_add; i++) {
501 if (!new_taskprocessor) {
506 ao2_ref(new_taskprocessor, -1);
529 unsigned int growth_threshold_reached = 0;
540 if (growth_threshold_reached) {
716 size_t remaining, requeue = 0;
Prototypes for public functions only of internal interest,.
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
#define ao2_alloc(data_size, destructor_fn)
#define ast_cond_destroy(cond)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
pthread_cond_t ast_cond_t
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define ast_cond_signal(cond)
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
static int sync_task(void *data)
Scheduler Routines (derived from cheops)
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
Structure for mutex and tracking information.
void(* thread_start)(void)
Function to call when a taskprocessor starts.
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
int max_size
Maximum number of taskprocessors a pool may have.
void(* thread_end)(void)
Function to call when a taskprocessor ends.
int auto_increment
Number of taskprocessors to increment the pool by.
int growth_threshold
The threshold for when to grow the pool.
int minimum_size
Number of taskprocessors that will always exist.
int initial_size
Number of taskprocessors the pool will start with.
An opaque taskpool structure.
taskpool_selector selector
struct ast_taskpool_options options
struct taskpool_taskprocessors static_taskprocessors
struct taskpool_taskprocessors dynamic_taskprocessors
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
A listener for taskprocessors.
struct ast_taskprocessor * tps
A ast_taskprocessor structure is a singleton by name.
struct ast_taskpool * pool
struct ast_serializer_shutdown_group * shutdown_group
A taskpool taskprocessor.
struct ast_taskprocessor * taskprocessor
struct timeval last_pushed
A container of taskprocessors.
struct taskpool_taskprocessors::@413 taskprocessors
unsigned int taskprocessor_num
AST_THREADSTORAGE_RAW(current_taskpool_pool)
Thread storage for the current taskpool.
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
static int taskpool_sync_task(void *data)
static struct ast_sched_context * sched
Scheduler used for dynamic pool shrinking.
static int taskpool_taskprocessor_start(void *data)
static void serializer_dtor(void *obj)
size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool)
Get the current number of taskprocessors in the taskpool.
void(* taskpool_selector)(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
static void taskpool_shutdown(void)
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
int ast_taskpool_push_wait(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool, and wait for completion.
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
static int taskpool_dynamic_pool_shrink(const void *data)
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
int ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool.
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
static void taskpool_taskprocessor_dtor(void *obj)
int ast_taskpool_init(void)
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
static struct ast_taskpool * ast_taskpool_get_current(void)
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
static int taskpool_taskprocessor_stop(void *data)
static int execute_tasks(void *data)
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
#define TASKPROCESSOR_IS_IDLE(tps, timeout)
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
static int serializer_start(struct ast_taskprocessor_listener *listener)
int ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int(*task)(void *data), void *data)
Push a task to a serializer, and wait for completion.
static int taskpool_serializer_empty_task(void *data)
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
long ast_taskpool_queue_size(struct ast_taskpool *pool)
Get the current number of queued tasks in the taskpool.
#define AST_TASKPOOL_OPTIONS_VERSION
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
@ AST_TASKPOOL_SELECTOR_DEFAULT
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct ast_taskprocessor_listener * ast_taskprocessor_listener(struct ast_taskprocessor *tps)
Return the listener associated with the taskprocessor.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
static int task(void *data)
Queued task for baseline test.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
Time-related functions and macros.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove all elements from a vector that matches the given comparison.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
#define AST_VECTOR(name, type)
Define a vector structure.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.