Asterisk - The Open Source Telephony Project GIT-master-4f2b068
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions
taskpool.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_taskpool_options
 

Macros

#define AST_TASKPOOL_OPTIONS_VERSION   1
 
#define ast_taskpool_push(pool, task, data)    __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define ast_taskpool_push_wait(pool, task, data)    __ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define ast_taskpool_serializer_push_wait(pool, task, data)    __ast_taskpool_serializer_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 

Enumerations

enum  ast_taskpool_selector { AST_TASKPOOL_SELECTOR_DEFAULT = 0 , AST_TASKPOOL_SELECTOR_LEAST_FULL , AST_TASKPOOL_SELECTOR_SEQUENTIAL }
 Selectors for choosing which taskprocessor in a pool to use. More...
 

Functions

int __ast_taskpool_push (struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task to the taskpool.
 
int __ast_taskpool_push_wait (struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task to the taskpool, and wait for completion.
 
int __ast_taskpool_serializer_push_wait (struct ast_taskprocessor *serializer, int(*task)(void *data), void *data, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task to a serializer, and wait for completion.
 
struct ast_taskpoolast_taskpool_create (const char *name, const struct ast_taskpool_options *options)
 Create a new taskpool.
 
long ast_taskpool_queue_size (struct ast_taskpool *pool)
 Get the current number of queued tasks in the taskpool.
 
struct ast_taskprocessorast_taskpool_serializer (const char *name, struct ast_taskpool *pool)
 Serialized execution of tasks within a ast_taskpool.
 
struct ast_taskprocessorast_taskpool_serializer_get_current (void)
 Get the taskpool serializer currently associated with this thread.
 
struct ast_taskprocessorast_taskpool_serializer_group (const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_taskpool.
 
int ast_taskpool_serializer_suspend (struct ast_taskprocessor *serializer)
 Suspend a serializer, causing tasks to be queued until unsuspended.
 
int ast_taskpool_serializer_unsuspend (struct ast_taskprocessor *serializer)
 Unsuspend a serializer, causing tasks to be executed.
 
void ast_taskpool_shutdown (struct ast_taskpool *pool)
 Shut down a taskpool and remove the underlying taskprocessors.
 
size_t ast_taskpool_taskprocessors_count (struct ast_taskpool *pool)
 Get the current number of taskprocessors in the taskpool.
 

Detailed Description

API providing queued task execution across threads.

Definition in file taskpool.h.

Macro Definition Documentation

◆ AST_TASKPOOL_OPTIONS_VERSION

#define AST_TASKPOOL_OPTIONS_VERSION   1

Definition at line 76 of file taskpool.h.

◆ ast_taskpool_push

#define ast_taskpool_push (   pool,
  task,
  data 
)     __ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 210 of file taskpool.h.

◆ ast_taskpool_push_wait

#define ast_taskpool_push_wait (   pool,
  task,
  data 
)     __ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 230 of file taskpool.h.

◆ ast_taskpool_serializer_push_wait

#define ast_taskpool_serializer_push_wait (   pool,
  task,
  data 
)     __ast_taskpool_serializer_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 335 of file taskpool.h.

Enumeration Type Documentation

◆ ast_taskpool_selector

Selectors for choosing which taskprocessor in a pool to use.

Enumerator
AST_TASKPOOL_SELECTOR_DEFAULT 
AST_TASKPOOL_SELECTOR_LEAST_FULL 
AST_TASKPOOL_SELECTOR_SEQUENTIAL 

Definition at line 69 of file taskpool.h.

69 {
70 AST_TASKPOOL_SELECTOR_DEFAULT = 0, /* The selector that is generally the best for most use cases */
71 AST_TASKPOOL_SELECTOR_LEAST_FULL, /* Select the least full taskprocessor */
72 AST_TASKPOOL_SELECTOR_SEQUENTIAL, /* Select taskprocessors in a sequential manner */
73};
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
Definition taskpool.h:72
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
Definition taskpool.h:71
@ AST_TASKPOOL_SELECTOR_DEFAULT
Definition taskpool.h:70

Function Documentation

◆ __ast_taskpool_push()

int __ast_taskpool_push ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the taskpool.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 527 of file taskpool.c.

529{
530 RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
531
532 /* Select the taskprocessor in the pool to use for pushing this task */
533 ao2_lock(pool);
534 if (!pool->shutting_down) {
535 unsigned int growth_threshold_reached = 0;
536
537 /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
538 * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
539 * fails for some reason, it will still fall back to the initially found static one if
540 * it is present.
541 */
542 pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
543 if (pool->options.auto_increment && growth_threshold_reached) {
544 /* If we need to grow then try dynamic taskprocessors */
545 pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
546 if (growth_threshold_reached) {
547 /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
548 taskpool_dynamic_pool_grow(pool, &taskprocessor);
549 }
550
551 /* If a dynamic taskprocessor was used update its last push time */
552 if (taskprocessor) {
553 taskprocessor->last_pushed = ast_tvnow();
554 }
555 }
556 ao2_bump(taskprocessor);
557 }
558 ao2_unlock(pool);
559
560 if (!taskprocessor) {
561 return -1;
562 }
563
564 if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) {
565 return -1;
566 }
567
568 return 0;
569}
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define NULL
Definition resample.c:96
int auto_increment
Number of taskprocessors to increment the pool by.
Definition taskpool.h:92
taskpool_selector selector
Definition taskpool.c:74
struct ast_taskpool_options options
Definition taskpool.c:70
struct taskpool_taskprocessors static_taskprocessors
Definition taskpool.c:64
int shutting_down
Definition taskpool.c:68
struct taskpool_taskprocessors dynamic_taskprocessors
Definition taskpool.c:66
A taskpool taskprocessor.
Definition taskpool.c:34
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
Definition taskpool.c:479
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References __ast_taskprocessor_push(), ao2_bump, ao2_cleanup, ao2_lock, ao2_unlock, ast_tvnow(), ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, NULL, ast_taskpool::options, RAII_VAR, ast_taskpool::selector, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, task(), taskpool_dynamic_pool_grow(), and taskpool_taskprocessor::taskprocessor.

