Asterisk - The Open Source Telephony Project GIT-master-77d630f
Data Structures | Macros | Typedefs | Functions | Variables
taskpool.c File Reference
#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/taskpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer_shutdown_group.h"
#include "asterisk/utils.h"
#include "asterisk/time.h"
#include "asterisk/sched.h"
Include dependency graph for taskpool.c:

Go to the source code of this file.

Data Structures

struct  ast_taskpool
 An opaque taskpool structure. More...
 
struct  serializer
 
struct  taskpool_sync_task
 
struct  taskpool_taskprocessor
 A taskpool taskprocessor. More...
 
struct  taskpool_taskprocessors
 A container of taskprocessors. More...
 

Macros

#define TASKPOOL_GROW_THRESHOLD   (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
 The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold) More...
 
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)   (size += ast_taskprocessor_size(tps->taskprocessor))
 
#define TASKPROCESSOR_IS_IDLE(tps, timeout)   (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
 

Typedefs

typedef void(* taskpool_selector) (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 

Functions

struct ast_taskpoolast_taskpool_create (const char *name, const struct ast_taskpool_options *options)
 Create a new taskpool. More...
 
static struct ast_taskpoolast_taskpool_get_current (void)
 
int ast_taskpool_init (void)
 
int ast_taskpool_push (struct ast_taskpool *pool, int(*task)(void *data), void *data)
 Push a task to the taskpool. More...
 
int ast_taskpool_push_wait (struct ast_taskpool *pool, int(*task)(void *data), void *data)
 Push a task to the taskpool, and wait for completion. More...
 
long ast_taskpool_queue_size (struct ast_taskpool *pool)
 Get the current number of queued tasks in the taskpool. More...
 
struct ast_taskprocessorast_taskpool_serializer (const char *name, struct ast_taskpool *pool)
 Serialized execution of tasks within a ast_taskpool. More...
 
struct ast_taskprocessorast_taskpool_serializer_get_current (void)
 Get the taskpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_taskpool_serializer_group (const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_taskpool. More...
 
int ast_taskpool_serializer_push_wait (struct ast_taskprocessor *serializer, int(*task)(void *data), void *data)
 Push a task to a serializer, and wait for completion. More...
 
void ast_taskpool_shutdown (struct ast_taskpool *pool)
 Shut down a taskpool and remove the underlying taskprocessors. More...
 
size_t ast_taskpool_taskprocessors_count (struct ast_taskpool *pool)
 Get the current number of taskprocessors in the taskpool. More...
 
 AST_THREADSTORAGE_RAW (current_taskpool_pool)
 Thread storage for the current taskpool. More...
 
 AST_THREADSTORAGE_RAW (current_taskpool_serializer)
 
static int execute_tasks (void *data)
 
static struct serializerserializer_create (struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_dtor (void *obj)
 
static void serializer_shutdown (struct ast_taskprocessor_listener *listener)
 
static int serializer_start (struct ast_taskprocessor_listener *listener)
 
static void serializer_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void taskpool_dynamic_pool_grow (struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
 
static int taskpool_dynamic_pool_shrink (const void *data)
 
static void taskpool_least_full_selector (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 Least full taskprocessor selector. More...
 
static void taskpool_sequential_selector (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 
static int taskpool_serializer_empty_task (void *data)
 
static void taskpool_shutdown (void)
 
static int taskpool_sync_task (void *data)
 
static void taskpool_sync_task_cleanup (struct taskpool_sync_task *sync_task)
 
static int taskpool_sync_task_init (struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
 
static struct taskpool_taskprocessortaskpool_taskprocessor_alloc (struct ast_taskpool *pool, char type)
 
static void taskpool_taskprocessor_dtor (void *obj)
 
static int taskpool_taskprocessor_start (void *data)
 
static int taskpool_taskprocessor_stop (void *data)
 
static void taskpool_taskprocessors_cleanup (struct taskpool_taskprocessors *taskprocessors)
 
static int taskpool_taskprocessors_init (struct taskpool_taskprocessors *taskprocessors, unsigned int size)
 

Variables

static struct ast_sched_contextsched
 Scheduler used for dynamic pool shrinking. More...
 
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
 

Macro Definition Documentation

◆ TASKPOOL_GROW_THRESHOLD

#define TASKPOOL_GROW_THRESHOLD   (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10

The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold)

Definition at line 80 of file taskpool.c.

◆ TASKPOOL_QUEUE_SIZE_ADD

#define TASKPOOL_QUEUE_SIZE_ADD (   tps,
  size 
)    (size += ast_taskprocessor_size(tps->taskprocessor))

Definition at line 462 of file taskpool.c.

◆ TASKPROCESSOR_IS_IDLE

#define TASKPROCESSOR_IS_IDLE (   tps,
  timeout 
)    (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))

Definition at line 221 of file taskpool.c.

Typedef Documentation

◆ taskpool_selector

typedef void(* taskpool_selector) (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)

Definition at line 51 of file taskpool.c.

Function Documentation

◆ ast_taskpool_create()

struct ast_taskpool * ast_taskpool_create ( const char *  name,
const struct ast_taskpool_options options 
)

Create a new taskpool.

Since
23.1.0
22.7.0
20.17.0

This function creates a taskpool. Tasks may be pushed onto this task pool and will be automatically acted upon by taskprocessors within the pool.

Only a single taskpool with a given name may exist. This function will fail if a taskpool with the given name already exists.

Parameters
nameThe unique name for the taskpool
optionsThe behavioral options for this taskpool
Return values
NULLFailed to create the taskpool
non-NULLThe newly-created taskpool
Note
The ast_taskpool_shutdown function must be called to shut down the taskpool and clean up underlying resources fully.

Definition at line 324 of file taskpool.c.

326{
327 struct ast_taskpool *pool;
328
329 /* Enforce versioning on the passed-in options */
330 if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
331 return NULL;
332 }
333
334 pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
335 if (!pool) {
336 return NULL;
337 }
338
339 strcpy(pool->name, name); /* Safe */
340 memcpy(&pool->options, options, sizeof(pool->options));
341 pool->shrink_sched_id = -1;
342
343 /* Verify the passed-in options are valid, and adjust if needed */
344 if (options->initial_size < options->minimum_size) {
345 pool->options.initial_size = options->minimum_size;
346 ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
347 name, options->initial_size, options->minimum_size, options->minimum_size);
348 }
349
350 if (options->max_size && pool->options.initial_size > options->max_size) {
351 pool->options.max_size = pool->options.initial_size;
352 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
353 name, options->max_size, pool->options.initial_size, pool->options.initial_size);
354 }
355
356 if (!options->auto_increment) {
357 if (!pool->options.minimum_size) {
358 pool->options.minimum_size = 1;
359 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
360 }
361 if (!pool->options.max_size) {
362 pool->options.max_size = pool->options.minimum_size;
363 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
364 }
365 if (pool->options.minimum_size != pool->options.max_size) {
366 pool->options.minimum_size = pool->options.max_size;
367 pool->options.initial_size = pool->options.max_size;
368 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
369 name, options->minimum_size, pool->options.max_size, pool->options.max_size);
370 }
371 } else if (!options->growth_threshold) {
373 }
374
377 } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
379 } else {
380 ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
381 name, options->selector);
383 }
384
386 ao2_ref(pool, -1);
387 return NULL;
388 }
389
390 /* Create the static taskprocessors based on the passed-in options */
391 for (int i = 0; i < pool->options.minimum_size; i++) {
393
395 if (!taskprocessor) {
396 /* The reference to pool is passed to ast_taskpool_shutdown */
398 return NULL;
399 }
400
403 /* The reference to pool is passed to ast_taskpool_shutdown */
405 return NULL;
406 }
407 }
408
410 pool->options.initial_size - pool->options.minimum_size)) {
412 return NULL;
413 }
414
415 /* Create the dynamic taskprocessor based on the passed-in options */
416 for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
418
420 if (!taskprocessor) {
421 /* The reference to pool is passed to ast_taskpool_shutdown */
423 return NULL;
424 }
425
428 /* The reference to pool is passed to ast_taskpool_shutdown */
430 return NULL;
431 }
432 }
433
434 /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
435 * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
436 * and to reduce complexity.
437 */
438 if (options->idle_timeout && options->auto_increment) {
440 if (pool->shrink_sched_id < 0) {
441 ao2_ref(pool, -1);
442 /* The second reference to pool is passed to ast_taskpool_shutdown */
444 return NULL;
445 }
446 }
447
448 return pool;
449}
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char name[]
Definition: format_mp3.c:68
#define LOG_WARNING
#define NULL
Definition: resample.c:96
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
Definition: sched.c:567
int max_size
Maximum number of taskprocessors a pool may have.
Definition: taskpool.h:115
int growth_threshold
The threshold for when to grow the pool.
Definition: taskpool.h:124
int minimum_size
Number of taskprocessors that will always exist.
Definition: taskpool.h:92
int initial_size
Number of taskprocessors the pool will start with.
Definition: taskpool.h:102
An opaque taskpool structure.
Definition: taskpool.c:62
taskpool_selector selector
Definition: taskpool.c:74
struct ast_taskpool_options options
Definition: taskpool.c:70
int shrink_sched_id
Definition: taskpool.c:72
struct taskpool_taskprocessors static_taskprocessors
Definition: taskpool.c:64
struct taskpool_taskprocessors dynamic_taskprocessors
Definition: taskpool.c:66
char name[0]
Definition: taskpool.c:76
Definition: sched.c:76
A taskpool taskprocessor.
Definition: taskpool.c:34
struct ast_taskprocessor * taskprocessor
Definition: taskpool.c:36
struct taskpool_taskprocessors::@413 taskprocessors
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
Definition: taskpool.c:287
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition: taskpool.c:653
static int taskpool_dynamic_pool_shrink(const void *data)
Definition: taskpool.c:226
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
Definition: taskpool.c:193
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
Definition: taskpool.c:152
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition: taskpool.c:262
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
Definition: taskpool.c:80
#define AST_TASKPOOL_OPTIONS_VERSION
Definition: taskpool.h:69
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
Definition: taskpool.h:65
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
Definition: taskpool.h:64
@ AST_TASKPOOL_SELECTOR_DEFAULT
Definition: taskpool.h:63
static struct test_options options
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267

