Asterisk - The Open Source Telephony Project GIT-master-4f2b068
Loading...
Searching...
No Matches
Data Structures | Macros | Typedefs | Enumerations | Functions | Variables
taskpool.c File Reference
#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/taskpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer_shutdown_group.h"
#include "asterisk/utils.h"
#include "asterisk/time.h"
#include "asterisk/sched.h"
Include dependency graph for taskpool.c:

Go to the source code of this file.

Data Structures

struct  ast_taskpool
 An opaque taskpool structure. More...
 
struct  serializer
 
struct  taskpool_sync_task
 
struct  taskpool_taskprocessor
 A taskpool taskprocessor. More...
 
struct  taskpool_taskprocessors
 A container of taskprocessors. More...
 

Macros

#define ast_taskpool_push_internal(pool, task, data)    __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define TASKPOOL_GROW_THRESHOLD   (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10
 The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold)
 
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)   (size += ast_taskprocessor_size(tps->taskprocessor))
 
#define TASKPROCESSOR_IS_IDLE(tps, timeout)   (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))
 

Typedefs

typedef void(* taskpool_selector) (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 

Enumerations

enum  serializer_suspension { SERIALIZER_UNSUSPENDED = 0 , SERIALIZER_SUSPENDING , SERIALIZER_SUSPENDED }
 

Functions

int __ast_taskpool_push (struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
 Push a task to the taskpool.
 
int __ast_taskpool_push_wait (struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
 Push a task to the taskpool, and wait for completion.
 
int __ast_taskpool_serializer_push_wait (struct ast_taskprocessor *serializer, int(*task)(void *data), void *data, const char *file, int line, const char *function)
 Push a task to a serializer, and wait for completion.
 
struct ast_taskpoolast_taskpool_create (const char *name, const struct ast_taskpool_options *options)
 Create a new taskpool.
 
static struct ast_taskpoolast_taskpool_get_current (void)
 
int ast_taskpool_init (void)
 
int ast_taskpool_push (struct ast_taskpool *pool, int(*task)(void *data), void *data)
 
int ast_taskpool_push_wait (struct ast_taskpool *pool, int(*task)(void *data), void *data)
 
long ast_taskpool_queue_size (struct ast_taskpool *pool)
 Get the current number of queued tasks in the taskpool.
 
struct ast_taskprocessorast_taskpool_serializer (const char *name, struct ast_taskpool *pool)
 Serialized execution of tasks within a ast_taskpool.
 
struct ast_taskprocessorast_taskpool_serializer_get_current (void)
 Get the taskpool serializer currently associated with this thread.
 
struct ast_taskprocessorast_taskpool_serializer_group (const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_taskpool.
 
int ast_taskpool_serializer_push_wait (struct ast_taskprocessor *serializer, int(*task)(void *data), void *data)
 
int ast_taskpool_serializer_suspend (struct ast_taskprocessor *serializer)
 Suspend a serializer, causing tasks to be queued until unsuspended.
 
int ast_taskpool_serializer_unsuspend (struct ast_taskprocessor *serializer)
 Unsuspend a serializer, causing tasks to be executed.
 
void ast_taskpool_shutdown (struct ast_taskpool *pool)
 Shut down a taskpool and remove the underlying taskprocessors.
 
size_t ast_taskpool_taskprocessors_count (struct ast_taskpool *pool)
 Get the current number of taskprocessors in the taskpool.
 
 AST_THREADSTORAGE_RAW (current_taskpool_pool)
 Thread storage for the current taskpool.
 
 AST_THREADSTORAGE_RAW (current_taskpool_serializer)
 
static int execute_tasks (void *data)
 
static struct serializerserializer_create (struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 
static void serializer_dtor (void *obj)
 
static void serializer_shutdown (struct ast_taskprocessor_listener *listener)
 
static int serializer_start (struct ast_taskprocessor_listener *listener)
 
static void serializer_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void taskpool_dynamic_pool_grow (struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
 
static int taskpool_dynamic_pool_shrink (const void *data)
 
static void taskpool_least_full_selector (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 Least full taskprocessor selector.
 
static void taskpool_sequential_selector (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
 
static int taskpool_serializer_empty_task (void *data)
 
static int taskpool_serializer_suspend_task (void *data)
 
static void taskpool_shutdown (void)
 
static int taskpool_sync_task (void *data)
 
static void taskpool_sync_task_cleanup (struct taskpool_sync_task *sync_task)
 
static int taskpool_sync_task_init (struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
 
static struct taskpool_taskprocessortaskpool_taskprocessor_alloc (struct ast_taskpool *pool, char type)
 
static void taskpool_taskprocessor_dtor (void *obj)
 
static int taskpool_taskprocessor_start (void *data)
 
static int taskpool_taskprocessor_stop (void *data)
 
static void taskpool_taskprocessors_cleanup (struct taskpool_taskprocessors *taskprocessors)
 
static int taskpool_taskprocessors_init (struct taskpool_taskprocessors *taskprocessors, unsigned int size)
 

Variables

static struct ast_sched_contextsched
 Scheduler used for dynamic pool shrinking.
 
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
 

Macro Definition Documentation

◆ ast_taskpool_push_internal

#define ast_taskpool_push_internal (   pool,
  task,
  data 
)     __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 523 of file taskpool.c.

◆ TASKPOOL_GROW_THRESHOLD

#define TASKPOOL_GROW_THRESHOLD   (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 5) / 10

The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water threshold)

Definition at line 80 of file taskpool.c.

◆ TASKPOOL_QUEUE_SIZE_ADD

#define TASKPOOL_QUEUE_SIZE_ADD (   tps,
  size 
)    (size += ast_taskprocessor_size(tps->taskprocessor))

Definition at line 462 of file taskpool.c.

◆ TASKPROCESSOR_IS_IDLE

#define TASKPROCESSOR_IS_IDLE (   tps,
  timeout 
)    (ast_tvdiff_ms(ast_tvnow(), tps->last_pushed) > (timeout))

Definition at line 221 of file taskpool.c.

Typedef Documentation

◆ taskpool_selector

typedef void(* taskpool_selector) (struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)

Definition at line 51 of file taskpool.c.

Enumeration Type Documentation

◆ serializer_suspension

Enumerator
SERIALIZER_UNSUSPENDED 
SERIALIZER_SUSPENDING 
SERIALIZER_SUSPENDED 

Definition at line 696 of file taskpool.c.

696 {
697 SERIALIZER_UNSUSPENDED = 0, /* The serializer is unsuspended */
698 SERIALIZER_SUSPENDING, /* The serializer is pending suspension */
699 SERIALIZER_SUSPENDED, /* The serializer is suspended */
700};
@ SERIALIZER_SUSPENDED
Definition taskpool.c:699
@ SERIALIZER_SUSPENDING
Definition taskpool.c:698
@ SERIALIZER_UNSUSPENDED
Definition taskpool.c:697

Function Documentation

◆ __ast_taskpool_push()

int __ast_taskpool_push ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the taskpool.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 527 of file taskpool.c.

529{
530 RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
531
532 /* Select the taskprocessor in the pool to use for pushing this task */
533 ao2_lock(pool);
534 if (!pool->shutting_down) {
535 unsigned int growth_threshold_reached = 0;
536
537 /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
538 * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
539 * fails for some reason, it will still fall back to the initially found static one if
540 * it is present.
541 */
542 pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
543 if (pool->options.auto_increment && growth_threshold_reached) {
544 /* If we need to grow then try dynamic taskprocessors */
545 pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
546 if (growth_threshold_reached) {
547 /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
548 taskpool_dynamic_pool_grow(pool, &taskprocessor);
549 }
550
551 /* If a dynamic taskprocessor was used update its last push time */
552 if (taskprocessor) {
553 taskprocessor->last_pushed = ast_tvnow();
554 }
555 }
556 ao2_bump(taskprocessor);
557 }
558 ao2_unlock(pool);
559
560 if (!taskprocessor) {
561 return -1;
562 }
563
564 if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) {
565 return -1;
566 }
567
568 return 0;
569}
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define NULL
Definition resample.c:96
int auto_increment
Number of taskprocessors to increment the pool by.
Definition taskpool.h:92
taskpool_selector selector
Definition taskpool.c:74
struct ast_taskpool_options options
Definition taskpool.c:70
struct taskpool_taskprocessors static_taskprocessors
Definition taskpool.c:64
int shutting_down
Definition taskpool.c:68
struct taskpool_taskprocessors dynamic_taskprocessors
Definition taskpool.c:66
A taskpool taskprocessor.
Definition taskpool.c:34
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
Definition taskpool.c:479
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References __ast_taskprocessor_push(), ao2_bump, ao2_cleanup, ao2_lock, ao2_unlock, ast_tvnow(), ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, NULL, ast_taskpool::options, RAII_VAR, ast_taskpool::selector, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, task(), taskpool_dynamic_pool_grow(), and taskpool_taskprocessor::taskprocessor.

Referenced by __ast_sip_push_task(), __ast_taskpool_push_wait(), and ast_taskpool_push().

◆ __ast_taskpool_push_wait()

int __ast_taskpool_push_wait ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the taskpool, and wait for completion.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 635 of file taskpool.c.

637{
638 struct taskpool_sync_task sync_task;
639
640 /* If we are already executing within a taskpool taskprocessor then
641 * don't bother pushing a new task, just directly execute the task.
642 */
644 return task(data);
645 }
646
647 if (taskpool_sync_task_init(&sync_task, task, data)) {
648 return -1;
649 }
650
651 if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) {
652 taskpool_sync_task_cleanup(&sync_task);
653 return -1;
654 }
655
656 ast_mutex_lock(&sync_task.lock);
657 while (!sync_task.complete) {
658 ast_cond_wait(&sync_task.cond, &sync_task.lock);
659 }
660 ast_mutex_unlock(&sync_task.lock);
661
662 taskpool_sync_task_cleanup(&sync_task);
663 return sync_task.fail;
664}
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_mutex_lock(a)
Definition lock.h:196
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
Definition taskpool.c:606
int __ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to the taskpool.
Definition taskpool.c:527
static struct ast_taskpool * ast_taskpool_get_current(void)
Definition taskpool.c:92
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
Definition taskpool.c:592

References __ast_taskpool_push(), ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), taskpool_sync_task::complete, taskpool_sync_task::cond, taskpool_sync_task::fail, taskpool_sync_task::lock, task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by __ast_sip_push_task_wait(), __ast_sip_push_task_wait_serializer(), and ast_taskpool_push_wait().

◆ __ast_taskpool_serializer_push_wait()

int __ast_taskpool_serializer_push_wait ( struct ast_taskprocessor serializer,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to a serializer, and wait for completion.

Since
23.1.0
22.7.0
20.17.0
Parameters
serializerThe serializer to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 882 of file taskpool.c.

884{
887 struct ast_taskprocessor *prior_serializer;
888 struct taskpool_sync_task sync_task;
889
890 /* If not in a taskpool taskprocessor we can just queue the task like normal and
891 * wait. */
893 if (taskpool_sync_task_init(&sync_task, task, data)) {
894 return -1;
895 }
896
897 if (__ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task, file, line, function)) {
898 taskpool_sync_task_cleanup(&sync_task);
899 return -1;
900 }
901
902 ast_mutex_lock(&sync_task.lock);
903 while (!sync_task.complete) {
904 ast_cond_wait(&sync_task.cond, &sync_task.lock);
905 }
906 ast_mutex_unlock(&sync_task.lock);
907
908 taskpool_sync_task_cleanup(&sync_task);
909 return sync_task.fail;
910 }
911
912 /* It is possible that we are already executing within a serializer, so stash the existing
913 * away so we can restore it.
914 */
915 prior_serializer = ast_taskpool_serializer_get_current();
916
917 ao2_lock(ser);
918
919 /* There are two cases where we can or have to directly execute this task:
920 * 1. There are no other tasks in the serializer
921 * 2. We are already in the serializer
922 * In the second case if we don't execute the task now, we will deadlock waiting
923 * on it as it will never occur.
924 */
925 if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
926 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
927 sync_task.fail = task(data);
928 ao2_unlock(ser);
929 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
930 return sync_task.fail;
931 }
932
933 if (taskpool_sync_task_init(&sync_task, task, data)) {
934 ao2_unlock(ser);
935 return -1;
936 }
937
938 /* First we queue the serialized task */
939 if (__ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task, file, line, function)) {
940 taskpool_sync_task_cleanup(&sync_task);
941 ao2_unlock(ser);
942 return -1;
943 }
944
945 /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
946 * stops two tasks from being queued for the same serializer at the same time.
947 */
949 taskpool_sync_task_cleanup(&sync_task);
950 ao2_unlock(ser);
951 return -1;
952 }
953
954 /* Now we execute the tasks on the serializer until our sync task is complete */
955 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
956 while (!sync_task.complete) {
957 /* If the serializer is suspended wait until it unsuspends */
958 while (ser->suspended == SERIALIZER_SUSPENDED) {
960 }
961
962 /* The sync task is guaranteed to be executed, so doing a while loop on the complete
963 * flag is safe.
964 */
966 }
967 taskpool_sync_task_cleanup(&sync_task);
968 ao2_unlock(ser);
969
970 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
971
972 return sync_task.fail;
973}
static void * listener(void *unused)
Definition asterisk.c:1531
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
ast_cond_t cond
Definition taskpool.c:708
enum serializer_suspension suspended
Definition taskpool.c:710
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition taskpool.c:825
static int taskpool_serializer_empty_task(void *data)
Definition taskpool.c:868
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_taskprocessor_push(tps, task_exe, datap)
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References __ast_taskprocessor_push(), ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_get_current(), ast_taskprocessor_execute(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push, ast_taskprocessor_size(), ast_threadstorage_set_ptr(), taskpool_sync_task::complete, taskpool_sync_task::cond, serializer::cond, taskpool_sync_task::fail, listener(), taskpool_sync_task::lock, NULL, SERIALIZER_SUSPENDED, serializer::suspended, task(), taskpool_serializer_empty_task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by __ast_sip_push_task_wait(), __ast_sip_push_task_wait_serializer(), and ast_taskpool_serializer_push_wait().

◆ ast_taskpool_create()

struct ast_taskpool * ast_taskpool_create ( const char *  name,
const struct ast_taskpool_options options 
)

Create a new taskpool.

Since
23.1.0
22.7.0
20.17.0

This function creates a taskpool. Tasks may be pushed onto this task pool and will be automatically acted upon by taskprocessors within the pool.

Only a single taskpool with a given name may exist. This function will fail if a taskpool with the given name already exists.

Parameters
nameThe unique name for the taskpool
optionsThe behavioral options for this taskpool
Return values
NULLFailed to create the taskpool
non-NULLThe newly-created taskpool
Note
The ast_taskpool_shutdown function must be called to shut down the taskpool and clean up underlying resources fully.

Definition at line 324 of file taskpool.c.

326{
327 struct ast_taskpool *pool;
328
329 /* Enforce versioning on the passed-in options */
330 if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
331 return NULL;
332 }
333
334 pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
335 if (!pool) {
336 return NULL;
337 }
338
339 strcpy(pool->name, name); /* Safe */
340 memcpy(&pool->options, options, sizeof(pool->options));
341 pool->shrink_sched_id = -1;
342
343 /* Verify the passed-in options are valid, and adjust if needed */
344 if (options->initial_size < options->minimum_size) {
345 pool->options.initial_size = options->minimum_size;
346 ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
347 name, options->initial_size, options->minimum_size, options->minimum_size);
348 }
349
350 if (options->max_size && pool->options.initial_size > options->max_size) {
351 pool->options.max_size = pool->options.initial_size;
352 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
353 name, options->max_size, pool->options.initial_size, pool->options.initial_size);
354 }
355
356 if (!options->auto_increment) {
357 if (!pool->options.minimum_size) {
358 pool->options.minimum_size = 1;
359 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
360 }
361 if (!pool->options.max_size) {
362 pool->options.max_size = pool->options.minimum_size;
363 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
364 }
365 if (pool->options.minimum_size != pool->options.max_size) {
366 pool->options.minimum_size = pool->options.max_size;
367 pool->options.initial_size = pool->options.max_size;
368 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
369 name, options->minimum_size, pool->options.max_size, pool->options.max_size);
370 }
371 } else if (!options->growth_threshold) {
373 }
374
377 } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
379 } else {
380 ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
381 name, options->selector);
383 }
384
386 ao2_ref(pool, -1);
387 return NULL;
388 }
389
390 /* Create the static taskprocessors based on the passed-in options */
391 for (int i = 0; i < pool->options.minimum_size; i++) {
393
395 if (!taskprocessor) {
396 /* The reference to pool is passed to ast_taskpool_shutdown */
398 return NULL;
399 }
400
403 /* The reference to pool is passed to ast_taskpool_shutdown */
405 return NULL;
406 }
407 }
408
410 pool->options.initial_size - pool->options.minimum_size)) {
412 return NULL;
413 }
414
415 /* Create the dynamic taskprocessor based on the passed-in options */
416 for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
418
420 if (!taskprocessor) {
421 /* The reference to pool is passed to ast_taskpool_shutdown */
423 return NULL;
424 }
425
428 /* The reference to pool is passed to ast_taskpool_shutdown */
430 return NULL;
431 }
432 }
433
434 /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
435 * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
436 * and to reduce complexity.
437 */
438 if (options->idle_timeout && options->auto_increment) {
440 if (pool->shrink_sched_id < 0) {
441 ao2_ref(pool, -1);
442 /* The second reference to pool is passed to ast_taskpool_shutdown */
444 return NULL;
445 }
446 }
447
448 return pool;
449}
#define ast_log
Definition astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
static const char name[]
Definition format_mp3.c:68
#define LOG_WARNING
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
Definition sched.c:567
int max_size
Maximum number of taskprocessors a pool may have.
Definition taskpool.h:122
int growth_threshold
The threshold for when to grow the pool.
Definition taskpool.h:131
int minimum_size
Number of taskprocessors that will always exist.
Definition taskpool.h:99
int initial_size
Number of taskprocessors the pool will start with.
Definition taskpool.h:109
An opaque taskpool structure.
Definition taskpool.c:62
int shrink_sched_id
Definition taskpool.c:72
char name[0]
Definition taskpool.c:76
Definition sched.c:76
struct ast_taskprocessor * taskprocessor
Definition taskpool.c:36
struct taskpool_taskprocessors::@439 taskprocessors
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
Definition taskpool.c:287
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition taskpool.c:675
static int taskpool_dynamic_pool_shrink(const void *data)
Definition taskpool.c:226
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
Definition taskpool.c:193
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
Definition taskpool.c:152
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition taskpool.c:262
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
Definition taskpool.c:80
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
Definition taskpool.h:72
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
Definition taskpool.h:71
@ AST_TASKPOOL_SELECTOR_DEFAULT
Definition taskpool.h:70
static struct test_options options
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References ao2_alloc, ao2_bump, ao2_ref, ast_log, ast_sched_add(), AST_TASKPOOL_OPTIONS_VERSION, AST_TASKPOOL_SELECTOR_DEFAULT, AST_TASKPOOL_SELECTOR_LEAST_FULL, AST_TASKPOOL_SELECTOR_SEQUENTIAL, ast_taskpool_shutdown(), AST_VECTOR_APPEND, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::growth_threshold, ast_taskpool_options::initial_size, LOG_WARNING, ast_taskpool_options::max_size, ast_taskpool_options::minimum_size, ast_taskpool::name, name, NULL, ast_taskpool::options, options, ast_taskpool::selector, ast_taskpool::shrink_sched_id, ast_taskpool::static_taskprocessors, taskpool_dynamic_pool_shrink(), TASKPOOL_GROW_THRESHOLD, taskpool_least_full_selector(), taskpool_sequential_selector(), taskpool_taskprocessor_alloc(), taskpool_taskprocessors_init(), taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), load_module(), and stasis_init().

