Asterisk - The Open Source Telephony Project GIT-master-f36a736
Data Structures | Macros | Functions | Variables
test_taskprocessor.c File Reference

taskprocessor unit tests More...

#include "asterisk.h"
#include "asterisk/test.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
Include dependency graph for test_taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  load_task_data
 Relevant data associated with taskprocessor load test. More...
 
struct  shutdown_data
 
struct  task_data
 userdata associated with baseline taskprocessor test More...
 
struct  test_listener_pvt
 Private data for the test taskprocessor listener. More...
 

Macros

#define HIGH_WATER_MARK   6
 
#define LOW_WATER_MARK   3
 
#define NUM_TASKS   20000
 
#define TEST_DATA_ARRAY_SIZE   10
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
 AST_TEST_DEFINE (default_taskprocessor)
 Baseline test for default taskprocessor. More...
 
 AST_TEST_DEFINE (default_taskprocessor_load)
 Load test for taskprocessor with default listener. More...
 
 AST_TEST_DEFINE (serializer_pool)
 Baseline test for a serializer pool. More...
 
 AST_TEST_DEFINE (subsystem_alert)
 Baseline test for subsystem alert. More...
 
 AST_TEST_DEFINE (taskprocessor_listener)
 Test for a taskprocessor with custom listener. More...
 
 AST_TEST_DEFINE (taskprocessor_push_local)
 
 AST_TEST_DEFINE (taskprocessor_shutdown)
 
