Asterisk - The Open Source Telephony Project GIT-master-d856a3e
test_threadpool.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2012-2013, Digium, Inc.
5 *
6 * Mark Michelson <mmichelson@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief threadpool unit tests
22 *
23 * \author Mark Michelson <mmichelson@digium.com>
24 *
25 */
26
27/*** MODULEINFO
28 <depend>TEST_FRAMEWORK</depend>
29 <support_level>core</support_level>
30 ***/
31
32#include "asterisk.h"
33
34#include "asterisk/astobj2.h"
35#include "asterisk/lock.h"
36#include "asterisk/logger.h"
37#include "asterisk/module.h"
39#include "asterisk/test.h"
40#include "asterisk/threadpool.h"
41
51};
52
53static struct test_listener_data *test_alloc(void)
54{
55 struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56 if (!tld) {
57 return NULL;
58 }
59 ast_mutex_init(&tld->lock);
60 ast_cond_init(&tld->cond, NULL);
61 return tld;
62}
63
64static void test_state_changed(struct ast_threadpool *pool,
66 int active_threads,
67 int idle_threads)
68{
70 SCOPED_MUTEX(lock, &tld->lock);
71 tld->num_active = active_threads;
72 tld->num_idle = idle_threads;
73 ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74 ast_cond_signal(&tld->cond);
75}
76
77static void test_task_pushed(struct ast_threadpool *pool,
79 int was_empty)
80{
82 SCOPED_MUTEX(lock, &tld->lock);
83 tld->task_pushed = 1;
84 ++tld->num_tasks;
85 tld->was_empty = was_empty;
86 ast_cond_signal(&tld->cond);
87}
88
89static void test_emptied(struct ast_threadpool *pool,
91{
93 SCOPED_MUTEX(lock, &tld->lock);
94 tld->empty_notice = 1;
95 ast_cond_signal(&tld->cond);
96}
97
99{
101 ast_cond_destroy(&tld->cond);
102 ast_mutex_destroy(&tld->lock);
103}
104
107 .task_pushed = test_task_pushed,
108 .emptied = test_emptied,
109 .shutdown = test_shutdown,
110};
111
116};
117
119{
120 struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121
122 if (!std) {
123 return NULL;
124 }
125 ast_mutex_init(&std->lock);
126 ast_cond_init(&std->cond, NULL);
127 return std;
128}
129
131{
132 if (!std) {
133 return;
134 }
135
136 ast_mutex_destroy(&std->lock);
137 ast_cond_destroy(&std->cond);
138
139 ast_free(std);
140}
141
142static int simple_task(void *data)
143{
144 struct simple_task_data *std = data;
145 SCOPED_MUTEX(lock, &std->lock);
146 std->task_executed = 1;
147 ast_cond_signal(&std->cond);
148 return 0;
149}
150
151static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
152{
153 struct timeval start = ast_tvnow();
154 struct timespec end = {
155 .tv_sec = start.tv_sec + 5,
156 .tv_nsec = start.tv_usec * 1000
157 };
159 SCOPED_MUTEX(lock, &tld->lock);
160
161 while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
162 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
163 break;
164 }
165 }
166
167 if (tld->num_active != num_active && tld->num_idle != num_idle) {
168 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
169 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
170 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
171 res = AST_TEST_FAIL;
172 }
173
174 return res;
175}
176
178{
180 struct timeval start = ast_tvnow();
181 struct timespec end = {
182 .tv_sec = start.tv_sec + 5,
183 .tv_nsec = start.tv_usec * 1000
184 };
185 SCOPED_MUTEX(lock, &tld->lock);
186
187 while (!tld->task_pushed) {
188 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
189 break;
190 }
191 }
192}
193
194static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
195{
196 struct timeval start = ast_tvnow();
197 struct timespec end = {
198 .tv_sec = start.tv_sec + 5,
199 .tv_nsec = start.tv_usec * 1000
200 };
202 SCOPED_MUTEX(lock, &std->lock);
203
204 while (!std->task_executed) {
205 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
206 break;
207 }
208 }
209
210 if (!std->task_executed) {
211 ast_test_status_update(test, "Task execution did not occur\n");
212 res = AST_TEST_FAIL;
213 }
214 return res;
215}
216
217static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
218{
219 struct timeval start = ast_tvnow();
220 struct timespec end = {
221 .tv_sec = start.tv_sec + 5,
222 .tv_nsec = start.tv_usec * 1000
223 };
225 SCOPED_MUTEX(lock, &tld->lock);
226
227 while (!tld->empty_notice) {
228 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
229 break;
230 }
231 }
232
233 if (!tld->empty_notice) {
234 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
235 res = AST_TEST_FAIL;
236 }
237
238 return res;
239}
240
242 struct ast_test *test,
244 int task_pushed,
245 int was_empty,
246 int num_tasks,
247 int num_active,
248 int num_idle,
249 int empty_notice)
250{
253
254 if (tld->task_pushed != task_pushed) {
255 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
256 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
257 res = AST_TEST_FAIL;
258 }
259 if (tld->was_empty != was_empty) {
260 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
261 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
262 res = AST_TEST_FAIL;
263 }
264 if (tld->num_tasks!= num_tasks) {
265 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
266 num_tasks, tld->num_tasks);
267 res = AST_TEST_FAIL;
268 }
269 if (tld->num_active != num_active) {
270 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
271 num_active, tld->num_active);
272 res = AST_TEST_FAIL;
273 }
274 if (tld->num_idle != num_idle) {
275 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
276 num_idle, tld->num_idle);
277 res = AST_TEST_FAIL;
278 }
279 if (tld->empty_notice != empty_notice) {
280 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
281 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
282 res = AST_TEST_FAIL;
283 }
284
285 return res;
286}
287
288AST_TEST_DEFINE(threadpool_push)
289{
290 struct ast_threadpool *pool = NULL;
292 struct simple_task_data *std = NULL;
293 struct test_listener_data *tld = NULL;
297 .idle_timeout = 0,
298 .auto_increment = 0,
299 .initial_size = 0,
300 .max_size = 0,
301 };
302
303 switch (cmd) {
304 case TEST_INIT:
305 info->name = "push";
306 info->category = "/main/threadpool/";
307 info->summary = "Test task";
308 info->description =
309 "Basic threadpool test";
310 return AST_TEST_NOT_RUN;
311 case TEST_EXECUTE:
312 break;
313 }
314 tld = test_alloc();
315 if (!tld) {
316 return AST_TEST_FAIL;
317 }
318
320 if (!listener) {
321 goto end;
322 }
323
325 if (!pool) {
326 goto end;
327 }
328
330 if (!std) {
331 goto end;
332 }
333
334 if (ast_threadpool_push(pool, simple_task, std)) {
335 goto end;
336 }
337
339
340 res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
341
342end:
346 ast_free(tld);
347 return res;
348}
349
350AST_TEST_DEFINE(threadpool_initial_threads)
351{
352 struct ast_threadpool *pool = NULL;
355 struct test_listener_data *tld = NULL;
358 .idle_timeout = 0,
359 .auto_increment = 0,
360 .initial_size = 3,
361 .max_size = 0,
362 };
363
364 switch (cmd) {
365 case TEST_INIT:
366 info->name = "initial_threads";
367 info->category = "/main/threadpool/";
368 info->summary = "Test threadpool initialization state";
369 info->description =
370 "Ensure that a threadpool created with a specific size contains the\n"
371 "proper number of idle threads.";
372 return AST_TEST_NOT_RUN;
373 case TEST_EXECUTE:
374 break;
375 }
376
377 tld = test_alloc();
378 if (!tld) {
379 return AST_TEST_FAIL;
380 }
381
383 if (!listener) {
384 goto end;
385 }
386
388 if (!pool) {
389 goto end;
390 }
391
392 res = wait_until_thread_state(test, tld, 0, 3);
393
394end:
397 ast_free(tld);
398 return res;
399}
400
401
402AST_TEST_DEFINE(threadpool_thread_creation)
403{
404 struct ast_threadpool *pool = NULL;
407 struct test_listener_data *tld = NULL;
410 .idle_timeout = 0,
411 .auto_increment = 0,
412 .initial_size = 0,
413 .max_size = 0,
414 };
415
416 switch (cmd) {
417 case TEST_INIT:
418 info->name = "thread_creation";
419 info->category = "/main/threadpool/";
420 info->summary = "Test threadpool thread creation";
421 info->description =
422 "Ensure that threads can be added to a threadpool";
423 return AST_TEST_NOT_RUN;
424 case TEST_EXECUTE:
425 break;
426 }
427
428 tld = test_alloc();
429 if (!tld) {
430 return AST_TEST_FAIL;
431 }
432
434 if (!listener) {
435 goto end;
436 }
437
439 if (!pool) {
440 goto end;
441 }
442
443 /* Now let's create a thread. It should start active, then go
444 * idle immediately
445 */
447
448 res = wait_until_thread_state(test, tld, 0, 1);
449
450end:
453 ast_free(tld);
454 return res;
455}
456
457AST_TEST_DEFINE(threadpool_thread_destruction)
458{
459 struct ast_threadpool *pool = NULL;
462 struct test_listener_data *tld = NULL;
465 .idle_timeout = 0,
466 .auto_increment = 0,
467 .initial_size = 0,
468 .max_size = 0,
469 };
470
471 switch (cmd) {
472 case TEST_INIT:
473 info->name = "thread_destruction";
474 info->category = "/main/threadpool/";
475 info->summary = "Test threadpool thread destruction";
476 info->description =
477 "Ensure that threads are properly destroyed in a threadpool";
478 return AST_TEST_NOT_RUN;
479 case TEST_EXECUTE:
480 break;
481 }
482
483 tld = test_alloc();
484 if (!tld) {
485 return AST_TEST_FAIL;
486 }
487
489 if (!listener) {
490 goto end;
491 }
492
494 if (!pool) {
495 goto end;
496 }
497
499
500 res = wait_until_thread_state(test, tld, 0, 3);
501 if (res == AST_TEST_FAIL) {
502 goto end;
503 }
504
505 res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
506 if (res == AST_TEST_FAIL) {
507 goto end;
508 }
509
511
512 res = wait_until_thread_state(test, tld, 0, 2);
513
514end:
517 ast_free(tld);
518 return res;
519}
520
521AST_TEST_DEFINE(threadpool_thread_timeout)
522{
523 struct ast_threadpool *pool = NULL;
526 struct test_listener_data *tld = NULL;
529 .idle_timeout = 2,
530 .auto_increment = 0,
531 .initial_size = 0,
532 .max_size = 0,
533 };
534
535 switch (cmd) {
536 case TEST_INIT:
537 info->name = "thread_timeout";
538 info->category = "/main/threadpool/";
539 info->summary = "Test threadpool thread timeout";
540 info->description =
541 "Ensure that a thread with a two second timeout dies as expected.";
542 return AST_TEST_NOT_RUN;
543 case TEST_EXECUTE:
544 break;
545 }
546
547 tld = test_alloc();
548 if (!tld) {
549 return AST_TEST_FAIL;
550 }
551
553 if (!listener) {
554 goto end;
555 }
556
558 if (!pool) {
559 goto end;
560 }
561
563
564 res = wait_until_thread_state(test, tld, 0, 1);
565 if (res == AST_TEST_FAIL) {
566 goto end;
567 }
568
569 res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
570 if (res == AST_TEST_FAIL) {
571 goto end;
572 }
573
574 res = wait_until_thread_state(test, tld, 0, 0);
575 if (res == AST_TEST_FAIL) {
576 goto end;
577 }
578
579 res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
580
581end:
584 ast_free(tld);
585 return res;
586}
587
588AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
589{
590 struct ast_threadpool *pool = NULL;
593 struct test_listener_data *tld = NULL;
596 .idle_timeout = 1,
597 .auto_increment = 1,
598 .initial_size = 0,
599 .max_size = 1,
600 };
601 int iteration;
602
603 switch (cmd) {
604 case TEST_INIT:
605 info->name = "thread_timeout_thrash";
606 info->category = "/main/threadpool/";
607 info->summary = "Thrash threadpool thread timeout";
608 info->description =
609 "Repeatedly queue a task when a threadpool thread should timeout.";
610 return AST_TEST_NOT_RUN;
611 case TEST_EXECUTE:
612 break;
613 }
614
615 tld = test_alloc();
616 if (!tld) {
617 return AST_TEST_FAIL;
618 }
619
621 if (!listener) {
622 goto end;
623 }
624
626 if (!pool) {
627 goto end;
628 }
629
631
632 for (iteration = 0; iteration < 30; ++iteration) {
633 struct simple_task_data *std = NULL;
634 struct timeval start = ast_tvnow();
635 struct timespec end = {
636 .tv_sec = start.tv_sec + options.idle_timeout,
637 .tv_nsec = start.tv_usec * 1000
638 };
639
641 if (!std) {
642 goto end;
643 }
644
645 /* Wait until the threadpool thread should timeout due to being idle */
646 ast_mutex_lock(&tld->lock);
647 while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
648 /* This purposely left empty as we want to loop waiting for a time out */
649 }
650 ast_mutex_unlock(&tld->lock);
651
652 if (ast_threadpool_push(pool, simple_task, std)) {
653 res = AST_TEST_FAIL;
654 } else {
655 res = wait_for_completion(test, std);
656 }
657
659
660 if (res == AST_TEST_FAIL) {
661 goto end;
662 }
663 }
664
665 res = wait_until_thread_state(test, tld, 0, 0);
666 if (res == AST_TEST_FAIL) {
667 goto end;
668 }
669
670 res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
671
672end:
675 ast_free(tld);
676 return res;
677}
678
679AST_TEST_DEFINE(threadpool_one_task_one_thread)
680{
681 struct ast_threadpool *pool = NULL;
683 struct simple_task_data *std = NULL;
685 struct test_listener_data *tld = NULL;
688 .idle_timeout = 0,
689 .auto_increment = 0,
690 .initial_size = 0,
691 .max_size = 0,
692 };
693
694 switch (cmd) {
695 case TEST_INIT:
696 info->name = "one_task_one_thread";
697 info->category = "/main/threadpool/";
698 info->summary = "Test a single task with a single thread";
699 info->description =
700 "Push a task into an empty threadpool, then add a thread to the pool.";
701 return AST_TEST_NOT_RUN;
702 case TEST_EXECUTE:
703 break;
704 }
705
706 tld = test_alloc();
707 if (!tld) {
708 return AST_TEST_FAIL;
709 }
710
712 if (!listener) {
713 goto end;
714 }
715
717 if (!pool) {
718 goto end;
719 }
720
722 if (!std) {
723 goto end;
724 }
725
726 if (ast_threadpool_push(pool, simple_task, std)) {
727 goto end;
728 }
729
731
732 /* Threads added to the pool are active when they start,
733 * so the newly-created thread should immediately execute
734 * the waiting task.
735 */
736 res = wait_for_completion(test, std);
737 if (res == AST_TEST_FAIL) {
738 goto end;
739 }
740
741 res = wait_for_empty_notice(test, tld);
742 if (res == AST_TEST_FAIL) {
743 goto end;
744 }
745
746 /* After completing the task, the thread should go idle */
747 res = wait_until_thread_state(test, tld, 0, 1);
748 if (res == AST_TEST_FAIL) {
749 goto end;
750 }
751
752 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
753
754end:
758 ast_free(tld);
759 return res;
760
761}
762
763AST_TEST_DEFINE(threadpool_one_thread_one_task)
764{
765 struct ast_threadpool *pool = NULL;
767 struct simple_task_data *std = NULL;
769 struct test_listener_data *tld = NULL;
772 .idle_timeout = 0,
773 .auto_increment = 0,
774 .initial_size = 0,
775 .max_size = 0,
776 };
777
778 switch (cmd) {
779 case TEST_INIT:
780 info->name = "one_thread_one_task";
781 info->category = "/main/threadpool/";
782 info->summary = "Test a single thread with a single task";
783 info->description =
784 "Add a thread to the pool and then push a task to it.";
785 return AST_TEST_NOT_RUN;
786 case TEST_EXECUTE:
787 break;
788 }
789
790 tld = test_alloc();
791 if (!tld) {
792 return AST_TEST_FAIL;
793 }
794
796 if (!listener) {
797 goto end;
798 }
799
801 if (!pool) {
802 goto end;
803 }
804
806 if (!std) {
807 goto end;
808 }
809
811
812 res = wait_until_thread_state(test, tld, 0, 1);
813 if (res == AST_TEST_FAIL) {
814 goto end;
815 }
816
817 if (ast_threadpool_push(pool, simple_task, std)) {
818 res = AST_TEST_FAIL;
819 goto end;
820 }
821
822 res = wait_for_completion(test, std);
823 if (res == AST_TEST_FAIL) {
824 goto end;
825 }
826
827 res = wait_for_empty_notice(test, tld);
828 if (res == AST_TEST_FAIL) {
829 goto end;
830 }
831
832 /* After completing the task, the thread should go idle */
833 res = wait_until_thread_state(test, tld, 0, 1);
834 if (res == AST_TEST_FAIL) {
835 goto end;
836 }
837
838 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
839
840end:
844 ast_free(tld);
845 return res;
846}
847
848AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
849{
850 struct ast_threadpool *pool = NULL;
852 struct simple_task_data *std1 = NULL;
853 struct simple_task_data *std2 = NULL;
854 struct simple_task_data *std3 = NULL;
856 struct test_listener_data *tld = NULL;
859 .idle_timeout = 0,
860 .auto_increment = 0,
861 .initial_size = 0,
862 .max_size = 0,
863 };
864
865 switch (cmd) {
866 case TEST_INIT:
867 info->name = "one_thread_multiple_tasks";
868 info->category = "/main/threadpool/";
869 info->summary = "Test a single thread with multiple tasks";
870 info->description =
871 "Add a thread to the pool and then push three tasks to it.";
872 return AST_TEST_NOT_RUN;
873 case TEST_EXECUTE:
874 break;
875 }
876
877 tld = test_alloc();
878 if (!tld) {
879 return AST_TEST_FAIL;
880 }
881
883 if (!listener) {
884 goto end;
885 }
886
888 if (!pool) {
889 goto end;
890 }
891
892 std1 = simple_task_data_alloc();
893 std2 = simple_task_data_alloc();
894 std3 = simple_task_data_alloc();
895 if (!std1 || !std2 || !std3) {
896 goto end;
897 }
898
900
901 res = wait_until_thread_state(test, tld, 0, 1);
902 if (res == AST_TEST_FAIL) {
903 goto end;
904 }
905
906 res = AST_TEST_FAIL;
907 if (ast_threadpool_push(pool, simple_task, std1)) {
908 goto end;
909 }
910
911 if (ast_threadpool_push(pool, simple_task, std2)) {
912 goto end;
913 }
914
915 if (ast_threadpool_push(pool, simple_task, std3)) {
916 goto end;
917 }
918
919 res = wait_for_completion(test, std1);
920 if (res == AST_TEST_FAIL) {
921 goto end;
922 }
923 res = wait_for_completion(test, std2);
924 if (res == AST_TEST_FAIL) {
925 goto end;
926 }
927 res = wait_for_completion(test, std3);
928 if (res == AST_TEST_FAIL) {
929 goto end;
930 }
931
932 res = wait_for_empty_notice(test, tld);
933 if (res == AST_TEST_FAIL) {
934 goto end;
935 }
936
937 res = wait_until_thread_state(test, tld, 0, 1);
938 if (res == AST_TEST_FAIL) {
939 goto end;
940 }
941
942 res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
943
944end:
950 ast_free(tld);
951 return res;
952}
953
955 struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
956{
958 struct timeval start;
959 struct timespec end;
960
961 res = wait_until_thread_state(test, tld, num_active, num_idle);
962 if (res == AST_TEST_FAIL) {
963 return res;
964 }
965
966 start = ast_tvnow();
967 end.tv_sec = start.tv_sec + 5;
968 end.tv_nsec = start.tv_usec * 1000;
969
970 ast_mutex_lock(&tld->lock);
971
972 while (tld->num_tasks != num_tasks) {
973 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
974 break;
975 }
976 }
977
978 if (tld->num_tasks != num_tasks) {
979 ast_test_status_update(test, "Number of tasks pushed %d does not match expected %d\n",
980 tld->num_tasks, num_tasks);
981 res = AST_TEST_FAIL;
982 }
983
984 ast_mutex_unlock(&tld->lock);
985
986 return res;
987}
988
989AST_TEST_DEFINE(threadpool_auto_increment)
990{
991 struct ast_threadpool *pool = NULL;
993 struct simple_task_data *std1 = NULL;
994 struct simple_task_data *std2 = NULL;
995 struct simple_task_data *std3 = NULL;
996 struct simple_task_data *std4 = NULL;
998 struct test_listener_data *tld = NULL;
1001 .idle_timeout = 0,
1002 .auto_increment = 3,
1003 .initial_size = 0,
1004 .max_size = 0,
1005 };
1006
1007 switch (cmd) {
1008 case TEST_INIT:
1009 info->name = "auto_increment";
1010 info->category = "/main/threadpool/";
1011 info->summary = "Test that the threadpool grows as tasks are added";
1012 info->description =
1013 "Create an empty threadpool and push a task to it. Once the task is\n"
1014 "pushed, the threadpool should add three threads and be able to\n"
1015 "handle the task. The threads should then go idle";
1016 return AST_TEST_NOT_RUN;
1017 case TEST_EXECUTE:
1018 break;
1019 }
1020
1021 tld = test_alloc();
1022 if (!tld) {
1023 return AST_TEST_FAIL;
1024 }
1025
1027 if (!listener) {
1028 goto end;
1029 }
1030
1031 pool = ast_threadpool_create(info->name, listener, &options);
1032 if (!pool) {
1033 goto end;
1034 }
1035
1036 std1 = simple_task_data_alloc();
1037 std2 = simple_task_data_alloc();
1038 std3 = simple_task_data_alloc();
1039 std4 = simple_task_data_alloc();
1040 if (!std1 || !std2 || !std3 || !std4) {
1041 goto end;
1042 }
1043
1044 if (ast_threadpool_push(pool, simple_task, std1)) {
1045 goto end;
1046 }
1047
1048 /* Pushing the task should result in the threadpool growing
1049 * by three threads. This will allow the task to actually execute
1050 */
1051 res = wait_for_completion(test, std1);
1052 if (res == AST_TEST_FAIL) {
1053 goto end;
1054 }
1055
1056 res = wait_for_empty_notice(test, tld);
1057 if (res == AST_TEST_FAIL) {
1058 goto end;
1059 }
1060
1061 res = wait_until_thread_state(test, tld, 0, 3);
1062 if (res == AST_TEST_FAIL) {
1063 goto end;
1064 }
1065
1066 /* Now push three tasks into the pool and ensure the pool does not
1067 * grow.
1068 */
1069 res = AST_TEST_FAIL;
1070
1071 if (ast_threadpool_push(pool, simple_task, std2)) {
1072 goto end;
1073 }
1074
1075 if (ast_threadpool_push(pool, simple_task, std3)) {
1076 goto end;
1077 }
1078
1079 if (ast_threadpool_push(pool, simple_task, std4)) {
1080 goto end;
1081 }
1082
1083 res = wait_for_completion(test, std2);
1084 if (res == AST_TEST_FAIL) {
1085 goto end;
1086 }
1087 res = wait_for_completion(test, std3);
1088 if (res == AST_TEST_FAIL) {
1089 goto end;
1090 }
1091 res = wait_for_completion(test, std4);
1092 if (res == AST_TEST_FAIL) {
1093 goto end;
1094 }
1095
1096 res = wait_for_empty_notice(test, tld);
1097 if (res == AST_TEST_FAIL) {
1098 goto end;
1099 }
1100
1101 res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
1102 if (res == AST_TEST_FAIL) {
1103 goto end;
1104 }
1105
1106end:
1113 ast_free(tld);
1114 return res;
1115}
1116
1117AST_TEST_DEFINE(threadpool_max_size)
1118{
1119 struct ast_threadpool *pool = NULL;
1121 struct simple_task_data *std = NULL;
1123 struct test_listener_data *tld = NULL;
1126 .idle_timeout = 0,
1127 .auto_increment = 3,
1128 .initial_size = 0,
1129 .max_size = 2,
1130 };
1131
1132 switch (cmd) {
1133 case TEST_INIT:
1134 info->name = "max_size";
1135 info->category = "/main/threadpool/";
1136 info->summary = "Test that the threadpool does not exceed its maximum size restriction";
1137 info->description =
1138 "Create an empty threadpool and push a task to it. Once the task is\n"
1139 "pushed, the threadpool should attempt to grow by three threads, but the\n"
1140 "pool's restrictions should only allow two threads to be added.";
1141 return AST_TEST_NOT_RUN;
1142 case TEST_EXECUTE:
1143 break;
1144 }
1145
1146 tld = test_alloc();
1147 if (!tld) {
1148 return AST_TEST_FAIL;
1149 }
1150
1152 if (!listener) {
1153 goto end;
1154 }
1155
1156 pool = ast_threadpool_create(info->name, listener, &options);
1157 if (!pool) {
1158 goto end;
1159 }
1160
1161 std = simple_task_data_alloc();
1162 if (!std) {
1163 goto end;
1164 }
1165
1166 if (ast_threadpool_push(pool, simple_task, std)) {
1167 goto end;
1168 }
1169
1170 res = wait_for_completion(test, std);
1171 if (res == AST_TEST_FAIL) {
1172 goto end;
1173 }
1174
1175 res = wait_until_thread_state(test, tld, 0, 2);
1176 if (res == AST_TEST_FAIL) {
1177 goto end;
1178 }
1179
1180 res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1181end:
1185 ast_free(tld);
1186 return res;
1187}
1188
1189AST_TEST_DEFINE(threadpool_reactivation)
1190{
1191 struct ast_threadpool *pool = NULL;
1193 struct simple_task_data *std1 = NULL;
1194 struct simple_task_data *std2 = NULL;
1196 struct test_listener_data *tld = NULL;
1199 .idle_timeout = 0,
1200 .auto_increment = 0,
1201 .initial_size = 0,
1202 .max_size = 0,
1203 };
1204
1205 switch (cmd) {
1206 case TEST_INIT:
1207 info->name = "reactivation";
1208 info->category = "/main/threadpool/";
1209 info->summary = "Test that a threadpool reactivates when work is added";
1210 info->description =
1211 "Push a task into a threadpool. Make sure the task executes and the\n"
1212 "thread goes idle. Then push a second task and ensure that the thread\n"
1213 "awakens and executes the second task.";
1214 return AST_TEST_NOT_RUN;
1215 case TEST_EXECUTE:
1216 break;
1217 }
1218
1219 tld = test_alloc();
1220 if (!tld) {
1221 return AST_TEST_FAIL;
1222 }
1223
1225 if (!listener) {
1226 goto end;
1227 }
1228
1229 pool = ast_threadpool_create(info->name, listener, &options);
1230 if (!pool) {
1231 goto end;
1232 }
1233
1234 std1 = simple_task_data_alloc();
1235 std2 = simple_task_data_alloc();
1236 if (!std1 || !std2) {
1237 goto end;
1238 }
1239
1240 if (ast_threadpool_push(pool, simple_task, std1)) {
1241 goto end;
1242 }
1243
1244 ast_threadpool_set_size(pool, 1);
1245
1246 res = wait_for_completion(test, std1);
1247 if (res == AST_TEST_FAIL) {
1248 goto end;
1249 }
1250
1251 res = wait_for_empty_notice(test, tld);
1252 if (res == AST_TEST_FAIL) {
1253 goto end;
1254 }
1255
1256 res = wait_until_thread_state(test, tld, 0, 1);
1257 if (res == AST_TEST_FAIL) {
1258 goto end;
1259 }
1260
1261 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1262 if (res == AST_TEST_FAIL) {
1263 goto end;
1264 }
1265
1266 /* Now make sure the threadpool reactivates when we add a second task */
1267 if (ast_threadpool_push(pool, simple_task, std2)) {
1268 res = AST_TEST_FAIL;
1269 goto end;
1270 }
1271
1272 res = wait_for_completion(test, std2);
1273 if (res == AST_TEST_FAIL) {
1274 goto end;
1275 }
1276
1277 res = wait_for_empty_notice(test, tld);
1278 if (res == AST_TEST_FAIL) {
1279 goto end;
1280 }
1281
1282 res = wait_until_thread_state(test, tld, 0, 1);
1283 if (res == AST_TEST_FAIL) {
1284 goto end;
1285 }
1286
1287 res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1288
1289end:
1294 ast_free(tld);
1295 return res;
1296
1297}
1298
1306};
1307
1309{
1310 struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1311
1312 if (!ctd) {
1313 return NULL;
1314 }
1315 ast_mutex_init(&ctd->lock);
1318 return ctd;
1319}
1320
1322{
1323 if (!ctd) {
1324 return;
1325 }
1326
1327 ast_mutex_destroy(&ctd->lock);
1330
1331 ast_free(ctd);
1332}
1333
1334static int complex_task(void *data)
1335{
1336 struct complex_task_data *ctd = data;
1337 SCOPED_MUTEX(lock, &ctd->lock);
1338 /* Notify that we started */
1339 ctd->task_started = 1;
1341 while (!ctd->continue_task) {
1343 }
1344 /* We got poked. Finish up */
1345 ctd->task_executed = 1;
1347 return 0;
1348}
1349
1350static void poke_worker(struct complex_task_data *ctd)
1351{
1352 SCOPED_MUTEX(lock, &ctd->lock);
1353 ctd->continue_task = 1;
1355}
1356
1358{
1359 struct timeval start = ast_tvnow();
1360 struct timespec end = {
1361 .tv_sec = start.tv_sec + 5,
1362 .tv_nsec = start.tv_usec * 1000
1363 };
1364 SCOPED_MUTEX(lock, &ctd->lock);
1365
1366 while (!ctd->task_started) {
1367 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1368 break;
1369 }
1370 }
1371
1372 return ctd->task_started;
1373}
1374
1376{
1377 struct timeval start = ast_tvnow();
1378 struct timespec end = {
1379 .tv_sec = start.tv_sec + 1,
1380 .tv_nsec = start.tv_usec * 1000
1381 };
1382 SCOPED_MUTEX(lock, &ctd->lock);
1383
1384 while (!ctd->task_started) {
1385 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1386 break;
1387 }
1388 }
1389
1390 return ctd->task_started;
1391}
1392
1394{
1395 struct timeval start = ast_tvnow();
1396 struct timespec end = {
1397 .tv_sec = start.tv_sec + 5,
1398 .tv_nsec = start.tv_usec * 1000
1399 };
1401 SCOPED_MUTEX(lock, &ctd->lock);
1402
1403 while (!ctd->task_executed) {
1404 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1405 break;
1406 }
1407 }
1408
1409 if (!ctd->task_executed) {
1410 res = AST_TEST_FAIL;
1411 }
1412 return res;
1413}
1414
1415AST_TEST_DEFINE(threadpool_task_distribution)
1416{
1417 struct ast_threadpool *pool = NULL;
1419 struct complex_task_data *ctd1 = NULL;
1420 struct complex_task_data *ctd2 = NULL;
1422 struct test_listener_data *tld = NULL;
1425 .idle_timeout = 0,
1426 .auto_increment = 0,
1427 .initial_size = 0,
1428 .max_size = 0,
1429 };
1430
1431 switch (cmd) {
1432 case TEST_INIT:
1433 info->name = "task_distribution";
1434 info->category = "/main/threadpool/";
1435 info->summary = "Test that tasks are evenly distributed to threads";
1436 info->description =
1437 "Push two tasks into a threadpool. Ensure that each is handled by\n"
1438 "a separate thread";
1439 return AST_TEST_NOT_RUN;
1440 case TEST_EXECUTE:
1441 break;
1442 }
1443
1444 tld = test_alloc();
1445 if (!tld) {
1446 return AST_TEST_FAIL;
1447 }
1448
1450 if (!listener) {
1451 goto end;
1452 }
1453
1454 pool = ast_threadpool_create(info->name, listener, &options);
1455 if (!pool) {
1456 goto end;
1457 }
1458
1459 ctd1 = complex_task_data_alloc();
1460 ctd2 = complex_task_data_alloc();
1461 if (!ctd1 || !ctd2) {
1462 goto end;
1463 }
1464
1465 if (ast_threadpool_push(pool, complex_task, ctd1)) {
1466 goto end;
1467 }
1468
1469 if (ast_threadpool_push(pool, complex_task, ctd2)) {
1470 goto end;
1471 }
1472
1473 ast_threadpool_set_size(pool, 2);
1474
1475 res = wait_until_thread_state(test, tld, 2, 0);
1476 if (res == AST_TEST_FAIL) {
1477 goto end;
1478 }
1479
1480 res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1481 if (res == AST_TEST_FAIL) {
1482 goto end;
1483 }
1484
1485 /* The tasks are stalled until we poke them */
1486 poke_worker(ctd1);
1487 poke_worker(ctd2);
1488
1489 res = wait_for_complex_completion(ctd1);
1490 if (res == AST_TEST_FAIL) {
1491 goto end;
1492 }
1493 res = wait_for_complex_completion(ctd2);
1494 if (res == AST_TEST_FAIL) {
1495 goto end;
1496 }
1497
1498 res = wait_until_thread_state(test, tld, 0, 2);
1499 if (res == AST_TEST_FAIL) {
1500 goto end;
1501 }
1502
1503 res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1504
1505end:
1510 ast_free(tld);
1511 return res;
1512}
1513
1514AST_TEST_DEFINE(threadpool_more_destruction)
1515{
1516 struct ast_threadpool *pool = NULL;
1518 struct complex_task_data *ctd1 = NULL;
1519 struct complex_task_data *ctd2 = NULL;
1521 struct test_listener_data *tld = NULL;
1524 .idle_timeout = 0,
1525 .auto_increment = 0,
1526 .initial_size = 0,
1527 .max_size = 0,
1528 };
1529
1530 switch (cmd) {
1531 case TEST_INIT:
1532 info->name = "more_destruction";
1533 info->category = "/main/threadpool/";
1534 info->summary = "Test that threads are destroyed as expected";
1535 info->description =
1536 "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1537 "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1538 "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1539 "and ensure that both tasks complete.";
1540 return AST_TEST_NOT_RUN;
1541 case TEST_EXECUTE:
1542 break;
1543 }
1544
1545 tld = test_alloc();
1546 if (!tld) {
1547 return AST_TEST_FAIL;
1548 }
1549
1551 if (!listener) {
1552 goto end;
1553 }
1554
1555 pool = ast_threadpool_create(info->name, listener, &options);
1556 if (!pool) {
1557 goto end;
1558 }
1559
1560 ctd1 = complex_task_data_alloc();
1561 ctd2 = complex_task_data_alloc();
1562 if (!ctd1 || !ctd2) {
1563 goto end;
1564 }
1565
1566 if (ast_threadpool_push(pool, complex_task, ctd1)) {
1567 goto end;
1568 }
1569
1570 if (ast_threadpool_push(pool, complex_task, ctd2)) {
1571 goto end;
1572 }
1573
1574 ast_threadpool_set_size(pool, 4);
1575
1576 res = wait_until_thread_state(test, tld, 2, 2);
1577 if (res == AST_TEST_FAIL) {
1578 goto end;
1579 }
1580
1581 res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1582 if (res == AST_TEST_FAIL) {
1583 goto end;
1584 }
1585
1586 ast_threadpool_set_size(pool, 1);
1587
1588 /* Shrinking the threadpool should kill off the two idle threads
1589 * and one of the active threads.
1590 */
1591 res = wait_until_thread_state(test, tld, 1, 0);
1592 if (res == AST_TEST_FAIL) {
1593 goto end;
1594 }
1595
1596 res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1597 if (res == AST_TEST_FAIL) {
1598 goto end;
1599 }
1600
1601 /* The tasks are stalled until we poke them */
1602 poke_worker(ctd1);
1603 poke_worker(ctd2);
1604
1605 res = wait_for_complex_completion(ctd1);
1606 if (res == AST_TEST_FAIL) {
1607 goto end;
1608 }
1609 res = wait_for_complex_completion(ctd2);
1610 if (res == AST_TEST_FAIL) {
1611 goto end;
1612 }
1613
1614 res = wait_until_thread_state(test, tld, 0, 1);
1615 if (res == AST_TEST_FAIL) {
1616 goto end;
1617 }
1618
1619 res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1620
1621end:
1626 ast_free(tld);
1627 return res;
1628}
1629
1630AST_TEST_DEFINE(threadpool_serializer)
1631{
1632 int started = 0;
1633 int finished = 0;
1635 struct ast_threadpool *pool = NULL;
1636 struct ast_taskprocessor *uut = NULL;
1637 struct complex_task_data *data1 = NULL;
1638 struct complex_task_data *data2 = NULL;
1639 struct complex_task_data *data3 = NULL;
1642 .idle_timeout = 0,
1643 .auto_increment = 0,
1644 .initial_size = 2,
1645 .max_size = 0,
1646 };
1647
1648 switch (cmd) {
1649 case TEST_INIT:
1650 info->name = "threadpool_serializer";
1651 info->category = "/main/threadpool/";
1652 info->summary = "Test that serializers";
1653 info->description =
1654 "Ensures that tasks enqueued to a serialize execute in sequence.";
1655 return AST_TEST_NOT_RUN;
1656 case TEST_EXECUTE:
1657 break;
1658 }
1659
1660 pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1661 if (!pool) {
1662 ast_test_status_update(test, "Could not create threadpool\n");
1663 goto end;
1664 }
1665 uut = ast_threadpool_serializer("ser1", pool);
1666 data1 = complex_task_data_alloc();
1667 data2 = complex_task_data_alloc();
1668 data3 = complex_task_data_alloc();
1669 if (!uut || !data1 || !data2 || !data3) {
1670 ast_test_status_update(test, "Allocation failed\n");
1671 goto end;
1672 }
1673
1674 /* This should start right away */
1675 if (ast_taskprocessor_push(uut, complex_task, data1)) {
1676 ast_test_status_update(test, "Failed to enqueue data1\n");
1677 goto end;
1678 }
1679 started = wait_for_complex_start(data1);
1680 if (!started) {
1681 ast_test_status_update(test, "Failed to start data1\n");
1682 goto end;
1683 }
1684
1685 /* This should not start until data 1 is complete */
1686 if (ast_taskprocessor_push(uut, complex_task, data2)) {
1687 ast_test_status_update(test, "Failed to enqueue data2\n");
1688 goto end;
1689 }
1690 started = has_complex_started(data2);
1691 if (started) {
1692 ast_test_status_update(test, "data2 started out of order\n");
1693 goto end;
1694 }
1695
1696 /* But the free thread in the pool can still run */
1697 if (ast_threadpool_push(pool, complex_task, data3)) {
1698 ast_test_status_update(test, "Failed to enqueue data3\n");
1699 }
1700 started = wait_for_complex_start(data3);
1701 if (!started) {
1702 ast_test_status_update(test, "Failed to start data3\n");
1703 goto end;
1704 }
1705
1706 /* Finishing data1 should allow data2 to start */
1707 poke_worker(data1);
1708 finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1709 if (!finished) {
1710 ast_test_status_update(test, "data1 couldn't finish\n");
1711 goto end;
1712 }
1713 started = wait_for_complex_start(data2);
1714 if (!started) {
1715 ast_test_status_update(test, "Failed to start data2\n");
1716 goto end;
1717 }
1718
1719 /* Finish up */
1720 poke_worker(data2);
1721 finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1722 if (!finished) {
1723 ast_test_status_update(test, "data2 couldn't finish\n");
1724 goto end;
1725 }
1726 poke_worker(data3);
1727 finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1728 if (!finished) {
1729 ast_test_status_update(test, "data3 couldn't finish\n");
1730 goto end;
1731 }
1732
1733 res = AST_TEST_PASS;
1734
1735end:
1736 poke_worker(data1);
1737 poke_worker(data2);
1738 poke_worker(data3);
1744 return res;
1745}
1746
1747AST_TEST_DEFINE(threadpool_serializer_dupe)
1748{
1750 struct ast_threadpool *pool = NULL;
1751 struct ast_taskprocessor *uut = NULL;
1752 struct ast_taskprocessor *there_can_be_only_one = NULL;
1755 .idle_timeout = 0,
1756 .auto_increment = 0,
1757 .initial_size = 2,
1758 .max_size = 0,
1759 };
1760
1761 switch (cmd) {
1762 case TEST_INIT:
1763 info->name = "threadpool_serializer_dupe";
1764 info->category = "/main/threadpool/";
1765 info->summary = "Test that serializers are uniquely named";
1766 info->description =
1767 "Creating two serializers with the same name should\n"
1768 "result in error.";
1769 return AST_TEST_NOT_RUN;
1770 case TEST_EXECUTE:
1771 break;
1772 }
1773
1774 pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1775 if (!pool) {
1776 ast_test_status_update(test, "Could not create threadpool\n");
1777 goto end;
1778 }
1779
1780 uut = ast_threadpool_serializer("highlander", pool);
1781 if (!uut) {
1782 ast_test_status_update(test, "Allocation failed\n");
1783 goto end;
1784 }
1785
1786 there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1787 if (there_can_be_only_one) {
1788 ast_taskprocessor_unreference(there_can_be_only_one);
1789 ast_test_status_update(test, "Duplicate name error\n");
1790 goto end;
1791 }
1792
1793 res = AST_TEST_PASS;
1794
1795end:
1798 return res;
1799}
1800
1801static int unload_module(void)
1802{
1803 ast_test_unregister(threadpool_push);
1804 ast_test_unregister(threadpool_initial_threads);
1805 ast_test_unregister(threadpool_thread_creation);
1806 ast_test_unregister(threadpool_thread_destruction);
1807 ast_test_unregister(threadpool_thread_timeout);
1808 ast_test_unregister(threadpool_thread_timeout_thrash);
1809 ast_test_unregister(threadpool_one_task_one_thread);
1810 ast_test_unregister(threadpool_one_thread_one_task);
1811 ast_test_unregister(threadpool_one_thread_multiple_tasks);
1812 ast_test_unregister(threadpool_auto_increment);
1813 ast_test_unregister(threadpool_max_size);
1814 ast_test_unregister(threadpool_reactivation);
1815 ast_test_unregister(threadpool_task_distribution);
1816 ast_test_unregister(threadpool_more_destruction);
1817 ast_test_unregister(threadpool_serializer);
1818 ast_test_unregister(threadpool_serializer_dupe);
1819 return 0;
1820}
1821
1822static int load_module(void)
1823{
1824 ast_test_register(threadpool_push);
1825 ast_test_register(threadpool_initial_threads);
1826 ast_test_register(threadpool_thread_creation);
1827 ast_test_register(threadpool_thread_destruction);
1828 ast_test_register(threadpool_thread_timeout);
1829 ast_test_register(threadpool_thread_timeout_thrash);
1830 ast_test_register(threadpool_one_task_one_thread);
1831 ast_test_register(threadpool_one_thread_one_task);
1832 ast_test_register(threadpool_one_thread_multiple_tasks);
1833 ast_test_register(threadpool_auto_increment);
1834 ast_test_register(threadpool_max_size);
1835 ast_test_register(threadpool_reactivation);
1836 ast_test_register(threadpool_task_distribution);
1837 ast_test_register(threadpool_more_destruction);
1838 ast_test_register(threadpool_serializer);
1839 ast_test_register(threadpool_serializer_dupe);
1841}
1842
ast_mutex_t lock
Definition: app_sla.c:331
static void * listener(void *unused)
Definition: asterisk.c:1519
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
char * end
Definition: eagi_proxy.c:73
Support for logging to various files, console and syslog Configuration in file logger....
#define LOG_NOTICE
Asterisk locking-related definitions:
#define ast_cond_destroy(cond)
Definition: lock.h:202
#define ast_cond_wait(cond, mutex)
Definition: lock.h:205
#define ast_cond_init(cond, attr)
Definition: lock.h:201
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_init(pmutex)
Definition: lock.h:186
#define ast_mutex_unlock(a)
Definition: lock.h:190
pthread_cond_t ast_cond_t
Definition: lock.h:178
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ast_mutex_destroy(a)
Definition: lock.h:188
#define ast_mutex_lock(a)
Definition: lock.h:189
#define ast_cond_signal(cond)
Definition: lock.h:203
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
def info(msg)
#define NULL
Definition: resample.c:96
Structure for mutex and tracking information.
Definition: lock.h:135
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
listener for a threadpool
Definition: threadpool.c:110
An opaque threadpool structure.
Definition: threadpool.c:36
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Test Framework API.
@ TEST_INIT
Definition: test.h:200
@ TEST_EXECUTE
Definition: test.h:201
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
ast_test_result_state
Definition: test.h:193
@ AST_TEST_PASS
Definition: test.h:195
@ AST_TEST_FAIL
Definition: test.h:196
@ AST_TEST_NOT_RUN
Definition: test.h:194
static struct test_options options
static struct test_listener_data * test_alloc(void)
static int has_complex_started(struct complex_task_data *ctd)
static struct complex_task_data * complex_task_data_alloc(void)
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
static void test_shutdown(struct ast_threadpool_listener *listener)
static const struct ast_threadpool_listener_callbacks test_callbacks
static int wait_for_complex_start(struct complex_task_data *ctd)
static void test_emptied(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
AST_TEST_DEFINE(threadpool_push)
static enum ast_test_result_state listener_check(struct ast_test *test, struct ast_threadpool_listener *listener, int task_pushed, int was_empty, int num_tasks, int num_active, int num_idle, int empty_notice)
static void complex_task_data_free(struct complex_task_data *ctd)
static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
static int complex_task(void *data)
static struct simple_task_data * simple_task_data_alloc(void)
static void poke_worker(struct complex_task_data *ctd)
static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
static void test_task_pushed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
static int load_module(void)
static int unload_module(void)
static int simple_task(void *data)
static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
static void simple_task_data_free(struct simple_task_data *std)
static void test_state_changed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:966
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
Push a task to the threadpool.
Definition: threadpool.c:957
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
Definition: threadpool.c:906
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:875
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:916
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition: threadpool.c:894
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1428
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159