Asterisk - The Open Source Telephony Project GIT-master-77d630f
Data Structures | Macros | Functions | Variables
taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
Include dependency graph for taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
 
struct  ast_taskprocessor_listener
 A listener for taskprocessors. More...
 
struct  default_taskprocessor_listener_pvt
 
struct  subsystem_alert
 
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
 
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
 
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...
 

Macros

#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10
 How many seconds to wait for running taskprocessors to finish on shutdown. More...
 
#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"
 
#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"
 
#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"
 
#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"
 
#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
 
#define TPS_MAX_BUCKETS   1567
 

Functions

static struct ast_taskprocessor__allocate_taskprocessor (const char *name, struct ast_taskprocessor_listener *listener)
 
static struct ast_taskprocessor__start_taskprocessor (struct ast_taskprocessor *p)
 
unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count. More...
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue. More...
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end. More...
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener. More...
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it. More...
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary. More...
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by subsystem. More...
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status. More...
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task. More...
 
struct ast_taskprocessor_listenerast_taskprocessor_listener (struct ast_taskprocessor *tps)
 Return the listener associated with the taskprocessor. More...
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener. More...
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor. More...
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener. More...
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton. More...
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer. More...
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
int ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name. More...
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor. More...
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue. More...
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended. More...
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement. More...
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended. More...
 
int ast_tps_init (void)
 
static AST_VECTOR_RW (subsystem_alert_vector, struct subsystem_alert *)
 CLI taskprocessor ping <blah>operation requires a ping condition lock. More...
 
static char * cli_subsystem_alert_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats_all (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int default_listener_die (void *data)
 
static void * default_listener_pvt_alloc (void)
 
static void default_listener_pvt_destroy (struct default_taskprocessor_listener_pvt *pvt)
 
static void default_listener_pvt_dtor (struct ast_taskprocessor_listener *listener)
 
static void default_listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static int default_listener_start (struct ast_taskprocessor_listener *listener)
 
static void default_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void * default_tps_processing_function (void *data)
 Function that processes tasks in the taskprocessor. More...
 
static void listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static void subsystem_alert_decrement (const char *subsystem)
 
static void subsystem_alert_increment (const char *subsystem)
 
static int subsystem_cmp (struct subsystem_alert *a, struct subsystem_alert *b)
 
static void subsystem_copy (struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
 
static int subsystem_match (struct subsystem_alert *alert, const char *subsystem)
 
static void taskprocessor_listener_dtor (void *obj)
 
static int taskprocessor_push (struct ast_taskprocessor *tps, struct tps_task *t)
 
static void tps_alert_add (struct ast_taskprocessor *tps, int delta)
 
static int tps_cmp_cb (void *obj, void *arg, int flags)
 The astobj2 compare callback for taskprocessors. More...
 
static int tps_hash_cb (const void *obj, const int flags)
 The astobj2 hash callback for taskprocessors. More...
 
static int tps_ping_handler (void *datap)
 CLI taskprocessor ping <blah>handler function. More...
 
static int tps_report_taskprocessor_list (int fd, const char *like)
 
static void tps_report_taskprocessor_list_helper (int fd, struct ast_taskprocessor *tps)
 
static void tps_reset_stats (struct ast_taskprocessor *tps)
 
static void tps_shutdown (void)
 
static int tps_sort_cb (const void *obj_left, const void *obj_right, int flags)
 
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap)
 
static struct tps_tasktps_task_alloc_local (int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
 
static void * tps_task_free (struct tps_task *task)
 
static void tps_taskprocessor_dtor (void *tps)
 
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 
static char * tps_taskprocessor_tab_complete (struct ast_cli_args *a)
 

Variables

static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
 
static struct ast_cli_entry taskprocessor_clis []
 
static unsigned int tps_alert_count
 
static ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author
Dwayne Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m

Definition in file taskprocessor.c.

Macro Definition Documentation

◆ AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT

#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10

How many seconds to wait for running taskprocessors to finish on shutdown.

Definition at line 291 of file taskprocessor.c.

◆ FMT_FIELDS

#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"

Definition at line 578 of file taskprocessor.c.

◆ FMT_FIELDS_SUBSYSTEM

#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"

Definition at line 577 of file taskprocessor.c.

◆ FMT_HEADERS_SUBSYSTEM

#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"

◆ SEQ_STR_SIZE

#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */

Definition at line 1353 of file taskprocessor.c.

◆ TPS_MAX_BUCKETS

#define TPS_MAX_BUCKETS   1567

Function Documentation

◆ __allocate_taskprocessor()

static struct ast_taskprocessor * __allocate_taskprocessor ( const char *  name,
struct ast_taskprocessor_listener listener 
)
static

Definition at line 1053 of file taskprocessor.c.

1054{
1055 struct ast_taskprocessor *p;
1056 char *subsystem_separator;
1057 size_t subsystem_length = 0;
1058 size_t name_length;
1059
1060 name_length = strlen(name);
1061 subsystem_separator = strchr(name, '/');
1062 if (subsystem_separator) {
1063 subsystem_length = subsystem_separator - name;
1064 }
1065
1066 p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
1067 if (!p) {
1068 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
1069 return NULL;
1070 }
1071
1072 /* Set default congestion water level alert triggers. */
1075
1076 strcpy(p->name, name); /* Safe */
1077 p->subsystem = p->name + name_length + 1;
1078 ast_copy_string(p->subsystem, name, subsystem_length + 1);
1079
1080 ao2_ref(listener, +1);
1081 p->listener = listener;
1082
1084
1085 ao2_ref(p, +1);
1086 listener->tps = p;
1087
1088 if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1089 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1090 listener->tps = NULL;
1091 ao2_ref(p, -2);
1092 return NULL;
1093 }
1094
1095 return p;
1096}
static void * listener(void *unused)
Definition: asterisk.c:1530
#define ast_log
Definition: astobj2.c:42
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char name[]
Definition: format_mp3.c:68
#define LOG_ERROR
#define LOG_WARNING
#define AST_PTHREADT_NULL
Definition: lock.h:73
#define NULL
Definition: resample.c:96
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
char * subsystem
Anything before the first '/' in the name (if there is one)
Definition: taskprocessor.c:93
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
Definition: taskprocessor.c:97
static void tps_taskprocessor_dtor(void *tps)
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64

References ao2_alloc, ao2_link_flags, ao2_ref, ast_copy_string(), ast_log, AST_PTHREADT_NULL, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, listener(), ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, name, NULL, OBJ_NOLOCK, ast_taskprocessor::subsystem, ast_taskprocessor::thread, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and tps_taskprocessor_dtor().

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ __start_taskprocessor()

static struct ast_taskprocessor * __start_taskprocessor ( struct ast_taskprocessor p)
static

Definition at line 1098 of file taskprocessor.c.

1099{
1100 if (p && p->listener->callbacks->start(p->listener)) {
1101 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
1102 p->name);
1104
1105 return NULL;
1106 }
1107
1108 return p;
1109}
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:92
const struct ast_taskprocessor_listener_callbacks * callbacks
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ast_log, ast_taskprocessor_unreference(), ast_taskprocessor_listener::callbacks, ast_taskprocessor::listener, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener_callbacks::start.

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 884 of file taskprocessor.c.