Referenced by __ast_sip_push_task(), __ast_taskpool_push_wait(), and ast_taskpool_push().

◆ __ast_taskpool_push_wait()

int __ast_taskpool_push_wait ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the taskpool, and wait for completion.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 635 of file taskpool.c.

637{
638 struct taskpool_sync_task sync_task;
639
640 /* If we are already executing within a taskpool taskprocessor then
641 * don't bother pushing a new task, just directly execute the task.
642 */
644 return task(data);
645 }
646
647 if (taskpool_sync_task_init(&sync_task, task, data)) {
648 return -1;
649 }
650
651 if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) {
652 taskpool_sync_task_cleanup(&sync_task);
653 return -1;
654 }
655
656 ast_mutex_lock(&sync_task.lock);
657 while (!sync_task.complete) {
658 ast_cond_wait(&sync_task.cond, &sync_task.lock);
659 }
660 ast_mutex_unlock(&sync_task.lock);
661
662 taskpool_sync_task_cleanup(&sync_task);
663 return sync_task.fail;
664}
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_mutex_lock(a)
Definition lock.h:196
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
Definition taskpool.c:606
int __ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function)
Push a task to the taskpool.
Definition taskpool.c:527
static struct ast_taskpool * ast_taskpool_get_current(void)
Definition taskpool.c:92
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
Definition taskpool.c:592

References __ast_taskpool_push(), ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), taskpool_sync_task::complete, taskpool_sync_task::cond, taskpool_sync_task::fail, taskpool_sync_task::lock, task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by __ast_sip_push_task_wait(), __ast_sip_push_task_wait_serializer(), and ast_taskpool_push_wait().

◆ __ast_taskpool_serializer_push_wait()

