Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
Data Structures | Macros | Functions
threadpool.h File Reference
#include "asterisk/serializer_shutdown_group.h"
Include dependency graph for threadpool.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_threadpool_listener_callbacks
 
struct  ast_threadpool_options
 

Macros

#define AST_THREADPOOL_OPTIONS_VERSION   1
 
#define ast_threadpool_push(pool, task, data)    __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 

Functions

int __ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task to the threadpool.
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool.
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener.
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data.
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue.
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool.
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread.
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool.
 
void ast_threadpool_set_size (struct ast_threadpool *threadpool, unsigned int size)
 Set the number of threads for the thread pool.
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it.
 

Macro Definition Documentation

◆ AST_THREADPOOL_OPTIONS_VERSION

#define AST_THREADPOOL_OPTIONS_VERSION   1

Definition at line 73 of file threadpool.h.

◆ ast_threadpool_push

#define ast_threadpool_push (   pool,
  task,
  data 
)     __ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 194 of file threadpool.h.

Function Documentation

◆ __ast_threadpool_push()

int __ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data,
const char *  file,
int  line,
const char *  function 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 961 of file threadpool.c.

963{
964 SCOPED_AO2LOCK(lock, pool);
965 if (!pool->shutting_down) {
966 return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
967 }
968 return -1;
969}
ast_mutex_t lock
Definition app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
struct ast_taskprocessor * tps
The main taskprocessor.
Definition threadpool.c:67
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.

References __ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by ast_threadpool_push().

◆ ast_threadpool_create()

struct ast_threadpool * ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 915 of file threadpool.c.

918{
919 struct ast_taskprocessor *tps;
920 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922 char *fullname;
923
925 if (!pool) {
926 return NULL;
927 }
928
930 if (!tps_listener) {
931 return NULL;
932 }
933
934 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936 return NULL;
937 }
938
939 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940 sprintf(fullname, "%s/pool", name); /* Safe */
941 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942 if (!tps) {
943 return NULL;
944 }
945
946 pool->tps = tps;
947 if (listener) {
948 ao2_ref(listener, +1);
949 pool->listener = listener;
950 }
951 ast_threadpool_set_size(pool, pool->options.initial_size);
952 ao2_ref(pool, +1);
953 return pool;
954}
static void * listener(void *unused)
Definition asterisk.c:1530
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_log
Definition astobj2.c:42
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
static const char name[]
Definition format_mp3.c:68
#define LOG_WARNING
#define NULL
Definition resample.c:96
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
An opaque threadpool structure.
Definition threadpool.c:36
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct test_options options
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition threadpool.c:874
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition threadpool.c:695
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition threadpool.c:404
#define AST_THREADPOOL_OPTIONS_VERSION
Definition threadpool.h:73
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), listener(), LOG_WARNING, name, NULL, options, RAII_VAR, threadpool_alloc(), and threadpool_tps_listener_callbacks.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), and load_module().

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener * ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 893 of file threadpool.c.

895{
897 if (!listener) {
898 return NULL;
899 }
900 listener->callbacks = callbacks;
901 listener->user_data = user_data;
902 return listener;
903}
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
struct @506 callbacks
listener for a threadpool
Definition threadpool.c:110

References ao2_alloc, callbacks, listener(), NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), and handle_cli_threadpool_push_serializer_efficiency().

◆ ast_threadpool_listener_get_user_data()

void * ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 905 of file threadpool.c.

906{
907 return listener->user_data;
908}

References listener().

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1347 of file threadpool.c.

1348{
1349 return ast_taskprocessor_size(pool->tps);
1350}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

◆ ast_threadpool_serializer()

struct ast_taskprocessor * ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1342 of file threadpool.c.

1343{
1345}
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.

References ast_threadpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), and handle_cli_threadpool_push_serializer_efficiency().

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor * ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1307 of file threadpool.c.

1308{
1309 return ast_threadstorage_get_ptr(&current_serializer);
1310}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), rfc3326_outgoing_response(), and serializer_efficiency_task().

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor * ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1312 of file threadpool.c.

1314{
1315 struct serializer *ser;
1317 struct ast_taskprocessor *tps;
1318
1319 ser = serializer_create(pool, shutdown_group);
1320 if (!ser) {
1321 return NULL;
1322 }
1323
1325 if (!listener) {
1326 ao2_ref(ser, -1);
1327 return NULL;
1328 }
1329
1331 if (!tps) {
1332 /* ser ref transferred to listener but not cleaned without tps */
1333 ao2_ref(ser, -1);
1334 } else if (shutdown_group) {
1336 }
1337
1338 ao2_ref(listener, -1);
1339 return tps;
1340}
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
void ast_serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Increment the number of serializer members in the group.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks

References ao2_ref, ast_serializer_shutdown_group_inc(), ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 874 of file threadpool.c.

875{
876 struct set_size_data *ssd;
878
879 if (pool->shutting_down) {
880 return;
881 }
882
884 if (!ssd) {
885 return;
886 }
887
889 ast_free(ssd);
890 }
891}
#define ast_free(a)
Definition astmm.h:180
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition threadpool.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition threadpool.c:802
unsigned int size
Definition threadpool.c:806
struct ast_threadpool * pool
Definition threadpool.c:804
#define ast_taskprocessor_push(tps, task_exe, datap)
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition threadpool.c:838
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition threadpool.c:814

References ast_free, ast_taskprocessor_push, ast_threadpool::control_tps, lock, set_size_data::pool, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), ast_threadpool::shutting_down, and set_size_data::size.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), and ast_threadpool_create().

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 977 of file threadpool.c.

978{
979 if (!pool) {
980 return;
981 }
982 /* Shut down the taskprocessors and everything else just
983 * takes care of itself via the taskprocessor callbacks
984 */
985 ao2_lock(pool);
986 pool->shutting_down = 1;
987 ao2_unlock(pool);
990}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), handle_cli_threadpool_push_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), load_module(), and unload_module().