◆ ast_taskpool_get_current()

static struct ast_taskpool * ast_taskpool_get_current ( void  )
static

Definition at line 92 of file taskpool.c.

93{
94 return ast_threadstorage_get_ptr(&current_taskpool_pool);
95}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by __ast_taskpool_push_wait(), __ast_taskpool_serializer_push_wait(), ast_taskpool_serializer_suspend(), ast_taskpool_serializer_unsuspend(), execute_tasks(), and taskpool_taskprocessor_stop().

◆ ast_taskpool_init()

int ast_taskpool_init ( void  )

Provided by taskpool.c

Definition at line 1093 of file taskpool.c.

1094{
1096 if (!sched) {
1097 return -1;
1098 }
1099
1101 return -1;
1102 }
1103
1105
1106 return 0;
1107}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition sched.c:197
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition sched.c:238
static void taskpool_shutdown(void)
Definition taskpool.c:1085

References ast_register_cleanup(), ast_sched_context_create(), ast_sched_start_thread(), and taskpool_shutdown().

Referenced by asterisk_daemon().

◆ ast_taskpool_push()

int ast_taskpool_push ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Definition at line 572 of file taskpool.c.

573{
574 return __ast_taskpool_push(pool, task, data, NULL, 0, NULL);
575}

References __ast_taskpool_push(), NULL, and task().