References ao2_alloc, ao2_bump, ao2_ref, ast_log, ast_sched_add(), AST_TASKPOOL_OPTIONS_VERSION, AST_TASKPOOL_SELECTOR_DEFAULT, AST_TASKPOOL_SELECTOR_LEAST_FULL, AST_TASKPOOL_SELECTOR_SEQUENTIAL, ast_taskpool_shutdown(), AST_VECTOR_APPEND, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::growth_threshold, ast_taskpool_options::initial_size, LOG_WARNING, ast_taskpool_options::max_size, ast_taskpool_options::minimum_size, ast_taskpool::name, name, NULL, ast_taskpool::options, options, ast_taskpool::selector, ast_taskpool::shrink_sched_id, ast_taskpool::static_taskprocessors, taskpool_dynamic_pool_shrink(), TASKPOOL_GROW_THRESHOLD, taskpool_least_full_selector(), taskpool_sequential_selector(), taskpool_taskprocessor_alloc(), taskpool_taskprocessors_init(), taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), and stasis_init().

◆ ast_taskpool_get_current()

static struct ast_taskpool * ast_taskpool_get_current ( void  )
static

Definition at line 92 of file taskpool.c.

93{
94 return ast_threadstorage_get_ptr(&current_taskpool_pool);
95}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by ast_taskpool_push_wait(), ast_taskpool_serializer_push_wait(), execute_tasks(), and taskpool_taskprocessor_stop().