int __ast_taskpool_serializer_push_wait ( struct ast_taskprocessor serializer,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to a serializer, and wait for completion.

Since
23.1.0
22.7.0
20.17.0
Parameters
serializerThe serializer to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 882 of file taskpool.c.

884{
887 struct ast_taskprocessor *prior_serializer;
888 struct taskpool_sync_task sync_task;
889
890 /* If not in a taskpool taskprocessor we can just queue the task like normal and
891 * wait. */
893 if (taskpool_sync_task_init(&sync_task, task, data)) {
894 return -1;
895 }
896
897 if (__ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task, file, line, function)) {
898 taskpool_sync_task_cleanup(&sync_task);
899 return -1;
900 }
901
902 ast_mutex_lock(&sync_task.lock);
903 while (!sync_task.complete) {
904 ast_cond_wait(&sync_task.cond, &sync_task.lock);
905 }
906 ast_mutex_unlock(&sync_task.lock);
907
908 taskpool_sync_task_cleanup(&sync_task);
909 return sync_task.fail;
910 }
911
912 /* It is possible that we are already executing within a serializer, so stash the existing
913 * away so we can restore it.
914 */
915 prior_serializer = ast_taskpool_serializer_get_current();
916
917 ao2_lock(ser);
918
919 /* There are two cases where we can or have to directly execute this task:
920 * 1. There are no other tasks in the serializer
921 * 2. We are already in the serializer
922 * In the second case if we don't execute the task now, we will deadlock waiting
923 * on it as it will never occur.
924 */
925 if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
926 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
927 sync_task.fail = task(data);
928 ao2_unlock(ser);
929 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
930 return sync_task.fail;
931 }
932
933 if (taskpool_sync_task_init(&sync_task, task, data)) {
934 ao2_unlock(ser);
935 return -1;
936 }
937
938 /* First we queue the serialized task */
939 if (__ast_taskprocessor_push(serializer, taskpool_sync_task, &sync_task, file, line, function)) {
940 taskpool_sync_task_cleanup(&sync_task);
941 ao2_unlock(ser);
942 return -1;
943 }
944
945 /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
946 * stops two tasks from being queued for the same serializer at the same time.
947 */
949 taskpool_sync_task_cleanup(&sync_task);
950 ao2_unlock(ser);
951 return -1;
952 }
953
954 /* Now we execute the tasks on the serializer until our sync task is complete */
955 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
956 while (!sync_task.complete) {
957 /* If the serializer is suspended wait until it unsuspends */
958 while (ser->suspended == SERIALIZER_SUSPENDED) {
960 }
961
962 /* The sync task is guaranteed to be executed, so doing a while loop on the complete
963 * flag is safe.
964 */
966 }
967 taskpool_sync_task_cleanup(&sync_task);
968 ao2_unlock(ser);
969
970 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
971
972 return sync_task.fail;
973}
static void * listener(void *unused)
Definition asterisk.c:1531
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
ast_cond_t cond
Definition taskpool.c:708
enum serializer_suspension suspended
Definition taskpool.c:710
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition taskpool.c:825
@ SERIALIZER_SUSPENDED
Definition taskpool.c:699
static int taskpool_serializer_empty_task(void *data)
Definition taskpool.c:868
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_taskprocessor_push(tps, task_exe, datap)
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References __ast_taskprocessor_push(), ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_get_current(), ast_taskprocessor_execute(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push, ast_taskprocessor_size(), ast_threadstorage_set_ptr(), taskpool_sync_task::complete, taskpool_sync_task::cond, serializer::cond, taskpool_sync_task::fail, listener(), taskpool_sync_task::lock, NULL, SERIALIZER_SUSPENDED, serializer::suspended, task(), taskpool_serializer_empty_task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by __ast_sip_push_task_wait(), __ast_sip_push_task_wait_serializer(), and ast_taskpool_serializer_push_wait().

◆ ast_taskpool_create()

struct ast_taskpool * ast_taskpool_create ( const char *  name,
const struct ast_taskpool_options options 
)

Create a new taskpool.

Since
23.1.0
22.7.0
20.17.0

This function creates a taskpool. Tasks may be pushed onto this task pool and will be automatically acted upon by taskprocessors within the pool.

Only a single taskpool with a given name may exist. This function will fail if a taskpool with the given name already exists.

Parameters
nameThe unique name for the taskpool
optionsThe behavioral options for this taskpool
Return values
NULLFailed to create the taskpool
non-NULLThe newly-created taskpool
Note
The ast_taskpool_shutdown function must be called to shut down the taskpool and clean up underlying resources fully.

Definition at line 324 of file taskpool.c.

326{
327 struct ast_taskpool *pool;
328
329 /* Enforce versioning on the passed-in options */
330 if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
331 return NULL;
332 }
333
334 pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
335 if (!pool) {
336 return NULL;
337 }
338
339 strcpy(pool->name, name); /* Safe */
340 memcpy(&pool->options, options, sizeof(pool->options));
341 pool->shrink_sched_id = -1;
342
343 /* Verify the passed-in options are valid, and adjust if needed */
344 if (options->initial_size < options->minimum_size) {
345 pool->options.initial_size = options->minimum_size;
346 ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
347 name, options->initial_size, options->minimum_size, options->minimum_size);
348 }
349
350 if (options->max_size && pool->options.initial_size > options->max_size) {
351 pool->options.max_size = pool->options.initial_size;
352 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
353 name, options->max_size, pool->options.initial_size, pool->options.initial_size);
354 }
355
356 if (!options->auto_increment) {
357 if (!pool->options.minimum_size) {
358 pool->options.minimum_size = 1;
359 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
360 }
361 if (!pool->options.max_size) {
362 pool->options.max_size = pool->options.minimum_size;
363 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
364 }
365 if (pool->options.minimum_size != pool->options.max_size) {
366 pool->options.minimum_size = pool->options.max_size;
367 pool->options.initial_size = pool->options.max_size;
368 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
369 name, options->minimum_size, pool->options.max_size, pool->options.max_size);
370 }
371 } else if (!options->growth_threshold) {
373 }
374
377 } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
379 } else {
380 ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
381 name, options->selector);
383 }
384
386 ao2_ref(pool, -1);
387 return NULL;
388 }
389
390 /* Create the static taskprocessors based on the passed-in options */
391 for (int i = 0; i < pool->options.minimum_size; i++) {
393
395 if (!taskprocessor) {
396 /* The reference to pool is passed to ast_taskpool_shutdown */
398 return NULL;
399 }
400
403 /* The reference to pool is passed to ast_taskpool_shutdown */
405 return NULL;
406 }
407 }
408
410 pool->options.initial_size - pool->options.minimum_size)) {
412 return NULL;
413 }
414
415 /* Create the dynamic taskprocessor based on the passed-in options */
416 for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
418
420 if (!taskprocessor) {
421 /* The reference to pool is passed to ast_taskpool_shutdown */
423 return NULL;
424 }
425
428 /* The reference to pool is passed to ast_taskpool_shutdown */
430 return NULL;
431 }
432 }
433
434 /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
435 * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
436 * and to reduce complexity.
437 */
438 if (options->idle_timeout && options->auto_increment) {
440 if (pool->shrink_sched_id < 0) {
441 ao2_ref(pool, -1);
442 /* The second reference to pool is passed to ast_taskpool_shutdown */
444 return NULL;
445 }
446 }
447
448 return pool;
449}
#define ast_log
Definition astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
static const char name[]
Definition format_mp3.c:68
#define LOG_WARNING
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
Definition sched.c:567
int max_size
Maximum number of taskprocessors a pool may have.
Definition taskpool.h:122
int growth_threshold
The threshold for when to grow the pool.
Definition taskpool.h:131
int minimum_size
Number of taskprocessors that will always exist.
Definition taskpool.h:99
int initial_size
Number of taskprocessors the pool will start with.
Definition taskpool.h:109
An opaque taskpool structure.
Definition taskpool.c:62
int shrink_sched_id
Definition taskpool.c:72
char name[0]
Definition taskpool.c:76
Definition sched.c:76
struct ast_taskprocessor * taskprocessor
Definition taskpool.c:36
struct taskpool_taskprocessors::@439 taskprocessors
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
Definition taskpool.c:287
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition taskpool.c:675
static int taskpool_dynamic_pool_shrink(const void *data)
Definition taskpool.c:226
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
Definition taskpool.c:193
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
Definition taskpool.c:152
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition taskpool.c:262
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
Definition taskpool.c:80
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
static struct test_options options
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References ao2_alloc, ao2_bump, ao2_ref, ast_log, ast_sched_add(), AST_TASKPOOL_OPTIONS_VERSION, AST_TASKPOOL_SELECTOR_DEFAULT, AST_TASKPOOL_SELECTOR_LEAST_FULL, AST_TASKPOOL_SELECTOR_SEQUENTIAL, ast_taskpool_shutdown(), AST_VECTOR_APPEND, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::growth_threshold, ast_taskpool_options::initial_size, LOG_WARNING, ast_taskpool_options::max_size, ast_taskpool_options::minimum_size, ast_taskpool::name, name, NULL, ast_taskpool::options, options, ast_taskpool::selector, ast_taskpool::shrink_sched_id, ast_taskpool::static_taskprocessors, taskpool_dynamic_pool_shrink(), TASKPOOL_GROW_THRESHOLD, taskpool_least_full_selector(), taskpool_sequential_selector(), taskpool_taskprocessor_alloc(), taskpool_taskprocessors_init(), taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), load_module(), and stasis_init().

