Asterisk - The Open Source Telephony Project  GIT-master-b7027de
Data Structures | Macros | Functions
threadpool.h File Reference

Go to the source code of this file.

Data Structures

struct  ast_threadpool_listener_callbacks
 
struct  ast_threadpool_options
 

Macros

#define AST_THREADPOOL_OPTIONS_VERSION   1
 

Functions

struct ast_serializer_shutdown_groupast_serializer_shutdown_group_alloc (void)
 Create a serializer group shutdown control object. More...
 
int ast_serializer_shutdown_group_join (struct ast_serializer_shutdown_group *shutdown_group, int timeout)
 Wait for the serializers in the group to shutdown with timeout. More...
 
struct ast_threadpoolast_threadpool_create (const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
 Create a new threadpool. More...
 
struct ast_threadpool_listenerast_threadpool_listener_alloc (const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
 Allocate a threadpool listener. More...
 
void * ast_threadpool_listener_get_user_data (const struct ast_threadpool_listener *listener)
 Get the threadpool listener's user data. More...
 
int ast_threadpool_push (struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
 Push a task to the threadpool. More...
 
long ast_threadpool_queue_size (struct ast_threadpool *pool)
 Return the size of the threadpool's task queue. More...
 
struct ast_taskprocessorast_threadpool_serializer (const char *name, struct ast_threadpool *pool)
 Serialized execution of tasks within a ast_threadpool. More...
 
struct ast_taskprocessorast_threadpool_serializer_get_current (void)
 Get the threadpool serializer currently associated with this thread. More...
 
struct ast_taskprocessorast_threadpool_serializer_group (const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 Serialized execution of tasks within a ast_threadpool. More...
 
void ast_threadpool_set_size (struct ast_threadpool *threadpool, unsigned int size)
 Set the number of threads for the thread pool. More...
 
void ast_threadpool_shutdown (struct ast_threadpool *pool)
 Shut down a threadpool and destroy it. More...
 

Macro Definition Documentation

◆ AST_THREADPOOL_OPTIONS_VERSION

#define AST_THREADPOOL_OPTIONS_VERSION   1

Definition at line 71 of file threadpool.h.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), ast_threadpool_create(), and stasis_init().

Function Documentation

◆ ast_serializer_shutdown_group_alloc()

struct ast_serializer_shutdown_group* ast_serializer_shutdown_group_alloc ( void  )

Create a serializer group shutdown control object.

Since
13.5.0
Returns
ao2 object to control shutdown of a serializer group.

Definition at line 1229 of file threadpool.c.

References ao2_alloc, ast_cond_init, ast_serializer_shutdown_group::cond, NULL, serializer_shutdown_group_dtor(), and shutdown_group.

Referenced by ast_serializer_pool_create(), load_module(), and sip_options_init_task().

1230 {
1232 
1233  shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1234  if (!shutdown_group) {
1235  return NULL;
1236  }
1237  ast_cond_init(&shutdown_group->cond, NULL);
1238  return shutdown_group;
1239 }
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.

◆ ast_serializer_shutdown_group_join()

int ast_serializer_shutdown_group_join ( struct ast_serializer_shutdown_group shutdown_group,
int  timeout 
)

Wait for the serializers in the group to shutdown with timeout.

Since
13.5.0
Parameters
shutdown_groupGroup shutdown controller. (Returns 0 immediately if NULL)
timeoutNumber of seconds to wait for the serializers in the group to shutdown. Zero if the timeout is disabled.
Returns
Number of seriaizers that did not get shutdown within the timeout.

Definition at line 1241 of file threadpool.c.

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_assert, ast_cond_timedwait, ast_cond_wait, ast_tvnow(), ast_serializer_shutdown_group::cond, ast_serializer_shutdown_group::count, lock, NULL, and timeout.

Referenced by ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), and unload_module().

1242 {
1243  int remaining;
1244  ast_mutex_t *lock;
1245 
1246  if (!shutdown_group) {
1247  return 0;
1248  }
1249 
1250  lock = ao2_object_get_lockaddr(shutdown_group);
1251  ast_assert(lock != NULL);
1252 
1253  ao2_lock(shutdown_group);
1254  if (timeout) {
1255  struct timeval start;
1256  struct timespec end;
1257 
1258  start = ast_tvnow();
1259  end.tv_sec = start.tv_sec + timeout;
1260  end.tv_nsec = start.tv_usec * 1000;
1261  while (shutdown_group->count) {
1262  if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
1263  /* Error or timed out waiting for the count to reach zero. */
1264  break;
1265  }
1266  }
1267  } else {
1268  while (shutdown_group->count) {
1269  if (ast_cond_wait(&shutdown_group->cond, lock)) {
1270  /* Error */
1271  break;
1272  }
1273  }
1274  }
1275  remaining = shutdown_group->count;
1276  ao2_unlock(shutdown_group);
1277  return remaining;
1278 }
static int timeout
Definition: cdr_mysql.c:86
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:710
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
char * end
Definition: eagi_proxy.c:73
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ao2_lock(a)
Definition: astobj2.h:718
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
Structure for mutex and tracking information.
Definition: lock.h:135

◆ ast_threadpool_create()

struct ast_threadpool* ast_threadpool_create ( const char *  name,
struct ast_threadpool_listener listener,
const struct ast_threadpool_options options 
)

Create a new threadpool.

This function creates a threadpool. Tasks may be pushed onto this thread pool and will be automatically acted upon by threads within the pool.

Only a single threadpool with a given name may exist. This function will fail if a threadpool with the given name already exists.

Parameters
nameThe unique name for the threadpool
listenerThe listener the threadpool will notify of changes. Can be NULL.
optionsThe behavioral options for this threadpool
Return values
NULLFailed to create the threadpool
non-NULLThe newly-created threadpool

Definition at line 915 of file threadpool.c.

References ao2_cleanup, ao2_ref, ast_alloca, ast_log, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_set_size(), ast_threadpool::listener, LOG_WARNING, NULL, RAII_VAR, threadpool_alloc(), ast_threadpool::tps, and ast_threadpool_options::version.

Referenced by ast_sorcery_init(), AST_TEST_DEFINE(), load_module(), and stasis_init().

918 {
919  struct ast_taskprocessor *tps;
920  RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
921  RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
922  char *fullname;
923 
924  pool = threadpool_alloc(name, options);
925  if (!pool) {
926  return NULL;
927  }
928 
930  if (!tps_listener) {
931  return NULL;
932  }
933 
934  if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935  ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936  return NULL;
937  }
938 
939  fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940  sprintf(fullname, "%s/pool", name); /* Safe */
941  tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942  if (!tps) {
943  return NULL;
944  }
945 
946  pool->tps = tps;
947  if (listener) {
948  ao2_ref(listener, +1);
949  pool->listener = listener;
950  }
951  ast_threadpool_set_size(pool, pool->options.initial_size);
952  ao2_ref(pool, +1);
953  return pool;
954 }
A listener for taskprocessors.
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:874
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
#define LOG_WARNING
Definition: logger.h:274
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Definition: threadpool.c:404
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
Definition: threadpool.c:695
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static const char name[]
Definition: cdr_mysql.c:74
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.