◆ ast_taskpool_init()

int ast_taskpool_init ( void  )

Provided by taskpool.c

Definition at line 931 of file taskpool.c.

932{
934 if (!sched) {
935 return -1;
936 }
937
939 return -1;
940 }
941
943
944 return 0;
945}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition: sched.c:197
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition: sched.c:238
static void taskpool_shutdown(void)
Definition: taskpool.c:923

References ast_register_cleanup(), ast_sched_context_create(), ast_sched_start_thread(), and taskpool_shutdown().

Referenced by asterisk_daemon().

◆ ast_taskpool_push()

int ast_taskpool_push ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the taskpool.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 522 of file taskpool.c.

523{
524 RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
525
526 /* Select the taskprocessor in the pool to use for pushing this task */
527 ao2_lock(pool);
528 if (!pool->shutting_down) {
529 unsigned int growth_threshold_reached = 0;
530
531 /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
532 * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
533 * fails for some reason, it will still fall back to the initially found static one if
534 * it is present.
535 */
536 pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
537 if (pool->options.auto_increment && growth_threshold_reached) {
538 /* If we need to grow then try dynamic taskprocessors */
539 pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
540 if (growth_threshold_reached) {
541 /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
542 taskpool_dynamic_pool_grow(pool, &taskprocessor);
543 }
544
545 /* If a dynamic taskprocessor was used update its last push time */
546 if (taskprocessor) {
547 taskprocessor->last_pushed = ast_tvnow();
548 }
549 }
550 ao2_bump(taskprocessor);
551 }
552 ao2_unlock(pool);
553
554 if (!taskprocessor) {
555 return -1;
556 }
557
558 if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
559 return -1;
560 }
561
562 return 0;
563}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
int auto_increment
Number of taskprocessors to increment the pool by.
Definition: taskpool.h:85
int shutting_down
Definition: taskpool.c:68
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
Definition: taskpool.c:479
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978

References ao2_bump, ao2_cleanup, ao2_lock, ao2_unlock, ast_taskprocessor_push(), ast_tvnow(), ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, NULL, ast_taskpool::options, RAII_VAR, ast_taskpool::selector, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, task(), taskpool_dynamic_pool_grow(), and taskpool_taskprocessor::taskprocessor.

Referenced by ast_taskpool_push_wait(), AST_TEST_DEFINE(), efficiency_task(), execute_tasks(), handle_cli_taskpool_push_efficiency(), and serializer_task_pushed().