◆ ast_taskpool_push_wait()

int ast_taskpool_push_wait ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Definition at line 670 of file taskpool.c.

671{
672 return __ast_taskpool_push_wait(pool, task, data, NULL, 0, NULL);
673}
int __ast_taskpool_push_wait(struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to the taskpool, and wait for completion.
Definition taskpool.c:635

References __ast_taskpool_push_wait(), NULL, and task().

◆ ast_taskpool_queue_size()

long ast_taskpool_queue_size ( struct ast_taskpool pool)

Get the current number of queued tasks in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of queued tasks in the taskpool

Definition at line 464 of file taskpool.c.

465{
466 long queue_size = 0;
467
468 ao2_lock(pool);
471 ao2_unlock(pool);
472
473 return queue_size;
474}
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
Definition taskpool.c:462
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873

References ao2_lock, ao2_unlock, AST_VECTOR_CALLBACK_VOID, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_sip_taskpool_queue_size().

◆ ast_taskpool_serializer()

struct ast_taskprocessor * ast_taskpool_serializer ( const char *  name,
struct ast_taskpool pool 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 860 of file taskpool.c.

861{
863}
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition taskpool.c:830

References ast_taskpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_serializer_efficiency(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

◆ ast_taskpool_serializer_get_current()

struct ast_taskprocessor * ast_taskpool_serializer_get_current ( void  )

Get the taskpool serializer currently associated with this thread.

Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 825 of file taskpool.c.

826{
827 return ast_threadstorage_get_ptr(&current_taskpool_serializer);
828}

References ast_threadstorage_get_ptr().

Referenced by __ast_taskpool_serializer_push_wait(), record_serializer(), requeue_task(), rfc3326_outgoing_request(), rfc3326_outgoing_response(), serializer_efficiency_task(), and simple_task().

◆ ast_taskpool_serializer_group()

struct ast_taskprocessor * ast_taskpool_serializer_group ( const char *  name,
struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 830 of file taskpool.c.

832{
833 struct serializer *ser;
835 struct ast_taskprocessor *tps;
836
838 if (!ser) {
839 return NULL;
840 }
841
843 if (!listener) {
844 ao2_ref(ser, -1);
845 return NULL;
846 }
847
849 if (!tps) {
850 /* ser ref transferred to listener but not cleaned without tps */
851 ao2_ref(ser, -1);
852 } else if (shutdown_group) {
854 }
855
856 ao2_ref(listener, -1);
857 return tps;
858}
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition taskpool.c:819
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition taskpool.c:722
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_taskpool_create(), ast_sip_create_serializer_group(), and ast_taskpool_serializer().

◆ ast_taskpool_serializer_push_wait()

int ast_taskpool_serializer_push_wait ( struct ast_taskprocessor serializer,
int(*)(void *data)  task,
void *  data 
)

Definition at line 877 of file taskpool.c.

878{
880}
int __ast_taskpool_serializer_push_wait(struct ast_taskprocessor *serializer, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to a serializer, and wait for completion.
Definition taskpool.c:882

References __ast_taskpool_serializer_push_wait(), NULL, and task().

◆ ast_taskpool_serializer_suspend()

int ast_taskpool_serializer_suspend ( struct ast_taskprocessor serializer)

Suspend a serializer, causing tasks to be queued until unsuspended.

Since
23.2.0
22.8.0
20.18.0
Parameters
serializerThe serializer to suspend
Return values
0success
-1failure
Note
May only be invoked from outside of the taskpool

Definition at line 1001 of file taskpool.c.

1002{
1005
1006 /* This suspension process works by inserting a checkpoint into the queue of the
1007 * serializer. Once this checkpoint is reached the taskpool taskprocessor handling
1008 * the queue stops prematurely and does not get requeued. For the case where a
1009 * synchronous task wait is in progress it is instead paused temporarily. Once
1010 * the serializer is unsuspended a new execution task is queued into the taskpool
1011 * to resume execution and any paused synchronous task waits are awoken to resume
1012 * their own execution as well. This approach minimizes the number of threads that
1013 * are paused waiting, nominally to 0.
1014 */
1015
1017 return -1;
1018 }
1019
1020 ao2_lock(ser);
1021
1022 /* If the serializer is already suspending or suspended, just return immediately.
1023 * This mirrors the original behavior from PJSIP.
1024 */
1025 if (ser->suspended != SERIALIZER_UNSUSPENDED) {
1026 ao2_unlock(ser);
1027 return 0;
1028 }
1029
1031
1032 ao2_unlock(ser);
1033
1034 /* Once this returns successfully there is no thread executing the tasks on the serializer,
1035 * so they will accumulate until the serializer is unsuspended.
1036 */
1038 /* Suspension failed, so unsuspend as doing otherwise would leave the serializer in a stuck
1039 * state.
1040 */
1041 ao2_lock(ser);
1043 ao2_unlock(ser);
1044 return -1;
1045 }
1046
1047 return 0;
1048}
static int taskpool_serializer_suspend_task(void *data)
Definition taskpool.c:978
#define ast_taskpool_serializer_push_wait(pool, task, data)
Definition taskpool.h:335

References ao2_lock, ao2_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_push_wait, ast_taskprocessor_listener_get_user_data(), listener(), SERIALIZER_SUSPENDING, SERIALIZER_UNSUSPENDED, serializer::suspended, and taskpool_serializer_suspend_task().

Referenced by ast_sip_session_suspend(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and AST_TEST_DEFINE().

◆ ast_taskpool_serializer_unsuspend()

int ast_taskpool_serializer_unsuspend ( struct ast_taskprocessor serializer)

Unsuspend a serializer, causing tasks to be executed.

Since
23.2.0
22.8.0
20.18.0
Parameters
serializerThe serializer to unsuspend
Return values
0success
-1failure
Note
May only be invoked from outside of the taskpool

Definition at line 1050 of file taskpool.c.

1051{
1054
1056 return -1;
1057 }
1058
1059 ao2_lock(ser);
1060
1061 if (ser->suspended != SERIALIZER_SUSPENDED) {
1062 ao2_unlock(ser);
1063 return 0;
1064 }
1065
1067
1068 /* Notify any other interested threads that this one has awoken */
1069 ast_cond_broadcast(&ser->cond);
1070
1071 /* And now we kick off handling of the queued tasks once again */
1074 }
1075
1076 ao2_unlock(ser);
1077
1078 return 0;
1079}
#define ast_cond_broadcast(cond)
Definition lock.h:211
struct ast_taskpool * pool
Definition taskpool.c:704
static int execute_tasks(void *data)
Definition taskpool.c:742
#define ast_taskpool_push(pool, task, data)
Definition taskpool.h:210
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_bump, ao2_lock, ao2_unlock, ast_cond_broadcast, ast_taskpool_get_current(), ast_taskpool_push, ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), serializer::cond, execute_tasks(), listener(), serializer::pool, SERIALIZER_SUSPENDED, SERIALIZER_UNSUSPENDED, and serializer::suspended.

Referenced by ast_sip_session_unsuspend(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and AST_TEST_DEFINE().

◆ ast_taskpool_shutdown()

void ast_taskpool_shutdown ( struct ast_taskpool pool)

Shut down a taskpool and remove the underlying taskprocessors.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe pool to shut down
Note
This will decrement the reference to the pool

Definition at line 675 of file taskpool.c.

676{
677 if (!pool) {
678 return;
679 }
680
681 /* Mark this pool as shutting down so nothing new is pushed */
682 ao2_lock(pool);
683 pool->shutting_down = 1;
684 ao2_unlock(pool);
685
686 /* Stop the shrink scheduled item if present */
688
689 /* Clean up all the taskprocessors */
692
693 ao2_ref(pool, -1);
694}
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
Definition sched.h:82
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
Definition taskpool.c:206

References ao2_lock, ao2_ref, ao2_unlock, AST_SCHED_DEL_UNREF, ast_taskpool::dynamic_taskprocessors, ast_taskpool::shrink_sched_id, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors_cleanup().

Referenced by ast_taskpool_create(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().

◆ ast_taskpool_taskprocessors_count()

size_t ast_taskpool_taskprocessors_count ( struct ast_taskpool pool)

Get the current number of taskprocessors in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of taskprocessors in the taskpool

Definition at line 451 of file taskpool.c.

452{
453 size_t count;
454
455 ao2_lock(pool);
457 ao2_unlock(pool);
458
459 return count;
460}
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620

References ao2_lock, ao2_unlock, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE(), and AST_TEST_DEFINE().

◆ AST_THREADSTORAGE_RAW() [1/2]

AST_THREADSTORAGE_RAW ( current_taskpool_pool  )

Thread storage for the current taskpool.

◆ AST_THREADSTORAGE_RAW() [2/2]

AST_THREADSTORAGE_RAW ( current_taskpool_serializer  )

◆ execute_tasks()

static int execute_tasks ( void *  data)
static

Definition at line 742 of file taskpool.c.

743{
744 struct ast_taskpool *pool = ast_taskpool_get_current();
745 struct ast_taskprocessor *tps = data;
748 size_t remaining, requeue = 0;
749
750 /* In a normal scenario this lock will not be in contention with
751 * anything else. It is only if a synchronous task is pushed to
752 * the serializer that it may be blocked on the synchronous
753 * task thread. This is done to ensure that only one thread is executing
754 * tasks from the serializer at a given time, and not out of order
755 * either.
756 */
757 ao2_lock(ser);
758
759 ast_threadstorage_set_ptr(&current_taskpool_serializer, tps);
760 for (remaining = ast_taskprocessor_size(tps); remaining > 0; remaining--) {
761 requeue = ast_taskprocessor_execute(tps);
762
763 /* If the serializer is suspended we will not execute any more tasks and
764 * we will not requeue the taskpool task. Instead it will be requeued when
765 * the serializer is unsuspended.
766 */
767 if (ser->suspended == SERIALIZER_SUSPENDED) {
768 requeue = 0;
769 break;
770 }
771 }
772 ast_threadstorage_set_ptr(&current_taskpool_serializer, NULL);
773
774 ao2_unlock(ser);
775
776 /* If there are remaining tasks we requeue, this way the serializer
777 * does not hold exclusivity of the taskpool taskprocessor
778 */
779 if (requeue) {
780 /* Ownership passes to the new task */
783 }
784 } else {
786 }
787
788 return 0;
789}
struct ast_taskprocessor * tps

References ao2_lock, ao2_unlock, ast_taskpool_get_current(), ast_taskpool_push, ast_taskprocessor_execute(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_size(), ast_taskprocessor_unreference(), ast_threadstorage_set_ptr(), execute_tasks(), listener(), NULL, serializer::pool, SERIALIZER_SUSPENDED, serializer::suspended, and ast_taskprocessor_listener::tps.

Referenced by ast_taskpool_serializer_unsuspend(), execute_tasks(), and serializer_task_pushed().

◆ serializer_create()

static struct serializer * serializer_create ( struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)
static

Definition at line 722 of file taskpool.c.

724{
725 struct serializer *ser;
726
727 /* This object has a lock so it can be used to ensure exclusive access
728 * to the execution of tasks within the serializer.
729 */
730 ser = ao2_alloc(sizeof(*ser), serializer_dtor);
731 if (!ser) {
732 return NULL;
733 }
734 ser->pool = ao2_bump(pool);
736 ast_cond_init(&ser->cond, NULL);
737 return ser;
738}
#define ast_cond_init(cond, attr)
Definition lock.h:208
struct ast_serializer_shutdown_group * shutdown_group
Definition taskpool.c:706
static void serializer_dtor(void *obj)
Definition taskpool.c:713

References ao2_alloc, ao2_bump, ast_cond_init, serializer::cond, NULL, serializer::pool, serializer_dtor(), serializer::shutdown_group, and shutdown_group.

Referenced by ast_taskpool_serializer_group().

◆ serializer_dtor()

static void serializer_dtor ( void *  obj)
static

Definition at line 713 of file taskpool.c.

714{
715 struct serializer *ser = obj;
716
717 ao2_cleanup(ser->pool);
719 ast_cond_destroy(&ser->cond);
720}
#define ast_cond_destroy(cond)
Definition lock.h:209

References ao2_cleanup, ast_cond_destroy, serializer::cond, serializer::pool, and serializer::shutdown_group.

Referenced by serializer_create().

◆ serializer_shutdown()

static void serializer_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 809 of file taskpool.c.

810{
812
813 if (ser->shutdown_group) {
815 }
816 ao2_cleanup(ser);
817}
void ast_serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Decrement the number of serializer members in the group.

References ao2_cleanup, ast_serializer_shutdown_group_dec(), ast_taskprocessor_listener_get_user_data(), listener(), and serializer::shutdown_group.

◆ serializer_start()

static int serializer_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 803 of file taskpool.c.

804{
805 /* No-op */
806 return 0;
807}

◆ serializer_task_pushed()

static void serializer_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 791 of file taskpool.c.

792{
793 if (was_empty) {
796
797 if (ast_taskpool_push(ser->pool, execute_tasks, tps)) {
799 }
800 }
801}
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.

References ast_taskpool_push, ast_taskprocessor_listener_get_tps(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), execute_tasks(), listener(), and serializer::pool.

◆ taskpool_dynamic_pool_grow()

static void taskpool_dynamic_pool_grow ( struct ast_taskpool pool,
struct taskpool_taskprocessor **  taskprocessor 
)
static

Definition at line 479 of file taskpool.c.

480{
481 unsigned int num_to_add = pool->options.auto_increment;
482 int i;
483
484 if (!num_to_add) {
485 return;
486 }
487
488 /* If a maximum size is enforced, then determine if we have to limit how many taskprocessors we add */
489 if (pool->options.max_size) {
491
492 if (current_size + num_to_add > pool->options.max_size) {
493 num_to_add = pool->options.max_size - current_size;
494 }
495 }
496
497 for (i = 0; i < num_to_add; i++) {
498 struct taskpool_taskprocessor *new_taskprocessor;
499
500 new_taskprocessor = taskpool_taskprocessor_alloc(pool, 'd');
501 if (!new_taskprocessor) {
502 return;
503 }
504
505 if (AST_VECTOR_APPEND(&pool->dynamic_taskprocessors.taskprocessors, new_taskprocessor)) {
506 ao2_ref(new_taskprocessor, -1);
507 return;
508 }
509
510 if (i == 0) {
511 /* On the first iteration we return the taskprocessor we just added */
512 *taskprocessor = new_taskprocessor;
513 /* We assume we will be going back to the first taskprocessor, since we are at the end of the vector */
515 } else if (i == 1) {
516 /* On the second iteration we update the next taskprocessor to use to be this one */
518 }
519 }
520}
unsigned int taskprocessor_num
Definition taskpool.c:48

References ao2_ref, AST_VECTOR_APPEND, AST_VECTOR_SIZE, ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::max_size, ast_taskpool::options, ast_taskpool::static_taskprocessors, taskpool_taskprocessor_alloc(), taskpool_taskprocessor::taskprocessor, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by __ast_taskpool_push().

◆ taskpool_dynamic_pool_shrink()

static int taskpool_dynamic_pool_shrink ( const void *  data)
static

Definition at line 226 of file taskpool.c.

227{
228 struct ast_taskpool *pool = (struct ast_taskpool *)data;
229 int num_removed;
230
231 ao2_lock(pool);
232
233 /* If the pool is shutting down, do nothing and don't reschedule */
234 if (pool->shutting_down) {
235 ao2_unlock(pool);
236 ao2_ref(pool, -1);
237 return 0;
238 }
239
240 /* Go through the dynamic taskprocessors and find any which have been idle long enough and remove them */
243 if (num_removed) {
244 /* If we've removed any taskprocessors the taskprocessor_num may no longer be valid, so update it */
247 }
248 }
249
250 ao2_unlock(pool);
251
252 /* It is possible for the pool to have been shut down between unlocking and returning, this is
253 * inherently a race condition we can't eliminate so we will catch it on the next iteration.
254 */
255 return pool->options.idle_timeout * 1000;
256}
int idle_timeout
Time limit in seconds for idle dynamic taskprocessors.
Definition taskpool.h:88
#define TASKPROCESSOR_IS_IDLE(tps, timeout)
Definition taskpool.c:221
#define AST_VECTOR_REMOVE_ALL_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove all elements from a vector that matches the given comparison.
Definition vector.h:472

References ao2_cleanup, ao2_lock, ao2_ref, ao2_unlock, AST_VECTOR_REMOVE_ALL_CMP_UNORDERED, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::idle_timeout, ast_taskpool::options, ast_taskpool::shutting_down, TASKPROCESSOR_IS_IDLE, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_least_full_selector()

static void taskpool_least_full_selector ( struct ast_taskpool pool,
struct taskpool_taskprocessors taskprocessors,
struct taskpool_taskprocessor **  taskprocessor,
unsigned int *  growth_threshold_reached 
)
static

Least full taskprocessor selector.

\interal

Definition at line 287 of file taskpool.c.

289{
290 struct taskpool_taskprocessor *least_full = NULL;
291 unsigned int i;
292
293 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
294 *growth_threshold_reached = 1;
295 return;
296 }
297
298 /* We assume that the growth threshold has not yet been reached, until proven otherwise */
299 *growth_threshold_reached = 0;
300
301 for (i = 0; i < AST_VECTOR_SIZE(&taskprocessors->taskprocessors); i++) {
302 struct taskpool_taskprocessor *tp = AST_VECTOR_GET(&taskprocessors->taskprocessors, i);
303
304 /* If this taskprocessor has no outstanding tasks, it is the best choice */
306 *taskprocessor = tp;
307 return;
308 }
309
310 /* If any of the taskprocessors have reached the growth threshold then we should grow the pool */
312 *growth_threshold_reached = 1;
313 }
314
315 /* The taskprocessor with the fewest tasks should be used */
316 if (!least_full || ast_taskprocessor_size(tp->taskprocessor) < ast_taskprocessor_size(least_full->taskprocessor)) {
317 least_full = tp;
318 }
319 }
320
321 *taskprocessor = least_full;
322}
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskpool_options::growth_threshold, NULL, ast_taskpool::options, taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_sequential_selector()

