Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
test_taskprocessor.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2012-2013, Digium, Inc.
5 *
6 * Mark Michelson <mmichelson@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief taskprocessor unit tests
22 *
23 * \author Mark Michelson <mmichelson@digium.com>
24 *
25 */
26
27/*** MODULEINFO
28 <depend>TEST_FRAMEWORK</depend>
29 <support_level>core</support_level>
30 ***/
31
32#include "asterisk.h"
33
34#include <unistd.h>
35
36#include "asterisk/test.h"
38#include "asterisk/module.h"
39#include "asterisk/astobj2.h"
40#include "asterisk/serializer.h"
41#include "asterisk/threadpool.h"
42#include "asterisk/cli.h"
43
44/*!
45 * \brief userdata associated with baseline taskprocessor test
46 */
47struct task_data {
48 /* Condition used to signal to queuing thread that task was executed */
50 /* Lock protecting the condition */
52 /*! Boolean indicating that the task was run */
54 /*! Milliseconds to wait before returning */
55 unsigned long wait_time;
56};
57
58static void task_data_dtor(void *obj)
59{
60 struct task_data *task_data = obj;
61
64}
65
66/*! \brief Create a task_data object */
67static struct task_data *task_data_create(void)
68{
69 struct task_data *task_data =
71
72 if (!task_data) {
73 return NULL;
74 }
75
80
81 return task_data;
82}
83
84/*!
85 * \brief Queued task for baseline test.
86 *
87 * The task simply sets a boolean to indicate the
88 * task has been run and then signals a condition
89 * saying it's complete
90 */
91static int task(void *data)
92{
93 struct task_data *task_data = data;
94
96 if (task_data->wait_time > 0) {
97 usleep(task_data->wait_time * 1000);
98 }
101 return 0;
102}
103
104/*!
105 * \brief Wait for a task to execute.
106 */
107static int task_wait(struct task_data *task_data)
108{
109 struct timeval start = ast_tvnow();
110 struct timespec end;
112
113 end.tv_sec = start.tv_sec + 30;
114 end.tv_nsec = start.tv_usec * 1000;
115
116 while (!task_data->task_complete) {
117 int res;
119 &end);
120 if (res == ETIMEDOUT) {
121 return -1;
122 }
123 }
124
125 return 0;
126}
127
128/*!
129 * \brief Baseline test for default taskprocessor
130 *
131 * This test ensures that when a task is added to a taskprocessor that
132 * has been allocated with a default listener that the task gets executed
133 * as expected
134 */
135AST_TEST_DEFINE(default_taskprocessor)
136{
139 int res;
140
141 switch (cmd) {
142 case TEST_INIT:
143 info->name = "default_taskprocessor";
144 info->category = "/main/taskprocessor/";
145 info->summary = "Test of default taskprocessor";
146 info->description =
147 "Ensures that a queued task gets executed.";
148 return AST_TEST_NOT_RUN;
149 case TEST_EXECUTE:
150 break;
151 }
152
154
155 if (!tps) {
156 ast_test_status_update(test, "Unable to create test taskprocessor\n");
157 return AST_TEST_FAIL;
158 }
159
161 if (!task_data) {
162 ast_test_status_update(test, "Unable to create task_data\n");
163 return AST_TEST_FAIL;
164 }
165
167 ast_test_status_update(test, "Failed to queue task\n");
168 return AST_TEST_FAIL;
169 }
170
171 res = task_wait(task_data);
172 if (res != 0) {
173 ast_test_status_update(test, "Queued task did not execute!\n");
174 return AST_TEST_FAIL;
175 }
176
177 return AST_TEST_PASS;
178}
179
180/*!
181 * \brief Baseline test for subsystem alert
182 */
184{
186#define TEST_DATA_ARRAY_SIZE 10
187#define LOW_WATER_MARK 3
188#define HIGH_WATER_MARK 6
189 struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
190 int res = 0;
191 int i;
192 long queue_count;
193 unsigned int alert_level;
194 unsigned int subsystem_alert_level;
195
196 switch (cmd) {
197 case TEST_INIT:
198 info->name = "subsystem_alert";
199 info->category = "/main/taskprocessor/";
200 info->summary = "Test of subsystem alerts";
201 info->description =
202 "Ensures alerts are generated properly.";
203 return AST_TEST_NOT_RUN;
204 case TEST_EXECUTE:
205 break;
206 }
207
208 tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
209
210 if (!tps) {
211 ast_test_status_update(test, "Unable to create test taskprocessor\n");
212 return AST_TEST_FAIL;
213 }
214
217
218 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
220 if (!task_data[i]) {
221 ast_test_status_update(test, "Unable to create task_data\n");
222 res = -1;
223 goto data_cleanup;
224 }
225 task_data[i]->wait_time = 500;
226
227 ast_test_status_update(test, "Pushing task %d\n", i);
228 if (ast_taskprocessor_push(tps, task, task_data[i])) {
229 ast_test_status_update(test, "Failed to queue task\n");
230 res = -1;
231 goto data_cleanup;
232 }
233
234 queue_count = ast_taskprocessor_size(tps);
235 alert_level = ast_taskprocessor_alert_get();
236 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
237
238 if (queue_count == HIGH_WATER_MARK) {
239 if (subsystem_alert_level) {
240 ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
241 }
242 if (alert_level) {
243 ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
244 }
245 } else if (queue_count < HIGH_WATER_MARK) {
246 if (subsystem_alert_level > 0) {
247 ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
248 res = -1;
249 }
250 if (alert_level > 0) {
251 ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
252 res = -1;
253 }
254 } else {
255 if (subsystem_alert_level == 0) {
256 ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
257 res = -1;
258 }
259 if (alert_level == 0) {
260 ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
261 res = -1;
262 }
263 }
264 }
265
267
268 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
269 ast_test_status_update(test, "Waiting on task %d\n", i);
270 if (task_wait(task_data[i])) {
271 ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
272 res = -1;
273 goto data_cleanup;
274 }
275
276 queue_count = ast_taskprocessor_size(tps);
277 alert_level = ast_taskprocessor_alert_get();
278 subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
279
280 if (queue_count == LOW_WATER_MARK) {
281 if (!subsystem_alert_level) {
282 ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
283 }
284 if (!alert_level) {
285 ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
286 }
287 } else if (queue_count > LOW_WATER_MARK) {
288 if (subsystem_alert_level == 0) {
289 ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
290 res = -1;
291 }
292 if (alert_level == 0) {
293 ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
294 res = -1;
295 }
296 } else {
297 if (subsystem_alert_level > 0) {
298 ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
299 res = -1;
300 }
301 if (alert_level > 0) {
302 ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
303 res = -1;
304 }
305 }
306
307 }
308
310 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
312 }
313
314 return res ? AST_TEST_FAIL : AST_TEST_PASS;
315}
316
317#define NUM_TASKS 20000
318
319/*!
320 * \brief Relevant data associated with taskprocessor load test
321 */
322static struct load_task_data {
323 /*! Condition used to indicate a task has completed executing */
325 /*! Lock used to protect the condition */
327 /*! Counter of the number of completed tasks */
329 /*! Storage for task-specific data */
332
333/*!
334 * \brief a queued task to be used in the taskprocessor load test
335 *
336 * The task increments the number of tasks executed and puts the passed-in
337 * data into the next slot in the array of random data.
338 */
339static int load_task(void *data)
340{
341 int *randdata = data;
345 return 0;
346}
347
348/*!
349 * \brief Load test for taskprocessor with default listener
350 *
351 * This test queues a large number of tasks, each with random data associated.
352 * The test ensures that all of the tasks are run and that the tasks are executed
353 * in the same order that they were queued
354 */
355AST_TEST_DEFINE(default_taskprocessor_load)
356{
357 struct ast_taskprocessor *tps;
358 struct timeval start;
359 struct timespec ts;
361 int timedwait_res;
362 int i;
363 int rand_data[NUM_TASKS];
364
365 switch (cmd) {
366 case TEST_INIT:
367 info->name = "default_taskprocessor_load";
368 info->category = "/main/taskprocessor/";
369 info->summary = "Load test of default taskprocessor";
370 info->description =
371 "Ensure that a large number of queued tasks are executed in the proper order.";
372 return AST_TEST_NOT_RUN;
373 case TEST_EXECUTE:
374 break;
375 }
376
378
379 if (!tps) {
380 ast_test_status_update(test, "Unable to create test taskprocessor\n");
381 return AST_TEST_FAIL;
382 }
383
384 start = ast_tvnow();
385
386 ts.tv_sec = start.tv_sec + 60;
387 ts.tv_nsec = start.tv_usec * 1000;
388
392
393 for (i = 0; i < NUM_TASKS; ++i) {
394 rand_data[i] = ast_random();
395 if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
396 ast_test_status_update(test, "Failed to queue task\n");
397 res = AST_TEST_FAIL;
398 goto test_end;
399 }
400 }
401
405 if (timedwait_res == ETIMEDOUT) {
406 break;
407 }
408 }
410
412 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
414 res = AST_TEST_FAIL;
415 goto test_end;
416 }
417
418 for (i = 0; i < NUM_TASKS; ++i) {
419 if (rand_data[i] != load_task_results.task_rand[i]) {
420 ast_test_status_update(test, "Queued tasks did not execute in order\n");
421 res = AST_TEST_FAIL;
422 goto test_end;
423 }
424 }
425
426test_end:
430 return res;
431}
432
433/*!
434 * \brief Private data for the test taskprocessor listener
435 */
437 /* Counter of number of tasks pushed to the queue */
439 /* Counter of number of times the queue was emptied */
441 /* Counter of number of times that a pushed task occurred on an empty queue */
443 /* Boolean indicating whether the shutdown callback was called */
445};
446
447/*!
448 * \brief test taskprocessor listener's alloc callback
449 */
450static void *test_listener_pvt_alloc(void)
451{
452 struct test_listener_pvt *pvt;
453
454 pvt = ast_calloc(1, sizeof(*pvt));
455 return pvt;
456}
457
458/*!
459 * \brief test taskprocessor listener's start callback
460 */
462{
463 return 0;
464}
465
466/*!
467 * \brief test taskprocessor listener's task_pushed callback
468 *
469 * Adjusts private data's stats as indicated by the parameters.
470 */
471static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
472{
474 ++pvt->num_pushed;
475 if (was_empty) {
476 ++pvt->num_was_empty;
477 }
478}
479
480/*!
481 * \brief test taskprocessor listener's emptied callback.
482 */
488
489/*!
490 * \brief test taskprocessor listener's shutdown callback.
491 */
497
499 .start = test_start,
500 .task_pushed = test_task_pushed,
501 .emptied = test_emptied,
502 .shutdown = test_shutdown,
503};
504
505/*!
506 * \brief Queued task for taskprocessor listener test.
507 *
508 * Does nothing.
509 */
510static int listener_test_task(void *ignore)
511{
512 return 0;
513}
514
515/*!
516 * \brief helper to ensure that statistics the listener is keeping are what we expect
517 *
518 * \param test The currently-running test
519 * \param pvt The private data for the taskprocessor listener
520 * \param num_pushed The expected current number of tasks pushed to the processor
521 * \param num_emptied The expected current number of times the taskprocessor has become empty
522 * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
523 * \retval -1 Stats were not as expected
524 * \retval 0 Stats were as expected
525 */
526static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
527{
528 if (pvt->num_pushed != num_pushed) {
529 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
530 num_pushed, pvt->num_pushed);
531 return -1;
532 }
533
534 if (pvt->num_emptied != num_emptied) {
535 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
536 num_emptied, pvt->num_emptied);
537 return -1;
538 }
539
540 if (pvt->num_was_empty != num_was_empty) {
541 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
542 num_was_empty, pvt->num_emptied);
543 return -1;
544 }
545
546 return 0;
547}
548
549/*!
550 * \brief Test for a taskprocessor with custom listener.
551 *
552 * This test pushes tasks to a taskprocessor with a custom listener, executes the tasks,
553 * and destroys the taskprocessor.
554 *
555 * The test ensures that the listener's callbacks are called when expected and that the data
556 * being passed in is accurate.
557 */
559{
560 struct ast_taskprocessor *tps = NULL;
562 struct test_listener_pvt *pvt = NULL;
564
565 switch (cmd) {
566 case TEST_INIT:
567 info->name = "taskprocessor_listener";
568 info->category = "/main/taskprocessor/";
569 info->summary = "Test of taskprocessor listeners";
570 info->description =
571 "Ensures that listener callbacks are called when expected.";
572 return AST_TEST_NOT_RUN;
573 case TEST_EXECUTE:
574 break;
575 }
576
578 if (!pvt) {
579 ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
580 return AST_TEST_FAIL;
581 }
582
584 if (!listener) {
585 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
586 res = AST_TEST_FAIL;
587 goto test_exit;
588 }
589
590 tps = ast_taskprocessor_create_with_listener("test_listener", listener);
591 if (!tps) {
592 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
593 res = AST_TEST_FAIL;
594 goto test_exit;
595 }
596
598 ast_test_status_update(test, "Failed to queue task\n");
599 res = AST_TEST_FAIL;
600 goto test_exit;
601 }
602
603 if (check_stats(test, pvt, 1, 0, 1) < 0) {
604 res = AST_TEST_FAIL;
605 goto test_exit;
606 }
607
609 ast_test_status_update(test, "Failed to queue task\n");
610 res = AST_TEST_FAIL;
611 goto test_exit;
612 }
613
614 if (check_stats(test, pvt, 2, 0, 1) < 0) {
615 res = AST_TEST_FAIL;
616 goto test_exit;
617 }
618
620
621 if (check_stats(test, pvt, 2, 0, 1) < 0) {
622 res = AST_TEST_FAIL;
623 goto test_exit;
624 }
625
627
628 if (check_stats(test, pvt, 2, 1, 1) < 0) {
629 res = AST_TEST_FAIL;
630 goto test_exit;
631 }
632
634
635 if (!pvt->shutdown) {
636 res = AST_TEST_FAIL;
637 goto test_exit;
638 }
639
640test_exit:
642 /* This is safe even if tps is NULL */
644 ast_free(pvt);
645 return res;
646}
647
656
664
681
695
697{
698 struct timeval start = ast_tvnow();
699 struct timespec end = {
700 .tv_sec = start.tv_sec + 5,
701 .tv_nsec = start.tv_usec * 1000
702 };
704
705 while (!shutdown_data->task_complete) {
706 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
707 break;
708 }
709 }
710
712}
713
719
721{
722 struct timeval start = ast_tvnow();
723 struct timespec end = {
724 .tv_sec = start.tv_sec + 5,
725 .tv_nsec = start.tv_usec * 1000
726 };
728
729 while (!shutdown_data->task_started) {
730 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
731 break;
732 }
733 }
734
736}
737
744
745static void *tps_shutdown_thread(void *data)
746{
747 struct ast_taskprocessor *tps = data;
749 return NULL;
750}
751
752AST_TEST_DEFINE(taskprocessor_shutdown)
753{
755 RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
756 RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
757 int push_res;
758 int wait_res;
759 int pthread_res;
760 pthread_t shutdown_thread;
761
762 switch (cmd) {
763 case TEST_INIT:
764 info->name = "taskprocessor_shutdown";
765 info->category = "/main/taskprocessor/";
766 info->summary = "Test of taskprocessor shutdown sequence";
767 info->description =
768 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
769 return AST_TEST_NOT_RUN;
770 case TEST_EXECUTE:
771 break;
772 }
773
774 tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
775 task1 = shutdown_data_create(0); /* task1 waits to be poked */
776 task2 = shutdown_data_create(1); /* task2 waits for nothing */
777
778 if (!tps || !task1 || !task2) {
779 ast_test_status_update(test, "Allocation error\n");
780 return AST_TEST_FAIL;
781 }
782
783 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
784 if (push_res != 0) {
785 ast_test_status_update(test, "Could not push task1\n");
786 return AST_TEST_FAIL;
787 }
788
789 push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
790 if (push_res != 0) {
791 ast_test_status_update(test, "Could not push task2\n");
792 return AST_TEST_FAIL;
793 }
794
795 wait_res = shutdown_waitfor_start(task1);
796 if (!wait_res) {
797 ast_test_status_update(test, "Task1 didn't start\n");
798 return AST_TEST_FAIL;
799 }
800
801 pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
802 if (pthread_res != 0) {
803 ast_test_status_update(test, "Failed to create shutdown thread\n");
804 return AST_TEST_FAIL;
805 }
806 tps = NULL;
807
808 /* Wakeup task1; it should complete */
809 shutdown_poke(task1);
810 wait_res = shutdown_waitfor_completion(task1);
811 if (!wait_res) {
812 ast_test_status_update(test, "Task1 didn't complete\n");
813 return AST_TEST_FAIL;
814 }
815
816 /* Wait for shutdown to complete */
817 pthread_join(shutdown_thread, NULL);
818
819 /* Should have also completed task2 */
820 wait_res = shutdown_has_completed(task2);
821 if (!wait_res) {
822 ast_test_status_update(test, "Task2 didn't finish\n");
823 return AST_TEST_FAIL;
824 }
825
826 return AST_TEST_PASS;
827}
828
830{
831 int *local_data = local->local_data;
832 struct task_data *task_data = local->data;
833
834 *local_data = 1;
836
837 return 0;
838}
839
840AST_TEST_DEFINE(taskprocessor_push_local)
841{
842 RAII_VAR(struct ast_taskprocessor *, tps, NULL,
845 int local_data;
846 int res;
847
848 switch (cmd) {
849 case TEST_INIT:
850 info->name = __func__;
851 info->category = "/main/taskprocessor/";
852 info->summary = "Test of pushing local data";
853 info->description =
854 "Ensures that local data is passed along.";
855 return AST_TEST_NOT_RUN;
856 case TEST_EXECUTE:
857 break;
858 }
859
860
862 if (!tps) {
863 ast_test_status_update(test, "Unable to create test taskprocessor\n");
864 return AST_TEST_FAIL;
865 }
866
867
869 if (!task_data) {
870 ast_test_status_update(test, "Unable to create task_data\n");
871 return AST_TEST_FAIL;
872 }
873
874 local_data = 0;
875 ast_taskprocessor_set_local(tps, &local_data);
876
878 ast_test_status_update(test, "Failed to queue task\n");
879 return AST_TEST_FAIL;
880 }
881
882 res = task_wait(task_data);
883 if (res != 0) {
884 ast_test_status_update(test, "Queued task did not execute!\n");
885 return AST_TEST_FAIL;
886 }
887
888 if (local_data != 1) {
890 "Queued task did not set local_data!\n");
891 return AST_TEST_FAIL;
892 }
893
894 return AST_TEST_PASS;
895}
896
897/*!
898 * \brief Baseline test for a serializer pool
899 *
900 * This test ensures that when a task is added to a taskprocessor that
901 * has been allocated with a default listener that the task gets executed
902 * as expected
903 */
904AST_TEST_DEFINE(serializer_pool)
905{
906 RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
911 .idle_timeout = 0,
912 .auto_increment = 0,
913 .initial_size = 1,
914 .max_size = 0,
915 };
916 /* struct ast_taskprocessor *tps; */
917
918 switch (cmd) {
919 case TEST_INIT:
920 info->name = "serializer_pool";
921 info->category = "/main/taskprocessor/";
922 info->summary = "Test using a serializer pool";
923 info->description =
924 "Ensures that a queued task gets executed.";
925 return AST_TEST_NOT_RUN;
926 case TEST_EXECUTE:
927 break;
928 }
929
930 ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
931 ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
932 "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
933 ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
934 ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
935 ast_test_validate(test, task_data = task_data_create());
936
937 task_data->wait_time = 4000; /* task takes 4 seconds */
938 ast_test_validate(test, !ast_taskprocessor_push(
939 ast_serializer_pool_get(serializer_pool), task, task_data));
940
941 if (!ast_serializer_pool_destroy(serializer_pool)) {
942 ast_test_status_update(test, "Unexpected pool destruction!\n");
943 /*
944 * The pool should have timed out, so if it destruction reports success
945 * we need to fail.
946 */
947 serializer_pool = NULL;
948 return AST_TEST_FAIL;
949 }
950
951 ast_test_validate(test, !task_wait(task_data));
952
953 /* The first attempt should have failed. Second try should destroy successfully */
954 if (ast_serializer_pool_destroy(serializer_pool)) {
955 ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
956 /*
957 * If this fails we'll try again on return to hopefully avoid a memory leak.
958 * If it again times out a third time, well not much we can do.
959 */
960 return AST_TEST_FAIL;
961 }
962
963 /* Test passed, so set pool to NULL to avoid "re-running" destroy */
964 serializer_pool = NULL;
965
966 return AST_TEST_PASS;
967}
968
969/*!
970 * \brief Test for CLI command "core show taskprocessor <name>"
971 *
972 * This test creates a taskprocessor, queues tasks with controlled execution,
973 * and verifies that the CLI command displays the queued tasks correctly.
974 */
975AST_TEST_DEFINE(taskprocessor_cli_show)
976{
978 struct task_data *task_data1 = NULL;
979 struct task_data *task_data2 = NULL;
980 struct task_data *task_data3 = NULL;
981 int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0;
982 char cli_command[128];
983 int cli_output_fd[2];
984 char output_buffer[4096] = {0};
985 ssize_t bytes_read;
986 int res = AST_TEST_FAIL;
987
988 switch (cmd) {
989 case TEST_INIT:
990 info->name = "taskprocessor_cli_show";
991 info->category = "/main/taskprocessor/";
992 info->summary = "Test CLI command 'core show taskprocessor'";
993 info->description =
994 "Verifies that the 'core show taskprocessor <name>' CLI command\n"
995 "displays taskprocessor information and queued tasks correctly.";
996 return AST_TEST_NOT_RUN;
997 case TEST_EXECUTE:
998 break;
999 }
1000
1001 /* Create a pipe to capture CLI output */
1002 if (pipe(cli_output_fd) != 0) {
1003 ast_test_status_update(test, "Failed to create pipe for CLI output\n");
1004 return AST_TEST_FAIL;
1005 }
1006
1007 /* Create taskprocessor */
1008 tps = ast_taskprocessor_get("test_cli_taskprocessor", TPS_REF_DEFAULT);
1009 if (!tps) {
1010 ast_test_status_update(test, "Unable to create test taskprocessor\n");
1011 close(cli_output_fd[0]);
1012 close(cli_output_fd[1]);
1013 return AST_TEST_FAIL;
1014 }
1015
1016 /* Create tasks that will wait so they stay in the queue */
1017 task_data1 = task_data_create();
1018 task_data2 = task_data_create();
1019 task_data3 = task_data_create();
1020
1021 if (!task_data1 || !task_data2 || !task_data3) {
1022 ast_test_status_update(test, "Unable to create task_data\n");
1023 goto cleanup;
1024 }
1025
1026 /* Set a long wait time so tasks stay queued */
1027 task_data1->wait_time = 2000; /* 2 seconds */
1028 task_data2->wait_time = 2000;
1029 task_data3->wait_time = 2000;
1030
1031 /* Queue the tasks */
1032 if (ast_taskprocessor_push(tps, task, task_data1)) {
1033 ast_test_status_update(test, "Failed to queue task 1\n");
1034 goto cleanup;
1035 }
1036 task_queued1 = 1;
1037
1038 if (ast_taskprocessor_push(tps, task, task_data2)) {
1039 ast_test_status_update(test, "Failed to queue task 2\n");
1040 goto cleanup;
1041 }
1042 task_queued2 = 1;
1043
1044 if (ast_taskprocessor_push(tps, task, task_data3)) {
1045 ast_test_status_update(test, "Failed to queue task 3\n");
1046 goto cleanup;
1047 }
1048 task_queued3 = 1;
1049
1050 /* Execute the CLI command */
1051 snprintf(cli_command, sizeof(cli_command), "core show taskprocessor name test_cli_taskprocessor");
1052
1053 if (ast_cli_command(cli_output_fd[1], cli_command) != 0) {
1054 ast_test_status_update(test, "CLI command execution failed\n");
1055 goto cleanup;
1056 }
1057
1058 /* Close write end and read the output */
1059 close(cli_output_fd[1]);
1060 cli_output_fd[1] = -1;
1061
1062 bytes_read = read(cli_output_fd[0], output_buffer, sizeof(output_buffer) - 1);
1063 if (bytes_read <= 0) {
1064 ast_test_status_update(test, "Failed to read CLI output\n");
1065 goto cleanup;
1066 }
1067 output_buffer[bytes_read] = '\0';
1068
1069 /* Log the output for inspection */
1070 ast_test_status_update(test, "CLI Output:\n%s\n", output_buffer);
1071
1072 /* Verify the output contains expected information */
1073 if (!strstr(output_buffer, "test_cli_taskprocessor")) {
1074 ast_test_status_update(test, "Output missing taskprocessor name\n");
1075 goto cleanup;
1076 }
1077
1078 if (!strstr(output_buffer, "Current queue size")) {
1079 ast_test_status_update(test, "Output missing queue size information\n");
1080 goto cleanup;
1081 }
1082
1083 /* Check for queued tasks section (at least one task should be shown) */
1084 if (!strstr(output_buffer, "Queued Tasks") && !strstr(output_buffer, "Currently executing")) {
1085 ast_test_status_update(test, "Output missing queued tasks or execution status\n");
1086 goto cleanup;
1087 }
1088
1089 /* Verify we see task information */
1090 if (!strstr(output_buffer, "Task #")) {
1091 ast_test_status_update(test, "Output missing task information\n");
1092 goto cleanup;
1093 }
1094
1095 ast_test_status_update(test, "CLI command output validated successfully\n");
1096 res = AST_TEST_PASS;
1097
1098cleanup:
1099
1100 ast_test_status_update(test, "Waiting for tasks to complete\n");
1101
1102 /* Wait for tasks to complete */
1103 if (task_data1) {
1104 if (task_queued1) {
1105 task_wait(task_data1);
1106 }
1107 ao2_cleanup(task_data1);
1108 }
1109 if (task_data2) {
1110 if (task_queued2) {
1111 task_wait(task_data2);
1112 }
1113 ao2_cleanup(task_data2);
1114 }
1115 if (task_data3) {
1116 if (task_queued3) {
1117 task_wait(task_data3);
1118 }
1119 ao2_cleanup(task_data3);
1120 }
1121
1122 if (cli_output_fd[0] >= 0) {
1123 close(cli_output_fd[0]);
1124 }
1125 if (cli_output_fd[1] >= 0) {
1126 close(cli_output_fd[1]);
1127 }
1128
1129 ast_test_status_update(test, "Tasks complete\n");
1130 return res;
1131}
1132
1133static int unload_module(void)
1134{
1135 ast_test_unregister(default_taskprocessor);
1136 ast_test_unregister(default_taskprocessor_load);
1137 ast_test_unregister(subsystem_alert);
1138 ast_test_unregister(taskprocessor_listener);
1139 ast_test_unregister(taskprocessor_shutdown);
1140 ast_test_unregister(taskprocessor_push_local);
1141 ast_test_unregister(serializer_pool);
1142 ast_test_unregister(taskprocessor_cli_show);
1143 return 0;
1144}
1145
1146static int load_module(void)
1147{
1148 ast_test_register(default_taskprocessor);
1149 ast_test_register(default_taskprocessor_load);
1150 ast_test_register(subsystem_alert);
1151 ast_test_register(taskprocessor_listener);
1152 ast_test_register(taskprocessor_shutdown);
1153 ast_test_register(taskprocessor_push_local);
1154 ast_test_register(serializer_pool);
1155 ast_test_register(taskprocessor_cli_show);
1157}
1158
1159AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");
ast_mutex_t lock
Definition app_sla.c:337
static void * listener(void *unused)
Definition asterisk.c:1530
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition astmm.h:180
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
Standard Command Line Interface.
#define ast_cli_command(fd, s)
Definition cli.h:232
char * end
Definition eagi_proxy.c:73
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_cond_wait(cond, mutex)
Definition lock.h:212
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define ast_mutex_init(pmutex)
Definition lock.h:193
#define ast_mutex_unlock(a)
Definition lock.h:197
pthread_cond_t ast_cond_t
Definition lock.h:185
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition lock.h:596
#define ast_mutex_destroy(a)
Definition lock.h:195
#define ast_mutex_lock(a)
Definition lock.h:196
#define ast_cond_signal(cond)
Definition lock.h:210
Asterisk module definitions.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition module.h:581
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition res_stasis.c:327
#define NULL
Definition resample.c:96
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition serializer.c:175
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition serializer.c:204
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition serializer.c:170
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition serializer.c:78
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition serializer.c:41
Structure for mutex and tracking information.
Definition lock.h:142
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
A listener for taskprocessors.
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
An opaque threadpool structure.
Definition threadpool.c:36
Relevant data associated with taskprocessor load test.
int task_rand[NUM_TASKS]
userdata associated with baseline taskprocessor test
ast_cond_t cond
unsigned long wait_time
ast_mutex_t lock
Private data for the test taskprocessor listener.
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_taskprocessor_push_local(tps, task_exe, datap)
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
#define ast_taskprocessor_push(tps, task_exe, datap)
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
Test Framework API.
@ TEST_INIT
Definition test.h:200
@ TEST_EXECUTE
Definition test.h:201
#define ast_test_status_update(a, b, c...)
Definition test.h:129
#define AST_TEST_DEFINE(hdr)
Definition test.h:126
ast_test_result_state
Definition test.h:193
@ AST_TEST_PASS
Definition test.h:195
@ AST_TEST_FAIL
Definition test.h:196
@ AST_TEST_NOT_RUN
Definition test.h:194
static struct test_options options
static void data_cleanup(void *data)
static struct task_data * task_data_create(void)
Create a task_data object.
static void shutdown_data_dtor(void *data)
static int task(void *data)
Queued task for baseline test.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static struct shutdown_data * shutdown_data_create(int dont_wait)
static void * tps_shutdown_thread(void *data)
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
#define HIGH_WATER_MARK
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
#define TEST_DATA_ARRAY_SIZE
#define NUM_TASKS
static void shutdown_poke(struct shutdown_data *shutdown_data)
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static int shutdown_task_exec(void *data)
static int load_module(void)
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
static void task_data_dtor(void *obj)
static int local_task_exe(struct ast_taskprocessor_local *local)
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
static int unload_module(void)
static struct load_task_data load_task_results
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
#define LOW_WATER_MARK
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition threadpool.c:977
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition threadpool.c:915
#define AST_THREADPOOL_OPTIONS_VERSION
Definition threadpool.h:73
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define ast_pthread_create(a, b, c, d)
Definition utils.h:624
long int ast_random(void)
Definition utils.c:2348