◆ ast_taskpool_push_wait()

int ast_taskpool_push_wait ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the taskpool, and wait for completion.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 623 of file taskpool.c.

624{
626
627 /* If we are already executing within a taskpool taskprocessor then
628 * don't bother pushing a new task, just directly execute the task.
629 */
631 return task(data);
632 }
633
635 return -1;
636 }
637
640 return -1;
641 }
642
644 while (!sync_task.complete) {
645 ast_cond_wait(&sync_task.cond, &sync_task.lock);
646 }
648
650 return sync_task.fail;
651}
#define ast_cond_wait(cond, mutex)
Definition: lock.h:212
#define ast_mutex_unlock(a)
Definition: lock.h:197
#define ast_mutex_lock(a)
Definition: lock.h:196
static int sync_task(void *data)
Definition: res_pjsip.c:2117
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
Definition: taskpool.c:594
int ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool.
Definition: taskpool.c:522
static struct ast_taskpool * ast_taskpool_get_current(void)
Definition: taskpool.c:92
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
Definition: taskpool.c:580

References ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_push(), sync_task(), task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by AST_TEST_DEFINE().

◆ ast_taskpool_queue_size()

long ast_taskpool_queue_size ( struct ast_taskpool pool)

Get the current number of queued tasks in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of queued tasks in the taskpool

Definition at line 464 of file taskpool.c.

465{
466 long queue_size = 0;
467
468 ao2_lock(pool);
471 ao2_unlock(pool);
472
473 return queue_size;
474}
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
Definition: taskpool.c:462
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:873

References ao2_lock, ao2_unlock, AST_VECTOR_CALLBACK_VOID, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, and taskpool_taskprocessors::taskprocessors.

◆ ast_taskpool_serializer()

struct ast_taskprocessor * ast_taskpool_serializer ( const char *  name,
struct ast_taskpool pool 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 819 of file taskpool.c.

820{
822}
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:789

References ast_taskpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), handle_cli_taskpool_push_serializer_efficiency(), and internal_stasis_subscribe().

◆ ast_taskpool_serializer_get_current()

struct ast_taskprocessor * ast_taskpool_serializer_get_current ( void  )

Get the taskpool serializer currently associated with this thread.

Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 784 of file taskpool.c.

785{
786 return ast_threadstorage_get_ptr(&current_taskpool_serializer);
787}

References ast_threadstorage_get_ptr().

Referenced by ast_taskpool_serializer_push_wait(), requeue_task(), serializer_efficiency_task(), and simple_task().

◆ ast_taskpool_serializer_group()