static void taskpool_sequential_selector ( struct ast_taskpool pool,
struct taskpool_taskprocessors taskprocessors,
struct taskpool_taskprocessor **  taskprocessor,
unsigned int *  growth_threshold_reached 
)
static

Definition at line 262 of file taskpool.c.

264{
265 unsigned int taskprocessor_num = taskprocessors->taskprocessor_num;
266
267 if (!AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
268 *growth_threshold_reached = 1;
269 return;
270 }
271
272 taskprocessors->taskprocessor_num++;
273 if (taskprocessors->taskprocessor_num == AST_VECTOR_SIZE(&taskprocessors->taskprocessors)) {
274 taskprocessors->taskprocessor_num = 0;
275 }
276
277 *taskprocessor = AST_VECTOR_GET(&taskprocessors->taskprocessors, taskprocessor_num);
278
279 /* Check to see if this has reached the growth threshold */
280 *growth_threshold_reached = (ast_taskprocessor_size((*taskprocessor)->taskprocessor) >= pool->options.growth_threshold) ? 1 : 0;
281}

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskpool_options::growth_threshold, ast_taskpool::options, taskpool_taskprocessors::taskprocessor_num, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

◆ taskpool_serializer_empty_task()

static int taskpool_serializer_empty_task ( void *  data)
static