885{
886 unsigned int count;
887
889 count = tps_alert_count;
891
892 return count;
893}
#define ast_rwlock_rdlock(a)
Definition: lock.h:242
#define ast_rwlock_unlock(a)
Definition: lock.h:241
static unsigned int tps_alert_count
static ast_rwlock_t tps_alert_lock

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 895 of file taskprocessor.c.

896{
897 if (!tps || high_water < 0 || high_water < low_water) {
898 return -1;
899 }
900
901 if (low_water < 0) {
902 /* Set low water level to 90% of high water level */
903 low_water = (high_water * 9) / 10;
904 }
905
906 ao2_lock(tps);
907
908 tps->tps_queue_low = low_water;
909 tps->tps_queue_high = high_water;
910
911 if (tps->high_water_alert) {
912 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
913 /* Update water mark alert immediately */
914 tps->high_water_alert = 0;
915 tps_alert_add(tps, -1);
916 }
917 } else {
918 if (high_water < tps->tps_queue_size) {
919 /* Update water mark alert immediately */
920 tps->high_water_alert = 1;
921 tps_alert_add(tps, +1);
922 }
923 }
924
925 ao2_unlock(tps);
926
927 return 0;
928}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
unsigned int high_water_alert
Definition: taskprocessor.c:89
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.

Definition at line 1365 of file taskprocessor.c.

1366{
1367 va_list ap;
1368 int user_size;
1369
1370 ast_assert(buf != NULL);
1371 ast_assert(SEQ_STR_SIZE <= size);
1372
1373 va_start(ap, format);
1374 user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1375 va_end(ap);
1376 if (user_size < 0) {
1377 /*
1378 * Wow! We got an output error to a memory buffer.
1379 * Assume no user part of name written.
1380 */
1381 user_size = 0;
1382 } else if (size < user_size + SEQ_STR_SIZE) {
1383 /* Truncate user part of name to make sequence number fit. */
1384 user_size = size - SEQ_STR_SIZE;
1385 }
1386
1387 /* Append sequence number to end of user name. */
1388 snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1389}
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define SEQ_STR_SIZE
#define ast_assert(a)
Definition: utils.h:776

References ast_assert, ast_taskprocessor_seq_num(), buf, NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), sorcery_object_type_alloc(), and taskpool_taskprocessor_alloc().

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor * ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure
non-NULLsuccess

Definition at line 1153 of file taskprocessor.c.

1154{
1155 struct ast_taskprocessor *p;
1156
1157 ao2_lock(tps_singletons);
1158 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1159 if (p) {
1160 ao2_unlock(tps_singletons);
1162 return NULL;
1163 }
1164
1166 ao2_unlock(tps_singletons);
1167
1168 return __start_taskprocessor(p);
1169}
#define OBJ_KEY
Definition: astobj2.h:1151
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), listener(), name, NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by ast_taskpool_serializer_group(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1282 of file taskprocessor.c.

1283{
1284 struct ast_taskprocessor_local local;
1285 struct tps_task *t;
1286 long size;
1287
1288 ao2_lock(tps);
1289 t = tps_taskprocessor_pop(tps);
1290 if (!t) {
1291 ao2_unlock(tps);
1292 return 0;
1293 }
1294
1295 tps->thread = pthread_self();
1296 tps->executing = 1;
1297
1298 if (t->wants_local) {
1299 local.local_data = tps->local_data;
1300 local.data = t->datap;
1301 }
1302 ao2_unlock(tps);
1303
1304 if (t->wants_local) {
1305 t->callback.execute_local(&local);
1306 } else {
1307 t->callback.execute(t->datap);
1308 }
1309 tps_task_free(t);
1310
1311 ao2_lock(tps);
1313 /* We need to check size in the same critical section where we reset the
1314 * executing bit. Avoids a race condition where a task is pushed right
1315 * after we pop an empty stack.
1316 */
1317 tps->executing = 0;
1318 size = ast_taskprocessor_size(tps);
1319
1320 /* Update the stats */
1322
1323 /* Include the task we just executed as part of the queue size. */
1324 if (size >= tps->stats.max_qsize) {
1325 tps->stats.max_qsize = size + 1;
1326 }
1327 ao2_unlock(tps);
1328
1329 /* If we executed a task, check for the transition to empty */
1330 if (size == 0 && tps->listener->callbacks->emptied) {
1331 tps->listener->callbacks->emptied(tps->listener);
1332 }
1333 return size > 0;
1334}
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
Local data parameter.
unsigned int executing
Definition: taskprocessor.c:85
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
int(* execute)(void *datap)
Definition: taskprocessor.c:50
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
unsigned int wants_local
Definition: taskprocessor.c:57
union tps_task::@414 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
static void * tps_task_free(struct tps_task *task)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, ast_taskprocessor::listener, ast_taskprocessor_local::local_data, ast_taskprocessor::local_data, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by ast_taskpool_serializer_push_wait(), AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), and threadpool_execute().