◆ ast_taskpool_queue_size()

long ast_taskpool_queue_size ( struct ast_taskpool pool)

Get the current number of queued tasks in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of queued tasks in the taskpool

Definition at line 464 of file taskpool.c.

465{
466 long queue_size = 0;
467
468 ao2_lock(pool);
471 ao2_unlock(pool);
472
473 return queue_size;
474}
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
Definition taskpool.c:462
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873

References ao2_lock, ao2_unlock, AST_VECTOR_CALLBACK_VOID, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, and taskpool_taskprocessors::taskprocessors.

Referenced by ast_sip_taskpool_queue_size().

◆ ast_taskpool_serializer()

struct ast_taskprocessor * ast_taskpool_serializer ( const char *  name,
struct ast_taskpool pool 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 860 of file taskpool.c.

861{
863}
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition taskpool.c:830

References ast_taskpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_serializer_efficiency(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

◆ ast_taskpool_serializer_get_current()

struct ast_taskprocessor * ast_taskpool_serializer_get_current ( void  )

Get the taskpool serializer currently associated with this thread.

Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 825 of file taskpool.c.

826{
827 return ast_threadstorage_get_ptr(&current_taskpool_serializer);
828}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by __ast_taskpool_serializer_push_wait(), record_serializer(), requeue_task(), rfc3326_outgoing_request(), rfc3326_outgoing_response(), serializer_efficiency_task(), and simple_task().

◆ ast_taskpool_serializer_group()

struct ast_taskprocessor * ast_taskpool_serializer_group ( const char *  name,
struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 830 of file taskpool.c.

832{
833 struct serializer *ser;
835 struct ast_taskprocessor *tps;
836
838 if (!ser) {
839 return NULL;
840 }
841
843 if (!listener) {
844 ao2_ref(ser, -1);
845 return NULL;
846 }
847
849 if (!tps) {
850 /* ser ref transferred to listener but not cleaned without tps */
851 ao2_ref(ser, -1);
852 } else if (shutdown_group) {
854 }
855
856 ao2_ref(listener, -1);
857 return tps;
858}
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition taskpool.c:819
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition taskpool.c:722
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_taskpool_create(), ast_sip_create_serializer_group(), and ast_taskpool_serializer().

◆ ast_taskpool_serializer_suspend()

int ast_taskpool_serializer_suspend ( struct ast_taskprocessor serializer)

Suspend a serializer, causing tasks to be queued until unsuspended.

Since
23.2.0
22.8.0
20.18.0
Parameters
serializerThe serializer to suspend
Return values
0success
-1failure
Note
May only be invoked from outside of the taskpool

Definition at line 1001 of file taskpool.c.

1002{
1005
1006 /* This suspension process works by inserting a checkpoint into the queue of the
1007 * serializer. Once this checkpoint is reached the taskpool taskprocessor handling
1008 * the queue stops prematurely and does not get requeued. For the case where a
1009 * synchronous task wait is in progress it is instead paused temporarily. Once
1010 * the serializer is unsuspended a new execution task is queued into the taskpool
1011 * to resume execution and any paused synchronous task waits are awoken to resume
1012 * their own execution as well. This approach minimizes the number of threads that
1013 * are paused waiting, nominally to 0.
1014 */
1015
1017 return -1;
1018 }
1019
1020 ao2_lock(ser);
1021
1022 /* If the serializer is already suspending or suspended, just return immediately.
1023 * This mirrors the original behavior from PJSIP.
1024 */
1025 if (ser->suspended != SERIALIZER_UNSUSPENDED) {
1026 ao2_unlock(ser);
1027 return 0;
1028 }
1029
1031
1032 ao2_unlock(ser);
1033
1034 /* Once this returns successfully there is no thread executing the tasks on the serializer,
1035 * so they will accumulate until the serializer is unsuspended.
1036 */
1038 /* Suspension failed, so unsuspend as doing otherwise would leave the serializer in a stuck
1039 * state.
1040 */
1041 ao2_lock(ser);
1043 ao2_unlock(ser);
1044 return -1;
1045 }
1046
1047 return 0;
1048}
static int taskpool_serializer_suspend_task(void *data)
Definition taskpool.c:978
@ SERIALIZER_SUSPENDING
Definition taskpool.c:698
@ SERIALIZER_UNSUSPENDED
Definition taskpool.c:697
#define ast_taskpool_serializer_push_wait(pool, task, data)
Definition taskpool.h:335

References ao2_lock, ao2_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_push_wait, ast_taskprocessor_listener_get_user_data(), listener(), SERIALIZER_SUSPENDING, SERIALIZER_UNSUSPENDED, serializer::suspended, and taskpool_serializer_suspend_task().

Referenced by ast_sip_session_suspend(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and AST_TEST_DEFINE().

◆ ast_taskpool_serializer_unsuspend()

int ast_taskpool_serializer_unsuspend ( struct ast_taskprocessor serializer)

Unsuspend a serializer, causing tasks to be executed.

Since
23.2.0
22.8.0
20.18.0
Parameters
serializerThe serializer to unsuspend
Return values
0success
-1failure
Note
May only be invoked from outside of the taskpool

Definition at line 1050 of file taskpool.c.

1051{
1054
1056 return -1;
1057 }
1058
1059 ao2_lock(ser);
1060
1061 if (ser->suspended != SERIALIZER_SUSPENDED) {
1062 ao2_unlock(ser);
1063 return 0;
1064 }
1065
1067
1068 /* Notify any other interested threads that this one has awoken */
1069 ast_cond_broadcast(&ser->cond);
1070
1071 /* And now we kick off handling of the queued tasks once again */
1074 }
1075
1076 ao2_unlock(ser);
1077
1078 return 0;
1079}
#define ast_cond_broadcast(cond)
Definition lock.h:211
struct ast_taskpool * pool
Definition taskpool.c:704
static int execute_tasks(void *data)
Definition taskpool.c:742
#define ast_taskpool_push(pool, task, data)
Definition taskpool.h:210
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_bump, ao2_lock, ao2_unlock, ast_cond_broadcast, ast_taskpool_get_current(), ast_taskpool_push, ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_unreference(), serializer::cond, execute_tasks(), listener(), serializer::pool, SERIALIZER_SUSPENDED, SERIALIZER_UNSUSPENDED, and serializer::suspended.