struct ast_taskprocessor * ast_taskpool_serializer_group ( const char *  name,
struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 789 of file taskpool.c.

791{
792 struct serializer *ser;
794 struct ast_taskprocessor *tps;
795
797 if (!ser) {
798 return NULL;
799 }
800
802 if (!listener) {
803 ao2_ref(ser, -1);
804 return NULL;
805 }
806
808 if (!tps) {
809 /* ser ref transferred to listener but not cleaned without tps */
810 ao2_ref(ser, -1);
811 } else if (shutdown_group) {
813 }
814
815 ao2_ref(listener, -1);
816 return tps;
817}
static void * listener(void *unused)
Definition: asterisk.c:1530
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: taskpool.c:778
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: taskpool.c:691
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_taskpool_create(), and ast_taskpool_serializer().

◆ ast_taskpool_serializer_push_wait()

int ast_taskpool_serializer_push_wait ( struct ast_taskprocessor serializer,
int(*)(void *data)  task,
void *  data 
)

Push a task to a serializer, and wait for completion.

Since
23.1.0
22.7.0
20.17.0
Parameters
serializerThe serializer to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 832 of file taskpool.c.

833{
836 struct ast_taskprocessor *prior_serializer;
838
839 /* If not in a taskpool taskprocessor we can just queue the task like normal and
840 * wait. */
843 return -1;
844 }
845
848 return -1;
849 }
850
852 while (!sync_task.complete) {
853 ast_cond_wait(&sync_task.cond, &sync_task.lock);
854 }
856
858 return sync_task.fail;
859 }
860
861 /* It is possible that we are already executing within a serializer, so stash the existing
862 * away so we can restore it.
863 */
864 prior_serializer = ast_taskpool_serializer_get_current();
865
866 ao2_lock(ser);
867
868 /* There are two cases where we can or have to directly execute this task:
869 * 1. There are no other tasks in the serializer
870 * 2. We are already in the serializer
871 * In the second case if we don't execute the task now, we will deadlock waiting
872 * on it as it will never occur.
873 */
874 if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
875 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
876 sync_task.fail = task(data);
877 ao2_unlock(ser);
878 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
879 return sync_task.fail;
880 }
881
883 ao2_unlock(ser);
884 return -1;
885 }
886
887 /* First we queue the serialized task */
890 ao2_unlock(ser);
891 return -1;
892 }
893
894 /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
895 * stops two tasks from being queued for the same serializer at the same time.
896 */
899 ao2_unlock(ser);
900 return -1;
901 }
902
903 /* Now we execute the tasks on the serializer until our sync task is complete */
904 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
905 while (!sync_task.complete) {
906 /* The sync task is guaranteed to be executed, so doing a while loop on the complete
907 * flag is safe.
908 */
910 }
912 ao2_unlock(ser);
913
914 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
915
916 return sync_task.fail;
917}
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition: taskpool.c:784
static int taskpool_serializer_empty_task(void *data)
Definition: taskpool.c:827
struct ast_taskprocessor_listener * ast_taskprocessor_listener(struct ast_taskprocessor *tps)
Return the listener associated with the taskprocessor.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References ao2_lock, ao2_unlock, ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_get_current(), ast_taskprocessor_execute(), ast_taskprocessor_listener(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_taskprocessor_size(), ast_threadstorage_set_ptr(), listener(), NULL, sync_task(), task(), taskpool_serializer_empty_task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by AST_TEST_DEFINE(), and requeue_task().

◆ ast_taskpool_shutdown()

void ast_taskpool_shutdown ( struct ast_taskpool pool)

Shut down a taskpool and remove the underlying taskprocessors.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe pool to shut down
Note
This will decrement the reference to the pool

Definition at line 653 of file taskpool.c.

654{
655 if (!pool) {
656 return;
657 }
658
659 /* Mark this pool as shutting down so nothing new is pushed */
660 ao2_lock(pool);
661 pool->shutting_down = 1;
662 ao2_unlock(pool);
663
664 /* Stop the shrink scheduled item if present */
666
667 /* Clean up all the taskprocessors */
670
671 ao2_ref(pool, -1);
672}
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
Definition: sched.h:82
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
Definition: taskpool.c:206

References ao2_lock, ao2_ref, ao2_unlock, AST_SCHED_DEL_UNREF, ast_taskpool::dynamic_taskprocessors, ast_taskpool::shrink_sched_id, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors_cleanup().

Referenced by ast_taskpool_create(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), and stasis_cleanup().

◆ ast_taskpool_taskprocessors_count()

size_t ast_taskpool_taskprocessors_count ( struct ast_taskpool pool)

Get the current number of taskprocessors in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of taskprocessors in the taskpool

Definition at line 451 of file taskpool.c.

452{
453 size_t count;
454
455 ao2_lock(pool);
457 ao2_unlock(pool);
458
459 return count;
460}
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620

References ao2_lock, ao2_unlock, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE().

◆ AST_THREADSTORAGE_RAW() [1/2]

AST_THREADSTORAGE_RAW ( current_taskpool_pool  )

Thread storage for the current taskpool.

◆ AST_THREADSTORAGE_RAW() [2/2]

AST_THREADSTORAGE_RAW ( current_taskpool_serializer  )

◆ execute_tasks()

static int execute_tasks ( void *  data)
static

Definition at line 710 of file taskpool.c.

711{
712 struct ast_taskpool *pool = ast_taskpool_get_current();
713 struct ast_taskprocessor *tps = data;
716 size_t remaining, requeue = 0;
717
718 /* In a normal scenario this lock will not be in contention with
719 * anything else. It is only if a synchronous task is pushed to
720 * the serializer that it may be blocked on the synchronous
721 * task thread. This is done to ensure that only one thread is executing
722 * tasks from the serializer at a given time, and not out of order
723 * either.
724 */
725 ao2_lock(ser);
726
727 ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
728 for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
729 requeue = ast_taskprocessor_execute(tps);
730 }
731 ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
732
733 ao2_unlock(ser);
734
735 /* If there are remaining tasks we requeue, this way the serializer
736 * does not hold exclusivity of the taskpool taskprocessor
737 */
738 if (requeue) {
739 /* Ownership passes to the new task */
742 }
743 } else {
745 }
746
747 return 0;
748}
struct ast_taskprocessor * tps
struct ast_taskpool * pool
Definition: taskpool.c:676
static int execute_tasks(void *data)
Definition: taskpool.c:710
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskpool_get_current(), ast_taskpool_push(), ast_taskprocessor_execute(), ast_taskprocessor_listener(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_size(), ast_taskprocessor_unreference(), ast_threadstorage_set_ptr(), execute_tasks(), listener(), NULL, serializer::pool, and ast_taskprocessor_listener::tps.

Referenced by execute_tasks(), and serializer_task_pushed().

◆ serializer_create()

static struct serializer * serializer_create ( struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)
static

Definition at line 691 of file taskpool.c.

693{
694 struct serializer *ser;
695
696 /* This object has a lock so it can be used to ensure exclusive access
697 * to the execution of tasks within the serializer.
698 */
699 ser = ao2_alloc(sizeof(*ser), serializer_dtor);
700 if (!ser) {
701 return NULL;
702 }
703 ser->pool = ao2_bump(pool);
705 return ser;
706}
struct ast_serializer_shutdown_group * shutdown_group
Definition: taskpool.c:678
static void serializer_dtor(void *obj)
Definition: taskpool.c:681

References ao2_alloc, ao2_bump, NULL, serializer::pool, serializer_dtor(), serializer::shutdown_group, and shutdown_group.

Referenced by ast_taskpool_serializer_group().

◆ serializer_dtor()

static void serializer_dtor ( void *  obj)
static

Definition at line 681 of file taskpool.c.

682{
683 struct serializer *ser = obj;
684
685 ao2_cleanup(ser->pool);
686 ser->pool = NULL;
688 ser->shutdown_group = NULL;
689}

References ao2_cleanup, NULL, serializer::pool, and serializer::shutdown_group.

Referenced by serializer_create().

◆ serializer_shutdown()

static void serializer_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 768 of file taskpool.c.

769{
771
772 if (ser->shutdown_group) {
774 }
775 ao2_cleanup(ser);
776}
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.

References ao2_cleanup, ast_serializer_shutdown_group_dec(), ast_taskprocessor_listener_get_user_data(), listener(), and serializer::shutdown_group.

◆ serializer_start()

static int serializer_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 762 of file taskpool.c.

763{
764 /* No-op */
765 return 0;
766}

◆ serializer_task_pushed()

static void serializer_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 750 of file taskpool.c.

751{
752 if (was_empty) {
755
756 if (ast_taskpool_push(ser->pool, execute_tasks, tps)) {
758 }
759 }
760}
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.

References ast_taskpool_push(), ast_taskprocessor_listener_get_tps(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), execute_tasks(), listener(), and serializer::pool.

◆ taskpool_dynamic_pool_grow()

static void taskpool_dynamic_pool_grow ( struct ast_taskpool pool,
struct taskpool_taskprocessor **  taskprocessor 
)
static

Definition at line 479 of file taskpool.c.

480{
481 unsigned int num_to_add = pool->options.auto_increment;
482 int i;
483
484 if (!num_to_add) {
485 return;
486 }
487
488 /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */
489 if (pool->options.max_size) {
491
492 if (current_size + num_to_add > pool->options.max_size) {
493 num_to_add = pool->options.max_size - current_size;
494 }
495 }
496
497 for (i = 0; i < num_to_add; i++) {
498 struct taskpool_taskprocessor *new_taskprocessor;
499
500 new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
501 if (!new_taskprocessor) {
502 return;
503 }
504
505 if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) {
506 ao2_ref(new_taskprocessor, -1);
507 return;
508 }
509
510 if (i == 0) {
511 /* On the first iteration we return the taskprocessor we just added */
512 *taskprocessor = new_taskprocessor;
513 /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */
515 } else if (i == 1) {
516 /* On the second iteration we update the next taskprocessor to use to be this one */
518 }
519 }
520}
unsigned int taskprocessor_num
Definition: taskpool.c:48