◆ ast_taskprocessor_get()

struct ast_taskprocessor * ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1114 of file taskprocessor.c.

1115{
1116 struct ast_taskprocessor *p;
1119
1120 if (ast_strlen_zero(name)) {
1121 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1122 return NULL;
1123 }
1124 ao2_lock(tps_singletons);
1125 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1126 if (p || (create & TPS_REF_IF_EXISTS)) {
1127 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1128 ao2_unlock(tps_singletons);
1129 return p;
1130 }
1131
1132 /* Create a new taskprocessor. Start by creating a default listener */
1134 if (!pvt) {
1135 ao2_unlock(tps_singletons);
1136 return NULL;
1137 }
1139 if (!listener) {
1140 ao2_unlock(tps_singletons);
1142 return NULL;
1143 }
1144
1146 ao2_unlock(tps_singletons);
1147 p = __start_taskprocessor(p);
1148 ao2_ref(listener, -1);
1149
1150 return p;
1151}
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
A listener for taskprocessors.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:78

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero(), ast_taskprocessor_listener_alloc(), default_listener_callbacks, default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, name, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_objects(), taskpool_taskprocessor_alloc(), and threadpool_alloc().

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by subsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 704 of file taskprocessor.c.

705{
706 struct subsystem_alert *alert;
707 unsigned int count = 0;
708 int idx;
709
710 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
711 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
712 if (idx >= 0) {
713 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
714 count = alert->alert_count;
715 }
716 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
717
718 return count;
719}
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:730
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:908
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:888
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1277 of file taskprocessor.c.

1278{
1279 return tps ? tps->suspended : -1;
1280}
unsigned int suspended
Definition: taskprocessor.c:91

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1336 of file taskprocessor.c.

1337{
1338 int is_task;
1339
1340 ao2_lock(tps);
1341 is_task = pthread_equal(tps->thread, pthread_self());
1342 ao2_unlock(tps);
1343 return is_task;
1344}

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

◆ ast_taskprocessor_listener()

Return the listener associated with the taskprocessor.

Definition at line 970 of file taskprocessor.c.

971{
972 return tps ? tps->listener : NULL;
973}

References ast_taskprocessor::listener, NULL, and ast_taskprocessor_listener::tps.

Referenced by ast_taskpool_serializer_push_wait(), and execute_tasks().

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 1000 of file taskprocessor.c.

1001{
1003
1005 if (!listener) {
1006 return NULL;
1007 }
1008 listener->callbacks = callbacks;
1009 listener->user_data = user_data;
1010
1011 return listener;
1012}
struct @476 callbacks
static void taskprocessor_listener_dtor(void *obj)

References ao2_alloc, callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskpool_serializer_group(), ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor * ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 1014 of file taskprocessor.c.

1015{
1016 ao2_ref(listener->tps, +1);
1017 return listener->tps;
1018}

References ao2_ref, and listener().

Referenced by serializer_task_pushed().

◆ ast_taskprocessor_listener_get_user_data()

void * ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 1020 of file taskprocessor.c.

1021{
1022 return listener->user_data;
1023}

References listener().

Referenced by ast_taskpool_serializer_push_wait(), execute_tasks(), serializer_shutdown(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

◆ ast_taskprocessor_name()

const char * ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 976 of file taskprocessor.c.

977{
978 if (!tps) {
979 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
980 return NULL;
981 }
982 return tps->name;
983}

References ast_log, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener::tps.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1355 of file taskprocessor.c.

1356{
1357 int final_size = strlen(name) + SEQ_STR_SIZE;
1358
1359 ast_assert(buf != NULL && name != NULL);
1360 ast_assert(final_size <= size);
1361
1362 snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1363}

References ast_assert, ast_taskprocessor_seq_num(), buf, name, NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create(), and ast_serializer_taskpool_create().

◆ ast_taskprocessor_push()

int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1245 of file taskprocessor.c.

1246{
1247 return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1248}
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)

References taskprocessor_push(), and tps_task_alloc().

Referenced by ast_cc_agent_status_response(), ast_cc_monitor_failed(), ast_cc_monitor_party_b_free(), ast_cc_monitor_status_request(), ast_cc_monitor_stop_ringing(), ast_msg_queue(), ast_sip_push_task(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_update(), ast_taskpool_push(), ast_taskpool_serializer_push_wait(), AST_TEST_DEFINE(), ast_threadpool_push(), ast_threadpool_set_size(), async_delete_name_rec(), async_play_sound_helper(), cc_request_state_change(), cli_tps_ping(), default_listener_shutdown(), destroy_conference_bridge(), dns_system_resolver_resolve(), generic_monitor_devstate_cb(), handle_cc_status(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), hepv3_send_packet(), iax2_transmit(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), play_sound_helper(), serializer_efficiency_task(), sorcery_object_load(), stasis_unsubscribe(), taskpool_taskprocessor_alloc(), taskpool_taskprocessor_dtor(), threadpool_active_thread_idle(), threadpool_idle_thread_dead(), threadpool_tps_emptied(), threadpool_tps_task_pushed(), and threadpool_zombie_thread_dead().

◆ ast_taskprocessor_push_local()

int ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *datap)  task_exe,
void *  datap 
)

Definition at line 1250 of file taskprocessor.c.

1251{
1252 return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
1253}
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)

References taskprocessor_push(), and tps_task_alloc_local().

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1346 of file taskprocessor.c.

1347{
1348 static int seq_num;
1349
1350 return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1351}
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:764

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1171 of file taskprocessor.c.

1173{
1174 SCOPED_AO2LOCK(lock, tps);
1175 tps->local_data = local_data;
1176}
ast_mutex_t lock
Definition: app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:611

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 965 of file taskprocessor.c.

966{
967 return (tps) ? tps->tps_queue_size : -1;
968}

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskpool_serializer_push_wait(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), ast_threadpool_queue_size(), execute_tasks(), taskpool_least_full_selector(), and taskpool_sequential_selector().

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1255 of file taskprocessor.c.

