Asterisk - The Open Source Telephony Project GIT-master-ff80666
Data Structures | Macros | Functions
threadpool.h File Reference
#include "asterisk/serializer_shutdown_group.h"
Include dependency graph for threadpool.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_threadpool_listener_callbacks
 
struct  ast_threadpool_options
 

Macros

#define AST_THREADPOOL_OPTIONS_VERSION   1
 

Functions

struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool. More...
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener. More...
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data. More...
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
 Push a task to the threadpool. More...
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue. More...
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool. More...
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool. More...
 
void ast_threadpool_set_size (struct ast_threadpool *threadpool, unsigned int size)
 Set the number of threads for the thread pool. More...
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it. More...
 

Macro Definition Documentation

◆ AST_THREADPOOL_OPTIONS_VERSION

#define AST_THREADPOOL_OPTIONS_VERSION   1

Definition at line 73 of file threadpool.h.

Function Documentation

◆ ast_threadpool_create()

struct ast_threadpool * ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 916 of file threadpool.c.

919{
920 struct ast_taskprocessor *tps;
921 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
923 char *fullname;
924
926 if (!pool) {
927 return NULL;
928 }
929
931 if (!tps_listener) {
932 return NULL;
933 }
934
935 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
936 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
937 return NULL;
938 }
939
940 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
941 sprintf(fullname, "%s/pool", name); /* Safe */
942 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
943 if (!tps) {
944 return NULL;
945 }
946
947 pool->tps = tps;
948 if (listener) {
949 ao2_ref(listener, +1);
950 pool->listener = listener;
951 }
952 ast_threadpool_set_size(pool, pool->options.initial_size);
953 ao2_ref(pool, +1);
954 return pool;
955}
static void * listener(void *unused)
Definition: asterisk.c:1530
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_log
Definition: astobj2.c:42
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static const char name[]
Definition: format_mp3.c:68
#define LOG_WARNING
#define NULL
Definition: resample.c:96
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct test_options options
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:696
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition: threadpool.c:404
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:73
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), listener(), LOG_WARNING, name, NULL, options, RAII_VAR, threadpool_alloc(), and threadpool_tps_listener_callbacks.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), and load_module().

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener * ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 894 of file threadpool.c.

896{
898 if (!listener) {
899 return NULL;
900 }
901 listener->callbacks = callbacks;
902 listener->user_data = user_data;
903 return listener;
904}
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
struct @476 callbacks
listener for a threadpool
Definition: threadpool.c:110

References ao2_alloc, callbacks, listener(), NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), and handle_cli_threadpool_push_serializer_efficiency().

◆ ast_threadpool_listener_get_user_data()

void * ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 906 of file threadpool.c.

907{
908 return listener->user_data;
909}

References listener().

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 957 of file threadpool.c.

958{
959 SCOPED_AO2LOCK(lock, pool);
960 if (!pool->shutting_down) {
961 return ast_taskprocessor_push(pool->tps, task, data);
962 }
963 return -1;
964}
ast_mutex_t lock
Definition: app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:611
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.

References ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), efficiency_task(), handle_cli_threadpool_push_efficiency(), and serializer_task_pushed().

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1336 of file threadpool.c.

1337{
1338 return ast_taskprocessor_size(pool->tps);
1339}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

◆ ast_threadpool_serializer()

struct ast_taskprocessor * ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1331 of file threadpool.c.

1332{
1334}
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1301

References ast_threadpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), handle_cli_threadpool_push_serializer_efficiency(), and sorcery_object_type_alloc().

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor * ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1296 of file threadpool.c.

1297{
1298 return ast_threadstorage_get_ptr(&current_serializer);
1299}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), rfc3326_outgoing_response(), and serializer_efficiency_task().

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor * ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1301 of file threadpool.c.

1303{
1304 struct serializer *ser;
1306 struct ast_taskprocessor *tps;
1307
1308 ser = serializer_create(pool, shutdown_group);
1309 if (!ser) {
1310 return NULL;
1311 }
1312
1314 if (!listener) {
1315 ao2_ref(ser, -1);
1316 return NULL;
1317 }
1318
1320 if (!tps) {
1321 /* ser ref transferred to listener but not cleaned without tps */
1322 ao2_ref(ser, -1);
1323 } else if (shutdown_group) {
1325 }
1326
1327 ao2_ref(listener, -1);
1328 return tps;
1329}
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1231
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1290

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 875 of file threadpool.c.

876{
877 struct set_size_data *ssd;
879
880 if (pool->shutting_down) {
881 return;
882 }
883
885 if (!ssd) {
886 return;
887 }
888
890 ast_free(ssd);
891 }
892}
#define ast_free(a)
Definition: astmm.h:180
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:803
unsigned int size
Definition: threadpool.c:807
struct ast_threadpool * pool
Definition: threadpool.c:805
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:839
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:815

References ast_free, ast_taskprocessor_push(), ast_threadpool::control_tps, lock, set_size_data::pool, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), ast_threadpool::shutting_down, and set_size_data::size.

Referenced by AST_TEST_DEFINE(), and ast_threadpool_create().

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 966 of file threadpool.c.

967{
968 if (!pool) {
969 return;
970 }
971 /* Shut down the taskprocessors and everything else just
972 * takes care of itself via the taskprocessor callbacks
973 */
974 ao2_lock(pool);
975 pool->shutting_down = 1;
976 ao2_unlock(pool);
979}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), load_module(), sorcery_cleanup(), and unload_module().