References ao2_ref, AST_VECTOR_APPEND, AST_VECTOR_SIZE, ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::max_size, ast_taskpool::options, ast_taskpool::static_taskprocessors, taskpool_taskprocessor_alloc(), taskpool_taskprocessor::taskprocessor, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_push().

◆ taskpool_dynamic_pool_shrink()

static int taskpool_dynamic_pool_shrink ( const void *  data)
static

Definition at line 226 of file taskpool.c.

227{
228 struct ast_taskpool *pool = (struct ast_taskpool *)data;
229 int num_removed;
230
231 ao2_lock(pool);
232
233 /* If the pool is shutting down, do nothing and don't reschedule */
234 if (pool->shutting_down) {
235 ao2_unlock(pool);
236 ao2_ref(pool, -1);
237 return 0;
238 }
239
240 /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */
243 if (num_removed) {
244 /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */
247 }
248 }
249
250 ao2_unlock(pool);
251
252 /* It is possible for the pool to have been shut down between unlocking and returning, this is
253 * inherently a race condition we can't eliminate so we will catch it on the next iteration.
254 */
255 return pool->options.idle_timeout * 1000;
256}
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition: taskpool.h:81
#define TASKPROCESSOR_IS_IDLE(tps, timeout)
Definition: taskpool.c:221
#define AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove all elements from a vector that matches the given comparison.
Definition: vector.h:472

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlock, AST_VECTOR_REMOVE_ALL_CMP_UNORDERED, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::idle_timeout, ast_taskpool::options, ast_taskpool::shutting_down, TASKPROCESSOR_IS_IDLE, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_least_full_selector()