Definition at line 868 of file taskpool.c.

869{
870 return 0;
871}

Referenced by __ast_taskpool_serializer_push_wait(), and taskpool_serializer_suspend_task().

◆ taskpool_serializer_suspend_task()

static int taskpool_serializer_suspend_task ( void *  data)
static

Definition at line 978 of file taskpool.c.

979{
980 struct ast_taskprocessor *serializer = data;
983
984 /* First we queue the empty task to ensure the serializer doesn't reach empty, this
985 * prevents any threads from queueing up a taskpool task that executes the serializer
986 * while it is suspended, allowing us to queue it ourselves when the serializer is
987 * unsuspended.
988 */
990 return -1;
991 }
992
993 /* Next we suspend the serializer so that the execute_tasks currently executing stops
994 * and doesn't requeue.
995 */
997
998 return 0;
999}

References ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push, listener(), NULL, SERIALIZER_SUSPENDED, serializer::suspended, and taskpool_serializer_empty_task().

Referenced by ast_taskpool_serializer_suspend().

◆ taskpool_shutdown()

static void taskpool_shutdown ( void  )
static

Definition at line 1085 of file taskpool.c.

1086{
1087 if (sched) {
1089 sched = NULL;
1090 }
1091}
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
Definition sched.c:271

References ast_sched_context_destroy(), and NULL.