◆ ast_threadpool_listener_alloc()

struct ast_threadpool_listener* ast_threadpool_listener_alloc ( const struct ast_threadpool_listener_callbacks callbacks,
void *  user_data 
)

Allocate a threadpool listener.

This function will call back into the alloc callback for the listener.

Parameters
callbacksListener callbacks to assign to the listener
user_dataUser data to be stored in the threadpool listener
Return values
NULLFailed to allocate the listener
non-NULLThe newly-created threadpool listener

Definition at line 893 of file threadpool.c.

References ao2_alloc, ast_threadpool_listener::callbacks, ast_threadpool::listener, NULL, and ast_threadpool_listener::user_data.

Referenced by AST_TEST_DEFINE().

895 {
896  struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
897  if (!listener) {
898  return NULL;
899  }
900  listener->callbacks = callbacks;
901  listener->user_data = user_data;
902  return listener;
903 }
#define NULL
Definition: resample.c:96
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
static void * listener(void *unused)
Definition: asterisk.c:1476
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
listener for a threadpool
Definition: threadpool.c:110

◆ ast_threadpool_listener_get_user_data()

void* ast_threadpool_listener_get_user_data ( const struct ast_threadpool_listener listener)

Get the threadpool listener's user data.

Parameters
listenerThe threadpool listener
Returns
The user data