static void taskpool_least_full_selector ( struct ast_taskpool pool,
struct taskpool_taskprocessors taskprocessors,
struct taskpool_taskprocessor **  taskprocessor,
unsigned int *  growth_threshold_reached 
)
static

Least full taskprocessor selector.

\interal

Definition at line 287 of file taskpool.c.

289{
290 struct taskpool_taskprocessor *least_full = NULL;
291 unsigned int i;
292
293 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
294 *growth_threshold_reached = 1;
295 return;
296 }
297
298 /* We assume that the growth threshold has not yet been reached, until proven otherwise */
299 *growth_threshold_reached = 0;
300
301 for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) {
302 struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i);
303
304 /* If this taskprocessor has no outstanding tasks, it is the best choice */
306 *taskprocessor = tp;
307 return;
308 }
309
310 /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */
312 *growth_threshold_reached = 1;
313 }
314
315 /* The taskprocessor with the fewest tasks should be used */
316 if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) {
317 least_full = tp;
318 }
319 }
320
321 *taskprocessor = least_full;
322}
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskpool_options::growth_threshold, NULL, ast_taskpool::options, taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_sequential_selector()

static void taskpool_sequential_selector ( struct ast_taskpool pool,
struct taskpool_taskprocessors taskprocessors,
struct taskpool_taskprocessor **  taskprocessor,
unsigned int *  growth_threshold_reached 
)
static

Definition at line 262 of file taskpool.c.

264{
265 unsigned int taskprocessor_num = taskprocessors->taskprocessor_num;
266
267 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
268 *growth_threshold_reached = 1;
269 return;
270 }
271
272 taskprocessors->taskprocessor_num++;
273 if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
274 taskprocessors->taskprocessor_num = 0;
275 }
276
277 *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num);
278
279 /* Check to see if this has reached the growth threshold */
280 *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0;
281}

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskpool_options::growth_threshold, ast_taskpool::options, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_serializer_empty_task()

static int taskpool_serializer_empty_task ( void *  data)
static

Definition at line 827 of file taskpool.c.

828{
829 return 0;
830}

Referenced by ast_taskpool_serializer_push_wait().

◆ taskpool_shutdown()

static void taskpool_shutdown ( void  )
static

Definition at line 923 of file taskpool.c.

924{
925 if (sched) {
927 sched = NULL;
928 }
929}
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
Definition: sched.c:271

References ast_sched_context_destroy(), and NULL.

Referenced by ast_taskpool_init().

◆ taskpool_sync_task()

static int taskpool_sync_task ( void *  data)
static

Definition at line 603 of file taskpool.c.

604{
605 struct taskpool_sync_task *sync_task = data;
606 int ret;
607
608 sync_task->fail = sync_task->task(sync_task->task_data);
609
610 /*
611 * Once we unlock sync_task->lock after signaling, we cannot access
612 * sync_task again. The thread waiting within ast_taskpool_push_wait()
613 * is free to continue and release its local variable (sync_task).
614 */
616 sync_task->complete = 1;
618 ret = sync_task->fail;
620 return ret;
621}
#define ast_cond_signal(cond)
Definition: lock.h:210

References ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, and sync_task().

◆ taskpool_sync_task_cleanup()

static void taskpool_sync_task_cleanup ( struct taskpool_sync_task sync_task)
static

Definition at line 594 of file taskpool.c.

595{
598}
#define ast_cond_destroy(cond)
Definition: lock.h:209
#define ast_mutex_destroy(a)
Definition: lock.h:195

References ast_cond_destroy, ast_mutex_destroy, and sync_task().

Referenced by ast_taskpool_push_wait(), and ast_taskpool_serializer_push_wait().

◆ taskpool_sync_task_init()

static int taskpool_sync_task_init ( struct taskpool_sync_task sync_task,
int(*)(void *)  task,
void *  data 
)
static

Definition at line 580 of file taskpool.c.

581{
584 sync_task->complete = 0;
585 sync_task->fail = 0;
586 sync_task->task = task;
587 sync_task->task_data = data;
588 return 0;
589}
#define ast_cond_init(cond, attr)
Definition: lock.h:208
#define ast_mutex_init(pmutex)
Definition: lock.h:193

References ast_cond_init, ast_mutex_init, NULL, sync_task(), and task().

Referenced by ast_taskpool_push_wait(), and ast_taskpool_serializer_push_wait().

◆ taskpool_taskprocessor_alloc()

static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc ( struct ast_taskpool pool,
char  type 
)
static

Definition at line 152 of file taskpool.c.