1256{
1257 if (tps) {
1258 ao2_lock(tps);
1259 tps->suspended = 1;
1260 ao2_unlock(tps);
1261 return 0;
1262 }
1263 return -1;
1264}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

◆ ast_taskprocessor_unreference()

void * ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1179 of file taskprocessor.c.

1180{
1181 if (!tps) {
1182 return NULL;
1183 }
1184
1185 /* To prevent another thread from finding and getting a reference to this
1186 * taskprocessor we hold the singletons lock. If we didn't do this then
1187 * they may acquire it and find that the listener has been shut down.
1188 */
1189 ao2_lock(tps_singletons);
1190
1191 if (ao2_ref(tps, -1) > 3) {
1192 ao2_unlock(tps_singletons);
1193 return NULL;
1194 }
1195
1196 /* If we're down to 3 references, then those must be:
1197 * 1. The reference we just got rid of
1198 * 2. The container
1199 * 3. The listener
1200 */
1201 ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1202 ao2_unlock(tps_singletons);
1203
1205 return NULL;
1206}
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition: astobj2.h:1600
static void listener_shutdown(struct ast_taskprocessor_listener *listener)

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), exten_state_subscription_destructor(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), taskpool_taskprocessor_alloc(), taskpool_taskprocessor_dtor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), and websocket_cb().

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1266 of file taskprocessor.c.

1267{
1268 if (tps) {
1269 ao2_lock(tps);
1270 tps->suspended = 0;
1271 ao2_unlock(tps);
1272 return 0;
1273 }
1274 return -1;
1275}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().

◆ ast_tps_init()

int ast_tps_init ( void  )

Provided by taskprocessor.c

Definition at line 368 of file taskprocessor.c.

369{
372 if (!tps_singletons) {
373 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
374 return -1;
375 }
376
377 if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
378 ao2_ref(tps_singletons, -1);
379 ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
380 return -1;
381 }
382
383 ast_cond_init(&cli_ping_cond, NULL);
384
386
388
389 return 0;
390}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define ast_cond_init(cond, attr)
Definition: lock.h:208
static void tps_shutdown(void)
#define TPS_MAX_BUCKETS
static struct ast_cli_entry taskprocessor_clis[]
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
#define ARRAY_LEN(a)
Definition: utils.h:703
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:169

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_cond_init, ast_log, ast_register_cleanup(), AST_VECTOR_RW_INIT, LOG_ERROR, NULL, taskprocessor_clis, tps_cmp_cb(), tps_hash_cb(), TPS_MAX_BUCKETS, and tps_shutdown().

Referenced by asterisk_daemon().

◆ AST_VECTOR_RW()

static AST_VECTOR_RW ( subsystem_alert_vector  ,
struct subsystem_alert  
)
static

CLI taskprocessor ping <blah>operation requires a ping condition lock.

Definition at line 127 of file taskprocessor.c.

162 {

◆ cli_subsystem_alert_report()

static char * cli_subsystem_alert_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 796 of file taskprocessor.c.

797{
798 struct subsystem_alert_vector sorted_subsystems;
799 int i;
800
801#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
802#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
803
804 switch (cmd) {
805 case CLI_INIT:
806 e->command = "core show taskprocessor alerted subsystems";
807 e->usage =
808 "Usage: core show taskprocessor alerted subsystems\n"
809 " Shows a list of task processor subsystems that are currently alerted\n";
810 return NULL;
811 case CLI_GENERATE:
812 return NULL;
813 }
814
815 if (a->argc != e->args) {
816 return CLI_SHOWUSAGE;
817 }
818
819 if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
820 return CLI_FAILURE;
821 }
822
823 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
824 for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
825 subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
826 }
827 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
828
829 ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
830
831 for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
832 struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
833 ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
834 }
835
836 ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
837
838 AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
839 AST_VECTOR_FREE(&sorted_subsystems);
840
841 return CLI_SUCCESS;
842}
#define ast_free(a)
Definition: astmm.h:180
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define FMT_HEADERS_SUBSYSTEM
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define FMT_FIELDS_SUBSYSTEM
static struct test_val a
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:124
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:873

References a, subsystem_alert::alert_count, ast_cli_entry::args, ast_cli(), ast_free, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_INIT, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_FIELDS_SUBSYSTEM, FMT_HEADERS_SUBSYSTEM, NULL, subsystem_alert::subsystem, subsystem_copy(), and ast_cli_entry::usage.

◆ cli_tps_ping()

static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 478 of file taskprocessor.c.

479{
480 struct timeval begin, end, delta;
481 const char *name;
482 struct timeval when;
483 struct timespec ts;
484 struct ast_taskprocessor *tps;
485
486 switch (cmd) {
487 case CLI_INIT:
488 e->command = "core ping taskprocessor";
489 e->usage =
490 "Usage: core ping taskprocessor <taskprocessor>\n"
491 " Displays the time required for a task to be processed\n";
492 return NULL;
493 case CLI_GENERATE:
494 if (a->pos == 3) {
496 } else {
497 return NULL;
498 }
499 }
500
501 if (a->argc != 4)
502 return CLI_SHOWUSAGE;
503
504 name = a->argv[3];
506 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
507 return CLI_SUCCESS;
508 }
509 ast_cli(a->fd, "\npinging %s ...", name);
510
511 /*
512 * Wait up to 5 seconds for a ping reply.
513 *
514 * On a very busy system it could take awhile to get a
515 * ping response from some taskprocessors.
516 */
517 begin = ast_tvnow();
518 when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
519 ts.tv_sec = when.tv_sec;
520 ts.tv_nsec = when.tv_usec * 1000;
521
522 ast_mutex_lock(&cli_ping_cond_lock);
523 if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
524 ast_mutex_unlock(&cli_ping_cond_lock);
525 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
527 return CLI_FAILURE;
528 }
529 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
530 ast_mutex_unlock(&cli_ping_cond_lock);
531
532 end = ast_tvnow();
533 delta = ast_tvsub(end, begin);
534 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
536 return CLI_SUCCESS;
537}
char * end
Definition: eagi_proxy.c:73
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:213
#define ast_mutex_unlock(a)
Definition: lock.h:197
#define ast_mutex_lock(a)
Definition: lock.h:196
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:282
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2280
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2295
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159