static int check_stats (struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
 helper to ensure that statistics the listener is keeping are what we expect More...
 
static int listener_test_task (void *ignore)
 Queued task for taskprocessor listener test. More...
 
static int load_module (void)
 
static int load_task (void *data)
 a queued task to be used in the taskprocessor load test More...
 
static int local_task_exe (struct ast_taskprocessor_local *local)
 
static struct shutdown_datashutdown_data_create (int dont_wait)
 
static void shutdown_data_dtor (void *data)
 
static int shutdown_has_completed (struct shutdown_data *shutdown_data)
 
static void shutdown_poke (struct shutdown_data *shutdown_data)
 
static int shutdown_task_exec (void *data)
 
static int shutdown_waitfor_completion (struct shutdown_data *shutdown_data)
 
static int shutdown_waitfor_start (struct shutdown_data *shutdown_data)
 
static int task (void *data)
 Queued task for baseline test. More...
 
static struct task_datatask_data_create (void)
 Create a task_data object. More...
 
static void task_data_dtor (void *obj)
 
static int task_wait (struct task_data *task_data)
 Wait for a task to execute. More...
 
static void test_emptied (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's emptied callback. More...
 
static void * test_listener_pvt_alloc (void)
 test taskprocessor listener's alloc callback More...
 
static void test_shutdown (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's shutdown callback. More...
 
static int test_start (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's start callback More...
 
static void test_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 test taskprocessor listener's task_pushed callback More...
 
static void * tps_shutdown_thread (void *data)
 
static int unload_module (void)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "taskprocessor test module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, .support_level = AST_MODULE_SUPPORT_CORE, }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct load_task_data load_task_results
 
static const struct ast_taskprocessor_listener_callbacks test_callbacks
 

Detailed Description

taskprocessor unit tests

Author
Mark Michelson mmich.nosp@m.elso.nosp@m.n@dig.nosp@m.ium..nosp@m.com

Definition in file test_taskprocessor.c.

Macro Definition Documentation

◆ HIGH_WATER_MARK

#define HIGH_WATER_MARK   6

◆ LOW_WATER_MARK

#define LOW_WATER_MARK   3

◆ NUM_TASKS

#define NUM_TASKS   20000

Definition at line 314 of file test_taskprocessor.c.

◆ TEST_DATA_ARRAY_SIZE

#define TEST_DATA_ARRAY_SIZE   10

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 990 of file test_taskprocessor.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 990 of file test_taskprocessor.c.

◆ AST_MODULE_SELF_SYM()

struct ast_module * AST_MODULE_SELF_SYM ( void  )

Definition at line 990 of file test_taskprocessor.c.

◆ AST_TEST_DEFINE() [1/7]

AST_TEST_DEFINE ( default_taskprocessor  )

Baseline test for default taskprocessor.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 132 of file test_taskprocessor.c.

133{
136 int res;
137
138 switch (cmd) {
139 case TEST_INIT:
140 info->name = "default_taskprocessor";
141 info->category = "/main/taskprocessor/";
142 info->summary = "Test of default taskprocessor";
143 info->description =
144 "Ensures that a queued task gets executed.";
145 return AST_TEST_NOT_RUN;
146 case TEST_EXECUTE:
147 break;
148 }
149
151
152 if (!tps) {
153 ast_test_status_update(test, "Unable to create test taskprocessor\n");
154 return AST_TEST_FAIL;
155 }
156
158 if (!task_data) {
159 ast_test_status_update(test, "Unable to create task_data\n");
160 return AST_TEST_FAIL;
161 }
162
164 ast_test_status_update(test, "Failed to queue task\n");
165 return AST_TEST_FAIL;
166 }
167
168 res = task_wait(task_data);
169 if (res != 0) {
170 ast_test_status_update(test, "Queued task did not execute!\n");
171 return AST_TEST_FAIL;
172 }
173
174 return AST_TEST_PASS;
175}
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
def info(msg)
#define NULL
Definition: resample.c:96
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
userdata associated with baseline taskprocessor test
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
@ TEST_INIT
Definition: test.h:200
@ TEST_EXECUTE
Definition: test.h:201
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
@ AST_TEST_PASS
Definition: test.h:195
@ AST_TEST_FAIL
Definition: test.h:196
@ AST_TEST_NOT_RUN
Definition: test.h:194
static struct task_data * task_data_create(void)
Create a task_data object.
static int task(void *data)
Queued task for baseline test.
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941

References ao2_cleanup, ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), NULL, RAII_VAR, task(), task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

◆ AST_TEST_DEFINE() [2/7]

AST_TEST_DEFINE ( default_taskprocessor_load  )

Load test for taskprocessor with default listener.

This test queues a large number of tasks, each with random data associated. The test ensures that all of the tasks are run and that the tasks are executed in the same order that they were queued

Definition at line 352 of file test_taskprocessor.c.

353{
354 struct ast_taskprocessor *tps;
355 struct timeval start;
356 struct timespec ts;
358 int timedwait_res;
359 int i;
360 int rand_data[NUM_TASKS];
361
362 switch (cmd) {
363 case TEST_INIT:
364 info->name = "default_taskprocessor_load";
365 info->category = "/main/taskprocessor/";
366 info->summary = "Load test of default taskprocessor";
367 info->description =
368 "Ensure that a large number of queued tasks are executed in the proper order.";
369 return AST_TEST_NOT_RUN;
370 case TEST_EXECUTE:
371 break;
372 }
373
375
376 if (!tps) {
377 ast_test_status_update(test, "Unable to create test taskprocessor\n");
378 return AST_TEST_FAIL;
379 }
380
381 start = ast_tvnow();
382
383 ts.tv_sec = start.tv_sec + 60;
384 ts.tv_nsec = start.tv_usec * 1000;
385
389
390 for (i = 0; i < NUM_TASKS; ++i) {
391 rand_data[i] = ast_random();
392 if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
393 ast_test_status_update(test, "Failed to queue task\n");
394 res = AST_TEST_FAIL;
395 goto test_end;
396 }
397 }
398
402 if (timedwait_res == ETIMEDOUT) {
403 break;
404 }
405 }
407
409 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
411 res = AST_TEST_FAIL;
412 goto test_end;
413 }
414
415 for (i = 0; i < NUM_TASKS; ++i) {
416 if (rand_data[i] != load_task_results.task_rand[i]) {
417 ast_test_status_update(test, "Queued tasks did not execute in order\n");
418 res = AST_TEST_FAIL;
419 goto test_end;
420 }
421 }
422
423test_end:
427 return res;
428}
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
int task_rand[NUM_TASKS]
ast_test_result_state
Definition: test.h:193
#define NUM_TASKS
static struct load_task_data load_task_results
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
long int ast_random(void)
Definition: utils.c:2312

