130#define TPS_MAX_BUCKETS 61
133#define TPS_MAX_BUCKETS 1567
146static int tps_hash_cb(
const void *obj,
const int flags);
148static int tps_cmp_cb(
void *obj,
void *arg,
int flags);
159static int tps_sort_cb(
const void *obj_left,
const void *obj_right,
int flags);
206 if (res != 0 &&
errno != EINTR) {
269 if (pthread_equal(pthread_self(), pvt->
poll_thread)) {
291#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT 10
304 struct timespec delay = {1, 0};
315 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
321 while (nanosleep(&delay, &delay));
330 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
339 "Asertion may occur, the following taskprocessors are still runing:\n");
357 "All waiting taskprocessors cleared!\n");
363 ao2_t_ref(tps_singletons, -1,
"Unref tps_singletons in shutdown");
364 tps_singletons =
NULL;
372 if (!tps_singletons) {
379 ast_log(
LOG_ERROR,
"taskprocessor subsystems vector failed to initialize!\n");
452 tklen = strlen(
a->word);
455 if (!strncasecmp(
a->word, p->
name, tklen)) {
480 struct timeval begin,
end, delta;
488 e->
command =
"core ping taskprocessor";
490 "Usage: core ping taskprocessor <taskprocessor>\n"
491 " Displays the time required for a task to be processed\n";
506 ast_cli(
a->fd,
"\nping failed: %s not found\n\n",
name);
519 ts.tv_sec = when.tv_sec;
520 ts.tv_nsec = when.tv_usec * 1000;
525 ast_cli(
a->fd,
"\nping failed: could not push task to %s\n\n",
name);
534 ast_cli(
a->fd,
"\n\t%24s ping time: %.1ld.%.6ld sec\n\n",
name, (
long)delta.tv_sec, (
long int)delta.tv_usec);
555static int tps_sort_cb(
const void *obj_left,
const void *obj_right,
int flags)
559 const char *right_key = obj_right;
565 right_key = tps_right->
name;
568 cmp = strcasecmp(tps_left->
name, right_key);
571 cmp = strncasecmp(tps_left->
name, right_key, strlen(right_key));
577#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n"
578#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n"
618 ast_debug(1,
"Failed to retrieve sorted taskprocessors\n");
623 word_len = strlen(like);
627 if (!strncasecmp(like, tps->
name, word_len)) {
649 e->
command =
"core show taskprocessors [like]";
651 "Usage: core show taskprocessors [like keyword]\n"
652 " Shows a list of instantiated task processors and their statistics\n";
655 if (
a->pos == e->
args) {
662 if (
a->argc == e->
args - 1) {
664 }
else if (
a->argc == e->
args + 1 && !strcasecmp(
a->argv[e->
args-1],
"like")) {
665 like =
a->argv[e->
args];
670 ast_cli(
a->fd,
"\n" FMT_HEADERS,
"Processor",
"Processed",
"In Queue",
"Max Depth",
"Low water",
"High water");
689 const char *rhsname = flags &
OBJ_KEY ? arg : rhs->name;
701 return strcmp(
a->subsystem,
b->subsystem);
707 unsigned int count = 0;
766 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n",
subsystem);
782 struct subsystem_alert_vector *vector)
798 struct subsystem_alert_vector sorted_subsystems;
801#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
802#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
806 e->
command =
"core show taskprocessor alerted subsystems";
808 "Usage: core show taskprocessor alerted subsystems\n"
809 " Shows a list of task processor subsystems that are currently alerted\n";
815 if (
a->argc != e->
args) {
897 if (!tps || high_water < 0 || high_water < low_water) {
903 low_water = (high_water * 9) / 10;
918 if (high_water < tps->tps_queue_size) {
1051 char *subsystem_separator;
1052 size_t subsystem_length = 0;
1055 name_length = strlen(
name);
1056 subsystem_separator = strchr(
name,
'/');
1057 if (subsystem_separator) {
1058 subsystem_length = subsystem_separator -
name;
1096 ast_log(
LOG_ERROR,
"Unable to start taskprocessor listener for taskprocessor %s\n",
1225 ast_log(
LOG_WARNING,
"The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1234 was_empty = tps->
executing ? 0 : previous_size == 0;
1290 tps->
thread = pthread_self();
1336 is_task = pthread_equal(tps->
thread, pthread_self());
1348#define SEQ_STR_SIZE (1 + 8 + 1)
1368 va_start(ap, format);
1371 if (user_size < 0) {
1401 e->
command =
"core reset taskprocessor";
1403 "Usage: core reset taskprocessor <taskprocessor>\n"
1404 " Resets stats for the specified taskprocessor\n";
1416 ast_cli(
a->fd,
"\nReset failed: %s not found\n\n",
name);
1435 e->
command =
"core reset taskprocessors";
1437 "Usage: core reset taskprocessors\n"
1438 " Resets stats for all taskprocessors\n";
1444 if (
a->argc != e->
args) {
1448 ast_cli(
a->fd,
"\nResetting stats for all taskprocessors\n\n");
Prototypes for public functions only of internal interest,.
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define ast_strdup(str)
A wrapper for strdup()
#define ast_calloc(num, len)
A wrapper for calloc()
#define ast_malloc(len)
A wrapper for malloc()
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_t_ref(o, delta, tag)
#define ao2_iterator_next(iter)
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_find(container, arg, flags)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Standard Command Line Interface.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define AST_CLI_DEFINE(fn, txt,...)
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_HEAD_NOLOCK(name, type)
Defines a structure to be used to hold a list of specified type (with no lock).
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define ast_rwlock_wrlock(a)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define AST_PTHREADT_NULL
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_rwlock_rdlock(a)
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define ast_rwlock_unlock(a)
#define ast_mutex_lock(a)
#define AST_MUTEX_DEFINE_STATIC(mutex)
#define ast_cond_signal(cond)
Asterisk module definitions.
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
A listener for taskprocessors.
struct ast_taskprocessor * tps
const struct ast_taskprocessor_listener_callbacks * callbacks
A ast_taskprocessor structure is a singleton by name.
long tps_queue_low
Taskprocessor low water clear alert level.
long tps_queue_high
Taskprocessor high water alert trigger level.
unsigned int high_water_alert
struct ast_taskprocessor::tps_queue tps_queue
struct ast_taskprocessor_listener * listener
unsigned int high_water_warned
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
char * subsystem
Anything before the first '/' in the name (if there is one)
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
long tps_queue_size
Taskprocessor current queue size.
tps_task structure is queued to a taskprocessor
int(* execute)(void *datap)
int(* execute_local)(struct ast_taskprocessor_local *local)
union tps_task::@406 callback
The execute() task callback function pointer.
struct tps_task::@407 list
AST_LIST_ENTRY overhead.
void * datap
The data pointer for the task execute() function.
tps_taskprocessor_stats maintain statistics for a taskprocessor.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
static void tps_reset_stats(struct ast_taskprocessor *tps)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static void tps_shutdown(void)
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static void * tps_task_free(struct tps_task *task)
#define FMT_HEADERS_SUBSYSTEM
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
static struct ast_cli_entry taskprocessor_clis[]
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static unsigned int tps_alert_count
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor's current task.
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
static int tps_report_taskprocessor_list(int fd, const char *like)
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static void subsystem_alert_decrement(const char *subsystem)
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void * default_listener_pvt_alloc(void)
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int default_listener_start(struct ast_taskprocessor_listener *listener)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
static int default_listener_die(void *data)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static void subsystem_alert_increment(const char *subsystem)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
static void tps_taskprocessor_dtor(void *tps)
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *)
CLI taskprocessor ping <blah>operation requires a ping condition lock.
static void taskprocessor_listener_dtor(void *obj)
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
#define FMT_FIELDS_SUBSYSTEM
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
static ast_rwlock_t tps_alert_lock
An API for managing task processing threads that can be shared across modules.
ast_tps_options
ast_tps_options for specification of taskprocessor options
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
static int task(void *data)
Queued task for baseline test.
Time-related functions and macros.
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define ast_pthread_create(a, b, c, d)
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.