130#define TPS_MAX_BUCKETS 61 
  133#define TPS_MAX_BUCKETS 1567 
  146static int tps_hash_cb(
const void *obj, 
const int flags);
 
  148static int tps_cmp_cb(
void *obj, 
void *arg, 
int flags);
 
  159static int tps_sort_cb(
const void *obj_left, 
const void *obj_right, 
int flags);
 
 
  206        if (res != 0 && 
errno != EINTR) {
 
 
  269    if (pthread_equal(pthread_self(), pvt->
poll_thread)) {
 
 
  291#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT 10 
  304    struct timespec delay = {1, 0};
 
  315            "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
 
  321            while (nanosleep(&delay, &delay));
 
  330                "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
 
  339            "Assertion may occur, the following taskprocessors are still running:\n");
 
  357                "All waiting taskprocessors cleared!\n");
 
  363    ao2_t_ref(tps_singletons, -1, 
"Unref tps_singletons in shutdown");
 
  364    tps_singletons = 
NULL;
 
 
  372    if (!tps_singletons) {
 
  379        ast_log(
LOG_ERROR, 
"taskprocessor subsystems vector failed to initialize!\n");
 
 
  452    tklen = strlen(
a->word);
 
  455        if (!strncasecmp(
a->word, p->
name, tklen)) {
 
 
  480    struct timeval begin, 
end, delta;
 
  488        e->
command = 
"core ping taskprocessor";
 
  490            "Usage: core ping taskprocessor <taskprocessor>\n" 
  491            "   Displays the time required for a task to be processed\n";
 
  506        ast_cli(
a->fd, 
"\nping failed: %s not found\n\n", 
name);
 
  519    ts.tv_sec = when.tv_sec;
 
  520    ts.tv_nsec = when.tv_usec * 1000;
 
  525        ast_cli(
a->fd, 
"\nping failed: could not push task to %s\n\n", 
name);
 
  534    ast_cli(
a->fd, 
"\n\t%24s ping time: %.1ld.%.6ld sec\n\n", 
name, (
long)delta.tv_sec, (
long int)delta.tv_usec);
 
 
  555static int tps_sort_cb(
const void *obj_left, 
const void *obj_right, 
int flags)
 
  559    const char *right_key = obj_right;
 
  565        right_key = tps_right->
name;
 
  568        cmp = strcasecmp(tps_left->
name, right_key);
 
  571        cmp = strncasecmp(tps_left->
name, right_key, strlen(right_key));
 
 
  577#define FMT_HEADERS     "%-70s %10s %10s %10s %10s %10s\n" 
  578#define FMT_FIELDS      "%-70s %10lu %10lu %10lu %10lu %10lu\n" 
  618        ast_debug(1, 
"Failed to retrieve sorted taskprocessors\n");
 
  623    word_len = strlen(like);
 
  627            if (!strncasecmp(like, tps->
name, word_len)) {
 
 
  649        e->
command = 
"core show taskprocessors [like]";
 
  651            "Usage: core show taskprocessors [like keyword]\n" 
  652            "   Shows a list of instantiated task processors and their statistics\n";
 
  655        if (
a->pos == e->
args) {
 
  662    if (
a->argc == e->
args - 1) {
 
  664    } 
else if (
a->argc == e->
args + 1 && !strcasecmp(
a->argv[e->
args-1], 
"like")) {
 
  665        like = 
a->argv[e->
args];
 
  670    ast_cli(
a->fd, 
"\n" FMT_HEADERS, 
"Processor", 
"Processed", 
"In Queue", 
"Max Depth", 
"Low water", 
"High water");
 
 
  689    const char *rhsname = flags & 
OBJ_KEY ? arg : rhs->
name;
 
 
  701    return strcmp(
a->subsystem, 
b->subsystem);
 
 
  707    unsigned int count = 0;
 
 
  766            "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", 
subsystem);
 
 
  782    struct subsystem_alert_vector *vector)
 
 
  798    struct subsystem_alert_vector sorted_subsystems;
 
  801#define FMT_HEADERS_SUBSYSTEM       "%-32s %12s\n" 
  802#define FMT_FIELDS_SUBSYSTEM        "%-32s %12u\n" 
  806        e->
command = 
"core show taskprocessor alerted subsystems";
 
  808            "Usage: core show taskprocessor alerted subsystems\n" 
  809            "   Shows a list of task processor subsystems that are currently alerted\n";
 
  815    if (
a->argc != e->
args) {
 
 
  897    if (!tps || high_water < 0 || high_water < low_water) {
 
  903        low_water = (high_water * 9) / 10;
 
  918        if (high_water < tps->tps_queue_size) {
 
 
 1056    char *subsystem_separator;
 
 1057    size_t subsystem_length = 0;
 
 1060    name_length = strlen(
name);
 
 1061    subsystem_separator = strchr(
name, 
'/');
 
 1062    if (subsystem_separator) {
 
 1063        subsystem_length = subsystem_separator - 
name;
 
 
 1101        ast_log(
LOG_ERROR, 
"Unable to start taskprocessor listener for taskprocessor %s\n",
 
 
 1230            ast_log(
LOG_WARNING, 
"The '%s' task processor queue reached %ld scheduled tasks%s.\n",
 
 1239    was_empty = tps->
executing ? 0 : previous_size == 0;
 
 
 1295    tps->
thread = pthread_self();
 
 
 1341    is_task = pthread_equal(tps->
thread, pthread_self());
 
 
 1353#define SEQ_STR_SIZE (1 + 8 + 1)     
 1373    va_start(ap, format);
 
 1376    if (user_size < 0) {
 
 
 1406        e->
command = 
"core reset taskprocessor";
 
 1408            "Usage: core reset taskprocessor <taskprocessor>\n" 
 1409            "    Resets stats for the specified taskprocessor\n";
 
 1421        ast_cli(
a->fd, 
"\nReset failed: %s not found\n\n", 
name);
 
 
 1440        e->
command = 
"core reset taskprocessors";
 
 1442            "Usage: core reset taskprocessors\n" 
 1443            "    Resets stats for all taskprocessors\n";
 
 1449    if (
a->argc != e->
args) {
 
 1453    ast_cli(
a->fd, 
"\nResetting stats for all taskprocessors\n\n");
 
 
Prototypes for public functions only of internal interest,.
void ast_cli_unregister_multiple(void)
static void * listener(void *unused)
Asterisk main include file. File version handling, generic pbx functions.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define ast_strdup(str)
A wrapper for strdup()
#define ast_calloc(num, len)
A wrapper for calloc()
#define ast_malloc(len)
A wrapper for malloc()
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_t_ref(o, delta, tag)
#define ao2_iterator_next(iter)
@ AO2_ALLOC_OPT_LOCK_NOLOCK
@ AO2_ALLOC_OPT_LOCK_MUTEX
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ao2_find(container, arg, flags)
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
@ OBJ_SEARCH_MASK
Search option field mask.
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
#define ao2_alloc(data_size, destructor_fn)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Standard Command Line Interface.
#define AST_CLI_DEFINE(fn, txt,...)
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void ast_cli(int fd, const char *fmt,...)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_HEAD_NOLOCK(name, type)
Defines a structure to be used to hold a list of specified type (with no lock).
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define ast_rwlock_wrlock(a)
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define AST_PTHREADT_NULL
#define ast_cond_init(cond, attr)
#define ast_cond_timedwait(cond, mutex, time)
#define ast_rwlock_rdlock(a)
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
#define ast_mutex_unlock(a)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
pthread_cond_t ast_cond_t
#define ast_rwlock_unlock(a)
#define ast_mutex_lock(a)
#define AST_MUTEX_DEFINE_STATIC(mutex)
#define ast_cond_signal(cond)
Asterisk module definitions.
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
static force_inline int attribute_pure ast_strlen_zero(const char *s)
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
descriptor for a cli entry.
int args
This gets set in ast_cli_register()
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
void(* dtor)(struct ast_taskprocessor_listener *listener)
A listener for taskprocessors.
struct ast_taskprocessor * tps
const struct ast_taskprocessor_listener_callbacks * callbacks
A ast_taskprocessor structure is a singleton by name.
long tps_queue_low
Taskprocessor low water clear alert level.
long tps_queue_high
Taskprocessor high water alert trigger level.
unsigned int high_water_alert
struct ast_taskprocessor::tps_queue tps_queue
struct ast_taskprocessor_listener * listener
unsigned int high_water_warned
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
char * subsystem
Anything before the first '/' in the name (if there is one)
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
long tps_queue_size
Taskprocessor current queue size.
tps_task structure is queued to a taskprocessor
int(* execute)(void *datap)
int(* execute_local)(struct ast_taskprocessor_local *local)
struct tps_task::@441 list
AST_LIST_ENTRY overhead.
union tps_task::@440 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
tps_taskprocessor_stats maintain statistics for a taskprocessor.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
static void tps_reset_stats(struct ast_taskprocessor *tps)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static void tps_shutdown(void)
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static void * tps_task_free(struct tps_task *task)
#define FMT_HEADERS_SUBSYSTEM
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
static int tps_ping_handler(void *datap)
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static unsigned int tps_alert_count
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor's current task.
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
static int tps_report_taskprocessor_list(int fd, const char *like)
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static void subsystem_alert_decrement(const char *subsystem)
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void * default_listener_pvt_alloc(void)
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int default_listener_start(struct ast_taskprocessor_listener *listener)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
static int default_listener_die(void *data)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static void subsystem_alert_increment(const char *subsystem)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
static void tps_taskprocessor_dtor(void *tps)
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static int tps_cmp_cb(void *obj, void *arg, int flags)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
static void taskprocessor_listener_dtor(void *obj)
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
static int tps_hash_cb(const void *obj, const int flags)
#define FMT_FIELDS_SUBSYSTEM
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
static ast_rwlock_t tps_alert_lock
An API for managing task processing threads that can be shared across modules.
ast_tps_options
ast_tps_options for specification of taskprocessor options
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
static int task(void *data)
Queued task for baseline test.
Time-related functions and macros.
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define ast_pthread_create(a, b, c, d)
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.