References a, ast_cli(), ast_cond_timedwait, ast_mutex_lock, ast_mutex_unlock, ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, end, name, NULL, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_report()

static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 643 of file taskprocessor.c.

644{
645 const char *like;
646
647 switch (cmd) {
648 case CLI_INIT:
649 e->command = "core show taskprocessors [like]";
650 e->usage =
651 "Usage: core show taskprocessors [like keyword]\n"
652 " Shows a list of instantiated task processors and their statistics\n";
653 return NULL;
654 case CLI_GENERATE:
655 if (a->pos == e->args) {
657 } else {
658 return NULL;
659 }
660 }
661
662 if (a->argc == e->args - 1) {
663 like = "";
664 } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
665 like = a->argv[e->args];
666 } else {
667 return CLI_SHOWUSAGE;
668 }
669
670 ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
671 ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
672
673 return CLI_SUCCESS;
674}
#define FMT_HEADERS
static int tps_report_taskprocessor_list(int fd, const char *like)

References a, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_HEADERS, NULL, tps_report_taskprocessor_list(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats()

static char * cli_tps_reset_stats ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1399 of file taskprocessor.c.

1400{
1401 const char *name;
1402 struct ast_taskprocessor *tps;
1403
1404 switch (cmd) {
1405 case CLI_INIT:
1406 e->command = "core reset taskprocessor";
1407 e->usage =
1408 "Usage: core reset taskprocessor <taskprocessor>\n"
1409 " Resets stats for the specified taskprocessor\n";
1410 return NULL;
1411 case CLI_GENERATE:
1413 }
1414
1415 if (a->argc != 4) {
1416 return CLI_SHOWUSAGE;
1417 }
1418
1419 name = a->argv[3];
1421 ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1422 return CLI_SUCCESS;
1423 }
1424 ast_cli(a->fd, "\nResetting %s\n\n", name);
1425
1426 tps_reset_stats(tps);
1427
1429
1430 return CLI_SUCCESS;
1431}
static void tps_reset_stats(struct ast_taskprocessor *tps)

References a, ast_cli(), ast_taskprocessor_get(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, name, NULL, TPS_REF_IF_EXISTS, tps_reset_stats(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats_all()

static char * cli_tps_reset_stats_all ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1433 of file taskprocessor.c.

1434{
1435 struct ast_taskprocessor *tps;
1436 struct ao2_iterator iter;
1437
1438 switch (cmd) {
1439 case CLI_INIT:
1440 e->command = "core reset taskprocessors";
1441 e->usage =
1442 "Usage: core reset taskprocessors\n"
1443 " Resets stats for all taskprocessors\n";
1444 return NULL;
1445 case CLI_GENERATE:
1446 return NULL;
1447 }
1448
1449 if (a->argc != e->args) {
1450 return CLI_SHOWUSAGE;
1451 }
1452
1453 ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1454
1455 iter = ao2_iterator_init(tps_singletons, 0);
1456 while ((tps = ao2_iterator_next(&iter))) {
1457 tps_reset_stats(tps);
1459 }
1460 ao2_iterator_destroy(&iter);
1461
1462 return CLI_SUCCESS;
1463}
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_entry::args, ast_cli(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, NULL, tps_reset_stats(), and ast_cli_entry::usage.

◆ default_listener_die()

static int default_listener_die ( void *  data)
static

Definition at line 246 of file taskprocessor.c.

247{
248 struct default_taskprocessor_listener_pvt *pvt = data;
249 pvt->dead = 1;
250 return 0;
251}

References default_taskprocessor_listener_pvt::dead.

Referenced by default_listener_shutdown().

◆ default_listener_pvt_alloc()

static void * default_listener_pvt_alloc ( void  )
static

Definition at line 1025 of file taskprocessor.c.

1026{
1028
1029 pvt = ast_calloc(1, sizeof(*pvt));
1030 if (!pvt) {
1031 return NULL;
1032 }
1034 if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
1035 ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
1036 ast_free(pvt);
1037 return NULL;
1038 }
1039 return pvt;
1040}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
int errno
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.

References ast_calloc, ast_free, ast_log, AST_PTHREADT_NULL, ast_sem_init(), errno, LOG_ERROR, NULL, default_taskprocessor_listener_pvt::poll_thread, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get().

◆ default_listener_pvt_destroy()

static void default_listener_pvt_destroy ( struct default_taskprocessor_listener_pvt pvt)
static

Definition at line 176 of file taskprocessor.c.

177{
178 ast_assert(pvt->dead);
179 ast_sem_destroy(&pvt->sem);
180 ast_free(pvt);
181}
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.

References ast_assert, ast_free, ast_sem_destroy(), default_taskprocessor_listener_pvt::dead, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get(), and default_listener_pvt_dtor().

◆ default_listener_pvt_dtor()

static void default_listener_pvt_dtor ( struct ast_taskprocessor_listener listener)
static

Definition at line 183 of file taskprocessor.c.

184{
185 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
186
188
189 listener->user_data = NULL;
190}

References default_listener_pvt_destroy(), listener(), and NULL.

◆ default_listener_shutdown()

static void default_listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 253 of file taskprocessor.c.

254{
255 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
256 int res;
257
258 /* Hold a reference during shutdown */
259 ao2_t_ref(listener->tps, +1, "tps-shutdown");
260
262 /* This will cause the thread to exit early without completing tasks already
263 * in the queue. This is probably the least bad option in this situation. */
265 }
266
268
269 if (pthread_equal(pthread_self(), pvt->poll_thread)) {
270 res = pthread_detach(pvt->poll_thread);
271 if (res != 0) {
272 ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
273 }
274 } else {
275 res = pthread_join(pvt->poll_thread, NULL);
276 if (res != 0) {
277 ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
278 }
279 }
281}
#define ao2_t_ref(o, delta, tag)
Definition: astobj2.h:460
static int default_listener_die(void *data)

References ao2_t_ref, ast_assert, ast_log, AST_PTHREADT_NULL, ast_taskprocessor_push(), default_listener_die(), errno, listener(), LOG_ERROR, NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_listener_start()

static int default_listener_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 225 of file taskprocessor.c.

226{
227 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
228
230 return -1;
231 }
232
233 return 0;
234}
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:621