Referenced by ast_taskpool_init().

◆ taskpool_sync_task()

static int taskpool_sync_task ( void *  data)
static

Definition at line 615 of file taskpool.c.

616{
617 struct taskpool_sync_task *sync_task = data;
618 int ret;
619
620 sync_task->fail = sync_task->task(sync_task->task_data);
621
622 /*
623 * Once we unlock sync_task->lock after signaling, we cannot access
624 * sync_task again. The thread waiting within ast_taskpool_push_wait()
625 * is free to continue and release its local variable (sync_task).
626 */
627 ast_mutex_lock(&sync_task->lock);
628 sync_task->complete = 1;
629 ast_cond_signal(&sync_task->cond);
630 ret = sync_task->fail;
631 ast_mutex_unlock(&sync_task->lock);
632 return ret;
633}
#define ast_cond_signal(cond)
Definition lock.h:210
ast_cond_t cond
Definition taskpool.c:582
int(* task)(void *)
Definition taskpool.c:585
ast_mutex_t lock
Definition taskpool.c:581

References ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, taskpool_sync_task::complete, taskpool_sync_task::cond, taskpool_sync_task::fail, taskpool_sync_task::lock, taskpool_sync_task::task, and taskpool_sync_task::task_data.

◆ taskpool_sync_task_cleanup()

