110 end.tv_sec = start.tv_sec + 30;
111 end.tv_nsec = start.tv_usec * 1000;
117 if (res == ETIMEDOUT) {
140 info->name =
"default_taskprocessor";
141 info->category =
"/main/taskprocessor/";
142 info->summary =
"Test of default taskprocessor";
144 "Ensures that a queued task gets executed.";
183#define TEST_DATA_ARRAY_SIZE 10
184#define LOW_WATER_MARK 3
185#define HIGH_WATER_MARK 6
190 unsigned int alert_level;
191 unsigned int subsystem_alert_level;
195 info->name =
"subsystem_alert";
196 info->category =
"/main/taskprocessor/";
197 info->summary =
"Test of subsystem alerts";
199 "Ensures alerts are generated properly.";
236 if (subsystem_alert_level) {
243 if (subsystem_alert_level > 0) {
247 if (alert_level > 0) {
252 if (subsystem_alert_level == 0) {
256 if (alert_level == 0) {
278 if (!subsystem_alert_level) {
285 if (subsystem_alert_level == 0) {
289 if (alert_level == 0) {
294 if (subsystem_alert_level > 0) {
298 if (alert_level > 0) {
314#define NUM_TASKS 20000
338 int *randdata = data;
355 struct timeval start;
364 info->name =
"default_taskprocessor_load";
365 info->category =
"/main/taskprocessor/";
366 info->summary =
"Load test of default taskprocessor";
368 "Ensure that a large number of queued tasks are executed in the proper order.";
383 ts.tv_sec = start.tv_sec + 60;
384 ts.tv_nsec = start.tv_usec * 1000;
402 if (timedwait_res == ETIMEDOUT) {
564 info->name =
"taskprocessor_listener";
565 info->category =
"/main/taskprocessor/";
566 info->summary =
"Test of taskprocessor listeners";
568 "Ensures that listener callbacks are called when expected.";
696 struct timespec
end = {
697 .tv_sec = start.tv_sec + 5,
698 .tv_nsec = start.tv_usec * 1000
720 struct timespec
end = {
721 .tv_sec = start.tv_sec + 5,
722 .tv_nsec = start.tv_usec * 1000
757 pthread_t shutdown_thread;
761 info->name =
"taskprocessor_shutdown";
762 info->category =
"/main/taskprocessor/";
763 info->summary =
"Test of taskprocessor shutdown sequence";
765 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
775 if (!tps || !task1 || !task2) {
799 if (pthread_res != 0) {
814 pthread_join(shutdown_thread,
NULL);
847 info->name = __func__;
848 info->category =
"/main/taskprocessor/";
849 info->summary =
"Test of pushing local data";
851 "Ensures that local data is passed along.";
885 if (local_data != 1) {
887 "Queued task did not set local_data!\n");
917 info->name =
"serializer_pool";
918 info->category =
"/main/taskprocessor/";
919 info->summary =
"Test using a serializer pool";
921 "Ensures that a queued task gets executed.";
944 serializer_pool =
NULL;
961 serializer_pool =
NULL;
968 ast_test_unregister(default_taskprocessor);
969 ast_test_unregister(default_taskprocessor_load);
971 ast_test_unregister(taskprocessor_listener);
972 ast_test_unregister(taskprocessor_shutdown);
973 ast_test_unregister(taskprocessor_push_local);
974 ast_test_unregister(serializer_pool);
980 ast_test_register(default_taskprocessor);
981 ast_test_register(default_taskprocessor_load);
983 ast_test_register(taskprocessor_listener);
984 ast_test_register(taskprocessor_shutdown);
985 ast_test_register(taskprocessor_push_local);
986 ast_test_register(serializer_pool);
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
#define ast_calloc(num, len)
A wrapper for calloc()
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ao2_alloc(data_size, destructor_fn)
#define ast_cond_destroy(cond)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_mutex_init(pmutex)
#define ast_mutex_unlock(a)
pthread_cond_t ast_cond_t
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ast_mutex_destroy(a)
#define ast_mutex_lock(a)
#define ast_cond_signal(cond)
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
def ignore(key=None, val=None, section=None, pjsip=None, nmapped=None, type='endpoint')
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
static struct ast_threadpool * threadpool
Thread pool for observers.
Structure for mutex and tracking information.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
A listener for taskprocessors.
A ast_taskprocessor structure is a singleton by name.
An opaque threadpool structure.
Relevant data associated with taskprocessor load test.
userdata associated with baseline taskprocessor test
Private data for the test taskprocessor listener.
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
#define ast_test_status_update(a, b, c...)
static void data_cleanup(void *data)
AST_TEST_DEFINE(default_taskprocessor)
Baseline test for default taskprocessor.
static struct task_data * task_data_create(void)
Create a task_data object.
static void shutdown_data_dtor(void *data)
static int task(void *data)
Queued task for baseline test.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static struct shutdown_data * shutdown_data_create(int dont_wait)
static void * tps_shutdown_thread(void *data)
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
#define TEST_DATA_ARRAY_SIZE
static void shutdown_poke(struct shutdown_data *shutdown_data)
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static int shutdown_task_exec(void *data)
static int load_module(void)
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
static void task_data_dtor(void *obj)
static int local_task_exe(struct ast_taskprocessor_local *local)
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
static int unload_module(void)
static struct load_task_data load_task_results
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define ast_pthread_create(a, b, c, d)
long int ast_random(void)