Definition at line 905 of file threadpool.c.

References ast_threadpool_listener::user_data.

Referenced by listener_check(), test_emptied(), test_shutdown(), test_state_changed(), test_task_pushed(), and wait_for_task_pushed().

906 {
907  return listener->user_data;
908 }

◆ ast_threadpool_push()

int ast_threadpool_push ( struct ast_threadpool pool,
int(*)(void *data)  task,
void *  data 
)

Push a task to the threadpool.

Tasks pushed into the threadpool will be automatically taken by one of the threads within

Parameters
poolThe threadpool to add the task to
taskThe task to add
dataThe parameter for the task
Return values
0success
-1failure

Definition at line 956 of file threadpool.c.

References ast_taskprocessor_push(), lock, SCOPED_AO2LOCK, ast_threadpool::shutting_down, task(), and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), and serializer_task_pushed().

957 {
958  SCOPED_AO2LOCK(lock, pool);
959  if (!pool->shutting_down) {
960  return ast_taskprocessor_push(pool->tps, task, data);
961  }
962  return -1;
963 }
static int task(void *data)
Queued task for baseline test.
ast_mutex_t lock
Definition: app_meetme.c:1091
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

◆ ast_threadpool_queue_size()

long ast_threadpool_queue_size ( struct ast_threadpool pool)

Return the size of the threadpool's task queue.

Since
13.7.0

Definition at line 1437 of file threadpool.c.

References ast_taskprocessor_size(), and ast_threadpool::tps.

Referenced by ast_sip_threadpool_queue_size().

1438 {
1439  return ast_taskprocessor_size(pool->tps);
1440 }
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67

◆ ast_threadpool_serializer()

struct ast_taskprocessor* ast_threadpool_serializer ( const char *  name,
struct ast_threadpool pool 
)

Serialized execution of tasks within a ast_threadpool.

Since
12.0.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
Returns
ast_taskprocessor for enqueuing work.
NULL on error.

Definition at line 1432 of file threadpool.c.

References ast_threadpool_serializer_group(), and NULL.

Referenced by AST_TEST_DEFINE(), internal_stasis_subscribe(), and sorcery_object_type_alloc().

1433 {
1434  return ast_threadpool_serializer_group(name, pool, NULL);
1435 }
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1402
#define NULL
Definition: resample.c:96
static const char name[]
Definition: cdr_mysql.c:74

◆ ast_threadpool_serializer_get_current()

struct ast_taskprocessor* ast_threadpool_serializer_get_current ( void  )

Get the threadpool serializer currently associated with this thread.

Since
14.0.0
Note
The returned pointer is valid while the serializer thread is running.
Use ao2_ref() on serializer if you are going to keep it for another thread. To unref it you must then use ast_taskprocessor_unreference().
Return values
serializeron success.
NULLon error or no serializer associated with the thread.

Definition at line 1397 of file threadpool.c.

References ast_threadstorage_get_ptr().

Referenced by record_serializer(), rfc3326_outgoing_request(), and rfc3326_outgoing_response().

1398 {
1399  return ast_threadstorage_get_ptr(&current_serializer);
1400 }
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.

◆ ast_threadpool_serializer_group()