References ast_cond_destroy, ast_cond_init, ast_cond_timedwait, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_random(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, ast_tvnow(), load_task_data::cond, sip_to_pjsip::info(), load_task(), load_task_results, load_task_data::lock, NULL, NUM_TASKS, load_task_data::task_rand, load_task_data::tasks_completed, TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

◆ AST_TEST_DEFINE() [3/7]

AST_TEST_DEFINE ( serializer_pool  )

Baseline test for a serializer pool.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 901 of file test_taskprocessor.c.

902{
908 .idle_timeout = 0,
909 .auto_increment = 0,
910 .initial_size = 1,
911 .max_size = 0,
912 };
913 /* struct ast_taskprocessor *tps; */
914
915 switch (cmd) {
916 case TEST_INIT:
917 info->name = "serializer_pool";
918 info->category = "/main/taskprocessor/";
919 info->summary = "Test using a serializer pool";
920 info->description =
921 "Ensures that a queued task gets executed.";
922 return AST_TEST_NOT_RUN;
923 case TEST_EXECUTE:
924 break;
925 }
926
927 ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
928 ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
929 "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
930 ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
931 ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
932 ast_test_validate(test, task_data = task_data_create());
933
934 task_data->wait_time = 4000; /* task takes 4 seconds */
935 ast_test_validate(test, !ast_taskprocessor_push(
936 ast_serializer_pool_get(serializer_pool), task, task_data));
937
938 if (!ast_serializer_pool_destroy(serializer_pool)) {
939 ast_test_status_update(test, "Unexpected pool destruction!\n");
940 /*
941 * The pool should have timed out, so if it destruction reports success
942 * we need to fail.
943 */
944 serializer_pool = NULL;
945 return AST_TEST_FAIL;
946 }
947
948 ast_test_validate(test, !task_wait(task_data));
949
950 /* The first attempt should have failed. Second try should destroy successfully */
951 if (ast_serializer_pool_destroy(serializer_pool)) {
952 ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
953 /*
954 * If this fails we'll try again on return to hopefully avoid a memory leak.
955 * If it again times out a third time, well not much we can do.
956 */
957 return AST_TEST_FAIL;
958 }
959
960 /* Test passed, so set pool to NULL to avoid "re-running" destroy */
961 serializer_pool = NULL;
962
963 return AST_TEST_PASS;
964}
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
An opaque threadpool structure.
Definition: threadpool.c:36
unsigned long wait_time
static struct test_options options
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71

References ao2_cleanup, ast_serializer_pool_create(), ast_serializer_pool_destroy(), ast_serializer_pool_get(), ast_serializer_pool_name(), ast_serializer_pool_set_alerts(), ast_taskprocessor_push(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_shutdown(), sip_to_pjsip::info(), NULL, options, RAII_VAR, task(), task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, threadpool, and task_data::wait_time.

◆ AST_TEST_DEFINE() [4/7]

AST_TEST_DEFINE ( subsystem_alert  )

Baseline test for subsystem alert.

Definition at line 180 of file test_taskprocessor.c.

181{
183#define TEST_DATA_ARRAY_SIZE 10
184#define LOW_WATER_MARK 3
185#define HIGH_WATER_MARK 6
186 struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
187 int res = 0;
188 int i;
189 long queue_count;
190 unsigned int alert_level;
191 unsigned int subsystem_alert_level;
192
193 switch (cmd) {
194 case TEST_INIT:
195 info->name = "subsystem_alert";
196 info->category = "/main/taskprocessor/";
197 info->summary = "Test of subsystem alerts";
198 info->description =
199 "Ensures alerts are generated properly.";
200 return AST_TEST_NOT_RUN;
201 case TEST_EXECUTE:
202 break;
203 }
204
205 tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
206
207 if (!tps) {
208 ast_test_status_update(test, "Unable to create test taskprocessor\n");
209 return AST_TEST_FAIL;
210 }
211
214
215 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
217 if (!task_data[i]) {
218 ast_test_status_update(test, "Unable to create task_data\n");
219 res = -1;
220 goto data_cleanup;
221 }
222 task_data[i]->wait_time = 500;
223
224 ast_test_status_update(test, "Pushing task %d\n", i);
225 if (ast_taskprocessor_push(tps, task, task_data[i])) {
226 ast_test_status_update(test, "Failed to queue task\n");
227 res = -1;
228 goto data_cleanup;
229 }
230
231 queue_count = ast_taskprocessor_size(tps);
232 alert_level = ast_taskprocessor_alert_get();
233 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
234
235 if (queue_count == HIGH_WATER_MARK) {
236 if (subsystem_alert_level) {
237 ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
238 }
239 if (alert_level) {
240 ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
241 }
242 } else if (queue_count < HIGH_WATER_MARK) {
243 if (subsystem_alert_level > 0) {
244 ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
245 res = -1;
246 }
247 if (alert_level > 0) {
248 ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
249 res = -1;
250 }
251 } else {
252 if (subsystem_alert_level == 0) {
253 ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
254 res = -1;
255 }
256 if (alert_level == 0) {
257 ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
258 res = -1;
259 }
260 }
261 }
262
264
265 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
266 ast_test_status_update(test, "Waiting on task %d\n", i);
267 if (task_wait(task_data[i])) {
268 ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
269 res = -1;
270 goto data_cleanup;
271 }
272
273 queue_count = ast_taskprocessor_size(tps);
274 alert_level = ast_taskprocessor_alert_get();
275 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
276
277 if (queue_count == LOW_WATER_MARK) {
278 if (!subsystem_alert_level) {
279 ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
280 }
281 if (!alert_level) {
282 ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
283 }
284 } else if (queue_count > LOW_WATER_MARK) {
285 if (subsystem_alert_level == 0) {
286 ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
287 res = -1;
288 }
289 if (alert_level == 0) {
290 ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
291 res = -1;
292 }
293 } else {
294 if (subsystem_alert_level > 0) {
295 ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
296 res = -1;
297 }
298 if (alert_level > 0) {
299 ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
300 res = -1;
301 }
302 }
303
304 }
305
307 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
309 }
310
311 return res ? AST_TEST_FAIL : AST_TEST_PASS;
312}
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static void data_cleanup(void *data)
#define HIGH_WATER_MARK
#define TEST_DATA_ARRAY_SIZE
#define LOW_WATER_MARK

