113 end.tv_sec = start.tv_sec + 30;
114 end.tv_nsec = start.tv_usec * 1000;
120 if (res == ETIMEDOUT) {
143 info->name =
"default_taskprocessor";
144 info->category =
"/main/taskprocessor/";
145 info->summary =
"Test of default taskprocessor";
147 "Ensures that a queued task gets executed.";
186#define TEST_DATA_ARRAY_SIZE 10
187#define LOW_WATER_MARK 3
188#define HIGH_WATER_MARK 6
193 unsigned int alert_level;
194 unsigned int subsystem_alert_level;
198 info->name =
"subsystem_alert";
199 info->category =
"/main/taskprocessor/";
200 info->summary =
"Test of subsystem alerts";
202 "Ensures alerts are generated properly.";
239 if (subsystem_alert_level) {
246 if (subsystem_alert_level > 0) {
250 if (alert_level > 0) {
255 if (subsystem_alert_level == 0) {
259 if (alert_level == 0) {
281 if (!subsystem_alert_level) {
288 if (subsystem_alert_level == 0) {
292 if (alert_level == 0) {
297 if (subsystem_alert_level > 0) {
301 if (alert_level > 0) {
317#define NUM_TASKS 20000
341 int *randdata = data;
358 struct timeval start;
367 info->name =
"default_taskprocessor_load";
368 info->category =
"/main/taskprocessor/";
369 info->summary =
"Load test of default taskprocessor";
371 "Ensure that a large number of queued tasks are executed in the proper order.";
386 ts.tv_sec = start.tv_sec + 60;
387 ts.tv_nsec = start.tv_usec * 1000;
405 if (timedwait_res == ETIMEDOUT) {
567 info->name =
"taskprocessor_listener";
568 info->category =
"/main/taskprocessor/";
569 info->summary =
"Test of taskprocessor listeners";
571 "Ensures that listener callbacks are called when expected.";
699 struct timespec
end = {
700 .tv_sec = start.tv_sec + 5,
701 .tv_nsec = start.tv_usec * 1000
723 struct timespec
end = {
724 .tv_sec = start.tv_sec + 5,
725 .tv_nsec = start.tv_usec * 1000
760 pthread_t shutdown_thread;
764 info->name =
"taskprocessor_shutdown";
765 info->category =
"/main/taskprocessor/";
766 info->summary =
"Test of taskprocessor shutdown sequence";
768 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
778 if (!tps || !task1 || !task2) {
802 if (pthread_res != 0) {
817 pthread_join(shutdown_thread,
NULL);
850 info->name = __func__;
851 info->category =
"/main/taskprocessor/";
852 info->summary =
"Test of pushing local data";
854 "Ensures that local data is passed along.";
888 if (local_data != 1) {
890 "Queued task did not set local_data!\n");
920 info->name =
"serializer_pool";
921 info->category =
"/main/taskprocessor/";
922 info->summary =
"Test using a serializer pool";
924 "Ensures that a queued task gets executed.";
932 "test/test", 5, threadpool, 2));
947 serializer_pool =
NULL;
964 serializer_pool =
NULL;
981 int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0;
982 char cli_command[128];
983 int cli_output_fd[2];
984 char output_buffer[4096] = {0};
990 info->name =
"taskprocessor_cli_show";
991 info->category =
"/main/taskprocessor/";
992 info->summary =
"Test CLI command 'core show taskprocessor'";
994 "Verifies that the 'core show taskprocessor <name>' CLI command\n"
995 "displays taskprocessor information and queued tasks correctly.";
1002 if (pipe(cli_output_fd) != 0) {
1011 close(cli_output_fd[0]);
1012 close(cli_output_fd[1]);
1021 if (!task_data1 || !task_data2 || !task_data3) {
1051 snprintf(cli_command,
sizeof(cli_command),
"core show taskprocessor name test_cli_taskprocessor");
1059 close(cli_output_fd[1]);
1060 cli_output_fd[1] = -1;
1062 bytes_read = read(cli_output_fd[0], output_buffer,
sizeof(output_buffer) - 1);
1063 if (bytes_read <= 0) {
1067 output_buffer[bytes_read] =
'\0';
1073 if (!strstr(output_buffer,
"test_cli_taskprocessor")) {
1078 if (!strstr(output_buffer,
"Current queue size")) {
1084 if (!strstr(output_buffer,
"Queued Tasks") && !strstr(output_buffer,
"Currently executing")) {
1090 if (!strstr(output_buffer,
"Task #")) {
1122 if (cli_output_fd[0] >= 0) {
1123 close(cli_output_fd[0]);
1125 if (cli_output_fd[1] >= 0) {
1126 close(cli_output_fd[1]);
1135 ast_test_unregister(default_taskprocessor);
1136 ast_test_unregister(default_taskprocessor_load);
1138 ast_test_unregister(taskprocessor_listener);
1139 ast_test_unregister(taskprocessor_shutdown);
1140 ast_test_unregister(taskprocessor_push_local);
1141 ast_test_unregister(serializer_pool);
1142 ast_test_unregister(taskprocessor_cli_show);
1148 ast_test_register(default_taskprocessor);
1149 ast_test_register(default_taskprocessor_load);
1151 ast_test_register(taskprocessor_listener);
1152 ast_test_register(taskprocessor_shutdown);
1153 ast_test_register(taskprocessor_push_local);
1154 ast_test_register(serializer_pool);
1155 ast_test_register(taskprocessor_cli_show);
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
#define ast_calloc(num, len)
A wrapper for calloc()
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc(data_size, destructor_fn)
Standard Command Line Interface.
#define ast_cli_command(fd, s)
#define ast_cond_destroy(cond)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
pthread_cond_t ast_cond_t
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define ast_cond_signal(cond)
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
static void cleanup(void)
Clean up any old apps that we don't need any more.
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Structure for mutex and tracking information.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
An opaque threadpool structure.
Relevant data associated with taskprocessor load test.
userdata associated with baseline taskprocessor test
Private data for the test taskprocessor listener.
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_taskprocessor_push_local(tps, task_exe, datap)
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
#define ast_taskprocessor_push(tps, task_exe, datap)
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
#define ast_test_status_update(a, b, c...)
#define AST_TEST_DEFINE(hdr)
static void data_cleanup(void *data)
static struct task_data * task_data_create(void)
Create a task_data object.
static void shutdown_data_dtor(void *data)
static int task(void *data)
Queued task for baseline test.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static struct shutdown_data * shutdown_data_create(int dont_wait)
static void * tps_shutdown_thread(void *data)
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
#define TEST_DATA_ARRAY_SIZE
static void shutdown_poke(struct shutdown_data *shutdown_data)
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static int shutdown_task_exec(void *data)
static int load_module(void)
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
static void task_data_dtor(void *obj)
static int local_task_exe(struct ast_taskprocessor_local *local)
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
static int unload_module(void)
static struct load_task_data load_task_results
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define ast_pthread_create(a, b, c, d)
long int ast_random(void)