References ast_pthread_create, default_tps_processing_function(), listener(), NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_task_pushed()

static void default_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 236 of file taskprocessor.c.

237{
238 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
239
240 if (ast_sem_post(&pvt->sem) != 0) {
241 ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
242 strerror(errno));
243 }
244}
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.

References ast_log, ast_sem_post(), errno, listener(), LOG_ERROR, and default_taskprocessor_listener_pvt::sem.

◆ default_tps_processing_function()

static void * default_tps_processing_function ( void *  data)
static

Function that processes tasks in the taskprocessor.

Definition at line 196 of file taskprocessor.c.

197{
199 struct ast_taskprocessor *tps = listener->tps;
200 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
201 int sem_value;
202 int res;
203
204 while (!pvt->dead) {
205 res = ast_sem_wait(&pvt->sem);
206 if (res != 0 && errno != EINTR) {
207 ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
208 strerror(errno));
209 /* Just give up */
210 break;
211 }
213 }
214
215 /* No posting to a dead taskprocessor! */
216 res = ast_sem_getvalue(&pvt->sem, &sem_value);
217 ast_assert(res == 0 && sem_value == 0);
218
219 /* Free the shutdown reference (see default_listener_shutdown) */
220 ao2_t_ref(listener->tps, -1, "tps-shutdown");
221
222 return NULL;
223}
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.

References ao2_t_ref, ast_assert, ast_log, ast_sem_getvalue(), ast_sem_wait(), ast_taskprocessor_execute(), default_taskprocessor_listener_pvt::dead, errno, listener(), LOG_ERROR, NULL, and default_taskprocessor_listener_pvt::sem.

Referenced by default_listener_start().

◆ listener_shutdown()

static void listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 985 of file taskprocessor.c.

986{
987 listener->callbacks->shutdown(listener);
988 ao2_ref(listener->tps, -1);
989}

References ao2_ref, and listener().

Referenced by ast_taskprocessor_unreference().

◆ subsystem_alert_decrement()

static void subsystem_alert_decrement ( const char *  subsystem)
static

Definition at line 753 of file taskprocessor.c.

754{
755 struct subsystem_alert *alert;
756 int idx;
757
759 return;
760 }
761
762 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
763 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
764 if (idx < 0) {
766 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
767 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
768 return;
769 }
770 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
771
772 alert->alert_count--;
773 if (alert->alert_count <= 0) {
774 AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
775 ast_free(alert);
776 }
777
778 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
779}
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:898
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition: vector.h:423

References subsystem_alert::alert_count, ast_free, ast_log, ast_strlen_zero(), AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_REMOVE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_ERROR, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_alert_increment()

static void subsystem_alert_increment ( const char *  subsystem)
static

Definition at line 721 of file taskprocessor.c.

722{
723 struct subsystem_alert *alert;
724 int idx;
725
727 return;
728 }
729
730 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
731 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
732 if (idx >= 0) {
733 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
734 alert->alert_count++;
735 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
736 return;
737 }
738
739 alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
740 if (!alert) {
741 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
742 return;
743 }
744 alert->alert_count = 1;
745 strcpy(alert->subsystem, subsystem); /* Safe */
746
747 if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
748 ast_free(alert);
749 }
750 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
751}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267

References subsystem_alert::alert_count, ast_free, ast_malloc, ast_strlen_zero(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_cmp()

static int subsystem_cmp ( struct subsystem_alert a,
struct subsystem_alert b 
)
static

Definition at line 699 of file taskprocessor.c.

700{
701 return strcmp(a->subsystem, b->subsystem);
702}
static struct test_val b

References a, and b.

Referenced by subsystem_copy().

◆ subsystem_copy()

static void subsystem_copy ( struct subsystem_alert alert,
struct subsystem_alert_vector *  vector 
)
static

Definition at line 781 of file taskprocessor.c.

783{
784 struct subsystem_alert *alert_copy;
785 alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
786 if (!alert_copy) {
787 return;
788 }
789 alert_copy->alert_count = alert->alert_count;
790 strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
791 if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
792 ast_free(alert_copy);
793 }
794}
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition: vector.h:382

References subsystem_alert::alert_count, ast_free, ast_malloc, AST_VECTOR_ADD_SORTED, subsystem_alert::subsystem, and subsystem_cmp().

Referenced by cli_subsystem_alert_report().

◆ subsystem_match()

static int subsystem_match ( struct subsystem_alert alert,
const char *  subsystem 
)
static

Definition at line 694 of file taskprocessor.c.

695{
696 return !strcmp(alert->subsystem, subsystem);
697}

References ast_taskprocessor::subsystem, and subsystem_alert::subsystem.

Referenced by ast_taskprocessor_get_subsystem_alert(), subsystem_alert_decrement(), and subsystem_alert_increment().

◆ taskprocessor_listener_dtor()

static void taskprocessor_listener_dtor ( void *  obj)
static

Definition at line 991 of file taskprocessor.c.

992{
994
995 if (listener->callbacks->dtor) {
996 listener->callbacks->dtor(listener);
997 }
998}

References listener().

Referenced by ast_taskprocessor_listener_alloc().

◆ taskprocessor_push()

static int taskprocessor_push ( struct ast_taskprocessor tps,
struct tps_task t 
)
static

Definition at line 1209 of file taskprocessor.c.

1210{
1211 int previous_size;
1212 int was_empty;
1213
1214 if (!tps) {
1215 ast_log(LOG_ERROR, "tps is NULL!\n");
1216 return -1;
1217 }
1218
1219 if (!t) {
1220 ast_log(LOG_ERROR, "t is NULL!\n");
1221 return -1;
1222 }
1223
1224 ao2_lock(tps);
1225 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1226 previous_size = tps->tps_queue_size++;
1227
1228 if (tps->tps_queue_high <= tps->tps_queue_size) {
1229 if (!tps->high_water_alert) {
1230 ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1231 tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
1232 tps->high_water_warned = 1;
1233 tps->high_water_alert = 1;
1234 tps_alert_add(tps, +1);
1235 }
1236 }
1237
1238 /* The currently executing task counts as still in queue */
1239 was_empty = tps->executing ? 0 : previous_size == 0;
1240 ao2_unlock(tps);
1241 tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1242 return 0;
1243}
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:731
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:99
struct ast_taskprocessor::tps_queue tps_queue
unsigned int high_water_warned
Definition: taskprocessor.c:87

