Asterisk - The Open Source Telephony Project GIT-master-d856a3e
test_taskprocessor.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2012-2013, Digium, Inc.
5 *
6 * Mark Michelson <mmichelson@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief taskprocessor unit tests
22 *
23 * \author Mark Michelson <mmichelson@digium.com>
24 *
25 */
26
27/*** MODULEINFO
28 <depend>TEST_FRAMEWORK</depend>
29 <support_level>core</support_level>
30 ***/
31
32#include "asterisk.h"
33
34#include "asterisk/test.h"
36#include "asterisk/module.h"
37#include "asterisk/astobj2.h"
38#include "asterisk/serializer.h"
39#include "asterisk/threadpool.h"
40
41/*!
42 * \brief userdata associated with baseline taskprocessor test
43 */
44struct task_data {
45 /* Condition used to signal to queuing thread that task was executed */
47 /* Lock protecting the condition */
49 /*! Boolean indicating that the task was run */
51 /*! Milliseconds to wait before returning */
52 unsigned long wait_time;
53};
54
55static void task_data_dtor(void *obj)
56{
57 struct task_data *task_data = obj;
58
61}
62
63/*! \brief Create a task_data object */
64static struct task_data *task_data_create(void)
65{
66 struct task_data *task_data =
68
69 if (!task_data) {
70 return NULL;
71 }
72
77
78 return task_data;
79}
80
81/*!
82 * \brief Queued task for baseline test.
83 *
84 * The task simply sets a boolean to indicate the
85 * task has been run and then signals a condition
86 * saying it's complete
87 */
88static int task(void *data)
89{
90 struct task_data *task_data = data;
91
93 if (task_data->wait_time > 0) {
94 usleep(task_data->wait_time * 1000);
95 }
98 return 0;
99}
100
101/*!
102 * \brief Wait for a task to execute.
103 */
104static int task_wait(struct task_data *task_data)
105{
106 struct timeval start = ast_tvnow();
107 struct timespec end;
109
110 end.tv_sec = start.tv_sec + 30;
111 end.tv_nsec = start.tv_usec * 1000;
112
113 while (!task_data->task_complete) {
114 int res;
116 &end);
117 if (res == ETIMEDOUT) {
118 return -1;
119 }
120 }
121
122 return 0;
123}
124
125/*!
126 * \brief Baseline test for default taskprocessor
127 *
128 * This test ensures that when a task is added to a taskprocessor that
129 * has been allocated with a default listener that the task gets executed
130 * as expected
131 */
132AST_TEST_DEFINE(default_taskprocessor)
133{
136 int res;
137
138 switch (cmd) {
139 case TEST_INIT:
140 info->name = "default_taskprocessor";
141 info->category = "/main/taskprocessor/";
142 info->summary = "Test of default taskprocessor";
143 info->description =
144 "Ensures that a queued task gets executed.";
145 return AST_TEST_NOT_RUN;
146 case TEST_EXECUTE:
147 break;
148 }
149
151
152 if (!tps) {
153 ast_test_status_update(test, "Unable to create test taskprocessor\n");
154 return AST_TEST_FAIL;
155 }
156
158 if (!task_data) {
159 ast_test_status_update(test, "Unable to create task_data\n");
160 return AST_TEST_FAIL;
161 }
162
164 ast_test_status_update(test, "Failed to queue task\n");
165 return AST_TEST_FAIL;
166 }
167
168 res = task_wait(task_data);
169 if (res != 0) {
170 ast_test_status_update(test, "Queued task did not execute!\n");
171 return AST_TEST_FAIL;
172 }
173
174 return AST_TEST_PASS;
175}
176
177/*!
178 * \brief Baseline test for subsystem alert
179 */
181{
183#define TEST_DATA_ARRAY_SIZE 10
184#define LOW_WATER_MARK 3
185#define HIGH_WATER_MARK 6
186 struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
187 int res = 0;
188 int i;
189 long queue_count;
190 unsigned int alert_level;
191 unsigned int subsystem_alert_level;
192
193 switch (cmd) {
194 case TEST_INIT:
195 info->name = "subsystem_alert";
196 info->category = "/main/taskprocessor/";
197 info->summary = "Test of subsystem alerts";
198 info->description =
199 "Ensures alerts are generated properly.";
200 return AST_TEST_NOT_RUN;
201 case TEST_EXECUTE:
202 break;
203 }
204
205 tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
206
207 if (!tps) {
208 ast_test_status_update(test, "Unable to create test taskprocessor\n");
209 return AST_TEST_FAIL;
210 }
211
214
215 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
217 if (!task_data[i]) {
218 ast_test_status_update(test, "Unable to create task_data\n");
219 res = -1;
220 goto data_cleanup;
221 }
222 task_data[i]->wait_time = 500;
223
224 ast_test_status_update(test, "Pushing task %d\n", i);
225 if (ast_taskprocessor_push(tps, task, task_data[i])) {
226 ast_test_status_update(test, "Failed to queue task\n");
227 res = -1;
228 goto data_cleanup;
229 }
230
231 queue_count = ast_taskprocessor_size(tps);
232 alert_level = ast_taskprocessor_alert_get();
233 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
234
235 if (queue_count == HIGH_WATER_MARK) {
236 if (subsystem_alert_level) {
237 ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
238 }
239 if (alert_level) {
240 ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
241 }
242 } else if (queue_count < HIGH_WATER_MARK) {
243 if (subsystem_alert_level > 0) {
244 ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
245 res = -1;
246 }
247 if (alert_level > 0) {
248 ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
249 res = -1;
250 }
251 } else {
252 if (subsystem_alert_level == 0) {
253 ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
254 res = -1;
255 }
256 if (alert_level == 0) {
257 ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
258 res = -1;
259 }
260 }
261 }
262
264
265 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
266 ast_test_status_update(test, "Waiting on task %d\n", i);
267 if (task_wait(task_data[i])) {
268 ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
269 res = -1;
270 goto data_cleanup;
271 }
272
273 queue_count = ast_taskprocessor_size(tps);
274 alert_level = ast_taskprocessor_alert_get();
275 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
276
277 if (queue_count == LOW_WATER_MARK) {
278 if (!subsystem_alert_level) {
279 ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
280 }
281 if (!alert_level) {
282 ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
283 }
284 } else if (queue_count > LOW_WATER_MARK) {
285 if (subsystem_alert_level == 0) {
286 ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
287 res = -1;
288 }
289 if (alert_level == 0) {
290 ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
291 res = -1;
292 }
293 } else {
294 if (subsystem_alert_level > 0) {
295 ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
296 res = -1;
297 }
298 if (alert_level > 0) {
299 ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
300 res = -1;
301 }
302 }
303
304 }
305
307 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
309 }
310
311 return res ? AST_TEST_FAIL : AST_TEST_PASS;
312}
313
314#define NUM_TASKS 20000
315
316/*!
317 * \brief Relevant data associated with taskprocessor load test
318 */
319static struct load_task_data {
320 /*! Condition used to indicate a task has completed executing */
322 /*! Lock used to protect the condition */
324 /*! Counter of the number of completed tasks */
326 /*! Storage for task-specific data */
329
330/*!
331 * \brief a queued task to be used in the taskprocessor load test
332 *
333 * The task increments the number of tasks executed and puts the passed-in
334 * data into the next slot in the array of random data.
335 */
336static int load_task(void *data)
337{
338 int *randdata = data;
342 return 0;
343}
344
345/*!
346 * \brief Load test for taskprocessor with default listener
347 *
348 * This test queues a large number of tasks, each with random data associated.
349 * The test ensures that all of the tasks are run and that the tasks are executed
350 * in the same order that they were queued
351 */
352AST_TEST_DEFINE(default_taskprocessor_load)
353{
354 struct ast_taskprocessor *tps;
355 struct timeval start;
356 struct timespec ts;
358 int timedwait_res;
359 int i;
360 int rand_data[NUM_TASKS];
361
362 switch (cmd) {
363 case TEST_INIT:
364 info->name = "default_taskprocessor_load";
365 info->category = "/main/taskprocessor/";
366 info->summary = "Load test of default taskprocessor";
367 info->description =
368 "Ensure that a large number of queued tasks are executed in the proper order.";
369 return AST_TEST_NOT_RUN;
370 case TEST_EXECUTE:
371 break;
372 }
373
375
376 if (!tps) {
377 ast_test_status_update(test, "Unable to create test taskprocessor\n");
378 return AST_TEST_FAIL;
379 }
380
381 start = ast_tvnow();
382
383 ts.tv_sec = start.tv_sec + 60;
384 ts.tv_nsec = start.tv_usec * 1000;
385
389
390 for (i = 0; i < NUM_TASKS; ++i) {
391 rand_data[i] = ast_random();
392 if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
393 ast_test_status_update(test, "Failed to queue task\n");
394 res = AST_TEST_FAIL;
395 goto test_end;
396 }
397 }
398
402 if (timedwait_res == ETIMEDOUT) {
403 break;
404 }
405 }
407
409 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
411 res = AST_TEST_FAIL;
412 goto test_end;
413 }
414
415 for (i = 0; i < NUM_TASKS; ++i) {
416 if (rand_data[i] != load_task_results.task_rand[i]) {
417 ast_test_status_update(test, "Queued tasks did not execute in order\n");
418 res = AST_TEST_FAIL;
419 goto test_end;
420 }
421 }
422
423test_end:
427 return res;
428}
429
430/*!
431 * \brief Private data for the test taskprocessor listener
432 */
434 /* Counter of number of tasks pushed to the queue */
436 /* Counter of number of times the queue was emptied */
438 /* Counter of number of times that a pushed task occurred on an empty queue */
440 /* Boolean indicating whether the shutdown callback was called */
442};
443
444/*!
445 * \brief test taskprocessor listener's alloc callback
446 */
447static void *test_listener_pvt_alloc(void)
448{
449 struct test_listener_pvt *pvt;
450
451 pvt = ast_calloc(1, sizeof(*pvt));
452 return pvt;
453}
454
455/*!
456 * \brief test taskprocessor listener's start callback
457 */
459{
460 return 0;
461}
462
463/*!
464 * \brief test taskprocessor listener's task_pushed callback
465 *
466 * Adjusts private data's stats as indicated by the parameters.
467 */
468static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
469{
471 ++pvt->num_pushed;
472 if (was_empty) {
473 ++pvt->num_was_empty;
474 }
475}
476
477/*!
478 * \brief test taskprocessor listener's emptied callback.
479 */
481{
483 ++pvt->num_emptied;
484}
485
486/*!
487 * \brief test taskprocessor listener's shutdown callback.
488 */
490{
492 pvt->shutdown = 1;
493}
494
496 .start = test_start,
497 .task_pushed = test_task_pushed,
498 .emptied = test_emptied,
499 .shutdown = test_shutdown,
500};
501
502/*!
503 * \brief Queued task for taskprocessor listener test.
504 *
505 * Does nothing.
506 */
507static int listener_test_task(void *ignore)
508{
509 return 0;
510}
511
512/*!
513 * \brief helper to ensure that statistics the listener is keeping are what we expect
514 *
515 * \param test The currently-running test
516 * \param pvt The private data for the taskprocessor listener
517 * \param num_pushed The expected current number of tasks pushed to the processor
518 * \param num_emptied The expected current number of times the taskprocessor has become empty
519 * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
520 * \retval -1 Stats were not as expected
521 * \retval 0 Stats were as expected
522 */
523static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
524{
525 if (pvt->num_pushed != num_pushed) {
526 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
527 num_pushed, pvt->num_pushed);
528 return -1;
529 }
530
531 if (pvt->num_emptied != num_emptied) {
532 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
533 num_emptied, pvt->num_emptied);
534 return -1;
535 }
536
537 if (pvt->num_was_empty != num_was_empty) {
538 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
539 num_was_empty, pvt->num_emptied);
540 return -1;
541 }
542
543 return 0;
544}
545
546/*!
547 * \brief Test for a taskprocessor with custom listener.
548 *
549 * This test pushes tasks to a taskprocessor with a custom listener, executes the tasks,
550 * and destroys the taskprocessor.
551 *
552 * The test ensures that the listener's callbacks are called when expected and that the data
553 * being passed in is accurate.
554 */
555AST_TEST_DEFINE(taskprocessor_listener)
556{
557 struct ast_taskprocessor *tps = NULL;
559 struct test_listener_pvt *pvt = NULL;
561
562 switch (cmd) {
563 case TEST_INIT:
564 info->name = "taskprocessor_listener";
565 info->category = "/main/taskprocessor/";
566 info->summary = "Test of taskprocessor listeners";
567 info->description =
568 "Ensures that listener callbacks are called when expected.";
569 return AST_TEST_NOT_RUN;
570 case TEST_EXECUTE:
571 break;
572 }
573
575 if (!pvt) {
576 ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
577 return AST_TEST_FAIL;
578 }
579
581 if (!listener) {
582 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
583 res = AST_TEST_FAIL;
584 goto test_exit;
585 }
586
587 tps = ast_taskprocessor_create_with_listener("test_listener", listener);
588 if (!tps) {
589 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
590 res = AST_TEST_FAIL;
591 goto test_exit;
592 }
593
595 ast_test_status_update(test, "Failed to queue task\n");
596 res = AST_TEST_FAIL;
597 goto test_exit;
598 }
599
600 if (check_stats(test, pvt, 1, 0, 1) < 0) {
601 res = AST_TEST_FAIL;
602 goto test_exit;
603 }
604
606 ast_test_status_update(test, "Failed to queue task\n");
607 res = AST_TEST_FAIL;
608 goto test_exit;
609 }
610
611 if (check_stats(test, pvt, 2, 0, 1) < 0) {
612 res = AST_TEST_FAIL;
613 goto test_exit;
614 }
615
617
618 if (check_stats(test, pvt, 2, 0, 1) < 0) {
619 res = AST_TEST_FAIL;
620 goto test_exit;
621 }
622
624
625 if (check_stats(test, pvt, 2, 1, 1) < 0) {
626 res = AST_TEST_FAIL;
627 goto test_exit;
628 }
629
631
632 if (!pvt->shutdown) {
633 res = AST_TEST_FAIL;
634 goto test_exit;
635 }
636
637test_exit:
639 /* This is safe even if tps is NULL */
641 ast_free(pvt);
642 return res;
643}
644
652};
653
654static void shutdown_data_dtor(void *data)
655{
656 struct shutdown_data *shutdown_data = data;
660}
661
662static struct shutdown_data *shutdown_data_create(int dont_wait)
663{
665
667 if (!shutdown_data) {
668 return NULL;
669 }
670
674 shutdown_data->task_stop_waiting = dont_wait;
676 return shutdown_data;
677}
678
679static int shutdown_task_exec(void *data)
680{
681 struct shutdown_data *shutdown_data = data;
687 }
690 return 0;
691}
692
694{
695 struct timeval start = ast_tvnow();
696 struct timespec end = {
697 .tv_sec = start.tv_sec + 5,
698 .tv_nsec = start.tv_usec * 1000
699 };
701
702 while (!shutdown_data->task_complete) {
703 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
704 break;
705 }
706 }
707
709}
710
712{
715}
716
718{
719 struct timeval start = ast_tvnow();
720 struct timespec end = {
721 .tv_sec = start.tv_sec + 5,
722 .tv_nsec = start.tv_usec * 1000
723 };
725
726 while (!shutdown_data->task_started) {
727 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
728 break;
729 }
730 }
731
733}
734
736{
740}
741
742static void *tps_shutdown_thread(void *data)
743{
744 struct ast_taskprocessor *tps = data;
746 return NULL;
747}
748
749AST_TEST_DEFINE(taskprocessor_shutdown)
750{
752 RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
753 RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
754 int push_res;
755 int wait_res;
756 int pthread_res;
757 pthread_t shutdown_thread;
758
759 switch (cmd) {
760 case TEST_INIT:
761 info->name = "taskprocessor_shutdown";
762 info->category = "/main/taskprocessor/";
763 info->summary = "Test of taskprocessor shutdown sequence";
764 info->description =
765 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
766 return AST_TEST_NOT_RUN;
767 case TEST_EXECUTE:
768 break;
769 }
770
771 tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
772 task1 = shutdown_data_create(0); /* task1 waits to be poked */
773 task2 = shutdown_data_create(1); /* task2 waits for nothing */
774
775 if (!tps || !task1 || !task2) {
776 ast_test_status_update(test, "Allocation error\n");
777 return AST_TEST_FAIL;
778 }
779
780 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
781 if (push_res != 0) {
782 ast_test_status_update(test, "Could not push task1\n");
783 return AST_TEST_FAIL;
784 }
785
786 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
787 if (push_res != 0) {
788 ast_test_status_update(test, "Could not push task2\n");
789 return AST_TEST_FAIL;
790 }
791
792 wait_res = shutdown_waitfor_start(task1);
793 if (!wait_res) {
794 ast_test_status_update(test, "Task1 didn't start\n");
795 return AST_TEST_FAIL;
796 }
797
798 pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
799 if (pthread_res != 0) {
800 ast_test_status_update(test, "Failed to create shutdown thread\n");
801 return AST_TEST_FAIL;
802 }
803 tps = NULL;
804
805 /* Wakeup task1; it should complete */
806 shutdown_poke(task1);
807 wait_res = shutdown_waitfor_completion(task1);
808 if (!wait_res) {
809 ast_test_status_update(test, "Task1 didn't complete\n");
810 return AST_TEST_FAIL;
811 }
812
813 /* Wait for shutdown to complete */
814 pthread_join(shutdown_thread, NULL);
815
816 /* Should have also completed task2 */
817 wait_res = shutdown_has_completed(task2);
818 if (!wait_res) {
819 ast_test_status_update(test, "Task2 didn't finish\n");
820 return AST_TEST_FAIL;
821 }
822
823 return AST_TEST_PASS;
824}
825
827{
828 int *local_data = local->local_data;
829 struct task_data *task_data = local->data;
830
831 *local_data = 1;
833
834 return 0;
835}
836
837AST_TEST_DEFINE(taskprocessor_push_local)
838{
839 RAII_VAR(struct ast_taskprocessor *, tps, NULL,
842 int local_data;
843 int res;
844
845 switch (cmd) {
846 case TEST_INIT:
847 info->name = __func__;
848 info->category = "/main/taskprocessor/";
849 info->summary = "Test of pushing local data";
850 info->description =
851 "Ensures that local data is passed along.";
852 return AST_TEST_NOT_RUN;
853 case TEST_EXECUTE:
854 break;
855 }
856
857
859 if (!tps) {
860 ast_test_status_update(test, "Unable to create test taskprocessor\n");
861 return AST_TEST_FAIL;
862 }
863
864
866 if (!task_data) {
867 ast_test_status_update(test, "Unable to create task_data\n");
868 return AST_TEST_FAIL;
869 }
870
871 local_data = 0;
872 ast_taskprocessor_set_local(tps, &local_data);
873
875 ast_test_status_update(test, "Failed to queue task\n");
876 return AST_TEST_FAIL;
877 }
878
879 res = task_wait(task_data);
880 if (res != 0) {
881 ast_test_status_update(test, "Queued task did not execute!\n");
882 return AST_TEST_FAIL;
883 }
884
885 if (local_data != 1) {
887 "Queued task did not set local_data!\n");
888 return AST_TEST_FAIL;
889 }
890
891 return AST_TEST_PASS;
892}
893
894/*!
895 * \brief Baseline test for a serializer pool
896 *
897 * This test ensures that when a task is added to a taskprocessor that
898 * has been allocated with a default listener that the task gets executed
899 * as expected
900 */
901AST_TEST_DEFINE(serializer_pool)
902{
908 .idle_timeout = 0,
909 .auto_increment = 0,
910 .initial_size = 1,
911 .max_size = 0,
912 };
913 /* struct ast_taskprocessor *tps; */
914
915 switch (cmd) {
916 case TEST_INIT:
917 info->name = "serializer_pool";
918 info->category = "/main/taskprocessor/";
919 info->summary = "Test using a serializer pool";
920 info->description =
921 "Ensures that a queued task gets executed.";
922 return AST_TEST_NOT_RUN;
923 case TEST_EXECUTE:
924 break;
925 }
926
927 ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
928 ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
929 "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
930 ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
931 ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
932 ast_test_validate(test, task_data = task_data_create());
933
934 task_data->wait_time = 4000; /* task takes 4 seconds */
935 ast_test_validate(test, !ast_taskprocessor_push(
936 ast_serializer_pool_get(serializer_pool), task, task_data));
937
938 if (!ast_serializer_pool_destroy(serializer_pool)) {
939 ast_test_status_update(test, "Unexpected pool destruction!\n");
940 /*
941 * The pool should have timed out, so if it destruction reports success
942 * we need to fail.
943 */
944 serializer_pool = NULL;
945 return AST_TEST_FAIL;
946 }
947
948 ast_test_validate(test, !task_wait(task_data));
949
950 /* The first attempt should have failed. Second try should destroy successfully */
951 if (ast_serializer_pool_destroy(serializer_pool)) {
952 ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
953 /*
954 * If this fails we'll try again on return to hopefully avoid a memory leak.
955 * If it again times out a third time, well not much we can do.
956 */
957 return AST_TEST_FAIL;
958 }
959
960 /* Test passed, so set pool to NULL to avoid "re-running" destroy */
961 serializer_pool = NULL;
962
963 return AST_TEST_PASS;
964}
965
966static int unload_module(void)
967{
968 ast_test_unregister(default_taskprocessor);
969 ast_test_unregister(default_taskprocessor_load);
970 ast_test_unregister(subsystem_alert);
971 ast_test_unregister(taskprocessor_listener);
972 ast_test_unregister(taskprocessor_shutdown);
973 ast_test_unregister(taskprocessor_push_local);
974 ast_test_unregister(serializer_pool);
975 return 0;
976}
977
978static int load_module(void)
979{
980 ast_test_register(default_taskprocessor);
981 ast_test_register(default_taskprocessor_load);
982 ast_test_register(subsystem_alert);
983 ast_test_register(taskprocessor_listener);
984 ast_test_register(taskprocessor_shutdown);
985 ast_test_register(taskprocessor_push_local);
986 ast_test_register(serializer_pool);
988}
989
990AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");
ast_mutex_t lock
Definition: app_sla.c:331
static void * listener(void *unused)
Definition: asterisk.c:1519
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
char * end
Definition: eagi_proxy.c:73
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
#define ast_cond_signal(cond)
Definition: lock.h:203
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
def ignore(key=None, val=None, section=None, pjsip=None, nmapped=None, type='endpoint')
Definition: sip_to_pjsip.py:48
def info(msg)
#define NULL
Definition: resample.c:96
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
Structure for mutex and tracking information.
Definition: lock.h:135
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:92
A listener for taskprocessors.
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
An opaque threadpool structure.
Definition: threadpool.c:36
Relevant data associated with taskprocessor load test.
int task_rand[NUM_TASKS]
userdata associated with baseline taskprocessor test
ast_cond_t cond
unsigned long wait_time
ast_mutex_t lock
Private data for the test taskprocessor listener.
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
Test Framework API.
@ TEST_INIT
Definition: test.h:200
@ TEST_EXECUTE
Definition: test.h:201
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
ast_test_result_state
Definition: test.h:193
@ AST_TEST_PASS
Definition: test.h:195
@ AST_TEST_FAIL
Definition: test.h:196
@ AST_TEST_NOT_RUN
Definition: test.h:194
static struct test_options options
static void data_cleanup(void *data)
AST_TEST_DEFINE(default_taskprocessor)
Baseline test for default taskprocessor.
static struct task_data * task_data_create(void)
Create a task_data object.
static void shutdown_data_dtor(void *data)
static int task(void *data)
Queued task for baseline test.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static struct shutdown_data * shutdown_data_create(int dont_wait)
static void * tps_shutdown_thread(void *data)
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
#define HIGH_WATER_MARK
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
#define TEST_DATA_ARRAY_SIZE
#define NUM_TASKS
static void shutdown_poke(struct shutdown_data *shutdown_data)
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static int shutdown_task_exec(void *data)
static int load_module(void)
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
static void task_data_dtor(void *obj)
static int local_task_exe(struct ast_taskprocessor_local *local)
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
static int unload_module(void)
static struct load_task_data load_task_results
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
#define LOW_WATER_MARK
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584
long int ast_random(void)
Definition: utils.c:2312