References ao2_cleanup, ast_taskprocessor_alert_get(), ast_taskprocessor_alert_set_levels(), ast_taskprocessor_get(), ast_taskprocessor_get_subsystem_alert(), ast_taskprocessor_push(), ast_taskprocessor_size(), ast_taskprocessor_suspend(), ast_taskprocessor_unreference(), ast_taskprocessor_unsuspend(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, data_cleanup(), HIGH_WATER_MARK, sip_to_pjsip::info(), LOW_WATER_MARK, NULL, RAII_VAR, task(), task_data_create(), task_wait(), TEST_DATA_ARRAY_SIZE, TEST_EXECUTE, TEST_INIT, TPS_REF_DEFAULT, and task_data::wait_time.

◆ AST_TEST_DEFINE() [5/7]

AST_TEST_DEFINE ( taskprocessor_listener  )

Test for a taskprocessor with custom listener.

This test pushes tasks to a taskprocessor with a custom listener, executes the tasks, and destroys the taskprocessor.

The test ensures that the listener's callbacks are called when expected and that the data being passed in is accurate.

Definition at line 555 of file test_taskprocessor.c.

556{
557 struct ast_taskprocessor *tps = NULL;
559 struct test_listener_pvt *pvt = NULL;
561
562 switch (cmd) {
563 case TEST_INIT:
564 info->name = "taskprocessor_listener";
565 info->category = "/main/taskprocessor/";
566 info->summary = "Test of taskprocessor listeners";
567 info->description =
568 "Ensures that listener callbacks are called when expected.";
569 return AST_TEST_NOT_RUN;
570 case TEST_EXECUTE:
571 break;
572 }
573
575 if (!pvt) {
576 ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
577 return AST_TEST_FAIL;
578 }
579
581 if (!listener) {
582 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
583 res = AST_TEST_FAIL;
584 goto test_exit;
585 }
586
587 tps = ast_taskprocessor_create_with_listener("test_listener", listener);
588 if (!tps) {
589 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
590 res = AST_TEST_FAIL;
591 goto test_exit;
592 }
593
595 ast_test_status_update(test, "Failed to queue task\n");
596 res = AST_TEST_FAIL;
597 goto test_exit;
598 }
599
600 if (check_stats(test, pvt, 1, 0, 1) < 0) {
601 res = AST_TEST_FAIL;
602 goto test_exit;
603 }
604
606 ast_test_status_update(test, "Failed to queue task\n");
607 res = AST_TEST_FAIL;
608 goto test_exit;
609 }
610
611 if (check_stats(test, pvt, 2, 0, 1) < 0) {
612 res = AST_TEST_FAIL;
613 goto test_exit;
614 }
615
617
618 if (check_stats(test, pvt, 2, 0, 1) < 0) {
619 res = AST_TEST_FAIL;
620 goto test_exit;
621 }
622
624
625 if (check_stats(test, pvt, 2, 1, 1) < 0) {
626 res = AST_TEST_FAIL;
627 goto test_exit;
628 }
629
631
632 if (!pvt->shutdown) {
633 res = AST_TEST_FAIL;
634 goto test_exit;
635 }
636
637test_exit:
639 /* This is safe even if tps is NULL */
641 ast_free(pvt);
642 return res;
643}
static void * listener(void *unused)
Definition: asterisk.c:1519
#define ast_free(a)
Definition: astmm.h:180
A listener for taskprocessors.
Private data for the test taskprocessor listener.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
static const struct ast_taskprocessor_listener_callbacks test_callbacks