static void taskpool_sync_task_cleanup ( struct taskpool_sync_task sync_task)
static

Definition at line 606 of file taskpool.c.

607{
608 ast_mutex_destroy(&sync_task->lock);
609 ast_cond_destroy(&sync_task->cond);
610}
#define ast_mutex_destroy(a)
Definition lock.h:195

References ast_cond_destroy, ast_mutex_destroy, taskpool_sync_task::cond, and taskpool_sync_task::lock.

Referenced by __ast_taskpool_push_wait(), and __ast_taskpool_serializer_push_wait().

◆ taskpool_sync_task_init()

static int taskpool_sync_task_init ( struct taskpool_sync_task sync_task,
int(*)(void *)  task,
void *  data 
)
static

Definition at line 592 of file taskpool.c.

593{
594 ast_mutex_init(&sync_task->lock);
595 ast_cond_init(&sync_task->cond, NULL);
596 sync_task->complete = 0;
597 sync_task->fail = 0;
598 sync_task->task = task;
599 sync_task->task_data = data;
600 return 0;
601}
#define ast_mutex_init(pmutex)
Definition lock.h:193

References ast_cond_init, ast_mutex_init, taskpool_sync_task::complete, taskpool_sync_task::cond, taskpool_sync_task::fail, taskpool_sync_task::lock, NULL, task(), taskpool_sync_task::task, and taskpool_sync_task::task_data.

