Asterisk - The Open Source Telephony Project GIT-master-77d630f
Data Structures | Macros | Enumerations | Functions
taskpool.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_taskpool_options
 

Macros

#define AST_TASKPOOL_OPTIONS_VERSION   1
 

Enumerations

enum  ast_taskpool_selector { AST_TASKPOOL_SELECTOR_DEFAULT = 0 , AST_TASKPOOL_SELECTOR_LEAST_FULL , AST_TASKPOOL_SELECTOR_SEQUENTIAL }
 Selectors for choosing which taskprocessor in a pool to use. More...
 

Functions

struct ast_taskpoolast_taskpool_create (const char *name, const struct ast_taskpool_options *options)
 Create a new taskpool. More...
 
int ast_taskpool_push (struct ast_taskpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
 Push a task to the taskpool. More...
 
int ast_taskpool_push_wait (struct ast_taskpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
 Push a task to the taskpool, and wait for completion. More...
 
long ast_taskpool_queue_size (struct ast_taskpool *pool)
 Get the current number of queued tasks in the taskpool. More...
 
struct ast_taskprocessorast_taskpool_serializer (const char *name, struct ast_taskpool *pool)
 Serialized execution of tasks within a ast_taskpool. More...
 
struct ast_taskprocessorast_taskpool_serializer_get_current (void)
 Get the taskpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_taskpool_serializer_group (const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_taskpool. More...
 
int ast_taskpool_serializer_push_wait (struct ast_taskprocessor *serializer, int(*task)(void *data), void *data)
 Push a task to a serializer, and wait for completion. More...
 
void ast_taskpool_shutdown (struct ast_taskpool *pool)
 Shut down a taskpool and remove the underlying taskprocessors. More...
 
size_t ast_taskpool_taskprocessors_count (struct ast_taskpool *pool)
 Get the current number of taskprocessors in the taskpool. More...
 

Detailed Description

API providing queued task execution across threads.

Definition in file taskpool.h.

Macro Definition Documentation

◆ AST_TASKPOOL_OPTIONS_VERSION

#define AST_TASKPOOL_OPTIONS_VERSION   1

Definition at line 69 of file taskpool.h.

Enumeration Type Documentation

◆ ast_taskpool_selector

Selectors for choosing which taskprocessor in a pool to use.

Enumerator
AST_TASKPOOL_SELECTOR_DEFAULT 
AST_TASKPOOL_SELECTOR_LEAST_FULL 
AST_TASKPOOL_SELECTOR_SEQUENTIAL 

Definition at line 62 of file taskpool.h.

62 {
63 AST_TASKPOOL_SELECTOR_DEFAULT = 0, /* The selector that is generally the best for most use cases */
64 AST_TASKPOOL_SELECTOR_LEAST_FULL, /* Select the least full taskprocessor */
65 AST_TASKPOOL_SELECTOR_SEQUENTIAL, /* Select taskprocessors in a sequential manner */
66};
@ AST_TASKPOOL_SELECTOR_SEQUENTIAL
Definition: taskpool.h:65
@ AST_TASKPOOL_SELECTOR_LEAST_FULL
Definition: taskpool.h:64
@ AST_TASKPOOL_SELECTOR_DEFAULT
Definition: taskpool.h:63

Function Documentation

◆ ast_taskpool_create()

struct ast_taskpool * ast_taskpool_create ( const char *  name,
const struct ast_taskpool_options options 
)

Create a new taskpool.

Since
23.1.0
22.7.0
20.17.0

This function creates a taskpool. Tasks may be pushed onto this task pool and will be automatically acted upon by taskprocessors within the pool.

Only a single taskpool with a given name may exist. This function will fail if a taskpool with the given name already exists.

Parameters
nameThe unique name for the taskpool
optionsThe behavioral options for this taskpool
Return values
NULLFailed to create the taskpool
non-NULLThe newly-created taskpool
Note
The ast_taskpool_shutdown function must be called to shut down the taskpool and clean up underlying resources fully.

Definition at line 324 of file taskpool.c.

326{
327 struct ast_taskpool *pool;
328
329 /* Enforce versioning on the passed-in options */
330 if (options->version != AST_TASKPOOL_OPTIONS_VERSION) {
331 return NULL;
332 }
333
334 pool = ao2_alloc(sizeof(*pool) + strlen(name) + 1, NULL);
335 if (!pool) {
336 return NULL;
337 }
338
339 strcpy(pool->name, name); /* Safe */
340 memcpy(&pool->options, options, sizeof(pool->options));
341 pool->shrink_sched_id = -1;
342
343 /* Verify the passed-in options are valid, and adjust if needed */
344 if (options->initial_size < options->minimum_size) {
345 pool->options.initial_size = options->minimum_size;
346 ast_log(LOG_WARNING, "Taskpool '%s' has an initial size of %d, which is less than the minimum size of %d. Adjusting to %d.\n",
347 name, options->initial_size, options->minimum_size, options->minimum_size);
348 }
349
350 if (options->max_size && pool->options.initial_size > options->max_size) {
351 pool->options.max_size = pool->options.initial_size;
352 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of %d, which is less than the initial size of %d. Adjusting to %d.\n",
353 name, options->max_size, pool->options.initial_size, pool->options.initial_size);
354 }
355
356 if (!options->auto_increment) {
357 if (!pool->options.minimum_size) {
358 pool->options.minimum_size = 1;
359 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of 0, which is not valid without auto increment. Adjusting to 1.\n", name);
360 }
361 if (!pool->options.max_size) {
362 pool->options.max_size = pool->options.minimum_size;
363 ast_log(LOG_WARNING, "Taskpool '%s' has a max size of 0, which is not valid without auto increment. Adjusting to %d.\n", name, pool->options.minimum_size);
364 }
365 if (pool->options.minimum_size != pool->options.max_size) {
366 pool->options.minimum_size = pool->options.max_size;
367 pool->options.initial_size = pool->options.max_size;
368 ast_log(LOG_WARNING, "Taskpool '%s' has a minimum size of %d, while max size is %d. Adjusting all sizes to %d due to lack of auto increment.\n",
369 name, options->minimum_size, pool->options.max_size, pool->options.max_size);
370 }
371 } else if (!options->growth_threshold) {
373 }
374
377 } else if (options->selector == AST_TASKPOOL_SELECTOR_SEQUENTIAL) {
379 } else {
380 ast_log(LOG_WARNING, "Taskpool '%s' has an invalid selector of %d. Adjusting to default selector.\n",
381 name, options->selector);
383 }
384
386 ao2_ref(pool, -1);
387 return NULL;
388 }
389
390 /* Create the static taskprocessors based on the passed-in options */
391 for (int i = 0; i < pool->options.minimum_size; i++) {
393
395 if (!taskprocessor) {
396 /* The reference to pool is passed to ast_taskpool_shutdown */
398 return NULL;
399 }
400
403 /* The reference to pool is passed to ast_taskpool_shutdown */
405 return NULL;
406 }
407 }
408
410 pool->options.initial_size - pool->options.minimum_size)) {
412 return NULL;
413 }
414
415 /* Create the dynamic taskprocessor based on the passed-in options */
416 for (int i = 0; i < (pool->options.initial_size - pool->options.minimum_size); i++) {
418
420 if (!taskprocessor) {
421 /* The reference to pool is passed to ast_taskpool_shutdown */
423 return NULL;
424 }
425
428 /* The reference to pool is passed to ast_taskpool_shutdown */
430 return NULL;
431 }
432 }
433
434 /* If idle timeout support is enabled kick off a scheduled task to shrink the dynamic pool periodically, we do
435 * this no matter if there are dynamic taskprocessor present to reduce the work needed within the push function
436 * and to reduce complexity.
437 */
438 if (options->idle_timeout && options->auto_increment) {
440 if (pool->shrink_sched_id < 0) {
441 ao2_ref(pool, -1);
442 /* The second reference to pool is passed to ast_taskpool_shutdown */
444 return NULL;
445 }
446 }
447
448 return pool;
449}
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char name[]
Definition: format_mp3.c:68
#define LOG_WARNING
#define NULL
Definition: resample.c:96
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data) attribute_warn_unused_result
Adds a scheduled event.
Definition: sched.c:567
int max_size
Maximum number of taskprocessors a pool may have.
Definition: taskpool.h:115
int growth_threshold
The threshold for when to grow the pool.
Definition: taskpool.h:124
int minimum_size
Number of taskprocessors that will always exist.
Definition: taskpool.h:92
int initial_size
Number of taskprocessors the pool will start with.
Definition: taskpool.h:102
An opaque taskpool structure.
Definition: taskpool.c:62
taskpool_selector selector
Definition: taskpool.c:74
struct ast_taskpool_options options
Definition: taskpool.c:70
int shrink_sched_id
Definition: taskpool.c:72
struct taskpool_taskprocessors static_taskprocessors
Definition: taskpool.c:64
struct taskpool_taskprocessors dynamic_taskprocessors
Definition: taskpool.c:66
char name[0]
Definition: taskpool.c:76
Definition: sched.c:76
A taskpool taskprocessor.
Definition: taskpool.c:34
struct ast_taskprocessor * taskprocessor
Definition: taskpool.c:36
struct taskpool_taskprocessors::@413 taskprocessors
static void taskpool_least_full_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Least full taskprocessor selector.
Definition: taskpool.c:287
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition: taskpool.c:653
static int taskpool_dynamic_pool_shrink(const void *data)
Definition: taskpool.c:226
static int taskpool_taskprocessors_init(struct taskpool_taskprocessors *taskprocessors, unsigned int size)
Definition: taskpool.c:193
static struct taskpool_taskprocessor * taskpool_taskprocessor_alloc(struct ast_taskpool *pool, char type)
Definition: taskpool.c:152
static void taskpool_sequential_selector(struct ast_taskpool *pool, struct taskpool_taskprocessors *taskprocessors, struct taskpool_taskprocessor **taskprocessor, unsigned int *growth_threshold_reached)
Definition: taskpool.c:262
#define TASKPOOL_GROW_THRESHOLD
The threshold for a taskprocessor at which we consider the pool needing to grow (50% of high water th...
Definition: taskpool.c:80
#define AST_TASKPOOL_OPTIONS_VERSION
Definition: taskpool.h:69
static struct test_options options
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267

References ao2_alloc, ao2_bump, ao2_ref, ast_log, ast_sched_add(), AST_TASKPOOL_OPTIONS_VERSION, AST_TASKPOOL_SELECTOR_DEFAULT, AST_TASKPOOL_SELECTOR_LEAST_FULL, AST_TASKPOOL_SELECTOR_SEQUENTIAL, ast_taskpool_shutdown(), AST_VECTOR_APPEND, ast_taskpool::dynamic_taskprocessors, ast_taskpool_options::growth_threshold, ast_taskpool_options::initial_size, LOG_WARNING, ast_taskpool_options::max_size, ast_taskpool_options::minimum_size, ast_taskpool::name, name, NULL, ast_taskpool::options, options, ast_taskpool::selector, ast_taskpool::shrink_sched_id, ast_taskpool::static_taskprocessors, taskpool_dynamic_pool_shrink(), TASKPOOL_GROW_THRESHOLD, taskpool_least_full_selector(), taskpool_sequential_selector(), taskpool_taskprocessor_alloc(), taskpool_taskprocessors_init(), taskpool_taskprocessor::taskprocessor, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), and stasis_init().

