Asterisk - The Open Source Telephony Project GIT-master-0644429
Data Structures | Macros | Functions
threadpool.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_threadpool_listener_callbacks
 
struct  ast_threadpool_options
 

Macros

#define AST_THREADPOOL_OPTIONS_VERSION   1
 

Functions

struct ast_serializer_shutdown_groupast_serializer_shutdown_group_alloc (void)
 Create a serializer group shutdown control object. More...
 
int ast_serializer_shutdown_group_join (struct ast_serializer_shutdown_group *shutdown_group, int timeout)
 Wait for the serializers in the group to shutdown with timeout. More...
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool. More...
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener. More...
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data. More...
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
 Push a task to the threadpool. More...
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue. More...
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool. More...
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool. More...
 
void ast_threadpool_set_size (struct ast_threadpool *threadpool, unsigned int size)
 Set the number of threads for the thread pool. More...
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it. More...
 

Macro Definition Documentation

◆ AST_THREADPOOL_OPTIONS_VERSION

#define AST_THREADPOOL_OPTIONS_VERSION   1

Definition at line 71 of file threadpool.h.

Function Documentation

◆ ast_serializer_shutdown_group_alloc()

struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc ( void  )

Create a serializer group shutdown control object.

Since
13.5.0
Returns
ao2 object to control shutdown of a serializer group.

Definition at line 1229 of file threadpool.c.

1230{
1232
1234 if (!shutdown_group) {
1235 return NULL;
1236 }
1238 return shutdown_group;
1239}
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ast_cond_init(cond, attr)
Definition: lock.h:201
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
#define NULL
Definition: resample.c:96
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222

References ao2_alloc, ast_cond_init, ast_serializer_shutdown_group::cond, NULL, serializer_shutdown_group_dtor(), and shutdown_group.

Referenced by ast_serializer_pool_create(), load_module(), and sip_options_init_task().

◆ ast_serializer_shutdown_group_join()

int ast_serializer_shutdown_group_join ( struct ast_serializer_shutdown_group shutdown_group,
int  timeout 
)

Wait for the serializers in the group to shutdown with timeout.

Since
13.5.0
Parameters
shutdown_groupGroup shutdown controller. (Returns 0 immediately if NULL)
timeoutNumber of seconds to wait for the serializers in the group to shutdown. Zero if the timeout is disabled.
Returns
Number of serializers that did not get shutdown within the timeout.

Definition at line 1241 of file threadpool.c.

1242{
1243 int remaining;
1245
1246 if (!shutdown_group) {
1247 return 0;
1248 }
1249
1251 ast_assert(lock != NULL);
1252
1254 if (timeout) {
1255 struct timeval start;
1256 struct timespec end;
1257
1258 start = ast_tvnow();
1259 end.tv_sec = start.tv_sec + timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
1261 while (shutdown_group->count) {
1263 /* Error or timed out waiting for the count to reach zero. */
1264 break;
1265 }
1266 }
1267 } else {
1268 while (shutdown_group->count) {
1270 /* Error */
1271 break;
1272 }
1273 }
1274 }
1275 remaining = shutdown_group->count;
1277 return remaining;
1278}
ast_mutex_t lock
Definition: app_sla.c:331
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
char * end
Definition: eagi_proxy.c:73
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
Structure for mutex and tracking information.
Definition: lock.h:135
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define ast_assert(a)
Definition: utils.h:739

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_assert, ast_cond_timedwait, ast_cond_wait, ast_tvnow(), ast_serializer_shutdown_group::cond, ast_serializer_shutdown_group::count, end, lock, NULL, and shutdown_group.

Referenced by ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), and unload_module().

◆ ast_threadpool_create()

struct ast_threadpool * ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 916 of file threadpool.c.