References ao2_cleanup, ast_free, ast_taskprocessor_create_with_listener(), ast_taskprocessor_execute(), ast_taskprocessor_listener_alloc(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, check_stats(), sip_to_pjsip::info(), listener(), listener_test_task(), NULL, test_listener_pvt::shutdown, test_callbacks, TEST_EXECUTE, TEST_INIT, and test_listener_pvt_alloc().

◆ AST_TEST_DEFINE() [6/7]

AST_TEST_DEFINE ( taskprocessor_push_local  )

Definition at line 837 of file test_taskprocessor.c.

838{
839 RAII_VAR(struct ast_taskprocessor *, tps, NULL,
842 int local_data;
843 int res;
844
845 switch (cmd) {
846 case TEST_INIT:
847 info->name = __func__;
848 info->category = "/main/taskprocessor/";
849 info->summary = "Test of pushing local data";
850 info->description =
851 "Ensures that local data is passed along.";
852 return AST_TEST_NOT_RUN;
853 case TEST_EXECUTE:
854 break;
855 }
856
857
859 if (!tps) {
860 ast_test_status_update(test, "Unable to create test taskprocessor\n");
861 return AST_TEST_FAIL;
862 }
863
864
866 if (!task_data) {
867 ast_test_status_update(test, "Unable to create task_data\n");
868 return AST_TEST_FAIL;
869 }
870
871 local_data = 0;
872 ast_taskprocessor_set_local(tps, &local_data);
873
875 ast_test_status_update(test, "Failed to queue task\n");
876 return AST_TEST_FAIL;
877 }
878
879 res = task_wait(task_data);
880 if (res != 0) {
881 ast_test_status_update(test, "Queued task did not execute!\n");
882 return AST_TEST_FAIL;
883 }
884
885 if (local_data != 1) {
887 "Queued task did not set local_data!\n");
888 return AST_TEST_FAIL;
889 }
890
891 return AST_TEST_PASS;
892}
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int local_task_exe(struct ast_taskprocessor_local *local)

References ao2_cleanup, ast_taskprocessor_get(), ast_taskprocessor_push_local(), ast_taskprocessor_set_local(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), local_task_exe(), NULL, RAII_VAR, task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

◆ AST_TEST_DEFINE() [7/7]

AST_TEST_DEFINE ( taskprocessor_shutdown  )

Definition at line 749 of file test_taskprocessor.c.