◆ ast_taskpool_push()

int ast_taskpool_push ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the taskpool.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 522 of file taskpool.c.

523{
524 RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
525
526 /* Select the taskprocessor in the pool to use for pushing this task */
527 ao2_lock(pool);
528 if (!pool->shutting_down) {
529 unsigned int growth_threshold_reached = 0;
530
531 /* A selector doesn't set taskprocessor to NULL, it will only change the value if a better
532 * taskprocessor is found. This means that even if the selector for a dynamic taskprocessor
533 * fails for some reason, it will still fall back to the initially found static one if
534 * it is present.
535 */
536 pool->selector(pool, &pool->static_taskprocessors, &taskprocessor, &growth_threshold_reached);
537 if (pool->options.auto_increment && growth_threshold_reached) {
538 /* If we need to grow then try dynamic taskprocessors */
539 pool->selector(pool, &pool->dynamic_taskprocessors, &taskprocessor, &growth_threshold_reached);
540 if (growth_threshold_reached) {
541 /* If we STILL need to grow then grow the dynamic taskprocessor pool if allowed */
542 taskpool_dynamic_pool_grow(pool, &taskprocessor);
543 }
544
545 /* If a dynamic taskprocessor was used update its last push time */
546 if (taskprocessor) {
547 taskprocessor->last_pushed = ast_tvnow();
548 }
549 }
550 ao2_bump(taskprocessor);
551 }
552 ao2_unlock(pool);
553
554 if (!taskprocessor) {
555 return -1;
556 }
557
558 if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
559 return -1;
560 }
561
562 return 0;
563}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
int auto_increment
Number of taskprocessors to increment the pool by.
Definition: taskpool.h:85
int shutting_down
Definition: taskpool.c:68
static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpool_taskprocessor **taskprocessor)
Definition: taskpool.c:479
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978