919{
920 struct ast_taskprocessor *tps;
921 RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922 RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
923 char *fullname;
924
926 if (!pool) {
927 return NULL;
928 }
929
931 if (!tps_listener) {
932 return NULL;
933 }
934
935 if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
936 ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
937 return NULL;
938 }
939
940 fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
941 sprintf(fullname, "%s/pool", name); /* Safe */
942 tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
943 if (!tps) {
944 return NULL;
945 }
946
947 pool->tps = tps;
948 if (listener) {
949 ao2_ref(listener, +1);
950 pool->listener = listener;
951 }
952 ast_threadpool_set_size(pool, pool->options.initial_size);
953 ao2_ref(pool, +1);
954 return pool;
955}
static void * listener(void *unused)
Definition: asterisk.c:1519
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_log
Definition: astobj2.c:42
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static const char name[]
Definition: format_mp3.c:68
#define LOG_WARNING
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct test_options options
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:696
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Allocate a threadpool.
Definition: threadpool.c:404
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), listener(), LOG_WARNING, name, NULL, options, RAII_VAR, threadpool_alloc(), and threadpool_tps_listener_callbacks.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), load_module(), and stasis_init().

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener * ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 894 of file threadpool.c.

896{
898 if (!listener) {
899 return NULL;
900 }
901 listener->callbacks = callbacks;
902 listener->user_data = user_data;
903 return listener;
904}
struct @468 callbacks
listener for a threadpool
Definition: threadpool.c:110

References ao2_alloc, callbacks, listener(), NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE().

◆ ast_threadpool_listener_get_user_data()

void * ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 906 of file threadpool.c.

907{
908 return listener->user_data;
909}

References listener().

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 957 of file threadpool.c.

958{
959 SCOPED_AO2LOCK(lock, pool);
960 if (!pool->shutting_down) {
961 return ast_taskprocessor_push(pool->tps, task, data);
962 }
963 return -1;
964}
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int task(void *data)
Queued task for baseline test.

References ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), and serializer_task_pushed().

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1433 of file threadpool.c.

1434{
1435 return ast_taskprocessor_size(pool->tps);
1436}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

◆ ast_threadpool_serializer()

struct ast_taskprocessor * ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1428 of file threadpool.c.

1429{
1431}
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398

References ast_threadpool_serializer_group(), name, and NULL.

Referenced by AST_TEST_DEFINE(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor * ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1393 of file threadpool.c.

1394{
1395 return ast_threadstorage_get_ptr(&current_serializer);
1396}
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), and rfc3326_outgoing_response().

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor * ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relies on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
Return values
NULLon error.

Definition at line 1398 of file threadpool.c.

1400{
1401 struct serializer *ser;
1403 struct ast_taskprocessor *tps;
1404
1405 ser = serializer_create(pool, shutdown_group);
1406 if (!ser) {
1407 return NULL;
1408 }
1409
1411 if (!listener) {
1412 ao2_ref(ser, -1);
1413 return NULL;
1414 }
1415
1417 if (!tps) {
1418 /* ser ref transferred to listener but not cleaned without tps */
1419 ao2_ref(ser, -1);
1420 } else if (shutdown_group) {
1422 }
1423
1424 ao2_ref(listener, -1);
1425 return tps;
1426}
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1328
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1387
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1287

References ao2_ref, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), listener(), name, NULL, serializer_create(), serializer_shutdown_group_inc(), serializer_tps_listener_callbacks, and shutdown_group.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 875 of file threadpool.c.

876{
877 struct set_size_data *ssd;
879
880 if (pool->shutting_down) {
881 return;
882 }
883
885 if (!ssd) {
886 return;
887 }
888
890 ast_free(ssd);
891 }
892}
#define ast_free(a)
Definition: astmm.h:180
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:803
unsigned int size
Definition: threadpool.c:807
struct ast_threadpool * pool
Definition: threadpool.c:805
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:839
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:815

References ast_free, ast_taskprocessor_push(), ast_threadpool::control_tps, lock, set_size_data::pool, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), ast_threadpool::shutting_down, and set_size_data::size.

Referenced by AST_TEST_DEFINE(), and ast_threadpool_create().

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 966 of file threadpool.c.

967{
968 if (!pool) {
969 return;
970 }
971 /* Shut down the taskprocessors and everything else just
972 * takes care of itself via the taskprocessor callbacks
973 */
974 ao2_lock(pool);
975 pool->shutting_down = 1;
976 ao2_unlock(pool);
979}
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().