struct ast_taskprocessor* ast_threadpool_serializer_group ( const char *  name,
struct ast_threadpool pool,
struct ast_serializer_shutdown_group shutdown_group 
)

Serialized execution of tasks within a ast_threadpool.

Since
13.5.0

A ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals.

Use ast_taskprocessor_unreference() to dispose of the returned ast_taskprocessor.

Only a single taskprocessor with a given name may exist. This function will fail if a taskprocessor with the given name already exists.

Parameters
nameName of the serializer. (must be unique)
poolast_threadpool for execution.
shutdown_groupGroup shutdown controller. (NULL if no group association)
Returns
ast_taskprocessor for enqueuing work.
NULL on error.

Definition at line 1402 of file threadpool.c.

References ao2_ref, ast_taskprocessor_create_with_listener(), ast_taskprocessor_listener_alloc(), ast_threadpool::listener, NULL, serializer_create(), serializer_shutdown_group_inc(), and ast_threadpool::tps.

Referenced by ast_serializer_pool_create(), ast_sip_create_serializer_group(), and ast_threadpool_serializer().

1404 {
1405  struct serializer *ser;
1407  struct ast_taskprocessor *tps;
1408 
1409  ser = serializer_create(pool, shutdown_group);
1410  if (!ser) {
1411  return NULL;
1412  }
1413 
1415  if (!listener) {
1416  ao2_ref(ser, -1);
1417  return NULL;
1418  }
1419 
1421  if (!tps) {
1422  /* ser ref transferred to listener but not cleaned without tps */
1423  ao2_ref(ser, -1);
1424  } else if (shutdown_group) {
1425  serializer_shutdown_group_inc(shutdown_group);
1426  }
1427 
1428  ao2_ref(listener, -1);
1429  return tps;
1430 }
A listener for taskprocessors.
#define NULL
Definition: resample.c:96
static void * listener(void *unused)
Definition: asterisk.c:1476
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1391
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1289
static const char name[]
Definition: cdr_mysql.c:74
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1332

◆ ast_threadpool_set_size()

void ast_threadpool_set_size ( struct ast_threadpool threadpool,
unsigned int  size 
)

Set the number of threads for the thread pool.

This number may be more or less than the current number of threads in the threadpool.

Parameters
threadpoolThe threadpool to adjust
sizeThe new desired size of the threadpool

Definition at line 874 of file threadpool.c.

References ast_free, ast_taskprocessor_push(), ast_threadpool::control_tps, lock, queued_set_size(), SCOPED_AO2LOCK, set_size_data_alloc(), and ast_threadpool::shutting_down.

Referenced by AST_TEST_DEFINE(), and ast_threadpool_create().

875 {
876  struct set_size_data *ssd;
878 
879  if (pool->shutting_down) {
880  return;
881  }
882 
883  ssd = set_size_data_alloc(pool, size);
884  if (!ssd) {
885  return;
886  }
887 
889  ast_free(ssd);
890  }
891 }
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:814
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:802
struct ast_threadpool * pool
Definition: threadpool.c:804
ast_mutex_t lock
Definition: app_meetme.c:1091
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_free(a)
Definition: astmm.h:182
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:838
unsigned int size
Definition: threadpool.c:806
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96

◆ ast_threadpool_shutdown()

void ast_threadpool_shutdown ( struct ast_threadpool pool)

Shut down a threadpool and destroy it.

Parameters
poolThe pool to shut down

Definition at line 965 of file threadpool.c.

References ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), ast_threadpool::control_tps, ast_threadpool::shutting_down, and ast_threadpool::tps.

Referenced by AST_TEST_DEFINE(), load_module(), sorcery_cleanup(), stasis_cleanup(), and unload_module().

966 {
967  if (!pool) {
968  return;
969  }
970  /* Shut down the taskprocessors and everything else just
971  * takes care of itself via the taskprocessor callbacks
972  */
973  ao2_lock(pool);
974  pool->shutting_down = 1;
975  ao2_unlock(pool);
978 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96