References ao2_bump, ao2_cleanup, ao2_lock, ao2_unlock, ast_taskprocessor_push(), ast_tvnow(), ast_taskpool_options::auto_increment, ast_taskpool::dynamic_taskprocessors, NULL, ast_taskpool::options, RAII_VAR, ast_taskpool::selector, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, task(), taskpool_dynamic_pool_grow(), and taskpool_taskprocessor::taskprocessor.

Referenced by ast_taskpool_push_wait(), AST_TEST_DEFINE(), efficiency_task(), execute_tasks(), handle_cli_taskpool_push_efficiency(), and serializer_task_pushed().

◆ ast_taskpool_push_wait()

int ast_taskpool_push_wait ( struct ast_taskpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the taskpool, and wait for completion.

Since
23.1.0
22.7.0
20.17.0

Tasks pushed into the taskpool will be automatically taken by one of the taskprocessors within

Parameters
poolThe taskpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 623 of file taskpool.c.

624{
626
627 /* If we are already executing within a taskpool taskprocessor then
628 * don't bother pushing a new task, just directly execute the task.
629 */
631 return task(data);
632 }
633
635 return -1;
636 }
637
640 return -1;
641 }
642
644 while (!sync_task.complete) {
645 ast_cond_wait(&sync_task.cond, &sync_task.lock);
646 }
648
650 return sync_task.fail;
651}
#define ast_cond_wait(cond, mutex)
Definition: lock.h:212
#define ast_mutex_unlock(a)
Definition: lock.h:197
#define ast_mutex_lock(a)
Definition: lock.h:196
static int sync_task(void *data)
Definition: res_pjsip.c:2117
static void taskpool_sync_task_cleanup(struct taskpool_sync_task *sync_task)
Definition: taskpool.c:594
int ast_taskpool_push(struct ast_taskpool *pool, int(*task)(void *data), void *data)
Push a task to the taskpool.
Definition: taskpool.c:522
static struct ast_taskpool * ast_taskpool_get_current(void)
Definition: taskpool.c:92
static int taskpool_sync_task_init(struct taskpool_sync_task *sync_task, int(*task)(void *), void *data)
Definition: taskpool.c:580

References ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_push(), sync_task(), task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by AST_TEST_DEFINE().

◆ ast_taskpool_queue_size()

long ast_taskpool_queue_size ( struct ast_taskpool pool)

Get the current number of queued tasks in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of queued tasks in the taskpool

Definition at line 464 of file taskpool.c.

465{
466 long queue_size = 0;
467
468 ao2_lock(pool);
471 ao2_unlock(pool);
472
473 return queue_size;
474}
#define TASKPOOL_QUEUE_SIZE_ADD(tps, size)
Definition: taskpool.c:462
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:873

References ao2_lock, ao2_unlock, AST_VECTOR_CALLBACK_VOID, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, TASKPOOL_QUEUE_SIZE_ADD, and taskpool_taskprocessors::taskprocessors.

◆ ast_taskpool_serializer()

struct ast_taskprocessor * ast_taskpool_serializer ( const char *  name,
struct ast_taskpool pool 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 819 of file taskpool.c.

820{
822}
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:789

References ast_taskpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), handle_cli_taskpool_push_serializer_efficiency(), and internal_stasis_subscribe().

◆ ast_taskpool_serializer_get_current()

struct ast_taskprocessor * ast_taskpool_serializer_get_current ( void  )

Get the taskpool serializer currently associated with this thread.

Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 784 of file taskpool.c.