References ao2_lock, ao2_unlock, AST_LIST_INSERT_TAIL, ast_log, ast_taskprocessor_listener::callbacks, ast_taskprocessor::executing, ast_taskprocessor::high_water_alert, ast_taskprocessor::high_water_warned, ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, ast_taskprocessor_listener_callbacks::task_pushed, tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_size.

Referenced by ast_taskprocessor_push(), and ast_taskprocessor_push_local().

◆ tps_alert_add()

static void tps_alert_add ( struct ast_taskprocessor tps,
int  delta 
)
static

Definition at line 859 of file taskprocessor.c.

860{
861 unsigned int old;
862
864 old = tps_alert_count;
865 tps_alert_count += delta;
866 if (DEBUG_ATLEAST(3)
867 /* and tps_alert_count becomes zero or non-zero */
868 && !old != !tps_alert_count) {
869 ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
870 tps->name, tps_alert_count ? "triggered" : "cleared");
871 }
872
873 if (tps->subsystem[0] != '\0') {
874 if (delta > 0) {
876 } else {
878 }
879 }
880
882}
#define DEBUG_ATLEAST(level)
#define LOG_DEBUG
#define ast_rwlock_wrlock(a)
Definition: lock.h:243
static void subsystem_alert_decrement(const char *subsystem)
static void subsystem_alert_increment(const char *subsystem)

References ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, DEBUG_ATLEAST, LOG_DEBUG, ast_taskprocessor::name, ast_taskprocessor::subsystem, subsystem_alert_decrement(), subsystem_alert_increment(), tps_alert_count, and tps_alert_lock.

Referenced by ast_taskprocessor_alert_set_levels(), taskprocessor_push(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

◆ tps_cmp_cb()

static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
)
static

The astobj2 compare callback for taskprocessors.

Definition at line 686 of file taskprocessor.c.

687{
688 struct ast_taskprocessor *lhs = obj, *rhs = arg;
689 const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
690
691 return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
692}
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028

References CMP_MATCH, CMP_STOP, ast_taskprocessor::name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_hash_cb()

static int tps_hash_cb ( const void *  obj,
const int  flags 
)
static

The astobj2 hash callback for taskprocessors.

Definition at line 677 of file taskprocessor.c.

678{
679 const struct ast_taskprocessor *tps = obj;
680 const char *name = flags & OBJ_KEY ? obj : tps->name;
681
682 return ast_str_case_hash(name);
683}
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303

References ast_str_case_hash(), ast_taskprocessor::name, name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_ping_handler()

static int tps_ping_handler ( void *  datap)
static

CLI taskprocessor ping <blah>handler function.

Definition at line 469 of file taskprocessor.c.

470{
471 ast_mutex_lock(&cli_ping_cond_lock);
472 ast_cond_signal(&cli_ping_cond);
473 ast_mutex_unlock(&cli_ping_cond_lock);
474 return 0;
475}
#define ast_cond_signal(cond)
Definition: lock.h:210

References ast_cond_signal, ast_mutex_lock, and ast_mutex_unlock.

Referenced by cli_tps_ping().

◆ tps_report_taskprocessor_list()

static int tps_report_taskprocessor_list ( int  fd,
const char *  like 
)
static

Definition at line 606 of file taskprocessor.c.

607{
608 int tps_count = 0;
609 int word_len;
610 struct ao2_container *sorted_tps;
611 struct ast_taskprocessor *tps;
612 struct ao2_iterator iter;
613
615 NULL);
616 if (!sorted_tps
617 || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
618 ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
619 ao2_cleanup(sorted_tps);
620 return 0;
621 }
622
623 word_len = strlen(like);
624 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
625 while ((tps = ao2_iterator_next(&iter))) {
626 if (like) {
627 if (!strncasecmp(like, tps->name, word_len)) {
629 tps_count++;
630 }
631 } else {
633 tps_count++;
634 }
636 }
638 ao2_ref(sorted_tps, -1);
639
640 return tps_count;
641}
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ast_debug(level,...)
Log a DEBUG message.
Generic container type.
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_debug, ast_taskprocessor_unreference(), ast_taskprocessor::name, NULL, tps_report_taskprocessor_list_helper(), and tps_sort_cb().

Referenced by cli_tps_report().

◆ tps_report_taskprocessor_list_helper()

static void tps_report_taskprocessor_list_helper ( int  fd,
struct ast_taskprocessor tps 
)
static

◆ tps_reset_stats()

static void tps_reset_stats ( struct ast_taskprocessor tps)
static

◆ tps_shutdown()

static void tps_shutdown ( void  )
static

Definition at line 297 of file taskprocessor.c.

