154 struct timespec
end = {
155 .tv_sec = start.tv_sec + 5,
156 .tv_nsec = start.tv_usec * 1000
181 struct timespec
end = {
182 .tv_sec = start.tv_sec + 5,
183 .tv_nsec = start.tv_usec * 1000
197 struct timespec
end = {
198 .tv_sec = start.tv_sec + 5,
199 .tv_nsec = start.tv_usec * 1000
220 struct timespec
end = {
221 .tv_sec = start.tv_sec + 5,
222 .tv_nsec = start.tv_usec * 1000
244 struct ast_test *
test,
308 info->category =
"/main/threadpool/";
309 info->summary =
"Test task";
311 "Basic threadpool test";
390 .num_tasks_executed = 0,
393 struct timeval start;
399 e->
command =
"threadpool push efficiency";
401 "Usage: threadpool push efficiency\n"
402 " Pushes 200 tasks to a threadpool and measures\n"
403 " the number of tasks executed within 30 seconds.\n";
427 for (i = 0; i < 200; i++) {
435 end.tv_sec = start.tv_sec + 30;
436 end.tv_nsec = start.tv_usec * 1000;
475 info->name =
"initial_threads";
476 info->category =
"/main/threadpool/";
477 info->summary =
"Test threadpool initialization state";
479 "Ensure that a threadpool created with a specific size contains the\n"
480 "proper number of idle threads.";
527 info->name =
"thread_creation";
528 info->category =
"/main/threadpool/";
529 info->summary =
"Test threadpool thread creation";
531 "Ensure that threads can be added to a threadpool";
582 info->name =
"thread_destruction";
583 info->category =
"/main/threadpool/";
584 info->summary =
"Test threadpool thread destruction";
586 "Ensure that threads are properly destroyed in a threadpool";
646 info->name =
"thread_timeout";
647 info->category =
"/main/threadpool/";
648 info->summary =
"Test threadpool thread timeout";
650 "Ensure that a thread with a two second timeout dies as expected.";
714 info->name =
"thread_timeout_thrash";
715 info->category =
"/main/threadpool/";
716 info->summary =
"Thrash threadpool thread timeout";
718 "Repeatedly queue a task when a threadpool thread should timeout.";
741 for (iteration = 0; iteration < 30; ++iteration) {
744 struct timespec
end = {
745 .tv_sec = start.tv_sec +
options.idle_timeout,
746 .tv_nsec = start.tv_usec * 1000
805 info->name =
"one_task_one_thread";
806 info->category =
"/main/threadpool/";
807 info->summary =
"Test a single task with a single thread";
809 "Push a task into an empty threadpool, then add a thread to the pool.";
889 info->name =
"one_thread_one_task";
890 info->category =
"/main/threadpool/";
891 info->summary =
"Test a single thread with a single task";
893 "Add a thread to the pool and then push a task to it.";
976 info->name =
"one_thread_multiple_tasks";
977 info->category =
"/main/threadpool/";
978 info->summary =
"Test a single thread with multiple tasks";
980 "Add a thread to the pool and then push three tasks to it.";
1004 if (!std1 || !std2 || !std3) {
1067 struct timeval start;
1068 struct timespec
end;
1076 end.tv_sec = start.tv_sec + 5;
1077 end.tv_nsec = start.tv_usec * 1000;
1111 .auto_increment = 3,
1118 info->name =
"auto_increment";
1119 info->category =
"/main/threadpool/";
1120 info->summary =
"Test that the threadpool grows as tasks are added";
1122 "Create an empty threadpool and push a task to it. Once the task is\n"
1123 "pushed, the threadpool should add three threads and be able to\n"
1124 "handle the task. The threads should then go idle";
1149 if (!std1 || !std2 || !std3 || !std4) {
1236 .auto_increment = 3,
1243 info->name =
"max_size";
1244 info->category =
"/main/threadpool/";
1245 info->summary =
"Test that the threadpool does not exceed its maximum size restriction";
1247 "Create an empty threadpool and push a task to it. Once the task is\n"
1248 "pushed, the threadpool should attempt to grow by three threads, but the\n"
1249 "pool's restrictions should only allow two threads to be added.";
1309 .auto_increment = 0,
1316 info->name =
"reactivation";
1317 info->category =
"/main/threadpool/";
1318 info->summary =
"Test that a threadpool reactivates when work is added";
1320 "Push a task into a threadpool. Make sure the task executes and the\n"
1321 "thread goes idle. Then push a second task and ensure that the thread\n"
1322 "awakens and executes the second task.";
1345 if (!std1 || !std2) {
1469 struct timespec
end = {
1470 .tv_sec = start.tv_sec + 5,
1471 .tv_nsec = start.tv_usec * 1000
1487 struct timespec
end = {
1488 .tv_sec = start.tv_sec + 1,
1489 .tv_nsec = start.tv_usec * 1000
1505 struct timespec
end = {
1506 .tv_sec = start.tv_sec + 5,
1507 .tv_nsec = start.tv_usec * 1000
1535 .auto_increment = 0,
1542 info->name =
"task_distribution";
1543 info->category =
"/main/threadpool/";
1544 info->summary =
"Test that tasks are evenly distributed to threads";
1546 "Push two tasks into a threadpool. Ensure that each is handled by\n"
1547 "a separate thread";
1570 if (!ctd1 || !ctd2) {
1634 .auto_increment = 0,
1641 info->name =
"more_destruction";
1642 info->category =
"/main/threadpool/";
1643 info->summary =
"Test that threads are destroyed as expected";
1645 "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1646 "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1647 "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1648 "and ensure that both tasks complete.";
1671 if (!ctd1 || !ctd2) {
1752 .auto_increment = 0,
1759 info->name =
"threadpool_serializer";
1760 info->category =
"/main/threadpool/";
1761 info->summary =
"Test that serializers";
1763 "Ensures that tasks enqueued to a serialize execute in sequence.";
1778 if (!uut || !data1 || !data2 || !data3) {
1899 .auto_increment = 0,
1904 struct timeval start;
1905 struct timespec
end;
1907 int num_tasks_executed = 0;
1912 e->
command =
"threadpool push serializer efficiency";
1914 "Usage: threadpool push serializer efficiency\n"
1915 " Pushes 200 tasks to a threadpool in serializers and measures\n"
1916 " the number of tasks executed within 30 seconds.\n";
1927 memset(&etd, 0,
sizeof(etd));
1940 for (i = 0; i < 200; i++) {
1960 for (i = 0; i < 200; i++) {
1968 end.tv_sec = start.tv_sec + 30;
1969 end.tv_nsec = start.tv_usec * 1000;
1977 ast_cli(
a->fd,
"Total tasks executed in 30 seconds: %d\n", num_tasks_executed);
1987 for (i = 0; i < 200; i++) {
2006 .auto_increment = 0,
2013 info->name =
"threadpool_serializer_dupe";
2014 info->category =
"/main/threadpool/";
2015 info->summary =
"Test that serializers are uniquely named";
2017 "Creating two serializers with the same name should\n"
2037 if (there_can_be_only_one) {
2058 ast_test_unregister(threadpool_push);
2059 ast_test_unregister(threadpool_initial_threads);
2060 ast_test_unregister(threadpool_thread_creation);
2061 ast_test_unregister(threadpool_thread_destruction);
2062 ast_test_unregister(threadpool_thread_timeout);
2063 ast_test_unregister(threadpool_thread_timeout_thrash);
2064 ast_test_unregister(threadpool_one_task_one_thread);
2065 ast_test_unregister(threadpool_one_thread_one_task);
2066 ast_test_unregister(threadpool_one_thread_multiple_tasks);
2067 ast_test_unregister(threadpool_auto_increment);
2068 ast_test_unregister(threadpool_max_size);
2069 ast_test_unregister(threadpool_reactivation);
2070 ast_test_unregister(threadpool_task_distribution);
2071 ast_test_unregister(threadpool_more_destruction);
2072 ast_test_unregister(threadpool_serializer);
2073 ast_test_unregister(threadpool_serializer_dupe);
2081 ast_test_register(threadpool_push);
2082 ast_test_register(threadpool_initial_threads);
2083 ast_test_register(threadpool_thread_creation);
2084 ast_test_register(threadpool_thread_destruction);
2085 ast_test_register(threadpool_thread_timeout);
2086 ast_test_register(threadpool_thread_timeout_thrash);
2087 ast_test_register(threadpool_one_task_one_thread);
2088 ast_test_register(threadpool_one_thread_one_task);
2089 ast_test_register(threadpool_one_thread_multiple_tasks);
2090 ast_test_register(threadpool_auto_increment);
2091 ast_test_register(threadpool_max_size);
2092 ast_test_register(threadpool_reactivation);
2093 ast_test_register(threadpool_task_distribution);
2094 ast_test_register(threadpool_more_destruction);
2095 ast_test_register(threadpool_serializer);
2096 ast_test_register(threadpool_serializer_dupe);
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
#define ast_calloc(num, len)
A wrapper for calloc()
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Support for logging to various files, console and syslog Configuration in file logger....
Asterisk locking-related definitions:
#define ast_cond_destroy(cond)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define ast_cond_signal(cond)
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
descriptor for a cli entry.
Structure for mutex and tracking information.
A ast_taskprocessor structure is a singleton by name.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
listener for a threadpool
An opaque threadpool structure.
struct ast_threadpool * pool
struct ast_taskpool * pool
struct ast_taskprocessor * serializer[2]
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
#define ast_test_status_update(a, b, c...)
static struct test_listener_data * test_alloc(void)
static int has_complex_started(struct complex_task_data *ctd)
static struct complex_task_data * complex_task_data_alloc(void)
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
static void test_shutdown(struct ast_threadpool_listener *listener)
static const struct ast_threadpool_listener_callbacks test_callbacks
static int wait_for_complex_start(struct complex_task_data *ctd)
static void test_emptied(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
AST_TEST_DEFINE(threadpool_push)
static char * handle_cli_threadpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static enum ast_test_result_state listener_check(struct ast_test *test, struct ast_threadpool_listener *listener, int task_pushed, int was_empty, int num_tasks, int num_active, int num_idle, int empty_notice)
static void complex_task_data_free(struct complex_task_data *ctd)
static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
static int complex_task(void *data)
static struct simple_task_data * simple_task_data_alloc(void)
static int serializer_efficiency_task(void *data)
static void poke_worker(struct complex_task_data *ctd)
static struct ast_cli_entry cli[]
static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
static void test_task_pushed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
static int load_module(void)
static char * handle_cli_threadpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int unload_module(void)
static int simple_task(void *data)
static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
static void simple_task_data_free(struct simple_task_data *std)
static int efficiency_task(void *data)
static void test_state_changed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
Push a task to the threadpool.
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size)
Set the number of threads for the thread pool.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
#define AST_THREADPOOL_OPTIONS_VERSION
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().