153{
155 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
156
157 /* We don't actually need locking for each pool taskprocessor, as the only thing
158 * mutable is the underlying taskprocessor which has its own internal locking.
159 */
161 if (!taskprocessor) {
162 return NULL;
163 }
164
165 /* Create name with seq number appended. */
166 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name);
167
168 taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
169 if (!taskprocessor->taskprocessor) {
171 return NULL;
172 }
173
174 taskprocessor->last_pushed = ast_tvnow();
175
177 ao2_ref(pool, -1);
178 /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to
179 * NULL here.
180 */
182 taskprocessor->taskprocessor = NULL;
183 return NULL;
184 }
185
186 return taskprocessor;
187}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:404
static const char type[]
Definition: chan_ooh323.c:109
static int taskpool_taskprocessor_start(void *data)
Definition: taskpool.c:131
static void taskpool_taskprocessor_dtor(void *obj)
Definition: taskpool.c:116
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_push(), ast_taskprocessor_unreference(), ast_tvnow(), ast_taskpool::name, NULL, taskpool_taskprocessor_dtor(), taskpool_taskprocessor_start(), taskpool_taskprocessor::taskprocessor, TPS_REF_DEFAULT, and type.

Referenced by ast_taskpool_create(), and taskpool_dynamic_pool_grow().

◆ taskpool_taskprocessor_dtor()

static void taskpool_taskprocessor_dtor ( void *  obj)
static

Definition at line 116 of file taskpool.c.

117{
119
121 /* We can't actually do anything if this fails, so just accept reality */
122 }
123
125}
static int taskpool_taskprocessor_stop(void *data)
Definition: taskpool.c:101

References ast_taskprocessor_push(), ast_taskprocessor_unreference(), NULL, taskpool_taskprocessor_stop(), and taskpool_taskprocessor::taskprocessor.

Referenced by taskpool_taskprocessor_alloc().

◆ taskpool_taskprocessor_start()

static int taskpool_taskprocessor_start ( void *  data)
static

Definition at line 131 of file taskpool.c.

132{
133 struct ast_taskpool *pool = data;
134
135 /* Set the pool on the thread for this taskprocessor, inheriting the
136 * reference passed to the task itself.
137 */
138 ast_threadstorage_set_ptr(&current_taskpool_pool, pool);
139
140 /* If a thread start callback is set on the options, call it */
141 if (pool->options.thread_start) {
142 pool->options.thread_start();
143 }
144
145 return 0;
146}
void(* thread_start)(void)
Function to call when a taskprocessor starts.
Definition: taskpool.h:131

References ast_threadstorage_set_ptr(), ast_taskpool::options, and ast_taskpool_options::thread_start.

Referenced by taskpool_taskprocessor_alloc().

◆ taskpool_taskprocessor_stop()

static int taskpool_taskprocessor_stop ( void *  data)
static

Definition at line 101 of file taskpool.c.

102 {
103 struct ast_taskpool *pool = ast_taskpool_get_current();
104
105 /* If a thread stop callback is set on the options, call it */
106 if (pool->options.thread_end) {
107 pool->options.thread_end();
108 }
109
110 ao2_cleanup(pool);
111
112 return 0;
113 }
void(* thread_end)(void)
Function to call when a taskprocessor ends.
Definition: taskpool.h:138

References ao2_cleanup, ast_taskpool_get_current(), ast_taskpool::options, and ast_taskpool_options::thread_end.

Referenced by taskpool_taskprocessor_dtor().

◆ taskpool_taskprocessors_cleanup()

static void taskpool_taskprocessors_cleanup ( struct taskpool_taskprocessors taskprocessors)
static

Definition at line 206 of file taskpool.c.

207{
208 /* Access/manipulation of taskprocessors is done with the lock held, and
209 * with a check of the shutdown flag done. This means that outside of holding
210 * the lock we can safely muck with it. Pushing to the taskprocessor is done
211 * outside of the lock, but with a reference to the taskprocessor held.
212 */
214 AST_VECTOR_FREE(&taskprocessors->taskprocessors);
215}
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:185

References ao2_cleanup, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_shutdown().

◆ taskpool_taskprocessors_init()

static int taskpool_taskprocessors_init ( struct taskpool_taskprocessors taskprocessors,
unsigned int  size 
)
static

Definition at line 193 of file taskpool.c.

194{
195 if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) {
196 return -1;
197 }
198
199 return 0;
200}
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:124

References AST_VECTOR_INIT, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

Variable Documentation

◆ sched

struct ast_sched_context* sched
static

Scheduler used for dynamic pool shrinking.

Definition at line 83 of file taskpool.c.

◆ serializer_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static
Initial value:
= {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: taskpool.c:750
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: taskpool.c:768
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: taskpool.c:762

Definition at line 778 of file taskpool.c.

Referenced by ast_taskpool_serializer_group().