750{
752 RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
753 RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
754 int push_res;
755 int wait_res;
756 int pthread_res;
757 pthread_t shutdown_thread;
758
759 switch (cmd) {
760 case TEST_INIT:
761 info->name = "taskprocessor_shutdown";
762 info->category = "/main/taskprocessor/";
763 info->summary = "Test of taskprocessor shutdown sequence";
764 info->description =
765 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
766 return AST_TEST_NOT_RUN;
767 case TEST_EXECUTE:
768 break;
769 }
770
771 tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
772 task1 = shutdown_data_create(0); /* task1 waits to be poked */
773 task2 = shutdown_data_create(1); /* task2 waits for nothing */
774
775 if (!tps || !task1 || !task2) {
776 ast_test_status_update(test, "Allocation error\n");
777 return AST_TEST_FAIL;
778 }
779
780 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
781 if (push_res != 0) {
782 ast_test_status_update(test, "Could not push task1\n");
783 return AST_TEST_FAIL;
784 }
785
786 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
787 if (push_res != 0) {
788 ast_test_status_update(test, "Could not push task2\n");
789 return AST_TEST_FAIL;
790 }
791
792 wait_res = shutdown_waitfor_start(task1);
793 if (!wait_res) {
794 ast_test_status_update(test, "Task1 didn't start\n");
795 return AST_TEST_FAIL;
796 }
797
798 pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
799 if (pthread_res != 0) {
800 ast_test_status_update(test, "Failed to create shutdown thread\n");
801 return AST_TEST_FAIL;
802 }
803 tps = NULL;
804
805 /* Wakeup task1; it should complete */
806 shutdown_poke(task1);
807 wait_res = shutdown_waitfor_completion(task1);
808 if (!wait_res) {
809 ast_test_status_update(test, "Task1 didn't complete\n");
810 return AST_TEST_FAIL;
811 }
812
813 /* Wait for shutdown to complete */
814 pthread_join(shutdown_thread, NULL);
815
816 /* Should have also completed task2 */
817 wait_res = shutdown_has_completed(task2);
818 if (!wait_res) {
819 ast_test_status_update(test, "Task2 didn't finish\n");
820 return AST_TEST_FAIL;
821 }
822
823 return AST_TEST_PASS;
824}
static struct shutdown_data * shutdown_data_create(int dont_wait)
static void * tps_shutdown_thread(void *data)
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
static void shutdown_poke(struct shutdown_data *shutdown_data)
static int shutdown_task_exec(void *data)
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584

References ao2_cleanup, ast_pthread_create, ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), NULL, RAII_VAR, shutdown_data_create(), shutdown_has_completed(), shutdown_poke(), shutdown_task_exec(), shutdown_waitfor_completion(), shutdown_waitfor_start(), TEST_EXECUTE, TEST_INIT, TPS_REF_DEFAULT, and tps_shutdown_thread().

◆ check_stats()

static int check_stats ( struct ast_test *  test,
const struct test_listener_pvt pvt,
int  num_pushed,
int  num_emptied,
int  num_was_empty 
)
static

helper to ensure that statistics the listener is keeping are what we expect

Parameters
testThe currently-running test
pvtThe private data for the taskprocessor listener
num_pushedThe expected current number of tasks pushed to the processor
num_emptiedThe expected current number of times the taskprocessor has become empty
num_was_emptyThe expected current number of times that tasks were pushed to an empty taskprocessor
Return values
-1Stats were not as expected
0Stats were as expected

Definition at line 523 of file test_taskprocessor.c.

524{
525 if (pvt->num_pushed != num_pushed) {
526 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
527 num_pushed, pvt->num_pushed);
528 return -1;
529 }
530
531 if (pvt->num_emptied != num_emptied) {
532 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
533 num_emptied, pvt->num_emptied);
534 return -1;
535 }
536
537 if (pvt->num_was_empty != num_was_empty) {
538 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
539 num_was_empty, pvt->num_emptied);
540 return -1;
541 }
542
543 return 0;
544}

References ast_test_status_update, test_listener_pvt::num_emptied, test_listener_pvt::num_pushed, and test_listener_pvt::num_was_empty.

Referenced by AST_TEST_DEFINE().

◆ listener_test_task()

static int listener_test_task ( void *  ignore)
static

Queued task for taskprocessor listener test.

Does nothing.

Definition at line 507 of file test_taskprocessor.c.

508{
509 return 0;
510}

Referenced by AST_TEST_DEFINE().

◆ load_module()

static int load_module ( void  )
static

Definition at line 978 of file test_taskprocessor.c.

979{
980 ast_test_register(default_taskprocessor);
981 ast_test_register(default_taskprocessor_load);
982 ast_test_register(subsystem_alert);
983 ast_test_register(taskprocessor_listener);
984 ast_test_register(taskprocessor_shutdown);
985 ast_test_register(taskprocessor_push_local);
986 ast_test_register(serializer_pool);
988}
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70

References AST_MODULE_LOAD_SUCCESS.

◆ load_task()

static int load_task ( void *  data)
static

a queued task to be used in the taskprocessor load test

The task increments the number of tasks executed and puts the passed-in data into the next slot in the array of random data.

Definition at line 336 of file test_taskprocessor.c.

337{
338 int *randdata = data;
342 return 0;
343}
ast_mutex_t lock
Definition: app_sla.c:331
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ast_cond_signal(cond)
Definition: lock.h:203