785{
786 return ast_threadstorage_get_ptr(&current_taskpool_serializer);
787}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by ast_taskpool_serializer_push_wait(), requeue_task(), serializer_efficiency_task(), and simple_task().

◆ ast_taskpool_serializer_group()

struct ast_taskprocessor * ast_taskpool_serializer_group ( const char *  name,
struct ast_taskpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_taskpool.

Since
23.1.0
22.7.0
20.17.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a taskprocessor from a ast_taskpool.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_taskpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 789 of file taskpool.c.

791{
792 struct serializer *ser;
794 struct ast_taskprocessor *tps;
795
797 if (!ser) {
798 return NULL;
799 }
800
802 if (!listener) {
803 ao2_ref(ser, -1);
804 return NULL;
805 }
806
808 if (!tps) {
809 /* ser ref transferred to listener but not cleaned without tps */
810 ao2_ref(ser, -1);
811 } else if (shutdown_group) {
813 }
814
815 ao2_ref(listener, -1);
816 return tps;
817}
static void * listener(void *unused)
Definition: asterisk.c:1530
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: taskpool.c:778
static struct serializer * serializer_create(struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: taskpool.c:691
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_taskpool_create(), and ast_taskpool_serializer().

◆ ast_taskpool_serializer_push_wait()

int ast_taskpool_serializer_push_wait ( struct ast_taskprocessor serializer,
int(*)(void *data)  task,
void *  data 
)

Push a task to a serializer, and wait for completion.

Since
23.1.0
22.7.0
20.17.0
Parameters
serializerThe serializer to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 832 of file taskpool.c.

833{
836 struct ast_taskprocessor *prior_serializer;
838
839 /* If not in a taskpool taskprocessor we can just queue the task like normal and
840 * wait. */
843 return -1;
844 }
845
848 return -1;
849 }
850
852 while (!sync_task.complete) {
853 ast_cond_wait(&sync_task.cond, &sync_task.lock);
854 }
856
858 return sync_task.fail;
859 }
860
861 /* It is possible that we are already executing within a serializer, so stash the existing
862 * away so we can restore it.
863 */
864 prior_serializer = ast_taskpool_serializer_get_current();
865
866 ao2_lock(ser);
867
868 /* There are two cases where we can or have to directly execute this task:
869 * 1. There are no other tasks in the serializer
870 * 2. We are already in the serializer
871 * In the second case if we don't execute the task now, we will deadlock waiting
872 * on it as it will never occur.
873 */
874 if (!ast_taskprocessor_size(serializer) || prior_serializer == serializer) {
875 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
876 sync_task.fail = task(data);
877 ao2_unlock(ser);
878 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
879 return sync_task.fail;
880 }
881
883 ao2_unlock(ser);
884 return -1;
885 }
886
887 /* First we queue the serialized task */
890 ao2_unlock(ser);
891 return -1;
892 }
893
894 /* Next we queue the empty task to ensure the serializer doesn't reach empty, this
895 * stops two tasks from being queued for the same serializer at the same time.
896 */
899 ao2_unlock(ser);
900 return -1;
901 }
902
903 /* Now we execute the tasks on the serializer until our sync task is complete */
904 ast_threadstorage_set_ptr(&current_taskpool_serializer, serializer);
905 while (!sync_task.complete) {
906 /* The sync task is guaranteed to be executed, so doing a while loop on the complete
907 * flag is safe.
908 */
910 }
912 ao2_unlock(ser);
913
914 ast_threadstorage_set_ptr(&current_taskpool_serializer, prior_serializer);
915
916 return sync_task.fail;
917}
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition: taskpool.c:784
static int taskpool_serializer_empty_task(void *data)
Definition: taskpool.c:827
struct ast_taskprocessor_listener * ast_taskprocessor_listener(struct ast_taskprocessor *tps)
Return the listener associated with the taskprocessor.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.

