Asterisk - The Open Source Telephony Project GIT-master-4f2b068
Loading...
Searching...
No Matches
test_taskpool.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2025, Sangoma Technologies Inc
5 *
6 * Joshua Colp <jcolp@sangoma.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief taskpool unit tests
22 *
23 * \author Joshua Colp <jcolp@sangoma.com>
24 *
25 */
26
27/*** MODULEINFO
28 <depend>TEST_FRAMEWORK</depend>
29 <support_level>core</support_level>
30 ***/
31
32#include "asterisk.h"
33
34#include "asterisk/astobj2.h"
35#include "asterisk/lock.h"
36#include "asterisk/logger.h"
37#include "asterisk/module.h"
39#include "asterisk/test.h"
40#include "asterisk/taskpool.h"
41#include "asterisk/cli.h"
42
43struct test_data {
49};
50
51static struct test_data *test_alloc(void)
52{
53 struct test_data *td = ast_calloc(1, sizeof(*td));
54 if (!td) {
55 return NULL;
56 }
57 ast_mutex_init(&td->lock);
58 ast_cond_init(&td->cond, NULL);
59 return td;
60}
61
62static void test_destroy(struct test_data *td)
63{
66 ast_free(td);
67}
68
69static int simple_task(void *data)
70{
71 struct test_data *td = data;
72 SCOPED_MUTEX(lock, &td->lock);
74 td->executed = 1;
76 return 0;
77}
78
79AST_TEST_DEFINE(taskpool_push)
80{
81 struct ast_taskpool *pool = NULL;
82 struct test_data *td = NULL;
85 .idle_timeout = 0,
86 .auto_increment = 0,
87 .minimum_size = 1,
88 .initial_size = 1,
89 .max_size = 1,
90 };
92 struct timeval start;
93 struct timespec end;
94
95 switch (cmd) {
96 case TEST_INIT:
97 info->name = "push";
98 info->category = "/main/taskpool/";
99 info->summary = "Taskpool pushing test";
100 info->description =
101 "Pushes a single task into a taskpool asynchronously and ensures it is executed.";
102 return AST_TEST_NOT_RUN;
103 case TEST_EXECUTE:
104 break;
105 }
106 td = test_alloc();
107 if (!td) {
108 return AST_TEST_FAIL;
109 }
110
111 pool = ast_taskpool_create(info->name, &options);
112 if (!pool) {
113 goto end;
114 }
115
116 if (ast_taskpool_push(pool, simple_task, td)) {
117 goto end;
118 }
119
120 /* It should not take more than 5 seconds for a single simple task to execute */
121 start = ast_tvnow();
122 end.tv_sec = start.tv_sec + 5;
123 end.tv_nsec = start.tv_usec * 1000;
124
125 ast_mutex_lock(&td->lock);
126 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
127 }
129
130 if (!td->executed) {
131 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
132 res = AST_TEST_FAIL;
133 }
134
135end:
137 test_destroy(td);
138 return res;
139}
140
141AST_TEST_DEFINE(taskpool_push_synchronous)
142{
143 struct ast_taskpool *pool = NULL;
144 struct test_data *td = NULL;
147 .idle_timeout = 0,
148 .auto_increment = 0,
149 .minimum_size = 1,
150 .initial_size = 1,
151 .max_size = 1,
152 };
154
155 switch (cmd) {
156 case TEST_INIT:
157 info->name = "push_synchronous";
158 info->category = "/main/taskpool/";
159 info->summary = "Taskpool synchronous pushing test";
160 info->description =
161 "Pushes a single task into a taskpool synchronously and ensures it is executed.";
162 return AST_TEST_NOT_RUN;
163 case TEST_EXECUTE:
164 break;
165 }
166 td = test_alloc();
167 if (!td) {
168 return AST_TEST_FAIL;
169 }
170
171 pool = ast_taskpool_create(info->name, &options);
172 if (!pool) {
173 goto end;
174 }
175
176 if (ast_taskpool_push_wait(pool, simple_task, td)) {
177 goto end;
178 }
179
180 if (!td->executed) {
181 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
182 res = AST_TEST_FAIL;
183 }
184
185end:
187 test_destroy(td);
188 return res;
189}
190
191AST_TEST_DEFINE(taskpool_push_serializer)
192{
193 struct ast_taskpool *pool = NULL;
194 struct test_data *td = NULL;
197 .idle_timeout = 0,
198 .auto_increment = 0,
199 .minimum_size = 1,
200 .initial_size = 1,
201 .max_size = 1,
202 };
205 struct timeval start;
206 struct timespec end;
207
208 switch (cmd) {
209 case TEST_INIT:
210 info->name = "push_serializer";
211 info->category = "/main/taskpool/";
212 info->summary = "Taskpool serializer pushing test";
213 info->description =
214 "Pushes a single task into a taskpool serializer and ensures it is executed.";
215 return AST_TEST_NOT_RUN;
216 case TEST_EXECUTE:
217 break;
218 }
219 td = test_alloc();
220 if (!td) {
221 return AST_TEST_FAIL;
222 }
223
224 pool = ast_taskpool_create(info->name, &options);
225 if (!pool) {
226 goto end;
227 }
228
229 serializer = ast_taskpool_serializer("serializer", pool);
230 if (!serializer) {
231 goto end;
232 }
233
235 goto end;
236 }
237
238 /* It should not take more than 5 seconds for a single simple task to execute */
239 start = ast_tvnow();
240 end.tv_sec = start.tv_sec + 5;
241 end.tv_nsec = start.tv_usec * 1000;
242
243 ast_mutex_lock(&td->lock);
244 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
245 }
247
248 if (!td->executed) {
249 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
250 res = AST_TEST_FAIL;
251 }
252
253 if (td->taskprocessor != serializer) {
254 ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
255 res = AST_TEST_FAIL;
256 }
257
258end:
261 test_destroy(td);
262 return res;
263}
264
265AST_TEST_DEFINE(taskpool_push_serializer_synchronous)
266{
267 struct ast_taskpool *pool = NULL;
268 struct test_data *td = NULL;
271 .idle_timeout = 0,
272 .auto_increment = 0,
273 .minimum_size = 1,
274 .initial_size = 1,
275 .max_size = 1,
276 };
279
280 switch (cmd) {
281 case TEST_INIT:
282 info->name = "push_serializer_synchronous";
283 info->category = "/main/taskpool/";
284 info->summary = "Taskpool serializer synchronous pushing test";
285 info->description =
286 "Pushes a single task into a taskpool serializer synchronously and ensures it is executed.";
287 return AST_TEST_NOT_RUN;
288 case TEST_EXECUTE:
289 break;
290 }
291 td = test_alloc();
292 if (!td) {
293 return AST_TEST_FAIL;
294 }
295
296 pool = ast_taskpool_create(info->name, &options);
297 if (!pool) {
298 goto end;
299 }
300
301 serializer = ast_taskpool_serializer("serializer", pool);
302 if (!serializer) {
303 goto end;
304 }
305
307 goto end;
308 }
309
310 if (!td->executed) {
311 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
312 res = AST_TEST_FAIL;
313 }
314
315 if (td->taskprocessor != serializer) {
316 ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
317 res = AST_TEST_FAIL;
318 }
319
320end:
323 test_destroy(td);
324 return res;
325}
326
331
332AST_TEST_DEFINE(taskpool_push_serializer_synchronous_requeue)
333{
334 struct ast_taskpool *pool = NULL;
335 struct test_data *td = NULL;
338 .idle_timeout = 0,
339 .auto_increment = 0,
340 .minimum_size = 1,
341 .initial_size = 1,
342 .max_size = 1,
343 };
346
347 switch (cmd) {
348 case TEST_INIT:
349 info->name = "push_serializer_synchronous_requeue";
350 info->category = "/main/taskpool/";
351 info->summary = "Taskpool serializer synchronous requeueing test";
352 info->description =
353 "Pushes a single task into a taskpool serializer synchronously and ensures it is requeued and executed.";
354 return AST_TEST_NOT_RUN;
355 case TEST_EXECUTE:
356 break;
357 }
358 td = test_alloc();
359 if (!td) {
360 return AST_TEST_FAIL;
361 }
362
363 pool = ast_taskpool_create(info->name, &options);
364 if (!pool) {
365 goto end;
366 }
367
368 serializer = ast_taskpool_serializer("serializer", pool);
369 if (!serializer) {
370 goto end;
371 }
372
374 goto end;
375 }
376
377 if (!td->executed) {
378 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
379 res = AST_TEST_FAIL;
380 }
381
382 if (td->taskprocessor != serializer) {
383 ast_test_status_update(test, "Expected taskprocessor to be same as serializer but it was not\n");
384 res = AST_TEST_FAIL;
385 }
386
387end:
390 test_destroy(td);
391 return res;
392}
393
394AST_TEST_DEFINE(taskpool_push_grow)
395{
396 struct ast_taskpool *pool = NULL;
397 struct test_data *td = NULL;
400 .idle_timeout = 0,
401 .auto_increment = 1,
402 .minimum_size = 0,
403 .initial_size = 0,
404 .max_size = 1,
405 };
407 struct timeval start;
408 struct timespec end;
409
410 switch (cmd) {
411 case TEST_INIT:
412 info->name = "push_grow";
413 info->category = "/main/taskpool/";
414 info->summary = "Taskpool pushing test with auto-grow enabled";
415 info->description =
416 "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool grows.";
417 return AST_TEST_NOT_RUN;
418 case TEST_EXECUTE:
419 break;
420 }
421 td = test_alloc();
422 if (!td) {
423 return AST_TEST_FAIL;
424 }
425
426 pool = ast_taskpool_create(info->name, &options);
427 if (!pool) {
428 goto end;
429 }
430
431 if (ast_taskpool_taskprocessors_count(pool) != 0) {
432 ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
433 res = AST_TEST_FAIL;
434 goto end;
435 }
436
437 if (ast_taskpool_push(pool, simple_task, td)) {
438 goto end;
439 }
440
441 if (ast_taskpool_taskprocessors_count(pool) != 1) {
442 ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
443 res = AST_TEST_FAIL;
444 goto end;
445 }
446
447 /* It should not take more than 5 seconds for a single simple task to execute */
448 start = ast_tvnow();
449 end.tv_sec = start.tv_sec + 5;
450 end.tv_nsec = start.tv_usec * 1000;
451
452 ast_mutex_lock(&td->lock);
453 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
454 }
456
457 if (!td->executed) {
458 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
459 res = AST_TEST_FAIL;
460 }
461
462end:
464 test_destroy(td);
465 return res;
466}
467
468AST_TEST_DEFINE(taskpool_push_shrink)
469{
470 struct ast_taskpool *pool = NULL;
471 struct test_data *td = NULL;
474 .idle_timeout = 1,
475 .auto_increment = 1,
476 .minimum_size = 0,
477 .initial_size = 0,
478 .max_size = 1,
479 };
481 struct timeval start;
482 struct timespec end;
483 int iterations = 0;
484
485 switch (cmd) {
486 case TEST_INIT:
487 info->name = "push_shrink";
488 info->category = "/main/taskpool/";
489 info->summary = "Taskpool pushing test with auto-shrink enabled";
490 info->description =
491 "Pushes a single task into a taskpool asynchronously, ensures it is executed and the pool shrinks.";
492 return AST_TEST_NOT_RUN;
493 case TEST_EXECUTE:
494 break;
495 }
496 td = test_alloc();
497 if (!td) {
498 return AST_TEST_FAIL;
499 }
500
501 pool = ast_taskpool_create(info->name, &options);
502 if (!pool) {
503 goto end;
504 }
505
506 if (ast_taskpool_taskprocessors_count(pool) != 0) {
507 ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
508 res = AST_TEST_FAIL;
509 goto end;
510 }
511
512 if (ast_taskpool_push(pool, simple_task, td)) {
513 res = AST_TEST_FAIL;
514 goto end;
515 }
516
517 if (ast_taskpool_taskprocessors_count(pool) != 1) {
518 ast_test_status_update(test, "Expected taskpool to have 1 taskprocessor but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
519 res = AST_TEST_FAIL;
520 goto end;
521 }
522
523 /* We give 10 seconds for the pool to shrink back to normal, but if it happens earlier we
524 * stop our check early.
525 */
526 ast_mutex_lock(&td->lock);
527 do {
528 start = ast_tvnow();
529 end.tv_sec = start.tv_sec + 1;
530 end.tv_nsec = start.tv_usec * 1000;
531
532 if (ast_cond_timedwait(&td->cond, &td->lock, &end) == ETIMEDOUT) {
533 iterations++;
534 }
535 } while (ast_taskpool_taskprocessors_count(pool) != 0 && iterations != 10);
536
537 if (!td->executed) {
538 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
539 res = AST_TEST_FAIL;
540 }
541
542 if (ast_taskpool_taskprocessors_count(pool) != 0) {
543 ast_test_status_update(test, "Expected taskpool to have 0 taskprocessors but it has %zu\n", ast_taskpool_taskprocessors_count(pool));
544 res = AST_TEST_FAIL;
545 goto end;
546 }
547
548end:
550 test_destroy(td);
551 return res;
552}
553
554AST_TEST_DEFINE(taskpool_serializer_suspension)
555{
556 struct ast_taskpool *pool = NULL;
557 struct test_data *td = NULL;
561 .idle_timeout = 0,
562 .auto_increment = 0,
563 .minimum_size = 1,
564 .initial_size = 1,
565 .max_size = 1,
566 };
568 struct timeval start;
569 struct timespec end;
570
571 switch (cmd) {
572 case TEST_INIT:
573 info->name = "serializer_suspension";
574 info->category = "/main/taskpool/";
575 info->summary = "Taskpool serializer suspension test";
576 info->description =
577 "Pushes a single task into a taskpool serializer asynchronously while suspended, and ensures it only executes after unsuspension.";
578 return AST_TEST_NOT_RUN;
579 case TEST_EXECUTE:
580 break;
581 }
582 td = test_alloc();
583 if (!td) {
584 return AST_TEST_FAIL;
585 }
586
587 pool = ast_taskpool_create(info->name, &options);
588 if (!pool) {
589 goto end;
590 }
591
592 serializer = ast_taskpool_serializer("serializer", pool);
593 if (!serializer) {
594 goto end;
595 }
596
598
600 goto end;
601 }
602
603 /* Give 5 seconds to ensure the task isn't executed */
604 start = ast_tvnow();
605 end.tv_sec = start.tv_sec + 5;
606 end.tv_nsec = start.tv_usec * 1000;
607
608 ast_mutex_lock(&td->lock);
609 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
610 }
612
613 if (td->executed) {
614 ast_test_status_update(test, "Expected simple task to not be executed but it was\n");
615 res = AST_TEST_FAIL;
616 }
617
619
620 /* Give 5 seconds to ensure the task is executed */
621 start = ast_tvnow();
622 end.tv_sec = start.tv_sec + 5;
623 end.tv_nsec = start.tv_usec * 1000;
624
625 ast_mutex_lock(&td->lock);
626 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
627 }
629
630 if (!td->executed) {
631 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
632 res = AST_TEST_FAIL;
633 }
634
635end:
638 test_destroy(td);
639 return res;
640}
641
642AST_TEST_DEFINE(taskpool_serializer_multiple_suspension)
643{
644 struct ast_taskpool *pool = NULL;
645 struct test_data *td = NULL;
649 .idle_timeout = 0,
650 .auto_increment = 0,
651 .minimum_size = 1,
652 .initial_size = 1,
653 .max_size = 1,
654 };
656 struct timeval start;
657 struct timespec end;
658
659 switch (cmd) {
660 case TEST_INIT:
661 info->name = "serializer_multiple_suspension";
662 info->category = "/main/taskpool/";
663 info->summary = "Taskpool serializer multiple suspension test";
664 info->description =
665 "Pushes a single task into a taskpool serializer asynchronously while suspended multiple times, and ensures it only executes after unsuspension.";
666 return AST_TEST_NOT_RUN;
667 case TEST_EXECUTE:
668 break;
669 }
670 td = test_alloc();
671 if (!td) {
672 return AST_TEST_FAIL;
673 }
674
675 pool = ast_taskpool_create(info->name, &options);
676 if (!pool) {
677 goto end;
678 }
679
680 serializer = ast_taskpool_serializer("serializer", pool);
681 if (!serializer) {
682 goto end;
683 }
684
687
689 goto end;
690 }
691
692 /* Give 5 seconds to ensure the task isn't executed */
693 start = ast_tvnow();
694 end.tv_sec = start.tv_sec + 5;
695 end.tv_nsec = start.tv_usec * 1000;
696
697 ast_mutex_lock(&td->lock);
698 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
699 }
701
702 if (td->executed) {
703 ast_test_status_update(test, "Expected simple task to not be executed but it was\n");
704 res = AST_TEST_FAIL;
705 }
706
709
710 /* Give 5 seconds to ensure the task is executed */
711 start = ast_tvnow();
712 end.tv_sec = start.tv_sec + 5;
713 end.tv_nsec = start.tv_usec * 1000;
714
715 ast_mutex_lock(&td->lock);
716 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
717 }
719
720 if (!td->executed) {
721 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
722 res = AST_TEST_FAIL;
723 }
724
725end:
728 test_destroy(td);
729 return res;
730}
731
732static int cascade_task(void *data)
733{
734 struct test_data *td = data;
735
737 return -1;
738 }
739
740 return 0;
741}
742
743AST_TEST_DEFINE(taskpool_serializer_push_wait_while_suspended_from_other_serializer)
744{
745 struct ast_taskpool *pool = NULL;
746 struct test_data *td = NULL;
747 struct ast_taskprocessor *suspended_serializer = NULL;
748 struct ast_taskprocessor *push_serializer = NULL;
751 .idle_timeout = 0,
752 .auto_increment = 0,
753 .minimum_size = 1,
754 .initial_size = 1,
755 .max_size = 1,
756 };
758 struct timeval start;
759 struct timespec end;
760
761 switch (cmd) {
762 case TEST_INIT:
763 info->name = "serializer_push_wait_while_suspended_from_other_serializer";
764 info->category = "/main/taskpool/";
765 info->summary = "Taskpool serializer push wait while suspended from other serializer test";
766 info->description =
767 "Pushes a single task into a taskpool serializer synchronously from another serializer while suspended, and ensures it only executes after unsuspension.";
768 return AST_TEST_NOT_RUN;
769 case TEST_EXECUTE:
770 break;
771 }
772 td = test_alloc();
773 if (!td) {
774 return AST_TEST_FAIL;
775 }
776
777 pool = ast_taskpool_create(info->name, &options);
778 if (!pool) {
779 goto end;
780 }
781
782 suspended_serializer = ast_taskpool_serializer("suspended_serializer", pool);
783 if (!suspended_serializer) {
784 goto end;
785 }
786
787 td->test_specific_data = suspended_serializer;
788
789 push_serializer = ast_taskpool_serializer("push_serializer", pool);
790 if (!push_serializer) {
791 goto end;
792 }
793
794 ast_taskpool_serializer_suspend(suspended_serializer);
795
796 /* We push a task to the unsuspended serializer which will push a task to the suspended serializer
797 * in a synchronous fashion. That task can not execute until the suspended serializer becomes
798 * unsuspended.
799 */
800 if (ast_taskprocessor_push(push_serializer, cascade_task, td)) {
801 goto end;
802 }
803
804 /* Give 5 seconds to ensure the task isn't executed */
805 start = ast_tvnow();
806 end.tv_sec = start.tv_sec + 5;
807 end.tv_nsec = start.tv_usec * 1000;
808
809 ast_mutex_lock(&td->lock);
810 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
811 }
813
814 if (td->executed) {
815 ast_test_status_update(test, "Expected simple task to not be executed but it was\n");
816 res = AST_TEST_FAIL;
817 }
818
819 ast_taskpool_serializer_unsuspend(suspended_serializer);
820
821 /* Give 5 seconds to ensure the task is executed */
822 start = ast_tvnow();
823 end.tv_sec = start.tv_sec + 5;
824 end.tv_nsec = start.tv_usec * 1000;
825
826 ast_mutex_lock(&td->lock);
827 while (!td->executed && ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
828 }
830
831 if (!td->executed) {
832 ast_test_status_update(test, "Expected simple task to be executed but it was not\n");
833 res = AST_TEST_FAIL;
834 }
835
836end:
837 ast_taskprocessor_unreference(suspended_serializer);
838 ast_taskprocessor_unreference(push_serializer);
840 test_destroy(td);
841 return res;
842}
843
849
850static int efficiency_task(void *data)
851{
852 struct efficiency_task_data *etd = data;
853
854 if (etd->shutdown) {
855 ao2_ref(etd->pool, -1);
856 return 0;
857 }
858
860
861 if (ast_taskpool_push(etd->pool, efficiency_task, etd)) {
862 ao2_ref(etd->pool, -1);
863 return -1;
864 }
865
866 return 0;
867}
868
869static char *handle_cli_taskpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
870{
871 struct ast_taskpool *pool = NULL;
872 struct test_data *td = NULL;
875 .idle_timeout = 0,
876 .auto_increment = 0,
877 .minimum_size = 5,
878 .initial_size = 5,
879 .max_size = 5,
880 };
881 struct efficiency_task_data etd = {
882 .pool = NULL,
883 .num_tasks_executed = 0,
884 .shutdown = 0,
885 };
886 struct timeval start;
887 struct timespec end;
888 int i;
889
890 switch (cmd) {
891 case CLI_INIT:
892 e->command = "taskpool push efficiency";
893 e->usage =
894 "Usage: taskpool push efficiency\n"
895 " Pushes 200 tasks to a taskpool and measures\n"
896 " the number of tasks executed within 30 seconds.\n";
897 return NULL;
898 case CLI_GENERATE:
899 return NULL;
900 }
901
902 td = test_alloc();
903 if (!td) {
904 return CLI_SUCCESS;
905 }
906
907 pool = ast_taskpool_create("taskpool_push_efficiency", &options);
908 if (!pool) {
909 goto end;
910 }
911
912 etd.pool = pool;
913
914 /* Push in 200 tasks, cause why not */
915 for (i = 0; i < 200; i++) {
916 /* Ensure that the task has a reference to the pool */
917 ao2_bump(pool);
918 if (ast_taskpool_push(pool, efficiency_task, &etd)) {
919 goto end;
920 }
921 }
922
923 /* Wait for 30 seconds */
924 start = ast_tvnow();
925 end.tv_sec = start.tv_sec + 30;
926 end.tv_nsec = start.tv_usec * 1000;
927
928 ast_mutex_lock(&td->lock);
929 while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
930 }
932
933 /* Give the total tasks executed, and tell each task to not requeue */
934 ast_cli(a->fd, "Total tasks executed in 30 seconds: %d (%d per second)\n", etd.num_tasks_executed, etd.num_tasks_executed / 30);
935
936end:
937 etd.shutdown = 1;
939 test_destroy(td);
940 return CLI_SUCCESS;
941}
942
948
949static int serializer_efficiency_task(void *data)
950{
951 struct serializer_efficiency_task_data *etd = data;
952 struct ast_taskprocessor *taskprocessor = etd->serializer[0];
953
954 if (*etd->shutdown) {
955 return 0;
956 }
957
959
960 /* We ping pong a task between a pair of taskprocessors to ensure that
961 * a single taskprocessor does not receive a thread from the threadpool
962 * exclusively.
963 */
964 if (taskprocessor == ast_taskpool_serializer_get_current()) {
965 taskprocessor = etd->serializer[1];
966 }
967
968 if (ast_taskprocessor_push(taskprocessor,
970 return -1;
971 }
972
973 return 0;
974}
975
977{
978 struct ast_taskpool *pool = NULL;
979 struct test_data *td = NULL;
982 .idle_timeout = 0,
983 .auto_increment = 0,
984 .minimum_size = 5,
985 .initial_size = 5,
986 .max_size = 5,
987 };
988 struct serializer_efficiency_task_data etd[200];
989 struct timeval start;
990 struct timespec end;
991 int i;
992 int num_tasks_executed = 0;
993 int shutdown = 0;
994
995 switch (cmd) {
996 case CLI_INIT:
997 e->command = "taskpool push serializer efficiency";
998 e->usage =
999 "Usage: taskpool push serializer efficiency\n"
1000 " Pushes 200 tasks to a taskpool in serializers and measures\n"
1001 " the number of tasks executed within 30 seconds.\n";
1002 return NULL;
1003 case CLI_GENERATE:
1004 return NULL;
1005 }
1006 td = test_alloc();
1007 if (!td) {
1008 return CLI_SUCCESS;
1009 }
1010
1011 memset(&etd, 0, sizeof(etd));
1012
1013 pool = ast_taskpool_create("taskpool_push_serializer_efficiency", &options);
1014 if (!pool) {
1015 goto end;
1016 }
1017
1018 /* We create 400 (200 pairs) of serializers */
1019 for (i = 0; i < 200; i++) {
1020 char serializer_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1021
1022 ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
1023 etd[i].serializer[0] = ast_taskpool_serializer(serializer_name, pool);
1024 if (!etd[i].serializer[0]) {
1025 goto end;
1026 }
1027
1028 ast_taskprocessor_build_name(serializer_name, sizeof(serializer_name), "serializer%d", i);
1029 etd[i].serializer[1] = ast_taskpool_serializer(serializer_name, pool);
1030 if (!etd[i].serializer[1]) {
1031 goto end;
1032 }
1033
1034 etd[i].num_tasks_executed = &num_tasks_executed;
1035 etd[i].shutdown = &shutdown;
1036 }
1037
1038 /* And once created we push in 200 tasks */
1039 for (i = 0; i < 200; i++) {
1041 goto end;
1042 }
1043 }
1044
1045 /* Wait for 30 seconds */
1046 start = ast_tvnow();
1047 end.tv_sec = start.tv_sec + 30;
1048 end.tv_nsec = start.tv_usec * 1000;
1049
1050 ast_mutex_lock(&td->lock);
1051 while (ast_cond_timedwait(&td->cond, &td->lock, &end) != ETIMEDOUT) {
1052 }
1053 ast_mutex_unlock(&td->lock);
1054
1055 /* Give the total tasks executed, and tell each task to not requeue */
1056 ast_cli(a->fd, "Total tasks executed in 30 seconds: %d (%d per second)\n", num_tasks_executed, num_tasks_executed / 30);
1057 shutdown = 1;
1058
1059end:
1060 /* We need to unreference each serializer */
1061 for (i = 0; i < 200; i++) {
1064 }
1066 test_destroy(td);
1067 return CLI_SUCCESS;
1068}
1069
1070static struct ast_cli_entry cli[] = {
1071 AST_CLI_DEFINE(handle_cli_taskpool_push_efficiency, "Push tasks to a taskpool and measure efficiency"),
1072 AST_CLI_DEFINE(handle_cli_taskpool_push_serializer_efficiency, "Push tasks to a taskpool in serializers and measure efficiency"),
1073};
1074
1075static int unload_module(void)
1076{
1078 AST_TEST_UNREGISTER(taskpool_push);
1079 AST_TEST_UNREGISTER(taskpool_push_synchronous);
1080 AST_TEST_UNREGISTER(taskpool_push_serializer);
1081 AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous);
1082 AST_TEST_UNREGISTER(taskpool_push_serializer_synchronous_requeue);
1083 AST_TEST_UNREGISTER(taskpool_push_grow);
1084 AST_TEST_UNREGISTER(taskpool_push_shrink);
1085 AST_TEST_UNREGISTER(taskpool_serializer_suspension);
1086 AST_TEST_UNREGISTER(taskpool_serializer_multiple_suspension);
1087 AST_TEST_UNREGISTER(taskpool_serializer_push_wait_while_suspended_from_other_serializer);
1088 return 0;
1089}
1090
1091static int load_module(void)
1092{
1094 AST_TEST_REGISTER(taskpool_push);
1095 AST_TEST_REGISTER(taskpool_push_synchronous);
1096 AST_TEST_REGISTER(taskpool_push_serializer);
1097 AST_TEST_REGISTER(taskpool_push_serializer_synchronous);
1098 AST_TEST_REGISTER(taskpool_push_serializer_synchronous_requeue);
1099 AST_TEST_REGISTER(taskpool_push_grow);
1100 AST_TEST_REGISTER(taskpool_push_shrink);
1101 AST_TEST_REGISTER(taskpool_serializer_suspension);
1102 AST_TEST_REGISTER(taskpool_serializer_multiple_suspension);
1103 AST_TEST_REGISTER(taskpool_serializer_push_wait_while_suspended_from_other_serializer);
1105}
1106
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
ast_mutex_t lock
Definition app_sla.c:337
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition astmm.h:180
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
Standard Command Line Interface.
#define CLI_SUCCESS
Definition cli.h:44
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
char * end
Definition eagi_proxy.c:73
Support for logging to various files, console and syslog Configuration in file logger....
Asterisk locking-related definitions:
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define ast_mutex_init(pmutex)
Definition lock.h:193
#define ast_mutex_unlock(a)
Definition lock.h:197
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
pthread_cond_t ast_cond_t
Definition lock.h:185
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition lock.h:596
#define ast_mutex_destroy(a)
Definition lock.h:195
#define ast_mutex_lock(a)
Definition lock.h:196
#define ast_cond_signal(cond)
Definition lock.h:210
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
#define NULL
Definition resample.c:96
descriptor for a cli entry.
Definition cli.h:171
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
Structure for mutex and tracking information.
Definition lock.h:142
An opaque taskpool structure.
Definition taskpool.c:62
A ast_taskprocessor structure is a singleton by name.
struct ast_taskpool * pool
struct ast_taskprocessor * serializer[2]
Sorcery object created based on backend data.
struct ast_taskprocessor * taskprocessor
void * test_specific_data
int ast_taskpool_serializer_unsuspend(struct ast_taskprocessor *serializer)
Unsuspend a serializer, causing tasks to be executed.
Definition taskpool.c:1050
#define AST_TASKPOOL_OPTIONS_VERSION
Definition taskpool.h:76
#define ast_taskpool_push_wait(pool, task, data)
Definition taskpool.h:230
size_t ast_taskpool_taskprocessors_count(struct ast_taskpool *pool)
Get the current number of taskprocessors in the taskpool.
Definition taskpool.c:451
int ast_taskpool_serializer_suspend(struct ast_taskprocessor *serializer)
Suspend a serializer, causing tasks to be queued until unsuspended.
Definition taskpool.c:1001
struct ast_taskprocessor * ast_taskpool_serializer(const char *name, struct ast_taskpool *pool)
Serialized execution of tasks within a ast_taskpool.
Definition taskpool.c:860
void ast_taskpool_shutdown(struct ast_taskpool *pool)
Shut down a taskpool and remove the underlying taskprocessors.
Definition taskpool.c:675
struct ast_taskprocessor * ast_taskpool_serializer_get_current(void)
Get the taskpool serializer currently associated with this thread.
Definition taskpool.c:825
#define ast_taskpool_serializer_push_wait(pool, task, data)
Definition taskpool.h:335
struct ast_taskpool * ast_taskpool_create(const char *name, const struct ast_taskpool_options *options)
Create a new taskpool.
Definition taskpool.c:324
#define ast_taskpool_push(pool, task, data)
Definition taskpool.h:210
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define ast_taskprocessor_push(tps, task_exe, datap)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Test Framework API.
@ TEST_INIT
Definition test.h:200
@ TEST_EXECUTE
Definition test.h:201
#define AST_TEST_REGISTER(cb)
Definition test.h:127
#define ast_test_status_update(a, b, c...)
Definition test.h:129
#define AST_TEST_UNREGISTER(cb)
Definition test.h:128
#define AST_TEST_DEFINE(hdr)
Definition test.h:126
ast_test_result_state
Definition test.h:193
@ AST_TEST_PASS
Definition test.h:195
@ AST_TEST_FAIL
Definition test.h:196
@ AST_TEST_NOT_RUN
Definition test.h:194
static struct test_options options
static struct test_val a
static struct test_data * test_alloc(void)
static char * handle_cli_taskpool_push_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int cascade_task(void *data)
static int requeue_task(void *data)
static int serializer_efficiency_task(void *data)
static void test_destroy(struct test_data *td)
static struct ast_cli_entry cli[]
static int load_module(void)
static char * handle_cli_taskpool_push_serializer_efficiency(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int unload_module(void)
static int simple_task(void *data)
static int efficiency_task(void *data)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define ARRAY_LEN(a)
Definition utils.h:706