References ast_cond_signal, load_task_data::cond, load_task_results, lock, load_task_data::lock, SCOPED_MUTEX, load_task_data::task_rand, and load_task_data::tasks_completed.

Referenced by AST_TEST_DEFINE().

◆ local_task_exe()

static int local_task_exe ( struct ast_taskprocessor_local local)
static

Definition at line 826 of file test_taskprocessor.c.

827{
828 int *local_data = local->local_data;
829 struct task_data *task_data = local->data;
830
831 *local_data = 1;
833
834 return 0;
835}

References ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, ast_taskprocessor::local_data, and task().

Referenced by AST_TEST_DEFINE().

◆ shutdown_data_create()

static struct shutdown_data * shutdown_data_create ( int  dont_wait)
static

Definition at line 662 of file test_taskprocessor.c.

663{
665
667 if (!shutdown_data) {
668 return NULL;
669 }
670
674 shutdown_data->task_stop_waiting = dont_wait;
676 return shutdown_data;
677}
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static void shutdown_data_dtor(void *data)

References ao2_alloc, ao2_cleanup, ao2_ref, ast_cond_init, ast_mutex_init, shutdown_data::in, shutdown_data::lock, NULL, shutdown_data::out, RAII_VAR, shutdown_data_dtor(), and shutdown_data::task_stop_waiting.

Referenced by AST_TEST_DEFINE().

◆ shutdown_data_dtor()

static void shutdown_data_dtor ( void *  data)
static

◆ shutdown_has_completed()

static int shutdown_has_completed ( struct shutdown_data shutdown_data)
static

◆ shutdown_poke()

static void shutdown_poke ( struct shutdown_data shutdown_data)
static

◆ shutdown_task_exec()

static int shutdown_task_exec ( void *  data)
static

◆ shutdown_waitfor_completion()

static int shutdown_waitfor_completion ( struct shutdown_data shutdown_data)
static

Definition at line 693 of file test_taskprocessor.c.

694{
695 struct timeval start = ast_tvnow();
696 struct timespec end = {
697 .tv_sec = start.tv_sec + 5,
698 .tv_nsec = start.tv_usec * 1000
699 };
701
702 while (!shutdown_data->task_complete) {
703 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
704 break;
705 }
706 }
707
709}
char * end
Definition: eagi_proxy.c:73

References ast_cond_timedwait, ast_tvnow(), end, lock, shutdown_data::lock, shutdown_data::out, SCOPED_MUTEX, and shutdown_data::task_complete.

Referenced by AST_TEST_DEFINE().

◆ shutdown_waitfor_start()

static int shutdown_waitfor_start ( struct shutdown_data shutdown_data)
static

Definition at line 717 of file test_taskprocessor.c.

718{
719 struct timeval start = ast_tvnow();
720 struct timespec end = {
721 .tv_sec = start.tv_sec + 5,
722 .tv_nsec = start.tv_usec * 1000
723 };
725
726 while (!shutdown_data->task_started) {
727 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
728 break;
729 }
730 }
731
733}

References ast_cond_timedwait, ast_tvnow(), end, lock, shutdown_data::lock, shutdown_data::out, SCOPED_MUTEX, and shutdown_data::task_started.

Referenced by AST_TEST_DEFINE().

◆ task()

static int task ( void *  data)
static

Queued task for baseline test.

The task simply sets a boolean to indicate the task has been run and then signals a condition saying it's complete

Definition at line 88 of file test_taskprocessor.c.

89{
90 struct task_data *task_data = data;
91
93 if (task_data->wait_time > 0) {
94 usleep(task_data->wait_time * 1000);
95 }
98 return 0;
99}
ast_cond_t cond
ast_mutex_t lock

References ast_cond_signal, task_data::cond, lock, task_data::lock, SCOPED_MUTEX, task_data::task_complete, and task_data::wait_time.