References ao2_lock, ao2_unlock, ast_cond_wait, ast_mutex_lock, ast_mutex_unlock, ast_taskpool_get_current(), ast_taskpool_serializer_get_current(), ast_taskprocessor_execute(), ast_taskprocessor_listener(), ast_taskprocessor_listener_get_user_data(), ast_taskprocessor_push(), ast_taskprocessor_size(), ast_threadstorage_set_ptr(), listener(), NULL, sync_task(), task(), taskpool_serializer_empty_task(), taskpool_sync_task_cleanup(), and taskpool_sync_task_init().

Referenced by AST_TEST_DEFINE(), and requeue_task().

◆ ast_taskpool_shutdown()

void ast_taskpool_shutdown ( struct ast_taskpool pool)

Shut down a taskpool and remove the underlying taskprocessors.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe pool to shut down
Note
This will decrement the reference to the pool

Definition at line 653 of file taskpool.c.

654{
655 if (!pool) {
656 return;
657 }
658
659 /* Mark this pool as shutting down so nothing new is pushed */
660 ao2_lock(pool);
661 pool->shutting_down = 1;
662 ao2_unlock(pool);
663
664 /* Stop the shrink scheduled item if present */
666
667 /* Clean up all the taskprocessors */
670
671 ao2_ref(pool, -1);
672}
#define AST_SCHED_DEL_UNREF(sched, id, refcall)
schedule task to get deleted and call unref function
Definition: sched.h:82
static void taskpool_taskprocessors_cleanup(struct taskpool_taskprocessors *taskprocessors)
Definition: taskpool.c:206

References ao2_lock, ao2_ref, ao2_unlock, AST_SCHED_DEL_UNREF, ast_taskpool::dynamic_taskprocessors, ast_taskpool::shrink_sched_id, ast_taskpool::shutting_down, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors_cleanup().

Referenced by ast_taskpool_create(), AST_TEST_DEFINE(), handle_cli_taskpool_push_efficiency(), handle_cli_taskpool_push_serializer_efficiency(), and stasis_cleanup().

◆ ast_taskpool_taskprocessors_count()

size_t ast_taskpool_taskprocessors_count ( struct ast_taskpool pool)

Get the current number of taskprocessors in the taskpool.

Since
23.1.0
22.7.0
20.17.0
Parameters
poolThe taskpool to query
Return values
Thenumber of taskprocessors in the taskpool

Definition at line 451 of file taskpool.c.

452{
453 size_t count;
454
455 ao2_lock(pool);
457 ao2_unlock(pool);
458
459 return count;
460}
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620

References ao2_lock, ao2_unlock, AST_VECTOR_SIZE, ast_taskpool::dynamic_taskprocessors, ast_taskpool::static_taskprocessors, and taskpool_taskprocessors::taskprocessors.

Referenced by AST_TEST_DEFINE().