Asterisk - The Open Source Telephony Project  GIT-master-44aef04
test_threadpool.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26 
27 /*** MODULEINFO
28  <depend>TEST_FRAMEWORK</depend>
29  <support_level>core</support_level>
30  ***/
31 
32 #include "asterisk.h"
33 
34 #include "asterisk/astobj2.h"
35 #include "asterisk/lock.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/module.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/test.h"
40 #include "asterisk/threadpool.h"
41 
44  int num_idle;
46  int num_tasks;
48  int was_empty;
51 };
52 
53 static struct test_listener_data *test_alloc(void)
54 {
55  struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56  if (!tld) {
57  return NULL;
58  }
59  ast_mutex_init(&tld->lock);
60  ast_cond_init(&tld->cond, NULL);
61  return tld;
62 }
63 
64 static void test_state_changed(struct ast_threadpool *pool,
66  int active_threads,
67  int idle_threads)
68 {
70  SCOPED_MUTEX(lock, &tld->lock);
71  tld->num_active = active_threads;
72  tld->num_idle = idle_threads;
73  ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74  ast_cond_signal(&tld->cond);
75 }
76 
77 static void test_task_pushed(struct ast_threadpool *pool,
79  int was_empty)
80 {
82  SCOPED_MUTEX(lock, &tld->lock);
83  tld->task_pushed = 1;
84  ++tld->num_tasks;
85  tld->was_empty = was_empty;
86  ast_cond_signal(&tld->cond);
87 }
88 
89 static void test_emptied(struct ast_threadpool *pool,
91 {
93  SCOPED_MUTEX(lock, &tld->lock);
94  tld->empty_notice = 1;
95  ast_cond_signal(&tld->cond);
96 }
97 
99 {
101  ast_cond_destroy(&tld->cond);
102  ast_mutex_destroy(&tld->lock);
103 }
104 
107  .task_pushed = test_task_pushed,
108  .emptied = test_emptied,
109  .shutdown = test_shutdown,
110 };
111 
116 };
117 
119 {
120  struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121 
122  if (!std) {
123  return NULL;
124  }
125  ast_mutex_init(&std->lock);
126  ast_cond_init(&std->cond, NULL);
127  return std;
128 }
129 
130 static void simple_task_data_free(struct simple_task_data *std)
131 {
132  if (!std) {
133  return;
134  }
135 
136  ast_mutex_destroy(&std->lock);
137  ast_cond_destroy(&std->cond);
138 
139  ast_free(std);
140 }
141 
142 static int simple_task(void *data)
143 {
144  struct simple_task_data *std = data;
145  SCOPED_MUTEX(lock, &std->lock);
146  std->task_executed = 1;
147  ast_cond_signal(&std->cond);
148  return 0;
149 }
150 
151 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
152 {
153  struct timeval start = ast_tvnow();
154  struct timespec end = {
155  .tv_sec = start.tv_sec + 5,
156  .tv_nsec = start.tv_usec * 1000
157  };
159  SCOPED_MUTEX(lock, &tld->lock);
160 
161  while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
162  if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
163  break;
164  }
165  }
166 
167  if (tld->num_active != num_active && tld->num_idle != num_idle) {
168  ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
169  ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
170  ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
171  res = AST_TEST_FAIL;
172  }
173 
174  return res;
175 }
176 
178 {
180  struct timeval start = ast_tvnow();
181  struct timespec end = {
182  .tv_sec = start.tv_sec + 5,
183  .tv_nsec = start.tv_usec * 1000
184  };
185  SCOPED_MUTEX(lock, &tld->lock);
186 
187  while (!tld->task_pushed) {
188  if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
189  break;
190  }
191  }
192 }
193 
194 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
195 {
196  struct timeval start = ast_tvnow();
197  struct timespec end = {
198  .tv_sec = start.tv_sec + 5,
199  .tv_nsec = start.tv_usec * 1000
200  };
202  SCOPED_MUTEX(lock, &std->lock);
203 
204  while (!std->task_executed) {
205  if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
206  break;
207  }
208  }
209 
210  if (!std->task_executed) {
211  ast_test_status_update(test, "Task execution did not occur\n");
212  res = AST_TEST_FAIL;
213  }
214  return res;
215 }
216 
217 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
218 {
219  struct timeval start = ast_tvnow();
220  struct timespec end = {
221  .tv_sec = start.tv_sec + 5,
222  .tv_nsec = start.tv_usec * 1000
223  };
225  SCOPED_MUTEX(lock, &tld->lock);
226 
227  while (!tld->empty_notice) {
228  if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
229  break;
230  }
231  }
232 
233  if (!tld->empty_notice) {
234  ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
235  res = AST_TEST_FAIL;
236  }
237 
238  return res;
239 }
240 
242  struct ast_test *test,
244  int task_pushed,
245  int was_empty,
246  int num_tasks,
247  int num_active,
248  int num_idle,
249  int empty_notice)
250 {
253 
254  if (tld->task_pushed != task_pushed) {
255  ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
256  task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
257  res = AST_TEST_FAIL;
258  }
259  if (tld->was_empty != was_empty) {
260  ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
261  was_empty ? "" : "not ", tld->was_empty ? "" : " not");
262  res = AST_TEST_FAIL;
263  }
264  if (tld->num_tasks!= num_tasks) {
265  ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
266  num_tasks, tld->num_tasks);
267  res = AST_TEST_FAIL;
268  }
269  if (tld->num_active != num_active) {
270  ast_test_status_update(test, "Expected %d active threads, but got %d\n",
271  num_active, tld->num_active);
272  res = AST_TEST_FAIL;
273  }
274  if (tld->num_idle != num_idle) {
275  ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
276  num_idle, tld->num_idle);
277  res = AST_TEST_FAIL;
278  }
279  if (tld->empty_notice != empty_notice) {
280  ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
281  was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
282  res = AST_TEST_FAIL;
283  }
284 
285  return res;
286 }
287 
288 AST_TEST_DEFINE(threadpool_push)
289 {
290  struct ast_threadpool *pool = NULL;
292  struct simple_task_data *std = NULL;
293  struct test_listener_data *tld = NULL;
295  struct ast_threadpool_options options = {
297  .idle_timeout = 0,
298  .auto_increment = 0,
299  .initial_size = 0,
300  .max_size = 0,
301  };
302 
303  switch (cmd) {
304  case TEST_INIT:
305  info->name = "push";
306  info->category = "/main/threadpool/";
307  info->summary = "Test task";
308  info->description =
309  "Basic threadpool test";
310  return AST_TEST_NOT_RUN;
311  case TEST_EXECUTE:
312  break;
313  }
314  tld = test_alloc();
315  if (!tld) {
316  return AST_TEST_FAIL;
317  }
318 
319  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
320  if (!listener) {
321  goto end;
322  }
323 
324  pool = ast_threadpool_create(info->name, listener, &options);
325  if (!pool) {
326  goto end;
327  }
328 
329  std = simple_task_data_alloc();
330  if (!std) {
331  goto end;
332  }
333 
334  if (ast_threadpool_push(pool, simple_task, std)) {
335  goto end;
336  }
337 
338  wait_for_task_pushed(listener);
339 
340  res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
341 
342 end:
344  ao2_cleanup(listener);
346  ast_free(tld);
347  return res;
348 }
349 
350 AST_TEST_DEFINE(threadpool_initial_threads)
351 {
352  struct ast_threadpool *pool = NULL;
355  struct test_listener_data *tld = NULL;
356  struct ast_threadpool_options options = {
358  .idle_timeout = 0,
359  .auto_increment = 0,
360  .initial_size = 3,
361  .max_size = 0,
362  };
363 
364  switch (cmd) {
365  case TEST_INIT:
366  info->name = "initial_threads";
367  info->category = "/main/threadpool/";
368  info->summary = "Test threadpool initialization state";
369  info->description =
370  "Ensure that a threadpool created with a specific size contains the\n"
371  "proper number of idle threads.";
372  return AST_TEST_NOT_RUN;
373  case TEST_EXECUTE:
374  break;
375  }
376 
377  tld = test_alloc();
378  if (!tld) {
379  return AST_TEST_FAIL;
380  }
381 
382  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
383  if (!listener) {
384  goto end;
385  }
386 
387  pool = ast_threadpool_create(info->name, listener, &options);
388  if (!pool) {
389  goto end;
390  }
391 
392  res = wait_until_thread_state(test, tld, 0, 3);
393 
394 end:
396  ao2_cleanup(listener);
397  ast_free(tld);
398  return res;
399 }
400 
401 
402 AST_TEST_DEFINE(threadpool_thread_creation)
403 {
404  struct ast_threadpool *pool = NULL;
407  struct test_listener_data *tld = NULL;
408  struct ast_threadpool_options options = {
410  .idle_timeout = 0,
411  .auto_increment = 0,
412  .initial_size = 0,
413  .max_size = 0,
414  };
415 
416  switch (cmd) {
417  case TEST_INIT:
418  info->name = "thread_creation";
419  info->category = "/main/threadpool/";
420  info->summary = "Test threadpool thread creation";
421  info->description =
422  "Ensure that threads can be added to a threadpool";
423  return AST_TEST_NOT_RUN;
424  case TEST_EXECUTE:
425  break;
426  }
427 
428  tld = test_alloc();
429  if (!tld) {
430  return AST_TEST_FAIL;
431  }
432 
433  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
434  if (!listener) {
435  goto end;
436  }
437 
438  pool = ast_threadpool_create(info->name, listener, &options);
439  if (!pool) {
440  goto end;
441  }
442 
443  /* Now let's create a thread. It should start active, then go
444  * idle immediately
445  */
446  ast_threadpool_set_size(pool, 1);
447 
448  res = wait_until_thread_state(test, tld, 0, 1);
449 
450 end:
452  ao2_cleanup(listener);
453  ast_free(tld);
454  return res;
455 }
456 
457 AST_TEST_DEFINE(threadpool_thread_destruction)
458 {
459  struct ast_threadpool *pool = NULL;
462  struct test_listener_data *tld = NULL;
463  struct ast_threadpool_options options = {
465  .idle_timeout = 0,
466  .auto_increment = 0,
467  .initial_size = 0,
468  .max_size = 0,
469  };
470 
471  switch (cmd) {
472  case TEST_INIT:
473  info->name = "thread_destruction";
474  info->category = "/main/threadpool/";
475  info->summary = "Test threadpool thread destruction";
476  info->description =
477  "Ensure that threads are properly destroyed in a threadpool";
478  return AST_TEST_NOT_RUN;
479  case TEST_EXECUTE:
480  break;
481  }
482 
483  tld = test_alloc();
484  if (!tld) {
485  return AST_TEST_FAIL;
486  }
487 
488  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
489  if (!listener) {
490  goto end;
491  }
492 
493  pool = ast_threadpool_create(info->name, listener, &options);
494  if (!pool) {
495  goto end;
496  }
497 
498  ast_threadpool_set_size(pool, 3);
499 
500  res = wait_until_thread_state(test, tld, 0, 3);
501  if (res == AST_TEST_FAIL) {
502  goto end;
503  }
504 
505  res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
506  if (res == AST_TEST_FAIL) {
507  goto end;
508  }
509 
510  ast_threadpool_set_size(pool, 2);
511 
512  res = wait_until_thread_state(test, tld, 0, 2);
513 
514 end:
516  ao2_cleanup(listener);
517  ast_free(tld);
518  return res;
519 }
520 
521 AST_TEST_DEFINE(threadpool_thread_timeout)
522 {
523  struct ast_threadpool *pool = NULL;
526  struct test_listener_data *tld = NULL;
527  struct ast_threadpool_options options = {
529  .idle_timeout = 2,
530  .auto_increment = 0,
531  .initial_size = 0,
532  .max_size = 0,
533  };
534 
535  switch (cmd) {
536  case TEST_INIT:
537  info->name = "thread_timeout";
538  info->category = "/main/threadpool/";
539  info->summary = "Test threadpool thread timeout";
540  info->description =
541  "Ensure that a thread with a two second timeout dies as expected.";
542  return AST_TEST_NOT_RUN;
543  case TEST_EXECUTE:
544  break;
545  }
546 
547  tld = test_alloc();
548  if (!tld) {
549  return AST_TEST_FAIL;
550  }
551 
552  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
553  if (!listener) {
554  goto end;
555  }
556 
557  pool = ast_threadpool_create(info->name, listener, &options);
558  if (!pool) {
559  goto end;
560  }
561 
562  ast_threadpool_set_size(pool, 1);
563 
564  res = wait_until_thread_state(test, tld, 0, 1);
565  if (res == AST_TEST_FAIL) {
566  goto end;
567  }
568 
569  res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
570  if (res == AST_TEST_FAIL) {
571  goto end;
572  }
573 
574  res = wait_until_thread_state(test, tld, 0, 0);
575  if (res == AST_TEST_FAIL) {
576  goto end;
577  }
578 
579  res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
580 
581 end:
583  ao2_cleanup(listener);
584  ast_free(tld);
585  return res;
586 }
587 
588 AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
589 {
590  struct ast_threadpool *pool = NULL;
593  struct test_listener_data *tld = NULL;
594  struct ast_threadpool_options options = {
596  .idle_timeout = 1,
597  .auto_increment = 1,
598  .initial_size = 0,
599  .max_size = 1,
600  };
601  int iteration;
602 
603  switch (cmd) {
604  case TEST_INIT:
605  info->name = "thread_timeout_thrash";
606  info->category = "/main/threadpool/";
607  info->summary = "Thrash threadpool thread timeout";
608  info->description =
609  "Repeatedly queue a task when a threadpool thread should timeout.";
610  return AST_TEST_NOT_RUN;
611  case TEST_EXECUTE:
612  break;
613  }
614 
615  tld = test_alloc();
616  if (!tld) {
617  return AST_TEST_FAIL;
618  }
619 
620  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
621  if (!listener) {
622  goto end;
623  }
624 
625  pool = ast_threadpool_create(info->name, listener, &options);
626  if (!pool) {
627  goto end;
628  }
629 
630  ast_threadpool_set_size(pool, 1);
631 
632  for (iteration = 0; iteration < 30; ++iteration) {
633  struct simple_task_data *std = NULL;
634  struct timeval start = ast_tvnow();
635  struct timespec end = {
636  .tv_sec = start.tv_sec + options.idle_timeout,
637  .tv_nsec = start.tv_usec * 1000
638  };
639 
640  std = simple_task_data_alloc();
641  if (!std) {
642  goto end;
643  }
644 
645  /* Wait until the threadpool thread should timeout due to being idle */
646  ast_mutex_lock(&tld->lock);
647  while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
648  /* This purposely left empty as we want to loop waiting for a time out */
649  }
650  ast_mutex_unlock(&tld->lock);
651 
652  if (ast_threadpool_push(pool, simple_task, std)) {
653  res = AST_TEST_FAIL;
654  } else {
655  res = wait_for_completion(test, std);
656  }
657 
659 
660  if (res == AST_TEST_FAIL) {
661  goto end;
662  }
663  }
664 
665  res = wait_until_thread_state(test, tld, 0, 0);
666  if (res == AST_TEST_FAIL) {
667  goto end;
668  }
669 
670  res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
671 
672 end:
674  ao2_cleanup(listener);
675  ast_free(tld);
676  return res;
677 }
678 
679 AST_TEST_DEFINE(threadpool_one_task_one_thread)
680 {
681  struct ast_threadpool *pool = NULL;
683  struct simple_task_data *std = NULL;
685  struct test_listener_data *tld = NULL;
686  struct ast_threadpool_options options = {
688  .idle_timeout = 0,
689  .auto_increment = 0,
690  .initial_size = 0,
691  .max_size = 0,
692  };
693 
694  switch (cmd) {
695  case TEST_INIT:
696  info->name = "one_task_one_thread";
697  info->category = "/main/threadpool/";
698  info->summary = "Test a single task with a single thread";
699  info->description =
700  "Push a task into an empty threadpool, then add a thread to the pool.";
701  return AST_TEST_NOT_RUN;
702  case TEST_EXECUTE:
703  break;
704  }
705 
706  tld = test_alloc();
707  if (!tld) {
708  return AST_TEST_FAIL;
709  }
710 
711  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
712  if (!listener) {
713  goto end;
714  }
715 
716  pool = ast_threadpool_create(info->name, listener, &options);
717  if (!pool) {
718  goto end;
719  }
720 
721  std = simple_task_data_alloc();
722  if (!std) {
723  goto end;
724  }
725 
726  if (ast_threadpool_push(pool, simple_task, std)) {
727  goto end;
728  }
729 
730  ast_threadpool_set_size(pool, 1);
731 
732  /* Threads added to the pool are active when they start,
733  * so the newly-created thread should immediately execute
734  * the waiting task.
735  */
736  res = wait_for_completion(test, std);
737  if (res == AST_TEST_FAIL) {
738  goto end;
739  }
740 
741  res = wait_for_empty_notice(test, tld);
742  if (res == AST_TEST_FAIL) {
743  goto end;
744  }
745 
746  /* After completing the task, the thread should go idle */
747  res = wait_until_thread_state(test, tld, 0, 1);
748  if (res == AST_TEST_FAIL) {
749  goto end;
750  }
751 
752  res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
753 
754 end:
756  ao2_cleanup(listener);
758  ast_free(tld);
759  return res;
760 
761 }
762 
763 AST_TEST_DEFINE(threadpool_one_thread_one_task)
764 {
765  struct ast_threadpool *pool = NULL;
767  struct simple_task_data *std = NULL;
769  struct test_listener_data *tld = NULL;
770  struct ast_threadpool_options options = {
772  .idle_timeout = 0,
773  .auto_increment = 0,
774  .initial_size = 0,
775  .max_size = 0,
776  };
777 
778  switch (cmd) {
779  case TEST_INIT:
780  info->name = "one_thread_one_task";
781  info->category = "/main/threadpool/";
782  info->summary = "Test a single thread with a single task";
783  info->description =
784  "Add a thread to the pool and then push a task to it.";
785  return AST_TEST_NOT_RUN;
786  case TEST_EXECUTE:
787  break;
788  }
789 
790  tld = test_alloc();
791  if (!tld) {
792  return AST_TEST_FAIL;
793  }
794 
795  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
796  if (!listener) {
797  goto end;
798  }
799 
800  pool = ast_threadpool_create(info->name, listener, &options);
801  if (!pool) {
802  goto end;
803  }
804 
805  std = simple_task_data_alloc();
806  if (!std) {
807  goto end;
808  }
809 
810  ast_threadpool_set_size(pool, 1);
811 
812  res = wait_until_thread_state(test, tld, 0, 1);
813  if (res == AST_TEST_FAIL) {
814  goto end;
815  }
816 
817  if (ast_threadpool_push(pool, simple_task, std)) {
818  res = AST_TEST_FAIL;
819  goto end;
820  }
821 
822  res = wait_for_completion(test, std);
823  if (res == AST_TEST_FAIL) {
824  goto end;
825  }
826 
827  res = wait_for_empty_notice(test, tld);
828  if (res == AST_TEST_FAIL) {
829  goto end;
830  }
831 
832  /* After completing the task, the thread should go idle */
833  res = wait_until_thread_state(test, tld, 0, 1);
834  if (res == AST_TEST_FAIL) {
835  goto end;
836  }
837 
838  res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
839 
840 end:
842  ao2_cleanup(listener);
844  ast_free(tld);
845  return res;
846 }
847 
848 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
849 {
850  struct ast_threadpool *pool = NULL;
852  struct simple_task_data *std1 = NULL;
853  struct simple_task_data *std2 = NULL;
854  struct simple_task_data *std3 = NULL;
856  struct test_listener_data *tld = NULL;
857  struct ast_threadpool_options options = {
859  .idle_timeout = 0,
860  .auto_increment = 0,
861  .initial_size = 0,
862  .max_size = 0,
863  };
864 
865  switch (cmd) {
866  case TEST_INIT:
867  info->name = "one_thread_multiple_tasks";
868  info->category = "/main/threadpool/";
869  info->summary = "Test a single thread with multiple tasks";
870  info->description =
871  "Add a thread to the pool and then push three tasks to it.";
872  return AST_TEST_NOT_RUN;
873  case TEST_EXECUTE:
874  break;
875  }
876 
877  tld = test_alloc();
878  if (!tld) {
879  return AST_TEST_FAIL;
880  }
881 
882  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
883  if (!listener) {
884  goto end;
885  }
886 
887  pool = ast_threadpool_create(info->name, listener, &options);
888  if (!pool) {
889  goto end;
890  }
891 
892  std1 = simple_task_data_alloc();
893  std2 = simple_task_data_alloc();
894  std3 = simple_task_data_alloc();
895  if (!std1 || !std2 || !std3) {
896  goto end;
897  }
898 
899  ast_threadpool_set_size(pool, 1);
900 
901  res = wait_until_thread_state(test, tld, 0, 1);
902  if (res == AST_TEST_FAIL) {
903  goto end;
904  }
905 
906  res = AST_TEST_FAIL;
907  if (ast_threadpool_push(pool, simple_task, std1)) {
908  goto end;
909  }
910 
911  if (ast_threadpool_push(pool, simple_task, std2)) {
912  goto end;
913  }
914 
915  if (ast_threadpool_push(pool, simple_task, std3)) {
916  goto end;
917  }
918 
919  res = wait_for_completion(test, std1);
920  if (res == AST_TEST_FAIL) {
921  goto end;
922  }
923  res = wait_for_completion(test, std2);
924  if (res == AST_TEST_FAIL) {
925  goto end;
926  }
927  res = wait_for_completion(test, std3);
928  if (res == AST_TEST_FAIL) {
929  goto end;
930  }
931 
932  res = wait_for_empty_notice(test, tld);
933  if (res == AST_TEST_FAIL) {
934  goto end;
935  }
936 
937  res = wait_until_thread_state(test, tld, 0, 1);
938  if (res == AST_TEST_FAIL) {
939  goto end;
940  }
941 
942  res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
943 
944 end:
946  ao2_cleanup(listener);
947  simple_task_data_free(std1);
948  simple_task_data_free(std2);
949  simple_task_data_free(std3);
950  ast_free(tld);
951  return res;
952 }
953 
955  struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
956 {
958  struct timeval start;
959  struct timespec end;
960 
961  res = wait_until_thread_state(test, tld, num_active, num_idle);
962  if (res == AST_TEST_FAIL) {
963  return res;
964  }
965 
966  start = ast_tvnow();
967  end.tv_sec = start.tv_sec + 5;
968  end.tv_nsec = start.tv_usec * 1000;
969 
970  ast_mutex_lock(&tld->lock);
971 
972  while (tld->num_tasks != num_tasks) {
973  if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
974  break;
975  }
976  }
977 
978  if (tld->num_tasks != num_tasks) {
979  ast_test_status_update(test, "Number of tasks pushed %d does not match expected %d\n",
980  tld->num_tasks, num_tasks);
981  res = AST_TEST_FAIL;
982  }
983 
984  ast_mutex_unlock(&tld->lock);
985 
986  return res;
987 }
988 
989 AST_TEST_DEFINE(threadpool_auto_increment)
990 {
991  struct ast_threadpool *pool = NULL;
993  struct simple_task_data *std1 = NULL;
994  struct simple_task_data *std2 = NULL;
995  struct simple_task_data *std3 = NULL;
996  struct simple_task_data *std4 = NULL;
998  struct test_listener_data *tld = NULL;
999  struct ast_threadpool_options options = {
1001  .idle_timeout = 0,
1002  .auto_increment = 3,
1003  .initial_size = 0,
1004  .max_size = 0,
1005  };
1006 
1007  switch (cmd) {
1008  case TEST_INIT:
1009  info->name = "auto_increment";
1010  info->category = "/main/threadpool/";
1011  info->summary = "Test that the threadpool grows as tasks are added";
1012  info->description =
1013  "Create an empty threadpool and push a task to it. Once the task is\n"
1014  "pushed, the threadpool should add three threads and be able to\n"
1015  "handle the task. The threads should then go idle";
1016  return AST_TEST_NOT_RUN;
1017  case TEST_EXECUTE:
1018  break;
1019  }
1020 
1021  tld = test_alloc();
1022  if (!tld) {
1023  return AST_TEST_FAIL;
1024  }
1025 
1026  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1027  if (!listener) {
1028  goto end;
1029  }
1030 
1031  pool = ast_threadpool_create(info->name, listener, &options);
1032  if (!pool) {
1033  goto end;
1034  }
1035 
1036  std1 = simple_task_data_alloc();
1037  std2 = simple_task_data_alloc();
1038  std3 = simple_task_data_alloc();
1039  std4 = simple_task_data_alloc();
1040  if (!std1 || !std2 || !std3 || !std4) {
1041  goto end;
1042  }
1043 
1044  if (ast_threadpool_push(pool, simple_task, std1)) {
1045  goto end;
1046  }
1047 
1048  /* Pushing the task should result in the threadpool growing
1049  * by three threads. This will allow the task to actually execute
1050  */
1051  res = wait_for_completion(test, std1);
1052  if (res == AST_TEST_FAIL) {
1053  goto end;
1054  }
1055 
1056  res = wait_for_empty_notice(test, tld);
1057  if (res == AST_TEST_FAIL) {
1058  goto end;
1059  }
1060 
1061  res = wait_until_thread_state(test, tld, 0, 3);
1062  if (res == AST_TEST_FAIL) {
1063  goto end;
1064  }
1065 
1066  /* Now push three tasks into the pool and ensure the pool does not
1067  * grow.
1068  */
1069  res = AST_TEST_FAIL;
1070 
1071  if (ast_threadpool_push(pool, simple_task, std2)) {
1072  goto end;
1073  }
1074 
1075  if (ast_threadpool_push(pool, simple_task, std3)) {
1076  goto end;
1077  }
1078 
1079  if (ast_threadpool_push(pool, simple_task, std4)) {
1080  goto end;
1081  }
1082 
1083  res = wait_for_completion(test, std2);
1084  if (res == AST_TEST_FAIL) {
1085  goto end;
1086  }
1087  res = wait_for_completion(test, std3);
1088  if (res == AST_TEST_FAIL) {
1089  goto end;
1090  }
1091  res = wait_for_completion(test, std4);
1092  if (res == AST_TEST_FAIL) {
1093  goto end;
1094  }
1095 
1096  res = wait_for_empty_notice(test, tld);
1097  if (res == AST_TEST_FAIL) {
1098  goto end;
1099  }
1100 
1101  res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
1102  if (res == AST_TEST_FAIL) {
1103  goto end;
1104  }
1105 
1106 end:
1108  ao2_cleanup(listener);
1109  simple_task_data_free(std1);
1110  simple_task_data_free(std2);
1111  simple_task_data_free(std3);
1112  simple_task_data_free(std4);
1113  ast_free(tld);
1114  return res;
1115 }
1116 
1117 AST_TEST_DEFINE(threadpool_max_size)
1118 {
1119  struct ast_threadpool *pool = NULL;
1121  struct simple_task_data *std = NULL;
1123  struct test_listener_data *tld = NULL;
1124  struct ast_threadpool_options options = {
1126  .idle_timeout = 0,
1127  .auto_increment = 3,
1128  .initial_size = 0,
1129  .max_size = 2,
1130  };
1131 
1132  switch (cmd) {
1133  case TEST_INIT:
1134  info->name = "max_size";
1135  info->category = "/main/threadpool/";
1136  info->summary = "Test that the threadpool does not exceed its maximum size restriction";
1137  info->description =
1138  "Create an empty threadpool and push a task to it. Once the task is\n"
1139  "pushed, the threadpool should attempt to grow by three threads, but the\n"
1140  "pool's restrictions should only allow two threads to be added.";
1141  return AST_TEST_NOT_RUN;
1142  case TEST_EXECUTE:
1143  break;
1144  }
1145 
1146  tld = test_alloc();
1147  if (!tld) {
1148  return AST_TEST_FAIL;
1149  }
1150 
1151  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1152  if (!listener) {
1153  goto end;
1154  }
1155 
1156  pool = ast_threadpool_create(info->name, listener, &options);
1157  if (!pool) {
1158  goto end;
1159  }
1160 
1161  std = simple_task_data_alloc();
1162  if (!std) {
1163  goto end;
1164  }
1165 
1166  if (ast_threadpool_push(pool, simple_task, std)) {
1167  goto end;
1168  }
1169 
1170  res = wait_for_completion(test, std);
1171  if (res == AST_TEST_FAIL) {
1172  goto end;
1173  }
1174 
1175  res = wait_until_thread_state(test, tld, 0, 2);
1176  if (res == AST_TEST_FAIL) {
1177  goto end;
1178  }
1179 
1180  res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1181 end:
1183  ao2_cleanup(listener);
1184  simple_task_data_free(std);
1185  ast_free(tld);
1186  return res;
1187 }
1188 
1189 AST_TEST_DEFINE(threadpool_reactivation)
1190 {
1191  struct ast_threadpool *pool = NULL;
1193  struct simple_task_data *std1 = NULL;
1194  struct simple_task_data *std2 = NULL;
1196  struct test_listener_data *tld = NULL;
1197  struct ast_threadpool_options options = {
1199  .idle_timeout = 0,
1200  .auto_increment = 0,
1201  .initial_size = 0,
1202  .max_size = 0,
1203  };
1204 
1205  switch (cmd) {
1206  case TEST_INIT:
1207  info->name = "reactivation";
1208  info->category = "/main/threadpool/";
1209  info->summary = "Test that a threadpool reactivates when work is added";
1210  info->description =
1211  "Push a task into a threadpool. Make sure the task executes and the\n"
1212  "thread goes idle. Then push a second task and ensure that the thread\n"
1213  "awakens and executes the second task.";
1214  return AST_TEST_NOT_RUN;
1215  case TEST_EXECUTE:
1216  break;
1217  }
1218 
1219  tld = test_alloc();
1220  if (!tld) {
1221  return AST_TEST_FAIL;
1222  }
1223 
1224  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1225  if (!listener) {
1226  goto end;
1227  }
1228 
1229  pool = ast_threadpool_create(info->name, listener, &options);
1230  if (!pool) {
1231  goto end;
1232  }
1233 
1234  std1 = simple_task_data_alloc();
1235  std2 = simple_task_data_alloc();
1236  if (!std1 || !std2) {
1237  goto end;
1238  }
1239 
1240  if (ast_threadpool_push(pool, simple_task, std1)) {
1241  goto end;
1242  }
1243 
1244  ast_threadpool_set_size(pool, 1);
1245 
1246  res = wait_for_completion(test, std1);
1247  if (res == AST_TEST_FAIL) {
1248  goto end;
1249  }
1250 
1251  res = wait_for_empty_notice(test, tld);
1252  if (res == AST_TEST_FAIL) {
1253  goto end;
1254  }
1255 
1256  res = wait_until_thread_state(test, tld, 0, 1);
1257  if (res == AST_TEST_FAIL) {
1258  goto end;
1259  }
1260 
1261  res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1262  if (res == AST_TEST_FAIL) {
1263  goto end;
1264  }
1265 
1266  /* Now make sure the threadpool reactivates when we add a second task */
1267  if (ast_threadpool_push(pool, simple_task, std2)) {
1268  res = AST_TEST_FAIL;
1269  goto end;
1270  }
1271 
1272  res = wait_for_completion(test, std2);
1273  if (res == AST_TEST_FAIL) {
1274  goto end;
1275  }
1276 
1277  res = wait_for_empty_notice(test, tld);
1278  if (res == AST_TEST_FAIL) {
1279  goto end;
1280  }
1281 
1282  res = wait_until_thread_state(test, tld, 0, 1);
1283  if (res == AST_TEST_FAIL) {
1284  goto end;
1285  }
1286 
1287  res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1288 
1289 end:
1291  ao2_cleanup(listener);
1292  simple_task_data_free(std1);
1293  simple_task_data_free(std2);
1294  ast_free(tld);
1295  return res;
1296 
1297 }
1298 
1306 };
1307 
1309 {
1310  struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1311 
1312  if (!ctd) {
1313  return NULL;
1314  }
1315  ast_mutex_init(&ctd->lock);
1316  ast_cond_init(&ctd->stall_cond, NULL);
1317  ast_cond_init(&ctd->notify_cond, NULL);
1318  return ctd;
1319 }
1320 
1322 {
1323  if (!ctd) {
1324  return;
1325  }
1326 
1327  ast_mutex_destroy(&ctd->lock);
1330 
1331  ast_free(ctd);
1332 }
1333 
1334 static int complex_task(void *data)
1335 {
1336  struct complex_task_data *ctd = data;
1337  SCOPED_MUTEX(lock, &ctd->lock);
1338  /* Notify that we started */
1339  ctd->task_started = 1;
1341  while (!ctd->continue_task) {
1342  ast_cond_wait(&ctd->stall_cond, lock);
1343  }
1344  /* We got poked. Finish up */
1345  ctd->task_executed = 1;
1347  return 0;
1348 }
1349 
1350 static void poke_worker(struct complex_task_data *ctd)
1351 {
1352  SCOPED_MUTEX(lock, &ctd->lock);
1353  ctd->continue_task = 1;
1354  ast_cond_signal(&ctd->stall_cond);
1355 }
1356 
1358 {
1359  struct timeval start = ast_tvnow();
1360  struct timespec end = {
1361  .tv_sec = start.tv_sec + 5,
1362  .tv_nsec = start.tv_usec * 1000
1363  };
1364  SCOPED_MUTEX(lock, &ctd->lock);
1365 
1366  while (!ctd->task_started) {
1367  if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1368  break;
1369  }
1370  }
1371 
1372  return ctd->task_started;
1373 }
1374 
1376 {
1377  struct timeval start = ast_tvnow();
1378  struct timespec end = {
1379  .tv_sec = start.tv_sec + 1,
1380  .tv_nsec = start.tv_usec * 1000
1381  };
1382  SCOPED_MUTEX(lock, &ctd->lock);
1383 
1384  while (!ctd->task_started) {
1385  if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1386  break;
1387  }
1388  }
1389 
1390  return ctd->task_started;
1391 }
1392 
1394 {
1395  struct timeval start = ast_tvnow();
1396  struct timespec end = {
1397  .tv_sec = start.tv_sec + 5,
1398  .tv_nsec = start.tv_usec * 1000
1399  };
1401  SCOPED_MUTEX(lock, &ctd->lock);
1402 
1403  while (!ctd->task_executed) {
1404  if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1405  break;
1406  }
1407  }
1408 
1409  if (!ctd->task_executed) {
1410  res = AST_TEST_FAIL;
1411  }
1412  return res;
1413 }
1414 
1415 AST_TEST_DEFINE(threadpool_task_distribution)
1416 {
1417  struct ast_threadpool *pool = NULL;
1419  struct complex_task_data *ctd1 = NULL;
1420  struct complex_task_data *ctd2 = NULL;
1422  struct test_listener_data *tld = NULL;
1423  struct ast_threadpool_options options = {
1425  .idle_timeout = 0,
1426  .auto_increment = 0,
1427  .initial_size = 0,
1428  .max_size = 0,
1429  };
1430 
1431  switch (cmd) {
1432  case TEST_INIT:
1433  info->name = "task_distribution";
1434  info->category = "/main/threadpool/";
1435  info->summary = "Test that tasks are evenly distributed to threads";
1436  info->description =
1437  "Push two tasks into a threadpool. Ensure that each is handled by\n"
1438  "a separate thread";
1439  return AST_TEST_NOT_RUN;
1440  case TEST_EXECUTE:
1441  break;
1442  }
1443 
1444  tld = test_alloc();
1445  if (!tld) {
1446  return AST_TEST_FAIL;
1447  }
1448 
1449  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1450  if (!listener) {
1451  goto end;
1452  }
1453 
1454  pool = ast_threadpool_create(info->name, listener, &options);
1455  if (!pool) {
1456  goto end;
1457  }
1458 
1459  ctd1 = complex_task_data_alloc();
1460  ctd2 = complex_task_data_alloc();
1461  if (!ctd1 || !ctd2) {
1462  goto end;
1463  }
1464 
1465  if (ast_threadpool_push(pool, complex_task, ctd1)) {
1466  goto end;
1467  }
1468 
1469  if (ast_threadpool_push(pool, complex_task, ctd2)) {
1470  goto end;
1471  }
1472 
1473  ast_threadpool_set_size(pool, 2);
1474 
1475  res = wait_until_thread_state(test, tld, 2, 0);
1476  if (res == AST_TEST_FAIL) {
1477  goto end;
1478  }
1479 
1480  res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1481  if (res == AST_TEST_FAIL) {
1482  goto end;
1483  }
1484 
1485  /* The tasks are stalled until we poke them */
1486  poke_worker(ctd1);
1487  poke_worker(ctd2);
1488 
1489  res = wait_for_complex_completion(ctd1);
1490  if (res == AST_TEST_FAIL) {
1491  goto end;
1492  }
1493  res = wait_for_complex_completion(ctd2);
1494  if (res == AST_TEST_FAIL) {
1495  goto end;
1496  }
1497 
1498  res = wait_until_thread_state(test, tld, 0, 2);
1499  if (res == AST_TEST_FAIL) {
1500  goto end;
1501  }
1502 
1503  res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1504 
1505 end:
1507  ao2_cleanup(listener);
1508  complex_task_data_free(ctd1);
1509  complex_task_data_free(ctd2);
1510  ast_free(tld);
1511  return res;
1512 }
1513 
1514 AST_TEST_DEFINE(threadpool_more_destruction)
1515 {
1516  struct ast_threadpool *pool = NULL;
1518  struct complex_task_data *ctd1 = NULL;
1519  struct complex_task_data *ctd2 = NULL;
1521  struct test_listener_data *tld = NULL;
1522  struct ast_threadpool_options options = {
1524  .idle_timeout = 0,
1525  .auto_increment = 0,
1526  .initial_size = 0,
1527  .max_size = 0,
1528  };
1529 
1530  switch (cmd) {
1531  case TEST_INIT:
1532  info->name = "more_destruction";
1533  info->category = "/main/threadpool/";
1534  info->summary = "Test that threads are destroyed as expected";
1535  info->description =
1536  "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1537  "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1538  "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1539  "and ensure that both tasks complete.";
1540  return AST_TEST_NOT_RUN;
1541  case TEST_EXECUTE:
1542  break;
1543  }
1544 
1545  tld = test_alloc();
1546  if (!tld) {
1547  return AST_TEST_FAIL;
1548  }
1549 
1550  listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1551  if (!listener) {
1552  goto end;
1553  }
1554 
1555  pool = ast_threadpool_create(info->name, listener, &options);
1556  if (!pool) {
1557  goto end;
1558  }
1559 
1560  ctd1 = complex_task_data_alloc();
1561  ctd2 = complex_task_data_alloc();
1562  if (!ctd1 || !ctd2) {
1563  goto end;
1564  }
1565 
1566  if (ast_threadpool_push(pool, complex_task, ctd1)) {
1567  goto end;
1568  }
1569 
1570  if (ast_threadpool_push(pool, complex_task, ctd2)) {
1571  goto end;
1572  }
1573 
1574  ast_threadpool_set_size(pool, 4);
1575 
1576  res = wait_until_thread_state(test, tld, 2, 2);
1577  if (res == AST_TEST_FAIL) {
1578  goto end;
1579  }
1580 
1581  res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1582  if (res == AST_TEST_FAIL) {
1583  goto end;
1584  }
1585 
1586  ast_threadpool_set_size(pool, 1);
1587 
1588  /* Shrinking the threadpool should kill off the two idle threads
1589  * and one of the active threads.
1590  */
1591  res = wait_until_thread_state(test, tld, 1, 0);
1592  if (res == AST_TEST_FAIL) {
1593  goto end;
1594  }
1595 
1596  res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1597  if (res == AST_TEST_FAIL) {
1598  goto end;
1599  }
1600 
1601  /* The tasks are stalled until we poke them */
1602  poke_worker(ctd1);
1603  poke_worker(ctd2);
1604 
1605  res = wait_for_complex_completion(ctd1);
1606  if (res == AST_TEST_FAIL) {
1607  goto end;
1608  }
1609  res = wait_for_complex_completion(ctd2);
1610  if (res == AST_TEST_FAIL) {
1611  goto end;
1612  }
1613 
1614  res = wait_until_thread_state(test, tld, 0, 1);
1615  if (res == AST_TEST_FAIL) {
1616  goto end;
1617  }
1618 
1619  res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1620 
1621 end:
1623  ao2_cleanup(listener);
1624  complex_task_data_free(ctd1);
1625  complex_task_data_free(ctd2);
1626  ast_free(tld);
1627  return res;
1628 }
1629 
1630 AST_TEST_DEFINE(threadpool_serializer)
1631 {
1632  int started = 0;
1633  int finished = 0;
1635  struct ast_threadpool *pool = NULL;
1636  struct ast_taskprocessor *uut = NULL;
1637  struct complex_task_data *data1 = NULL;
1638  struct complex_task_data *data2 = NULL;
1639  struct complex_task_data *data3 = NULL;
1640  struct ast_threadpool_options options = {
1642  .idle_timeout = 0,
1643  .auto_increment = 0,
1644  .initial_size = 2,
1645  .max_size = 0,
1646  };
1647 
1648  switch (cmd) {
1649  case TEST_INIT:
1650  info->name = "threadpool_serializer";
1651  info->category = "/main/threadpool/";
1652  info->summary = "Test that serializers";
1653  info->description =
1654  "Ensures that tasks enqueued to a serialize execute in sequence.";
1655  return AST_TEST_NOT_RUN;
1656  case TEST_EXECUTE:
1657  break;
1658  }
1659 
1660  pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1661  if (!pool) {
1662  ast_test_status_update(test, "Could not create threadpool\n");
1663  goto end;
1664  }
1665  uut = ast_threadpool_serializer("ser1", pool);
1666  data1 = complex_task_data_alloc();
1667  data2 = complex_task_data_alloc();
1668  data3 = complex_task_data_alloc();
1669  if (!uut || !data1 || !data2 || !data3) {
1670  ast_test_status_update(test, "Allocation failed\n");
1671  goto end;
1672  }
1673 
1674  /* This should start right away */
1675  if (ast_taskprocessor_push(uut, complex_task, data1)) {
1676  ast_test_status_update(test, "Failed to enqueue data1\n");
1677  goto end;
1678  }
1679  started = wait_for_complex_start(data1);
1680  if (!started) {
1681  ast_test_status_update(test, "Failed to start data1\n");
1682  goto end;
1683  }
1684 
1685  /* This should not start until data 1 is complete */
1686  if (ast_taskprocessor_push(uut, complex_task, data2)) {
1687  ast_test_status_update(test, "Failed to enqueue data2\n");
1688  goto end;
1689  }
1690  started = has_complex_started(data2);
1691  if (started) {
1692  ast_test_status_update(test, "data2 started out of order\n");
1693  goto end;
1694  }
1695 
1696  /* But the free thread in the pool can still run */
1697  if (ast_threadpool_push(pool, complex_task, data3)) {
1698  ast_test_status_update(test, "Failed to enqueue data3\n");
1699  }
1700  started = wait_for_complex_start(data3);
1701  if (!started) {
1702  ast_test_status_update(test, "Failed to start data3\n");
1703  goto end;
1704  }
1705 
1706  /* Finishing data1 should allow data2 to start */
1707  poke_worker(data1);
1708  finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1709  if (!finished) {
1710  ast_test_status_update(test, "data1 couldn't finish\n");
1711  goto end;
1712  }
1713  started = wait_for_complex_start(data2);
1714  if (!started) {
1715  ast_test_status_update(test, "Failed to start data2\n");
1716  goto end;
1717  }
1718 
1719  /* Finish up */
1720  poke_worker(data2);
1721  finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1722  if (!finished) {
1723  ast_test_status_update(test, "data2 couldn't finish\n");
1724  goto end;
1725  }
1726  poke_worker(data3);
1727  finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1728  if (!finished) {
1729  ast_test_status_update(test, "data3 couldn't finish\n");
1730  goto end;
1731  }
1732 
1733  res = AST_TEST_PASS;
1734 
1735 end:
1736  poke_worker(data1);
1737  poke_worker(data2);
1738  poke_worker(data3);
1741  complex_task_data_free(data1);
1742  complex_task_data_free(data2);
1743  complex_task_data_free(data3);
1744  return res;
1745 }
1746 
1747 AST_TEST_DEFINE(threadpool_serializer_dupe)
1748 {
1750  struct ast_threadpool *pool = NULL;
1751  struct ast_taskprocessor *uut = NULL;
1752  struct ast_taskprocessor *there_can_be_only_one = NULL;
1753  struct ast_threadpool_options options = {
1755  .idle_timeout = 0,
1756  .auto_increment = 0,
1757  .initial_size = 2,
1758  .max_size = 0,
1759  };
1760 
1761  switch (cmd) {
1762  case TEST_INIT:
1763  info->name = "threadpool_serializer_dupe";
1764  info->category = "/main/threadpool/";
1765  info->summary = "Test that serializers are uniquely named";
1766  info->description =
1767  "Creating two serializers with the same name should\n"
1768  "result in error.";
1769  return AST_TEST_NOT_RUN;
1770  case TEST_EXECUTE:
1771  break;
1772  }
1773 
1774  pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1775  if (!pool) {
1776  ast_test_status_update(test, "Could not create threadpool\n");
1777  goto end;
1778  }
1779 
1780  uut = ast_threadpool_serializer("highlander", pool);
1781  if (!uut) {
1782  ast_test_status_update(test, "Allocation failed\n");
1783  goto end;
1784  }
1785 
1786  there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1787  if (there_can_be_only_one) {
1788  ast_taskprocessor_unreference(there_can_be_only_one);
1789  ast_test_status_update(test, "Duplicate name error\n");
1790  goto end;
1791  }
1792 
1793  res = AST_TEST_PASS;
1794 
1795 end:
1798  return res;
1799 }
1800 
1801 static int unload_module(void)
1802 {
1803  ast_test_unregister(threadpool_push);
1804  ast_test_unregister(threadpool_initial_threads);
1805  ast_test_unregister(threadpool_thread_creation);
1806  ast_test_unregister(threadpool_thread_destruction);
1807  ast_test_unregister(threadpool_thread_timeout);
1808  ast_test_unregister(threadpool_thread_timeout_thrash);
1809  ast_test_unregister(threadpool_one_task_one_thread);
1810  ast_test_unregister(threadpool_one_thread_one_task);
1811  ast_test_unregister(threadpool_one_thread_multiple_tasks);
1812  ast_test_unregister(threadpool_auto_increment);
1813  ast_test_unregister(threadpool_max_size);
1814  ast_test_unregister(threadpool_reactivation);
1815  ast_test_unregister(threadpool_task_distribution);
1816  ast_test_unregister(threadpool_more_destruction);
1817  ast_test_unregister(threadpool_serializer);
1818  ast_test_unregister(threadpool_serializer_dupe);
1819  return 0;
1820 }
1821 
1822 static int load_module(void)
1823 {
1824  ast_test_register(threadpool_push);
1825  ast_test_register(threadpool_initial_threads);
1826  ast_test_register(threadpool_thread_creation);
1827  ast_test_register(threadpool_thread_destruction);
1828  ast_test_register(threadpool_thread_timeout);
1829  ast_test_register(threadpool_thread_timeout_thrash);
1830  ast_test_register(threadpool_one_task_one_thread);
1831  ast_test_register(threadpool_one_thread_one_task);
1832  ast_test_register(threadpool_one_thread_multiple_tasks);
1833  ast_test_register(threadpool_auto_increment);
1834  ast_test_register(threadpool_max_size);
1835  ast_test_register(threadpool_reactivation);
1836  ast_test_register(threadpool_task_distribution);
1837  ast_test_register(threadpool_more_destruction);
1838  ast_test_register(threadpool_serializer);
1839  ast_test_register(threadpool_serializer_dupe);
1840  return AST_MODULE_LOAD_SUCCESS;
1841 }
1842 
1843 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:874
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:567
static void complex_task_data_free(struct complex_task_data *ctd)
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
static int unload_module(void)
static enum ast_test_result_state listener_check(struct ast_test *test, struct ast_threadpool_listener *listener, int task_pushed, int was_empty, int num_tasks, int num_active, int num_idle, int empty_notice)
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
static void simple_task_data_free(struct simple_task_data *std)
static int simple_task(void *data)
Test Framework API.
static struct test_listener_data * test_alloc(void)
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
char * end
Definition: eagi_proxy.c:73
static int has_complex_started(struct complex_task_data *ctd)
#define ast_cond_signal(cond)
Definition: lock.h:201
pthread_cond_t ast_cond_t
Definition: lock.h:176
static int load_module(void)
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static void test_shutdown(struct ast_threadpool_listener *listener)
static void poke_worker(struct complex_task_data *ctd)
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener&#39;s user data.
Definition: threadpool.c:905
static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
static void test_state_changed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
static int wait_for_complex_start(struct complex_task_data *ctd)
static struct simple_task_data * simple_task_data_alloc(void)
AST_TEST_DEFINE(threadpool_push)
def info(msg)
static const struct ast_threadpool_listener_callbacks test_callbacks
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define LOG_NOTICE
Definition: logger.h:263
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition: threadpool.c:893
static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
#define ast_free(a)
Definition: astmm.h:182
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
static void test_emptied(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
Push a task to the threadpool.
Definition: threadpool.c:956
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
Support for logging to various files, console and syslog Configuration in file logger.conf.
listener for a threadpool
Definition: threadpool.c:110
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
#define ast_mutex_init(pmutex)
Definition: lock.h:184
static void test_task_pushed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
#define ast_mutex_destroy(a)
Definition: lock.h:186
static int complex_task(void *data)
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
Asterisk module definitions.
static struct complex_task_data * complex_task_data_alloc(void)
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
Structure for mutex and tracking information.
Definition: lock.h:135
ast_test_result_state
Definition: test.h:200
#define ast_mutex_unlock(a)
Definition: lock.h:188