Referenced by AST_TEST_DEFINE(), ast_threadpool_push(), local_task_exe(), sched_free(), tps_task_free(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

◆ task_data_create()

static struct task_data * task_data_create ( void  )
static

Create a task_data object.

Definition at line 64 of file test_taskprocessor.c.

65{
66 struct task_data *task_data =
68
69 if (!task_data) {
70 return NULL;
71 }
72
77
78 return task_data;
79}
static void task_data_dtor(void *obj)

References ao2_alloc, ast_cond_init, ast_mutex_init, task_data::cond, task_data::lock, NULL, task_data::task_complete, task_data_dtor(), and task_data::wait_time.

Referenced by AST_TEST_DEFINE().

◆ task_data_dtor()

static void task_data_dtor ( void *  obj)
static

Definition at line 55 of file test_taskprocessor.c.

56{
57 struct task_data *task_data = obj;
58
61}

References ast_cond_destroy, ast_mutex_destroy, task_data::cond, and task_data::lock.

Referenced by task_data_create().

◆ task_wait()

static int task_wait ( struct task_data task_data)
static

Wait for a task to execute.

Definition at line 104 of file test_taskprocessor.c.

105{
106 struct timeval start = ast_tvnow();
107 struct timespec end;
109
110 end.tv_sec = start.tv_sec + 30;
111 end.tv_nsec = start.tv_usec * 1000;
112
113 while (!task_data->task_complete) {
114 int res;
116 &end);
117 if (res == ETIMEDOUT) {
118 return -1;
119 }
120 }
121
122 return 0;
123}

References ast_cond_timedwait, ast_tvnow(), task_data::cond, end, lock, task_data::lock, SCOPED_MUTEX, and task_data::task_complete.

Referenced by AST_TEST_DEFINE().

◆ test_emptied()

static void test_emptied ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's emptied callback.

Definition at line 480 of file test_taskprocessor.c.

481{
483 ++pvt->num_emptied;
484}
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.

References ast_taskprocessor_listener_get_user_data(), listener(), and test_listener_pvt::num_emptied.

◆ test_listener_pvt_alloc()

static void * test_listener_pvt_alloc ( void  )
static

test taskprocessor listener's alloc callback

Definition at line 447 of file test_taskprocessor.c.

448{
449 struct test_listener_pvt *pvt;
450
451 pvt = ast_calloc(1, sizeof(*pvt));
452 return pvt;
453}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202

References ast_calloc.

Referenced by AST_TEST_DEFINE().

◆ test_shutdown()

static void test_shutdown ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's shutdown callback.

Definition at line 489 of file test_taskprocessor.c.

490{
492 pvt->shutdown = 1;
493}

References ast_taskprocessor_listener_get_user_data(), listener(), and test_listener_pvt::shutdown.

◆ test_start()

static int test_start ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's start callback

Definition at line 458 of file test_taskprocessor.c.

459{
460 return 0;
461}

◆ test_task_pushed()

static void test_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

test taskprocessor listener's task_pushed callback

Adjusts private data's stats as indicated by the parameters.

Definition at line 468 of file test_taskprocessor.c.

469{
471 ++pvt->num_pushed;
472 if (was_empty) {
473 ++pvt->num_was_empty;
474 }
475}

References ast_taskprocessor_listener_get_user_data(), listener(), test_listener_pvt::num_pushed, and test_listener_pvt::num_was_empty.

◆ tps_shutdown_thread()

static void * tps_shutdown_thread ( void *  data)
static

Definition at line 742 of file test_taskprocessor.c.

743{
744 struct ast_taskprocessor *tps = data;
746 return NULL;
747}

References ast_taskprocessor_unreference(), and NULL.

Referenced by AST_TEST_DEFINE().

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 966 of file test_taskprocessor.c.

967{
968 ast_test_unregister(default_taskprocessor);
969 ast_test_unregister(default_taskprocessor_load);
970 ast_test_unregister(subsystem_alert);
971 ast_test_unregister(taskprocessor_listener);
972 ast_test_unregister(taskprocessor_shutdown);
973 ast_test_unregister(taskprocessor_push_local);
974 ast_test_unregister(serializer_pool);
975 return 0;
976}

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "taskprocessor test module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, .support_level = AST_MODULE_SUPPORT_CORE, }
static

Definition at line 990 of file test_taskprocessor.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 990 of file test_taskprocessor.c.

◆ load_task_results

struct load_task_data load_task_results
static

Referenced by AST_TEST_DEFINE(), and load_task().

◆ test_callbacks

const struct ast_taskprocessor_listener_callbacks test_callbacks
static

Definition at line 495 of file test_taskprocessor.c.

Referenced by AST_TEST_DEFINE().