298{
299 int objcount;
300 int tries;
301 struct ao2_container *sorted_tps;
302 struct ast_taskprocessor *tps;
303 struct ao2_iterator iter;
304 struct timespec delay = {1, 0};
305
306 /* During shutdown there may still be taskprocessor threads running and those
307 * tasprocessors reference tps_singletons. When those taskprocessors finish
308 * they will call ast_taskprocessor_unreference, creating a race condition which
309 * can result in tps_singletons being referenced after being deleted. To try and
310 * avoid this we check the container count and if greater than zero, give the
311 * running taskprocessors a chance to finish */
312 objcount = ao2_container_count(tps_singletons);
313 if (objcount > 0) {
315 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
316 objcount);
317
318 /* give the running taskprocessors a chance to finish, up to
319 * AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT seconds */
320 for (tries = 0; tries < AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT; tries++) {
321 while (nanosleep(&delay, &delay));
322 objcount = ao2_container_count(tps_singletons);
323 /* if count is 0, we are done waiting */
324 if (objcount == 0) {
325 break;
326 }
327 delay.tv_sec = 1;
328 delay.tv_nsec = 0;
330 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
331 objcount);
332 }
333 }
334
335 /* rather than try forever, risk an assertion on shutdown. This probably indicates
336 * a taskprocessor was not cleaned up somewhere */
337 if (objcount > 0) {
339 "Assertion may occur, the following taskprocessors are still running:\n");
340
342 NULL);
343 if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
344 ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors");
345 }
346 else {
347 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
348 while ((tps = ao2_iterator_next(&iter))) {
349 ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name);
350 }
351 }
352
353 ao2_cleanup(sorted_tps);
354 }
355 else {
357 "All waiting taskprocessors cleared!\n");
358 }
359
361 AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
362 AST_VECTOR_RW_FREE(&overloaded_subsystems);
363 ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
364 tps_singletons = NULL;
365}
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:213

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_count(), ao2_container_dup(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_t_ref, ARRAY_LEN, ast_cli_unregister_multiple(), ast_free, ast_log, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_RW_FREE, LOG_DEBUG, LOG_ERROR, ast_taskprocessor::name, NULL, taskprocessor_clis, and tps_sort_cb().

Referenced by ast_tps_init().

◆ tps_sort_cb()

static int tps_sort_cb ( const void *  obj_left,
const void *  obj_right,
int  flags 
)
static

Definition at line 555 of file taskprocessor.c.

556{
557 const struct ast_taskprocessor *tps_left = obj_left;
558 const struct ast_taskprocessor *tps_right = obj_right;
559 const char *right_key = obj_right;
560 int cmp;
561
562 switch (flags & OBJ_SEARCH_MASK) {
563 default:
565 right_key = tps_right->name;
566 /* Fall through */
567 case OBJ_SEARCH_KEY:
568 cmp = strcasecmp(tps_left->name, right_key);
569 break;
571 cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
572 break;
573 }
574 return cmp;
575}
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101

References ast_taskprocessor::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by tps_report_taskprocessor_list(), and tps_shutdown().

◆ tps_task_alloc()

static struct tps_task * tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap 
)
static

Definition at line 393 of file taskprocessor.c.

394{
395 struct tps_task *t;
396 if (!task_exe) {
397 ast_log(LOG_ERROR, "task_exe is NULL!\n");
398 return NULL;
399 }
400
401 t = ast_calloc(1, sizeof(*t));
402 if (!t) {
403 ast_log(LOG_ERROR, "failed to allocate task!\n");
404 return NULL;
405 }
406
407 t->callback.execute = task_exe;
408 t->datap = datap;
409
410 return t;
411}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute, LOG_ERROR, and NULL.

Referenced by ast_taskprocessor_push().

◆ tps_task_alloc_local()

static struct tps_task * tps_task_alloc_local ( int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap 
)
static

Definition at line 413 of file taskprocessor.c.

414{
415 struct tps_task *t;
416 if (!task_exe) {
417 ast_log(LOG_ERROR, "task_exe is NULL!\n");
418 return NULL;
419 }
420
421 t = ast_calloc(1, sizeof(*t));
422 if (!t) {
423 ast_log(LOG_ERROR, "failed to allocate task!\n");
424 return NULL;
425 }
426
427 t->callback.execute_local = task_exe;
428 t->datap = datap;
429 t->wants_local = 1;
430
431 return t;
432}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute_local, LOG_ERROR, NULL, and tps_task::wants_local.

Referenced by ast_taskprocessor_push_local().

◆ tps_task_free()

static void * tps_task_free ( struct tps_task task)
static

Definition at line 435 of file taskprocessor.c.

436{
437 ast_free(task);
438 return NULL;
439}
static int task(void *data)
Queued task for baseline test.

References ast_free, NULL, and task().

Referenced by ast_taskprocessor_execute(), and tps_taskprocessor_dtor().

◆ tps_taskprocessor_dtor()

static void tps_taskprocessor_dtor ( void *  tps)
static

Definition at line 931 of file taskprocessor.c.

932{
933 struct ast_taskprocessor *t = tps;
934 struct tps_task *task;
935
936 while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
938 }
939 t->tps_queue_size = 0;
940
941 if (t->high_water_alert) {
942 t->high_water_alert = 0;
943 tps_alert_add(t, -1);
944 }
945
947 t->listener = NULL;
948}
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:833
struct tps_task::@415 list
AST_LIST_ENTRY overhead.

References ao2_cleanup, AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, ast_taskprocessor::listener, NULL, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_free().

Referenced by __allocate_taskprocessor().

◆ tps_taskprocessor_pop()

static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor tps)
static

◆ tps_taskprocessor_tab_complete()

static char * tps_taskprocessor_tab_complete ( struct ast_cli_args a)
static

Definition at line 446 of file taskprocessor.c.

447{
448 int tklen;
449 struct ast_taskprocessor *p;
450 struct ao2_iterator i;
451
452 tklen = strlen(a->word);
453 i = ao2_iterator_init(tps_singletons, 0);
454 while ((p = ao2_iterator_next(&i))) {
455 if (!strncasecmp(a->word, p->name, tklen)) {
458 break;
459 }
460 }
462 }
464
465 return NULL;
466}
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2737

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_completion_add(), ast_strdup, ast_taskprocessor_unreference(), ast_taskprocessor::name, and NULL.

Referenced by cli_tps_ping(), cli_tps_report(), and cli_tps_reset_stats().

Variable Documentation

◆ default_listener_callbacks

const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static

Definition at line 283 of file taskprocessor.c.

Referenced by ast_taskprocessor_get().

◆ taskprocessor_clis

struct ast_cli_entry taskprocessor_clis[]
static

Definition at line 162 of file taskprocessor.c.

Referenced by ast_tps_init(), and tps_shutdown().

◆ tps_alert_count

unsigned int tps_alert_count
static

Count of the number of taskprocessors in high water alert.

Definition at line 846 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().

◆ tps_alert_lock

ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

Access protection for tps_alert_count

Definition at line 849 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().