Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
taskprocessor.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2007-2013, Digium, Inc.
5 *
6 * Dwayne M. Hubbard <dhubbard@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*!
20 * \file
21 * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22 *
23 * \author Dwayne Hubbard <dhubbard@digium.com>
24 */
25
26/*** MODULEINFO
27 <support_level>core</support_level>
28 ***/
29
30#include "asterisk.h"
31
32#include "asterisk/_private.h"
33#include "asterisk/module.h"
34#include "asterisk/time.h"
35#include "asterisk/astobj2.h"
36#include "asterisk/cli.h"
38#include "asterisk/sem.h"
39
40/*!
41 * \brief tps_task structure is queued to a taskprocessor
42 *
43 * tps_tasks are processed in FIFO order and freed by the taskprocessing
44 * thread after the task handler returns. The callback function that is assigned
45 * to the execute() function pointer is responsible for releasing datap resources if necessary.
46 */
47struct tps_task {
48 /*! \brief The execute() task callback function pointer */
49 union {
50 int (*execute)(void *datap);
53 /*! \brief The data pointer for the task execute() function */
54 void *datap;
55 /*! \brief AST_LIST_ENTRY overhead */
57 unsigned int wants_local:1;
58 /*! \brief Debug information about where the task was pushed from */
59 const char *file;
60 int line;
61 const char *function;
62};
63
64/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
66 /*! \brief This is the maximum number of tasks queued at any one time */
67 unsigned long max_qsize;
68 /*! \brief This is the current number of tasks processed */
70 /*! \brief Highest time (in microseconds) spent processing a task */
72 /*! \brief Lowest time (in microseconds) spent processing a task */
74 /*! \brief File where the highest time task was pushed from */
76 /*! \brief Line where the highest time task was pushed from */
78 /*! \brief Function where the highest time task was pushed from */
80};
81
82/*! \brief A ast_taskprocessor structure is a singleton by name */
84 /*! \brief Taskprocessor statistics */
87 /*! \brief Taskprocessor current queue size */
89 /*! \brief Taskprocessor low water clear alert level */
91 /*! \brief Taskprocessor high water alert trigger level */
93 /*! \brief Taskprocessor queue */
96 /*! Current thread executing the tasks */
97 pthread_t thread;
98 /*! Indicates if the taskprocessor is currently executing a task */
99 unsigned int executing:1;
100 /*! Indicates that a high water warning has been issued on this task processor */
101 unsigned int high_water_warned:1;
102 /*! Indicates that a high water alert is active on this taskprocessor */
103 unsigned int high_water_alert:1;
104 /*! Indicates if the taskprocessor is currently suspended */
105 unsigned int suspended:1;
106 /*! \brief Anything before the first '/' in the name (if there is one) */
108 /*! \brief Friendly name of the taskprocessor.
109 * Subsystem is appended after the name's NULL terminator.
110 */
111 char name[0];
112};
113
114/*!
115 * \brief A listener for taskprocessors
116 *
117 * \since 12.0.0
118 *
119 * When a taskprocessor's state changes, the listener
120 * is notified of the change. This allows for tasks
121 * to be addressed in whatever way is appropriate for
122 * the module using the taskprocessor.
123 */
125 /*! The callbacks the taskprocessor calls into to notify of state changes */
127 /*! The taskprocessor that the listener is listening to */
129 /*! Data private to the listener */
131};
132
133/*!
134 * Keep track of which subsystems are in alert
135 * and how many of their taskprocessors are overloaded.
136 */
138 unsigned int alert_count;
139 char subsystem[0];
140};
141static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
142
143#ifdef LOW_MEMORY
144#define TPS_MAX_BUCKETS 61
145#else
146/*! \brief Number of buckets in the tps_singletons container. */
147#define TPS_MAX_BUCKETS 1567
148#endif
149
150/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
151static struct ao2_container *tps_singletons;
152
153/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
154static ast_cond_t cli_ping_cond;
155
156/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
157AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
158
159/*! \brief The astobj2 hash callback for taskprocessors */
160static int tps_hash_cb(const void *obj, const int flags);
161/*! \brief The astobj2 compare callback for taskprocessors */
162static int tps_cmp_cb(void *obj, void *arg, int flags);
163
164/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
165static int tps_ping_handler(void *datap);
166
167static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
168static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
169static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
170static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
171static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
172static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
173
174static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
175
176
177static struct ast_cli_entry taskprocessor_clis[] = {
178 AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
179 AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
180 AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"),
181 AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
182 AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
183 AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
184};
185
191
193{
194 ast_assert(pvt->dead);
195 ast_sem_destroy(&pvt->sem);
196 ast_free(pvt);
197}
198
200{
201 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
202
204
205 listener->user_data = NULL;
206}
207
208/* Keeping the old symbols for ABI compatibility */
209#undef ast_taskprocessor_push
210#define ast_taskprocessor_push_internal(tps, task_exe, datap) \
211 __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
212int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
213
214#undef ast_taskprocessor_push_local
215#define ast_taskprocessor_push_local_internal(tps, task_exe, datap) \
216 __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
217int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap);
218
219/*!
220 * \brief Function that processes tasks in the taskprocessor
221 * \internal
222 */
223static void *default_tps_processing_function(void *data)
224{
226 struct ast_taskprocessor *tps = listener->tps;
227 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
228 int sem_value;
229 int res;
230
231 while (!pvt->dead) {
232 res = ast_sem_wait(&pvt->sem);
233 if (res != 0 && errno != EINTR) {
234 ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n",
235 tps->name, strerror(errno));
236 /* Just give up */
237 break;
238 }
240 }
241
242 /* No posting to a dead taskprocessor! */
243 res = ast_sem_getvalue(&pvt->sem, &sem_value);
244 ast_assert(res == 0 && sem_value == 0);
245
246 /* Free the shutdown reference (see default_listener_shutdown) */
247 ao2_t_ref(listener->tps, -1, "tps-shutdown");
248
249 return NULL;
250}
251
253{
254 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
255
257 return -1;
258 }
259
260 return 0;
261}
262
263static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
264{
265 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
266
267 if (ast_sem_post(&pvt->sem) != 0) {
268 ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n",
269 listener->tps->name, strerror(errno));
270 }
271}
272
273static int default_listener_die(void *data)
274{
275 struct default_taskprocessor_listener_pvt *pvt = data;
276 pvt->dead = 1;
277 return 0;
278}
279
281{
282 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
283 int res;
284
285 /* Hold a reference during shutdown */
286 ao2_t_ref(listener->tps, +1, "tps-shutdown");
287
289 /* This will cause the thread to exit early without completing tasks already
290 * in the queue. This is probably the least bad option in this situation. */
292 }
293
295
296 if (pthread_equal(pthread_self(), pvt->poll_thread)) {
297 res = pthread_detach(pvt->poll_thread);
298 if (res != 0) {
299 ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
300 }
301 } else {
302 res = pthread_join(pvt->poll_thread, NULL);
303 if (res != 0) {
304 ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
305 }
306 }
308}
309
316
317/*! \brief How many seconds to wait for running taskprocessors to finish on shutdown. */
318#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT 10
319
320/*!
321 * \internal
322 * \brief Clean up resources on Asterisk shutdown
323 */
324static void tps_shutdown(void)
325{
326 int objcount;
327 int tries;
328 struct ao2_container *sorted_tps;
329 struct ast_taskprocessor *tps;
330 struct ao2_iterator iter;
331 struct timespec delay = {1, 0};
332
333 /* During shutdown there may still be taskprocessor threads running and those
334 * tasprocessors reference tps_singletons. When those taskprocessors finish
335 * they will call ast_taskprocessor_unreference, creating a race condition which
336 * can result in tps_singletons being referenced after being deleted. To try and
337 * avoid this we check the container count and if greater than zero, give the
338 * running taskprocessors a chance to finish */
339 objcount = ao2_container_count(tps_singletons);
340 if (objcount > 0) {
342 "Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n",
343 objcount);
344
345 /* give the running taskprocessors a chance to finish, up to
346 * AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT seconds */
347 for (tries = 0; tries < AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT; tries++) {
348 while (nanosleep(&delay, &delay));
349 objcount = ao2_container_count(tps_singletons);
350 /* if count is 0, we are done waiting */
351 if (objcount == 0) {
352 break;
353 }
354 delay.tv_sec = 1;
355 delay.tv_nsec = 0;
357 "Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n",
358 objcount, tries + 1);
359 }
360 }
361
362 /* rather than try forever, risk an assertion on shutdown. This probably indicates
363 * a taskprocessor was not cleaned up somewhere */
364 if (objcount > 0) {
366 "Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n",
368
370 NULL);
371 if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
372 ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n");
373 }
374 else {
375 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
376 while ((tps = ao2_iterator_next(&iter))) {
377 ast_log(LOG_ERROR, " - Taskprocessor '%s' (queue size: %ld)\n",
378 tps->name, tps->tps_queue_size);
379 }
380 }
381
382 ao2_cleanup(sorted_tps);
383 }
384 else {
386 "Taskprocessor shutdown: All taskprocessors completed successfully.\n");
387 }
388
389 ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
390 AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
391 AST_VECTOR_RW_FREE(&overloaded_subsystems);
392 ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
393 tps_singletons = NULL;
394}
395
396/* initialize the taskprocessor container and register CLI operations */
398{
401 if (!tps_singletons) {
402 ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n");
403 return -1;
404 }
405
406 if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
407 ao2_ref(tps_singletons, -1);
408 ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n");
409 return -1;
410 }
411
412 ast_cond_init(&cli_ping_cond, NULL);
413
414 ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
415
417
418 return 0;
419}
420
421/* allocate resources for the task */
422static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap,
423 const char *file, int line, const char *function)
424{
425 struct tps_task *t;
426 if (!task_exe) {
427 ast_log(LOG_ERROR, "Task callback function is NULL!\n");
428 return NULL;
429 }
430
431 t = ast_calloc(1, sizeof(*t));
432 if (!t) {
433 ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
434 return NULL;
435 }
436
437 t->callback.execute = task_exe;
438 t->datap = datap;
439 t->file = file;
440 t->line = line;
441 t->function = function;
442
443 return t;
444}
445
446static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
447 const char *file, int line, const char *function)
448{
449 struct tps_task *t;
450 if (!task_exe) {
451 ast_log(LOG_ERROR, "Task callback function is NULL!\n");
452 return NULL;
453 }
454
455 t = ast_calloc(1, sizeof(*t));
456 if (!t) {
457 ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
458 return NULL;
459 }
460
461 t->callback.execute_local = task_exe;
462 t->datap = datap;
463 t->wants_local = 1;
464 t->file = file;
465 t->line = line;
466 t->function = function;
467
468 return t;
469}
470
471/* release task resources */
472static void *tps_task_free(struct tps_task *task)
473{
474 ast_free(task);
475 return NULL;
476}
477
478/* Taskprocessor tab completion.
479 *
480 * The caller of this function is responsible for argument
481 * position checks prior to calling.
482 */
484{
485 int tklen;
486 struct ast_taskprocessor *p;
487 struct ao2_iterator i;
488
489 tklen = strlen(a->word);
490 i = ao2_iterator_init(tps_singletons, 0);
491 while ((p = ao2_iterator_next(&i))) {
492 if (!strncasecmp(a->word, p->name, tklen)) {
495 break;
496 }
497 }
499 }
501
502 return NULL;
503}
504
505/* ping task handling function */
506static int tps_ping_handler(void *datap)
507{
508 ast_mutex_lock(&cli_ping_cond_lock);
509 ast_cond_signal(&cli_ping_cond);
510 ast_mutex_unlock(&cli_ping_cond_lock);
511 return 0;
512}
513
514/* ping the specified taskprocessor and display the ping time on the CLI */
515static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
516{
517 struct timeval begin, end, delta;
518 const char *name;
519 struct timeval when;
520 struct timespec ts;
521 struct ast_taskprocessor *tps;
522
523 switch (cmd) {
524 case CLI_INIT:
525 e->command = "core ping taskprocessor";
526 e->usage =
527 "Usage: core ping taskprocessor <taskprocessor>\n"
528 " Displays the time required for a task to be processed\n";
529 return NULL;
530 case CLI_GENERATE:
531 if (a->pos == 3) {
533 } else {
534 return NULL;
535 }
536 }
537
538 if (a->argc != 4)
539 return CLI_SHOWUSAGE;
540
541 name = a->argv[3];
543 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
544 return CLI_SUCCESS;
545 }
546 ast_cli(a->fd, "\npinging %s ...", name);
547
548 /*
549 * Wait up to 5 seconds for a ping reply.
550 *
551 * On a very busy system it could take awhile to get a
552 * ping response from some taskprocessors.
553 */
554 begin = ast_tvnow();
555 when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
556 ts.tv_sec = when.tv_sec;
557 ts.tv_nsec = when.tv_usec * 1000;
558
559 ast_mutex_lock(&cli_ping_cond_lock);
561 ast_mutex_unlock(&cli_ping_cond_lock);
562 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
564 return CLI_FAILURE;
565 }
566 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
567 ast_mutex_unlock(&cli_ping_cond_lock);
568
569 end = ast_tvnow();
570 delta = ast_tvsub(end, begin);
571 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
573 return CLI_SUCCESS;
574}
575
576/*!
577 * \internal
578 * \brief Taskprocessor ao2 container sort function.
579 * \since 13.8.0
580 *
581 * \param obj_left pointer to the (user-defined part) of an object.
582 * \param obj_right pointer to the (user-defined part) of an object.
583 * \param flags flags from ao2_callback()
584 * OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
585 * OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
586 * OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
587 *
588 * \retval negative if obj_left < obj_right
589 * \retval 0 if obj_left == obj_right
590 * \retval positive if obj_left > obj_right
591 */
592static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
593{
594 const struct ast_taskprocessor *tps_left = obj_left;
595 const struct ast_taskprocessor *tps_right = obj_right;
596 const char *right_key = obj_right;
597 int cmp;
598
599 switch (flags & OBJ_SEARCH_MASK) {
600 default:
602 right_key = tps_right->name;
603 /* Fall through */
604 case OBJ_SEARCH_KEY:
605 cmp = strcasecmp(tps_left->name, right_key);
606 break;
608 cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
609 break;
610 }
611 return cmp;
612}
613
614#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s %10s %10s\n"
615#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"
616
617/*!
618 * \internal
619 * \brief Print taskprocessor information to CLI.
620 * \since 13.30.0
621 *
622 * \param fd the file descriptor
623 * \param tps the taskprocessor
624 */
632
633/*!
634 * \internal
635 * \brief Prints an optionally narrowed down list of taskprocessors to the CLI.
636 * \since 13.30.0
637 *
638 * \param fd the file descriptor
639 * \param like the string we are matching on
640 *
641 * \return number of taskprocessors on success
642 * \retval 0 otherwise
643 */
644static int tps_report_taskprocessor_list(int fd, const char *like)
645{
646 int tps_count = 0;
647 int word_len;
648 struct ao2_container *sorted_tps;
649 struct ast_taskprocessor *tps;
650 struct ao2_iterator iter;
651
653 NULL);
654 if (!sorted_tps
655 || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
656 ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
657 ao2_cleanup(sorted_tps);
658 return 0;
659 }
660
661 word_len = strlen(like);
662 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
663 while ((tps = ao2_iterator_next(&iter))) {
664 if (like) {
665 if (!strncasecmp(like, tps->name, word_len)) {
667 tps_count++;
668 }
669 } else {
671 tps_count++;
672 }
674 }
676 ao2_ref(sorted_tps, -1);
677
678 return tps_count;
679}
680
681static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
682{
683 const char *like;
684
685 switch (cmd) {
686 case CLI_INIT:
687 e->command = "core show taskprocessors [like]";
688 e->usage =
689 "Usage: core show taskprocessors [like keyword]\n"
690 " Shows a list of instantiated task processors and their statistics\n";
691 return NULL;
692 case CLI_GENERATE:
693 if (a->pos == e->args) {
695 } else {
696 return NULL;
697 }
698 }
699
700 if (a->argc == e->args - 1) {
701 like = "";
702 } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
703 like = a->argv[e->args];
704 } else {
705 return CLI_SHOWUSAGE;
706 }
707
708 ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)");
709 ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
710
711 return CLI_SUCCESS;
712}
713
714static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
715{
716 const char *name;
717 struct ast_taskprocessor *tps;
718 struct tps_task *task;
719 int task_count = 0;
720
721 switch (cmd) {
722 case CLI_INIT:
723 e->command = "core show taskprocessor name";
724 e->usage =
725 "Usage: core show taskprocessor name <taskprocessor>\n"
726 " Displays detailed information about a specific taskprocessor,\n"
727 " including all queued tasks and their origins (DEVMODE only).\n";
728 return NULL;
729 case CLI_GENERATE:
730 if (a->pos == 4) {
732 }
733 return NULL;
734 }
735
736 if (a->argc != 5) {
737 return CLI_SHOWUSAGE;
738 }
739
740 name = a->argv[4];
742 if (!tps) {
743 ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name);
744 return CLI_SUCCESS;
745 }
746
747 ao2_lock(tps);
748
749 ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name);
750 ast_cli(a->fd, "===========================================\n");
751 ast_cli(a->fd, "Subsystem: %s\n", tps->subsystem[0] ? tps->subsystem : "(none)");
752 ast_cli(a->fd, "Tasks processed: %lu\n", tps->stats._tasks_processed_count);
753 ast_cli(a->fd, "Current queue size: %ld\n", tps->tps_queue_size);
754 ast_cli(a->fd, "Max queue depth: %lu\n", tps->stats.max_qsize);
755 ast_cli(a->fd, "Low water mark: %ld\n", tps->tps_queue_low);
756 ast_cli(a->fd, "High water mark: %ld\n", tps->tps_queue_high);
757 ast_cli(a->fd, "High water alert: %s\n", tps->high_water_alert ? "Yes" : "No");
758 ast_cli(a->fd, "Suspended: %s\n", tps->suspended ? "Yes" : "No");
759 ast_cli(a->fd, "Currently executing: %s\n", tps->executing ? "Yes" : "No");
760 ast_cli(a->fd, "Highest time (us): %ld\n", tps->stats.highest_time_processed);
761 if (tps->stats.highest_time_task_file) {
762 ast_cli(a->fd, " Highest task origin: %s:%d (%s)\n",
766 }
767 ast_cli(a->fd, "Lowest time (us): %ld\n", tps->stats.lowest_time_processed);
768
769 if (tps->tps_queue_size > 0) {
770 ast_cli(a->fd, "\nQueued Tasks:\n");
771 ast_cli(a->fd, "-------------------------------------------\n");
772
774 task_count++;
775 if (task->file) {
776 ast_cli(a->fd, " Task #%d:\n", task_count);
777 ast_cli(a->fd, " Origin: %s:%d\n", task->file, task->line);
778 ast_cli(a->fd, " Function: %s\n", task->function);
779 ast_cli(a->fd, " Type: %s\n", task->wants_local ? "Local" : "Standard");
780 } else {
781 ast_cli(a->fd, " Task #%d: (origin not available)\n", task_count);
782 }
783 }
784 ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count);
785 } else {
786 ast_cli(a->fd, "\nNo tasks currently queued.\n");
787 }
788
789 ao2_unlock(tps);
791
792 ast_cli(a->fd, "\n");
793 return CLI_SUCCESS;
794}
795
796/* hash callback for astobj2 */
797static int tps_hash_cb(const void *obj, const int flags)
798{
799 const struct ast_taskprocessor *tps = obj;
800 const char *name = flags & OBJ_KEY ? obj : tps->name;
801
802 return ast_str_case_hash(name);
803}
804
805/* compare callback for astobj2 */
806static int tps_cmp_cb(void *obj, void *arg, int flags)
807{
808 struct ast_taskprocessor *lhs = obj, *rhs = arg;
809 const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
810
811 return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
812}
813
814static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
815{
816 return !strcmp(alert->subsystem, subsystem);
817}
818
819static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
820{
821 return strcmp(a->subsystem, b->subsystem);
822}
823
825{
826 struct subsystem_alert *alert;
827 unsigned int count = 0;
828 int idx;
829
830 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
831 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
832 if (idx >= 0) {
833 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
834 count = alert->alert_count;
835 }
836 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
837
838 return count;
839}
840
841static void subsystem_alert_increment(const char *subsystem)
842{
843 struct subsystem_alert *alert;
844 int idx;
845
847 return;
848 }
849
850 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
851 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
852 if (idx >= 0) {
853 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
854 alert->alert_count++;
855 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
856 return;
857 }
858
859 alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
860 if (!alert) {
861 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
862 return;
863 }
864 alert->alert_count = 1;
865 strcpy(alert->subsystem, subsystem); /* Safe */
866
867 if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
868 ast_free(alert);
869 }
870 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
871}
872
873static void subsystem_alert_decrement(const char *subsystem)
874{
875 struct subsystem_alert *alert;
876 int idx;
877
879 return;
880 }
881
882 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
883 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
884 if (idx < 0) {
886 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
887 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
888 return;
889 }
890 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
891
892 alert->alert_count--;
893 if (alert->alert_count <= 0) {
894 AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
895 ast_free(alert);
896 }
897
898 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
899}
900
901static void subsystem_copy(struct subsystem_alert *alert,
902 struct subsystem_alert_vector *vector)
903{
904 struct subsystem_alert *alert_copy;
905 alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
906 if (!alert_copy) {
907 return;
908 }
909 alert_copy->alert_count = alert->alert_count;
910 strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
911 if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
912 ast_free(alert_copy);
913 }
914}
915
916static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
917{
918 struct subsystem_alert_vector sorted_subsystems;
919 int i;
920
921#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
922#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
923
924 switch (cmd) {
925 case CLI_INIT:
926 e->command = "core show taskprocessor alerted subsystems";
927 e->usage =
928 "Usage: core show taskprocessor alerted subsystems\n"
929 " Shows a list of task processor subsystems that are currently alerted\n";
930 return NULL;
931 case CLI_GENERATE:
932 return NULL;
933 }
934
935 if (a->argc != e->args) {
936 return CLI_SHOWUSAGE;
937 }
938
939 if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
940 return CLI_FAILURE;
941 }
942
943 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
944 for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
945 subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
946 }
947 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
948
949 ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
950
951 for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
952 struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
953 ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
954 }
955
956 ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
957
958 AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
959 AST_VECTOR_FREE(&sorted_subsystems);
960
961 return CLI_SUCCESS;
962}
963
964
965/*! Count of the number of taskprocessors in high water alert. */
966static unsigned int tps_alert_count;
967
968/*! Access protection for tps_alert_count */
970
971/*!
972 * \internal
973 * \brief Add a delta to tps_alert_count with protection.
974 * \since 13.10.0
975 *
976 * \param tps Taskprocessor updating queue water mark alert trigger.
977 * \param delta The amount to add to tps_alert_count.
978 */
979static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
980{
981 unsigned int old;
982
984 old = tps_alert_count;
985 tps_alert_count += delta;
986 if (DEBUG_ATLEAST(3)
987 /* and tps_alert_count becomes zero or non-zero */
988 && !old != !tps_alert_count) {
989 ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n",
990 tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count);
991 }
992
993 if (tps->subsystem[0] != '\0') {
994 if (delta > 0) {
996 } else {
998 }
999 }
1000
1002}
1003
1005{
1006 unsigned int count;
1007
1009 count = tps_alert_count;
1011
1012 return count;
1013}
1014
1015int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
1016{
1017 if (!tps || high_water < 0 || high_water < low_water) {
1018 return -1;
1019 }
1020
1021 if (low_water < 0) {
1022 /* Set low water level to 90% of high water level */
1023 low_water = (high_water * 9) / 10;
1024 }
1025
1026 ao2_lock(tps);
1027
1028 tps->tps_queue_low = low_water;
1029 tps->tps_queue_high = high_water;
1030
1031 if (tps->high_water_alert) {
1032 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
1033 /* Update water mark alert immediately */
1034 tps->high_water_alert = 0;
1035 tps_alert_add(tps, -1);
1036 }
1037 } else {
1038 if (high_water < tps->tps_queue_size) {
1039 /* Update water mark alert immediately */
1040 tps->high_water_alert = 1;
1041 tps_alert_add(tps, +1);
1042 }
1043 }
1044
1045 ao2_unlock(tps);
1046
1047 return 0;
1048}
1049
1050/* destroy the taskprocessor */
1051static void tps_taskprocessor_dtor(void *tps)
1052{
1053 struct ast_taskprocessor *t = tps;
1054 struct tps_task *task;
1055
1056 while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
1058 }
1059 t->tps_queue_size = 0;
1060
1061 if (t->high_water_alert) {
1062 t->high_water_alert = 0;
1063 tps_alert_add(t, -1);
1064 }
1065
1067 t->listener = NULL;
1068}
1069
1070/* pop the front task and return it */
1072{
1073 struct tps_task *task;
1074
1075 if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
1076 --tps->tps_queue_size;
1077 if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
1078 tps->high_water_alert = 0;
1079 tps_alert_add(tps, -1);
1080 }
1081 }
1082 return task;
1083}
1084
1086{
1087 return (tps) ? tps->tps_queue_size : -1;
1088}
1089
1094
1095/* taskprocessor name accessor */
1097{
1098 if (!tps) {
1099 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
1100 return NULL;
1101 }
1102 return tps->name;
1103}
1104
1106{
1107 listener->callbacks->shutdown(listener);
1108 ao2_ref(listener->tps, -1);
1109}
1110
1111static void taskprocessor_listener_dtor(void *obj)
1112{
1114
1115 if (listener->callbacks->dtor) {
1117 }
1118}
1119
1121{
1123
1125 if (!listener) {
1126 return NULL;
1127 }
1128 listener->callbacks = callbacks;
1129 listener->user_data = user_data;
1130
1131 return listener;
1132}
1133
1135{
1136 ao2_ref(listener->tps, +1);
1137 return listener->tps;
1138}
1139
1141{
1142 return listener->user_data;
1143}
1144
1146{
1148
1149 pvt = ast_calloc(1, sizeof(*pvt));
1150 if (!pvt) {
1151 ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n");
1152 return NULL;
1153 }
1155 if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
1156 ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno));
1157 ast_free(pvt);
1158 return NULL;
1159 }
1160 return pvt;
1161}
1162
1163/*!
1164 * \internal
1165 * \brief Allocate a task processor structure
1166 *
1167 * \param name Name of the task processor.
1168 * \param listener Listener to associate with the task processor.
1169 *
1170 * \return The newly allocated task processor.
1171 *
1172 * \pre tps_singletons must be locked by the caller.
1173 */
1175{
1176 struct ast_taskprocessor *p;
1177 char *subsystem_separator;
1178 size_t subsystem_length = 0;
1179 size_t name_length;
1180
1181 name_length = strlen(name);
1182 subsystem_separator = strchr(name, '/');
1183 if (subsystem_separator) {
1184 subsystem_length = subsystem_separator - name;
1185 }
1186
1187 p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
1188 if (!p) {
1189 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
1190 return NULL;
1191 }
1192
1193 /* Set default congestion water level alert triggers. */
1196
1197 strcpy(p->name, name); /* Safe */
1198 p->subsystem = p->name + name_length + 1;
1199 ast_copy_string(p->subsystem, name, subsystem_length + 1);
1200
1201 ao2_ref(listener, +1);
1202 p->listener = listener;
1203
1205
1206 ao2_ref(p, +1);
1207 listener->tps = p;
1208
1209 if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1210 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1211 listener->tps = NULL;
1212 ao2_ref(p, -2);
1213 return NULL;
1214 }
1215
1216 return p;
1217}
1218
1220{
1221 if (p && p->listener->callbacks->start(p->listener)) {
1222 ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n",
1223 p->name);
1225
1226 return NULL;
1227 }
1228
1229 return p;
1230}
1231
1232/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
1233 * create the taskprocessor if we were told via ast_tps_options to return a reference only
1234 * if it already exists */
1236{
1237 struct ast_taskprocessor *p;
1240
1241 if (ast_strlen_zero(name)) {
1242 ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
1243 return NULL;
1244 }
1245 ao2_lock(tps_singletons);
1246 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1247 if (p || (create & TPS_REF_IF_EXISTS)) {
1248 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1249 ao2_unlock(tps_singletons);
1250 return p;
1251 }
1252
1253 /* Create a new taskprocessor. Start by creating a default listener */
1255 if (!pvt) {
1256 ao2_unlock(tps_singletons);
1257 return NULL;
1258 }
1260 if (!listener) {
1261 ao2_unlock(tps_singletons);
1263 return NULL;
1264 }
1265
1267 ao2_unlock(tps_singletons);
1268 p = __start_taskprocessor(p);
1269 ao2_ref(listener, -1);
1270
1271 return p;
1272}
1273
1275{
1276 struct ast_taskprocessor *p;
1277
1278 ao2_lock(tps_singletons);
1279 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1280 if (p) {
1281 ao2_unlock(tps_singletons);
1283 return NULL;
1284 }
1285
1287 ao2_unlock(tps_singletons);
1288
1289 return __start_taskprocessor(p);
1290}
1291
1293 void *local_data)
1294{
1295 SCOPED_AO2LOCK(lock, tps);
1296 tps->local_data = local_data;
1297}
1298
1299/* decrement the taskprocessor reference count and unlink from the container if necessary */
1301{
1302 if (!tps) {
1303 return NULL;
1304 }
1305
1306 /* To prevent another thread from finding and getting a reference to this
1307 * taskprocessor we hold the singletons lock. If we didn't do this then
1308 * they may acquire it and find that the listener has been shut down.
1309 */
1310 ao2_lock(tps_singletons);
1311
1312 if (ao2_ref(tps, -1) > 3) {
1313 ao2_unlock(tps_singletons);
1314 return NULL;
1315 }
1316
1317 /* If we're down to 3 references, then those must be:
1318 * 1. The reference we just got rid of
1319 * 2. The container
1320 * 3. The listener
1321 */
1322 ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1323 ao2_unlock(tps_singletons);
1324
1326 return NULL;
1327}
1328
1329/* push the task into the taskprocessor queue */
1330static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
1331{
1332 int previous_size;
1333 int was_empty;
1334
1335 if (!tps) {
1336 ast_log(LOG_ERROR, "Taskprocessor is NULL!\n");
1337 return -1;
1338 }
1339
1340 if (!t) {
1341 ast_log(LOG_ERROR, "Task is NULL!\n");
1342 return -1;
1343 }
1344
1345 if (t->file) {
1346 ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n",
1347 tps->name, t->file, t->line, t->function);
1348 }
1349
1350 ao2_lock(tps);
1351 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1352 previous_size = tps->tps_queue_size++;
1353
1354 if (tps->tps_queue_high <= tps->tps_queue_size) {
1355 if (!tps->high_water_alert) {
1356 ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n",
1357 tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : "");
1358 tps->high_water_warned = 1;
1359 tps->high_water_alert = 1;
1360 tps_alert_add(tps, +1);
1361 }
1362 }
1363
1364 /* The currently executing task counts as still in queue */
1365 was_empty = tps->executing ? 0 : previous_size == 0;
1366 ao2_unlock(tps);
1367 tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1368 return 0;
1369}
1370
1371int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
1372 const char *file, int line, const char *function)
1373{
1374 return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
1375}
1376
1377int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
1378{
1379 return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL);
1380}
1381
1382int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap,
1383 const char *file, int line, const char *function)
1384{
1385 return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function));
1386}
1387
1388int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
1389{
1390 return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL);
1391}
1392
1394{
1395 if (tps) {
1396 ao2_lock(tps);
1397 tps->suspended = 1;
1398 ao2_unlock(tps);
1399 return 0;
1400 }
1401 return -1;
1402}
1403
1405{
1406 if (tps) {
1407 ao2_lock(tps);
1408 tps->suspended = 0;
1409 ao2_unlock(tps);
1410 return 0;
1411 }
1412 return -1;
1413}
1414
1416{
1417 return tps ? tps->suspended : -1;
1418}
1419
1421{
1422 struct ast_taskprocessor_local local;
1423 struct tps_task *t;
1424 long size;
1425 struct timeval start;
1426 long elapsed;
1427 const char *task_file = NULL;
1428 int task_line = 0;
1429 const char *task_function = NULL;
1430
1431 ao2_lock(tps);
1432 t = tps_taskprocessor_pop(tps);
1433 if (!t) {
1434 ao2_unlock(tps);
1435 return 0;
1436 }
1437
1438 tps->thread = pthread_self();
1439 tps->executing = 1;
1440
1441 if (t->wants_local) {
1442 local.local_data = tps->local_data;
1443 local.data = t->datap;
1444 }
1445
1446 /* Save task origin info before we free the task */
1447 task_file = t->file;
1448 task_line = t->line;
1449 task_function = t->function;
1450 ao2_unlock(tps);
1451
1452 start = ast_tvnow();
1453
1454 if (t->wants_local) {
1455 t->callback.execute_local(&local);
1456 } else {
1457 t->callback.execute(t->datap);
1458 }
1459 tps_task_free(t);
1460
1461 ao2_lock(tps);
1463 /* We need to check size in the same critical section where we reset the
1464 * executing bit. Avoids a race condition where a task is pushed right
1465 * after we pop an empty stack.
1466 */
1467 tps->executing = 0;
1468 size = ast_taskprocessor_size(tps);
1469
1470 /* Update the stats */
1472
1473 /* Include the task we just executed as part of the queue size. */
1474 if (size >= tps->stats.max_qsize) {
1475 tps->stats.max_qsize = size + 1;
1476 }
1477
1478 elapsed = ast_tvdiff_us(ast_tvnow(), start);
1479 if (elapsed > tps->stats.highest_time_processed) {
1480 tps->stats.highest_time_processed = elapsed;
1481 tps->stats.highest_time_task_file = task_file;
1482 tps->stats.highest_time_task_line = task_line;
1483 tps->stats.highest_time_task_function = task_function;
1484 }
1485 if (elapsed < tps->stats.lowest_time_processed) {
1486 tps->stats.lowest_time_processed = elapsed;
1487 }
1488
1489 ao2_unlock(tps);
1490
1491 /* If we executed a task, check for the transition to empty */
1492 if (size == 0 && tps->listener->callbacks->emptied) {
1493 tps->listener->callbacks->emptied(tps->listener);
1494 }
1495 return size > 0;
1496}
1497
1499{
1500 int is_task;
1501
1502 ao2_lock(tps);
1503 is_task = pthread_equal(tps->thread, pthread_self());
1504 ao2_unlock(tps);
1505 return is_task;
1506}
1507
1509{
1510 static int seq_num;
1511
1512 return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1513}
1514
1515#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
1516
1517void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
1518{
1519 int final_size = strlen(name) + SEQ_STR_SIZE;
1520
1521 ast_assert(buf != NULL && name != NULL);
1522 ast_assert(final_size <= size);
1523
1524 snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1525}
1526
1527void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
1528{
1529 va_list ap;
1530 int user_size;
1531
1532 ast_assert(buf != NULL);
1533 ast_assert(SEQ_STR_SIZE <= size);
1534
1535 va_start(ap, format);
1536 user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1537 va_end(ap);
1538 if (user_size < 0) {
1539 /*
1540 * Wow! We got an output error to a memory buffer.
1541 * Assume no user part of name written.
1542 */
1543 user_size = 0;
1544 } else if (size < user_size + SEQ_STR_SIZE) {
1545 /* Truncate user part of name to make sequence number fit. */
1546 user_size = size - SEQ_STR_SIZE;
1547 }
1548
1549 /* Append sequence number to end of user name. */
1550 snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1551}
1552
1553static void tps_reset_stats(struct ast_taskprocessor *tps)
1554{
1555 ao2_lock(tps);
1557 tps->stats.max_qsize = 0;
1563 ao2_unlock(tps);
1564}
1565
1566static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1567{
1568 const char *name;
1569 struct ast_taskprocessor *tps;
1570
1571 switch (cmd) {
1572 case CLI_INIT:
1573 e->command = "core reset taskprocessor";
1574 e->usage =
1575 "Usage: core reset taskprocessor <taskprocessor>\n"
1576 " Resets stats for the specified taskprocessor\n";
1577 return NULL;
1578 case CLI_GENERATE:
1580 }
1581
1582 if (a->argc != 4) {
1583 return CLI_SHOWUSAGE;
1584 }
1585
1586 name = a->argv[3];
1588 ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1589 return CLI_SUCCESS;
1590 }
1591 ast_cli(a->fd, "\nResetting %s\n\n", name);
1592
1593 tps_reset_stats(tps);
1594
1596
1597 return CLI_SUCCESS;
1598}
1599
1600static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1601{
1602 struct ast_taskprocessor *tps;
1603 struct ao2_iterator iter;
1604
1605 switch (cmd) {
1606 case CLI_INIT:
1607 e->command = "core reset taskprocessors";
1608 e->usage =
1609 "Usage: core reset taskprocessors\n"
1610 " Resets stats for all taskprocessors\n";
1611 return NULL;
1612 case CLI_GENERATE:
1613 return NULL;
1614 }
1615
1616 if (a->argc != e->args) {
1617 return CLI_SHOWUSAGE;
1618 }
1619
1620 ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1621
1622 iter = ao2_iterator_init(tps_singletons, 0);
1623 while ((tps = ao2_iterator_next(&iter))) {
1624 tps_reset_stats(tps);
1626 }
1627 ao2_iterator_destroy(&iter);
1628
1629 return CLI_SUCCESS;
1630}
Prototypes for public functions only of internal interest,.
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
ast_mutex_t lock
Definition app_sla.c:337
static void * listener(void *unused)
Definition asterisk.c:1530
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define ast_log
Definition astobj2.c:42
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_t_ref(o, delta, tag)
Definition astobj2.h:460
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
@ CMP_MATCH
Definition astobj2.h:1027
@ CMP_STOP
Definition astobj2.h:1028
#define OBJ_KEY
Definition astobj2.h:1151
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition astobj2.h:1600
@ AO2_ITERATOR_UNLINK
Definition astobj2.h:1863
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition astobj2.h:1087
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_SEARCH_MASK
Search option field mask.
Definition astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
Standard Command Line Interface.
#define CLI_SHOWUSAGE
Definition cli.h:45
#define CLI_SUCCESS
Definition cli.h:44
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition main/cli.c:2737
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define CLI_FAILURE
Definition cli.h:46
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
char * end
Definition eagi_proxy.c:73
char buf[BUFSIZE]
Definition eagi_proxy.c:66
static const char name[]
Definition format_mp3.c:68
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_DEBUG
#define LOG_ERROR
#define LOG_WARNING
#define AST_LIST_HEAD_NOLOCK(name, type)
Defines a structure to be used to hold a list of specified type (with no lock).
#define AST_LIST_TRAVERSE(head, var, field)
Loops over (traverses) the entries in a list.
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define ast_rwlock_wrlock(a)
Definition lock.h:243
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611
#define AST_PTHREADT_NULL
Definition lock.h:73
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define ast_rwlock_rdlock(a)
Definition lock.h:242
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
Definition lock.h:550
#define ast_mutex_unlock(a)
Definition lock.h:197
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764
pthread_cond_t ast_cond_t
Definition lock.h:185
#define ast_rwlock_unlock(a)
Definition lock.h:241
#define ast_mutex_lock(a)
Definition lock.h:196
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition lock.h:527
#define ast_cond_signal(cond)
Definition lock.h:210
int errno
Asterisk module definitions.
static int task_count
struct @506 callbacks
#define NULL
Definition resample.c:96
Asterisk semaphore API.
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition strings.h:1303
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
descriptor for a cli entry.
Definition cli.h:171
int args
This gets set in ast_cli_register()
Definition cli.h:185
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
Definition sem.h:81
Taskprocessor queue.
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
void(* dtor)(struct ast_taskprocessor_listener *listener)
A listener for taskprocessors.
struct ast_taskprocessor * tps
const struct ast_taskprocessor_listener_callbacks * callbacks
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
long tps_queue_low
Taskprocessor low water clear alert level.
unsigned int suspended
unsigned int executing
long tps_queue_high
Taskprocessor high water alert trigger level.
unsigned int high_water_alert
struct ast_taskprocessor::tps_queue tps_queue
struct ast_taskprocessor_listener * listener
unsigned int high_water_warned
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
char * subsystem
Anything before the first '/' in the name (if there is one)
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
long tps_queue_size
Taskprocessor current queue size.
unsigned int alert_count
tps_task structure is queued to a taskprocessor
int(* execute)(void *datap)
int(* execute_local)(struct ast_taskprocessor_local *local)
unsigned int wants_local
struct tps_task::@441 list
AST_LIST_ENTRY overhead.
union tps_task::@440 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
const char * file
Debug information about where the task was pushed from.
const char * function
tps_taskprocessor_stats maintain statistics for a taskprocessor.
long highest_time_processed
Highest time (in microseconds) spent processing a task.
long lowest_time_processed
Lowest time (in microseconds) spent processing a task.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
int highest_time_task_line
Line where the highest time task was pushed from.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
const char * highest_time_task_function
Function where the highest time task was pushed from.
const char * highest_time_task_file
File where the highest time task was pushed from.
#define ast_taskprocessor_push_internal(tps, task_exe, datap)
static void tps_reset_stats(struct ast_taskprocessor *tps)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static void tps_shutdown(void)
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define FMT_HEADERS
#define FMT_FIELDS
static void * tps_task_free(struct tps_task *task)
#define TPS_MAX_BUCKETS
#define FMT_HEADERS_SUBSYSTEM
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
static char * cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
static int tps_ping_handler(void *datap)
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static unsigned int tps_alert_count
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor's current task.
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
int ast_tps_init(void)
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap, const char *file, int line, const char *function)
static int tps_report_taskprocessor_list(int fd, const char *like)
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static void subsystem_alert_decrement(const char *subsystem)
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void * default_listener_pvt_alloc(void)
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int default_listener_start(struct ast_taskprocessor_listener *listener)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
static int default_listener_die(void *data)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static void subsystem_alert_increment(const char *subsystem)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
static void tps_taskprocessor_dtor(void *tps)
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap, const char *file, int line, const char *function)
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static int tps_cmp_cb(void *obj, void *arg, int flags)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define SEQ_STR_SIZE
static void taskprocessor_listener_dtor(void *obj)
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
static int tps_hash_cb(const void *obj, const int flags)
#define FMT_FIELDS_SUBSYSTEM
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
static ast_rwlock_t tps_alert_lock
An API for managing task processing threads that can be shared across modules.
ast_tps_options
ast_tps_options for specification of taskprocessor options
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
#define ast_taskprocessor_push_local(tps, task_exe, datap)
#define ast_taskprocessor_push(tps, task_exe, datap)
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
static struct test_val b
static struct test_val a
static int task(void *data)
Queued task for baseline test.
Time-related functions and macros.
int64_t ast_tvdiff_us(struct timeval end, struct timeval start)
Computes the difference (in microseconds) between two struct timeval instances.
Definition time.h:87
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition time.h:282
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition extconf.c:2280
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition extconf.c:2295
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define ast_assert(a)
Definition utils.h:779
#define ast_pthread_create(a, b, c, d)
Definition utils.h:624
#define ARRAY_LEN(a)
Definition utils.h:706
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition vector.h:730
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition vector.h:898
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition vector.h:908
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition vector.h:213
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition vector.h:382
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition vector.h:104
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition vector.h:423
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition vector.h:888
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition vector.h:169