Referenced by ast_sip_session_unsuspend(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and AST_TEST_DEFINE().

◆ ast_taskpool_shutdown()

void ast_taskpool_shutdown ( struct ast_taskpool pool)

Shut down a taskpool and remove the underlying taskprocessors.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe pool to shut down
Note
This will decrement the reference to the pool

Definition at line 675 of file taskpool.c.

676{
677 if (!pool) {
678 return;
679 }
680
681 /* Mark this pool as shutting down so nothing new is pushed */
682 ao2_lock(pool);
683 pool->shutting_down = 1;
684 ao2_unlock(pool);
685
686 /* Stop the shrink scheduled item if present */
688
689 /* Clean up all the taskprocessors */
692
693 ao2_ref(pool, -1);
694}
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
Definition sched.h:82
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
Definition taskpool.c:206

References ao2_lock, ao2_ref, ao2_unlock, AST_SCHED_DEL_UNREF, ast_taskpool::dynamic_taskprocessors, ast_taskpool::shrink_sched_id, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors_cleanup().

Referenced by ast_taskpool_create(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().

◆ ast_taskpool_taskprocessors_count()

size_t ast_taskpool_taskprocessors_count ( struct ast_taskpool pool)

Get the current number of taskprocessors in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of taskprocessors in the taskpool

Definition at line 451 of file taskpool.c.

452{
453 size_t count;
454
455 ao2_lock(pool);
457 ao2_unlock(pool);
458
459 return count;
460}
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620

References ao2_lock, ao2_unlock, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE(), and AST_TEST_DEFINE().