Referenced by __ast_taskpool_push_wait(), and __ast_taskpool_serializer_push_wait().

◆ taskpool_taskprocessor_alloc()

static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc ( struct ast_taskpool pool,
char  type 
)
static

Definition at line 152 of file taskpool.c.

153{
155 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
156
157 /* We don't actually need locking for each pool taskprocessor, as the only thing
158 * mutable is the underlying taskprocessor which has its own internal locking.
159 */
161 if (!taskprocessor) {
162 return NULL;
163 }
164
165 /* Create name with seq number appended. */
166 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "taskpool/%c:%s", type, pool->name);
167
168 taskprocessor->taskprocessor = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
169 if (!taskprocessor->taskprocessor) {
171 return NULL;
172 }
173
174 taskprocessor->last_pushed = ast_tvnow();
175
177 ao2_ref(pool, -1);
178 /* Prevent the taskprocessor from queueing the stop task by explicitly unreferencing and setting it to
179 * NULL here.
180 */
182 taskprocessor->taskprocessor = NULL;
183 return NULL;
184 }
185
186 return taskprocessor;
187}
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition astobj2.h:404
static const char type[]
static int taskpool_taskprocessor_start(void *data)
Definition taskpool.c:131
static void taskpool_taskprocessor_dtor(void *obj)
Definition taskpool.c:116
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_push, ast_taskprocessor_unreference(), ast_tvnow(), ast_taskpool::name, NULL, taskpool_taskprocessor_dtor(), taskpool_taskprocessor_start(), taskpool_taskprocessor::taskprocessor, TPS_REF_DEFAULT, and type.

Referenced by ast_taskpool_create(), and taskpool_dynamic_pool_grow().

◆ taskpool_taskprocessor_dtor()

static void taskpool_taskprocessor_dtor ( void *  obj)
static

Definition at line 116 of file taskpool.c.

117{
119
121 /* We can't actually do anything if this fails, so just accept reality */
122 }
123
125}
static int taskpool_taskprocessor_stop(void *data)
Definition taskpool.c:101

References ast_taskprocessor_push, ast_taskprocessor_unreference(), NULL, taskpool_taskprocessor_stop(), and taskpool_taskprocessor::taskprocessor.

Referenced by taskpool_taskprocessor_alloc().

◆ taskpool_taskprocessor_start()

static int taskpool_taskprocessor_start ( void *  data)
static

Definition at line 131 of file taskpool.c.

132{
133 struct ast_taskpool *pool = data;
134
135 /* Set the pool on the thread for this taskprocessor, inheriting the
136 * reference passed to the task itself.
137 */
138 ast_threadstorage_set_ptr(&current_taskpool_pool, pool);
139
140 /* If a thread start callback is set on the options, call it */
141 if (pool->options.thread_start) {
142 pool->options.thread_start();
143 }
144
145 return 0;
146}
void(* thread_start)(void)
Function to call when a taskprocessor starts.
Definition taskpool.h:138

References ast_threadstorage_set_ptr(), ast_taskpool::options, and ast_taskpool_options::thread_start.

Referenced by taskpool_taskprocessor_alloc().

◆ taskpool_taskprocessor_stop()

static int taskpool_taskprocessor_stop ( void *  data)
static

Definition at line 101 of file taskpool.c.

102 {
103 struct ast_taskpool *pool = ast_taskpool_get_current();
104
105 /* If a thread stop callback is set on the options, call it */
106 if (pool->options.thread_end) {
107 pool->options.thread_end();
108 }
109
110 ao2_cleanup(pool);
111
112 return 0;
113 }
void(* thread_end)(void)
Function to call when a taskprocessor ends.
Definition taskpool.h:145

References ao2_cleanup, ast_taskpool_get_current(), ast_taskpool::options, and ast_taskpool_options::thread_end.

Referenced by taskpool_taskprocessor_dtor().

◆ taskpool_taskprocessors_cleanup()

static void taskpool_taskprocessors_cleanup ( struct taskpool_taskprocessors taskprocessors)
static

Definition at line 206 of file taskpool.c.

207{
208 /* Access/manipulation of taskprocessors is done with the lock held, and
209 * with a check of the shutdown flag done. This means that outside of holding
210 * the lock we can safely muck with it. Pushing to the taskprocessor is done
211 * outside of the lock, but with a reference to the taskprocessor held.
212 */
214 AST_VECTOR_FREE(&taskprocessors->taskprocessors);
215}
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185

References ao2_cleanup, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_shutdown().

◆ taskpool_taskprocessors_init()

static int taskpool_taskprocessors_init ( struct taskpool_taskprocessors taskprocessors,
unsigned int  size 
)
static

Definition at line 193 of file taskpool.c.

194{
195 if (AST_VECTOR_INIT(&taskprocessors->taskprocessors, size)) {
196 return -1;
197 }
198
199 return 0;
200}
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124

References AST_VECTOR_INIT, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_taskpool_create().

Variable Documentation

◆ sched

struct ast_sched_context* sched
static

Scheduler used for dynamic pool shrinking.

Definition at line 83 of file taskpool.c.

◆ serializer_tps_listener_callbacks

struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
static
Initial value:
= {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
}
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition taskpool.c:791
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition taskpool.c:809
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition taskpool.c:803

Definition at line 819 of file taskpool.c.

819 {
820 .task_pushed = serializer_task_pushed,
821 .start = serializer_start,
822 .shutdown = serializer_shutdown,
823};

Referenced by ast_taskpool_serializer_group().