Asterisk - The Open Source Telephony Project GIT-master-f36a736
Data Structures | Macros | Functions | Variables
taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
Include dependency graph for taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
 
struct  ast_taskprocessor_listener
 A listener for taskprocessors. More...
 
struct  default_taskprocessor_listener_pvt
 
struct  subsystem_alert
 
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
 
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
 
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...
 

Macros

#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10
 How many seconds to wait for running taskprocessors to finish on shutdown. More...
 
#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"
 
#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"
 
#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"
 
#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"
 
#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
 
#define TPS_MAX_BUCKETS   1567
 

Functions

static struct ast_taskprocessor__allocate_taskprocessor (const char *name, struct ast_taskprocessor_listener *listener)
 
static struct ast_taskprocessor__start_taskprocessor (struct ast_taskprocessor *p)
 
unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count. More...
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue. More...
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end. More...
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener. More...
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it. More...
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary. More...
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by subsystem. More...
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status. More...
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task. More...
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener. More...
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor. More...
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener. More...
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton. More...
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer. More...
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
int ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name. More...
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor. More...
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue. More...
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended. More...
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement. More...
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended. More...
 
int ast_tps_init (void)
 
static AST_VECTOR_RW (subsystem_alert_vector, struct subsystem_alert *)
 CLI taskprocessor ping <blah>operation requires a ping condition lock. More...
 
static char * cli_subsystem_alert_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats_all (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int default_listener_die (void *data)
 
static void * default_listener_pvt_alloc (void)
 
static void default_listener_pvt_destroy (struct default_taskprocessor_listener_pvt *pvt)
 
static void default_listener_pvt_dtor (struct ast_taskprocessor_listener *listener)
 
static void default_listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static int default_listener_start (struct ast_taskprocessor_listener *listener)
 
static void default_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void * default_tps_processing_function (void *data)
 Function that processes tasks in the taskprocessor. More...
 
static void listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static void subsystem_alert_decrement (const char *subsystem)
 
static void subsystem_alert_increment (const char *subsystem)
 
static int subsystem_cmp (struct subsystem_alert *a, struct subsystem_alert *b)
 
static void subsystem_copy (struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
 
static int subsystem_match (struct subsystem_alert *alert, const char *subsystem)
 
static void taskprocessor_listener_dtor (void *obj)
 
static int taskprocessor_push (struct ast_taskprocessor *tps, struct tps_task *t)
 
static void tps_alert_add (struct ast_taskprocessor *tps, int delta)
 
static int tps_cmp_cb (void *obj, void *arg, int flags)
 The astobj2 compare callback for taskprocessors. More...
 
static int tps_hash_cb (const void *obj, const int flags)
 The astobj2 hash callback for taskprocessors. More...
 
static int tps_ping_handler (void *datap)
 CLI taskprocessor ping <blah>handler function. More...
 
static int tps_report_taskprocessor_list (int fd, const char *like)
 
static void tps_report_taskprocessor_list_helper (int fd, struct ast_taskprocessor *tps)
 
static void tps_reset_stats (struct ast_taskprocessor *tps)
 
static void tps_shutdown (void)
 
static int tps_sort_cb (const void *obj_left, const void *obj_right, int flags)
 
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap)
 
static struct tps_tasktps_task_alloc_local (int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
 
static void * tps_task_free (struct tps_task *task)
 
static void tps_taskprocessor_dtor (void *tps)
 
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 
static char * tps_taskprocessor_tab_complete (struct ast_cli_args *a)
 

Variables

static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
 
static struct ast_cli_entry taskprocessor_clis []
 
static unsigned int tps_alert_count
 
static ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author
Dwayne Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m

Definition in file taskprocessor.c.

Macro Definition Documentation

◆ AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT

#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10

How many seconds to wait for running taskprocessors to finish on shutdown.

Definition at line 291 of file taskprocessor.c.

◆ FMT_FIELDS

#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"

Definition at line 578 of file taskprocessor.c.

◆ FMT_FIELDS_SUBSYSTEM

#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"

Definition at line 577 of file taskprocessor.c.

◆ FMT_HEADERS_SUBSYSTEM

#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"

◆ SEQ_STR_SIZE

#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */

Definition at line 1348 of file taskprocessor.c.

◆ TPS_MAX_BUCKETS

#define TPS_MAX_BUCKETS   1567

Function Documentation

◆ __allocate_taskprocessor()

static struct ast_taskprocessor * __allocate_taskprocessor ( const char *  name,
struct ast_taskprocessor_listener listener 
)
static

Definition at line 1048 of file taskprocessor.c.

1049{
1050 struct ast_taskprocessor *p;
1051 char *subsystem_separator;
1052 size_t subsystem_length = 0;
1053 size_t name_length;
1054
1055 name_length = strlen(name);
1056 subsystem_separator = strchr(name, '/');
1057 if (subsystem_separator) {
1058 subsystem_length = subsystem_separator - name;
1059 }
1060
1061 p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
1062 if (!p) {
1063 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
1064 return NULL;
1065 }
1066
1067 /* Set default congestion water level alert triggers. */
1070
1071 strcpy(p->name, name); /* Safe */
1072 p->subsystem = p->name + name_length + 1;
1073 ast_copy_string(p->subsystem, name, subsystem_length + 1);
1074
1075 ao2_ref(listener, +1);
1076 p->listener = listener;
1077
1079
1080 ao2_ref(p, +1);
1081 listener->tps = p;
1082
1083 if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1084 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1085 listener->tps = NULL;
1086 ao2_ref(p, -2);
1087 return NULL;
1088 }
1089
1090 return p;
1091}
static void * listener(void *unused)
Definition: asterisk.c:1519
#define ast_log
Definition: astobj2.c:42
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
static const char name[]
Definition: format_mp3.c:68
#define LOG_ERROR
#define LOG_WARNING
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define NULL
Definition: resample.c:96
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
char * subsystem
Anything before the first '/' in the name (if there is one)
Definition: taskprocessor.c:93
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
Definition: taskprocessor.c:97
static void tps_taskprocessor_dtor(void *tps)
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64

References ao2_alloc, ao2_link_flags, ao2_ref, ast_copy_string(), ast_log, AST_PTHREADT_NULL, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, listener(), ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, name, NULL, OBJ_NOLOCK, ast_taskprocessor::subsystem, ast_taskprocessor::thread, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and tps_taskprocessor_dtor().

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ __start_taskprocessor()

static struct ast_taskprocessor * __start_taskprocessor ( struct ast_taskprocessor p)
static

Definition at line 1093 of file taskprocessor.c.

1094{
1095 if (p && p->listener->callbacks->start(p->listener)) {
1096 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
1097 p->name);
1099
1100 return NULL;
1101 }
1102
1103 return p;
1104}
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:92
const struct ast_taskprocessor_listener_callbacks * callbacks
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ast_log, ast_taskprocessor_unreference(), ast_taskprocessor_listener::callbacks, ast_taskprocessor::listener, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener_callbacks::start.

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 884 of file taskprocessor.c.

885{
886 unsigned int count;
887
889 count = tps_alert_count;
891
892 return count;
893}
#define ast_rwlock_rdlock(a)
Definition: lock.h:235
#define ast_rwlock_unlock(a)
Definition: lock.h:234
static unsigned int tps_alert_count
static ast_rwlock_t tps_alert_lock

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 895 of file taskprocessor.c.

896{
897 if (!tps || high_water < 0 || high_water < low_water) {
898 return -1;
899 }
900
901 if (low_water < 0) {
902 /* Set low water level to 90% of high water level */
903 low_water = (high_water * 9) / 10;
904 }
905
906 ao2_lock(tps);
907
908 tps->tps_queue_low = low_water;
909 tps->tps_queue_high = high_water;
910
911 if (tps->high_water_alert) {
912 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
913 /* Update water mark alert immediately */
914 tps->high_water_alert = 0;
915 tps_alert_add(tps, -1);
916 }
917 } else {
918 if (high_water < tps->tps_queue_size) {
919 /* Update water mark alert immediately */
920 tps->high_water_alert = 1;
921 tps_alert_add(tps, +1);
922 }
923 }
924
925 ao2_unlock(tps);
926
927 return 0;
928}
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_lock(a)
Definition: astobj2.h:717
unsigned int high_water_alert
Definition: taskprocessor.c:89
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.

Definition at line 1360 of file taskprocessor.c.

1361{
1362 va_list ap;
1363 int user_size;
1364
1365 ast_assert(buf != NULL);
1366 ast_assert(SEQ_STR_SIZE <= size);
1367
1368 va_start(ap, format);
1369 user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1370 va_end(ap);
1371 if (user_size < 0) {
1372 /*
1373 * Wow! We got an output error to a memory buffer.
1374 * Assume no user part of name written.
1375 */
1376 user_size = 0;
1377 } else if (size < user_size + SEQ_STR_SIZE) {
1378 /* Truncate user part of name to make sequence number fit. */
1379 user_size = size - SEQ_STR_SIZE;
1380 }
1381
1382 /* Append sequence number to end of user name. */
1383 snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1384}
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define SEQ_STR_SIZE
#define ast_assert(a)
Definition: utils.h:739

References ast_assert, ast_taskprocessor_seq_num(), buf, NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), and sorcery_object_type_alloc().

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor * ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure
non-NULLsuccess

Definition at line 1148 of file taskprocessor.c.

1149{
1150 struct ast_taskprocessor *p;
1151
1152 ao2_lock(tps_singletons);
1153 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1154 if (p) {
1155 ao2_unlock(tps_singletons);
1157 return NULL;
1158 }
1159
1161 ao2_unlock(tps_singletons);
1162
1163 return __start_taskprocessor(p);
1164}
#define OBJ_KEY
Definition: astobj2.h:1151
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1736
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), listener(), name, NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1277 of file taskprocessor.c.

1278{
1279 struct ast_taskprocessor_local local;
1280 struct tps_task *t;
1281 long size;
1282
1283 ao2_lock(tps);
1284 t = tps_taskprocessor_pop(tps);
1285 if (!t) {
1286 ao2_unlock(tps);
1287 return 0;
1288 }
1289
1290 tps->thread = pthread_self();
1291 tps->executing = 1;
1292
1293 if (t->wants_local) {
1294 local.local_data = tps->local_data;
1295 local.data = t->datap;
1296 }
1297 ao2_unlock(tps);
1298
1299 if (t->wants_local) {
1300 t->callback.execute_local(&local);
1301 } else {
1302 t->callback.execute(t->datap);
1303 }
1304 tps_task_free(t);
1305
1306 ao2_lock(tps);
1308 /* We need to check size in the same critical section where we reset the
1309 * executing bit. Avoids a race condition where a task is pushed right
1310 * after we pop an empty stack.
1311 */
1312 tps->executing = 0;
1313 size = ast_taskprocessor_size(tps);
1314
1315 /* Update the stats */
1317
1318 /* Include the task we just executed as part of the queue size. */
1319 if (size >= tps->stats.max_qsize) {
1320 tps->stats.max_qsize = size + 1;
1321 }
1322 ao2_unlock(tps);
1323
1324 /* If we executed a task, check for the transition to empty */
1325 if (size == 0 && tps->listener->callbacks->emptied) {
1326 tps->listener->callbacks->emptied(tps->listener);
1327 }
1328 return size > 0;
1329}
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
Local data parameter.
unsigned int executing
Definition: taskprocessor.c:85
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
int(* execute)(void *datap)
Definition: taskprocessor.c:50
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
unsigned int wants_local
Definition: taskprocessor.c:57
union tps_task::@406 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
static void * tps_task_free(struct tps_task *task)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, ast_taskprocessor::listener, ast_taskprocessor_local::local_data, ast_taskprocessor::local_data, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), and threadpool_execute().

◆ ast_taskprocessor_get()

struct ast_taskprocessor * ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1109 of file taskprocessor.c.

1110{
1111 struct ast_taskprocessor *p;
1114
1115 if (ast_strlen_zero(name)) {
1116 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1117 return NULL;
1118 }
1119 ao2_lock(tps_singletons);
1120 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1121 if (p || (create & TPS_REF_IF_EXISTS)) {
1122 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1123 ao2_unlock(tps_singletons);
1124 return p;
1125 }
1126
1127 /* Create a new taskprocessor. Start by creating a default listener */
1129 if (!pvt) {
1130 ao2_unlock(tps_singletons);
1131 return NULL;
1132 }
1134 if (!listener) {
1135 ao2_unlock(tps_singletons);
1137 return NULL;
1138 }
1139
1141 ao2_unlock(tps_singletons);
1142 p = __start_taskprocessor(p);
1143 ao2_ref(listener, -1);
1144
1145 return p;
1146}
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
A listener for taskprocessors.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:78

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero(), ast_taskprocessor_listener_alloc(), default_listener_callbacks, default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, name, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_objects(), and threadpool_alloc().

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by subsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 704 of file taskprocessor.c.

705{
706 struct subsystem_alert *alert;
707 unsigned int count = 0;
708 int idx;
709
710 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
711 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
712 if (idx >= 0) {
713 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
714 count = alert->alert_count;
715 }
716 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
717
718 return count;
719}
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:719
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:897
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:877
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1272 of file taskprocessor.c.

1273{
1274 return tps ? tps->suspended : -1;
1275}
unsigned int suspended
Definition: taskprocessor.c:91

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1331 of file taskprocessor.c.

1332{
1333 int is_task;
1334
1335 ao2_lock(tps);
1336 is_task = pthread_equal(tps->thread, pthread_self());
1337 ao2_unlock(tps);
1338 return is_task;
1339}

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 995 of file taskprocessor.c.

996{
998
1000 if (!listener) {
1001 return NULL;
1002 }
1003 listener->callbacks = callbacks;
1004 listener->user_data = user_data;
1005
1006 return listener;
1007}
struct @468 callbacks
static void taskprocessor_listener_dtor(void *obj)

References ao2_alloc, callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor * ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 1009 of file taskprocessor.c.

1010{
1011 ao2_ref(listener->tps, +1);
1012 return listener->tps;
1013}

References ao2_ref, and listener().

Referenced by serializer_task_pushed().

◆ ast_taskprocessor_listener_get_user_data()

void * ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 1015 of file taskprocessor.c.

1016{
1017 return listener->user_data;
1018}

References listener().

Referenced by serializer_shutdown(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

◆ ast_taskprocessor_name()

const char * ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 971 of file taskprocessor.c.

972{
973 if (!tps) {
974 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
975 return NULL;
976 }
977 return tps->name;
978}

References ast_log, LOG_ERROR, ast_taskprocessor::name, and NULL.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1350 of file taskprocessor.c.

1351{
1352 int final_size = strlen(name) + SEQ_STR_SIZE;
1353
1354 ast_assert(buf != NULL && name != NULL);
1355 ast_assert(final_size <= size);
1356
1357 snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1358}

References ast_assert, ast_taskprocessor_seq_num(), buf, name, NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create().

◆ ast_taskprocessor_push()

int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1240 of file taskprocessor.c.

1241{
1242 return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1243}
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)

References taskprocessor_push(), and tps_task_alloc().

Referenced by ast_cc_agent_status_response(), ast_cc_monitor_failed(), ast_cc_monitor_party_b_free(), ast_cc_monitor_status_request(), ast_cc_monitor_stop_ringing(), ast_msg_queue(), ast_sip_push_task(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_update(), AST_TEST_DEFINE(), ast_threadpool_push(), ast_threadpool_set_size(), async_delete_name_rec(), async_play_sound_helper(), cc_request_state_change(), cli_tps_ping(), default_listener_shutdown(), destroy_conference_bridge(), dns_system_resolver_resolve(), generic_monitor_devstate_cb(), handle_cc_status(), hepv3_send_packet(), iax2_transmit(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), play_sound_helper(), sorcery_object_load(), stasis_unsubscribe(), threadpool_active_thread_idle(), threadpool_idle_thread_dead(), threadpool_tps_emptied(), threadpool_tps_task_pushed(), and threadpool_zombie_thread_dead().

◆ ast_taskprocessor_push_local()

int ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *datap)  task_exe,
void *  datap 
)

Definition at line 1245 of file taskprocessor.c.

1246{
1247 return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
1248}
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)

References taskprocessor_push(), and tps_task_alloc_local().

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1341 of file taskprocessor.c.

1342{
1343 static int seq_num;
1344
1345 return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1346}
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1166 of file taskprocessor.c.

1168{
1169 SCOPED_AO2LOCK(lock, tps);
1170 tps->local_data = local_data;
1171}
ast_mutex_t lock
Definition: app_sla.c:331
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 965 of file taskprocessor.c.

966{
967 return (tps) ? tps->tps_queue_size : -1;
968}

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), and ast_threadpool_queue_size().

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1250 of file taskprocessor.c.

1251{
1252 if (tps) {
1253 ao2_lock(tps);
1254 tps->suspended = 1;
1255 ao2_unlock(tps);
1256 return 0;
1257 }
1258 return -1;
1259}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

◆ ast_taskprocessor_unreference()

void * ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1174 of file taskprocessor.c.

1175{
1176 if (!tps) {
1177 return NULL;
1178 }
1179
1180 /* To prevent another thread from finding and getting a reference to this
1181 * taskprocessor we hold the singletons lock. If we didn't do this then
1182 * they may acquire it and find that the listener has been shut down.
1183 */
1184 ao2_lock(tps_singletons);
1185
1186 if (ao2_ref(tps, -1) > 3) {
1187 ao2_unlock(tps_singletons);
1188 return NULL;
1189 }
1190
1191 /* If we're down to 3 references, then those must be:
1192 * 1. The reference we just got rid of
1193 * 2. The container
1194 * 3. The listener
1195 */
1196 ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1197 ao2_unlock(tps_singletons);
1198
1200 return NULL;
1201}
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition: astobj2.h:1600
static void listener_shutdown(struct ast_taskprocessor_listener *listener)

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), exten_state_subscription_destructor(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), and websocket_cb().

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1261 of file taskprocessor.c.

1262{
1263 if (tps) {
1264 ao2_lock(tps);
1265 tps->suspended = 0;
1266 ao2_unlock(tps);
1267 return 0;
1268 }
1269 return -1;
1270}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().

◆ ast_tps_init()

int ast_tps_init ( void  )

Provided by taskprocessor.c

Definition at line 368 of file taskprocessor.c.

369{
372 if (!tps_singletons) {
373 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
374 return -1;
375 }
376
377 if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
378 ao2_ref(tps_singletons, -1);
379 ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
380 return -1;
381 }
382
383 ast_cond_init(&cli_ping_cond, NULL);
384
386
388
389 return 0;
390}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition: astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define ast_cond_init(cond, attr)
Definition: lock.h:201
static void tps_shutdown(void)
#define TPS_MAX_BUCKETS
static struct ast_cli_entry taskprocessor_clis[]
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
#define ARRAY_LEN(a)
Definition: utils.h:666
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_cond_init, ast_log, ast_register_cleanup(), AST_VECTOR_RW_INIT, LOG_ERROR, NULL, taskprocessor_clis, tps_cmp_cb(), tps_hash_cb(), TPS_MAX_BUCKETS, and tps_shutdown().

Referenced by asterisk_daemon().

◆ AST_VECTOR_RW()

static AST_VECTOR_RW ( subsystem_alert_vector  ,
struct subsystem_alert  
)
static

CLI taskprocessor ping <blah>operation requires a ping condition lock.

Definition at line 127 of file taskprocessor.c.

162 {

◆ cli_subsystem_alert_report()

static char * cli_subsystem_alert_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 796 of file taskprocessor.c.

797{
798 struct subsystem_alert_vector sorted_subsystems;
799 int i;
800
801#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
802#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
803
804 switch (cmd) {
805 case CLI_INIT:
806 e->command = "core show taskprocessor alerted subsystems";
807 e->usage =
808 "Usage: core show taskprocessor alerted subsystems\n"
809 " Shows a list of task processor subsystems that are currently alerted\n";
810 return NULL;
811 case CLI_GENERATE:
812 return NULL;
813 }
814
815 if (a->argc != e->args) {
816 return CLI_SHOWUSAGE;
817 }
818
819 if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
820 return CLI_FAILURE;
821 }
822
823 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
824 for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
825 subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
826 }
827 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
828
829 ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
830
831 for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
832 struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
833 ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
834 }
835
836 ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
837
838 AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
839 AST_VECTOR_FREE(&sorted_subsystems);
840
841 return CLI_SUCCESS;
842}
#define ast_free(a)
Definition: astmm.h:180
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_SUCCESS
Definition: cli.h:44
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
@ CLI_INIT
Definition: cli.h:152
@ CLI_GENERATE
Definition: cli.h:153
#define CLI_FAILURE
Definition: cli.h:46
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define FMT_HEADERS_SUBSYSTEM
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define FMT_FIELDS_SUBSYSTEM
static struct test_val a
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:862

References a, subsystem_alert::alert_count, ast_cli_entry::args, ast_cli(), ast_free, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_INIT, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_FIELDS_SUBSYSTEM, FMT_HEADERS_SUBSYSTEM, NULL, subsystem_alert::subsystem, subsystem_copy(), and ast_cli_entry::usage.

◆ cli_tps_ping()

static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 478 of file taskprocessor.c.

479{
480 struct timeval begin, end, delta;
481 const char *name;
482 struct timeval when;
483 struct timespec ts;
484 struct ast_taskprocessor *tps;
485
486 switch (cmd) {
487 case CLI_INIT:
488 e->command = "core ping taskprocessor";
489 e->usage =
490 "Usage: core ping taskprocessor <taskprocessor>\n"
491 " Displays the time required for a task to be processed\n";
492 return NULL;
493 case CLI_GENERATE:
494 if (a->pos == 3) {
496 } else {
497 return NULL;
498 }
499 }
500
501 if (a->argc != 4)
502 return CLI_SHOWUSAGE;
503
504 name = a->argv[3];
506 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
507 return CLI_SUCCESS;
508 }
509 ast_cli(a->fd, "\npinging %s ...", name);
510
511 /*
512 * Wait up to 5 seconds for a ping reply.
513 *
514 * On a very busy system it could take awhile to get a
515 * ping response from some taskprocessors.
516 */
517 begin = ast_tvnow();
518 when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
519 ts.tv_sec = when.tv_sec;
520 ts.tv_nsec = when.tv_usec * 1000;
521
522 ast_mutex_lock(&cli_ping_cond_lock);
523 if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
524 ast_mutex_unlock(&cli_ping_cond_lock);
525 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
527 return CLI_FAILURE;
528 }
529 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
530 ast_mutex_unlock(&cli_ping_cond_lock);
531
532 end = ast_tvnow();
533 delta = ast_tvsub(end, begin);
534 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
536 return CLI_SUCCESS;
537}
char * end
Definition: eagi_proxy.c:73
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:206
#define ast_mutex_unlock(a)
Definition: lock.h:190
#define ast_mutex_lock(a)
Definition: lock.h:189
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:282
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2282
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2297
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159

References a, ast_cli(), ast_cond_timedwait, ast_mutex_lock, ast_mutex_unlock, ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, end, name, NULL, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_report()

static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 643 of file taskprocessor.c.

644{
645 const char *like;
646
647 switch (cmd) {
648 case CLI_INIT:
649 e->command = "core show taskprocessors [like]";
650 e->usage =
651 "Usage: core show taskprocessors [like keyword]\n"
652 " Shows a list of instantiated task processors and their statistics\n";
653 return NULL;
654 case CLI_GENERATE:
655 if (a->pos == e->args) {
657 } else {
658 return NULL;
659 }
660 }
661
662 if (a->argc == e->args - 1) {
663 like = "";
664 } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
665 like = a->argv[e->args];
666 } else {
667 return CLI_SHOWUSAGE;
668 }
669
670 ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
671 ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
672
673 return CLI_SUCCESS;
674}
#define FMT_HEADERS
static int tps_report_taskprocessor_list(int fd, const char *like)

References a, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_HEADERS, NULL, tps_report_taskprocessor_list(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats()

static char * cli_tps_reset_stats ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1394 of file taskprocessor.c.

1395{
1396 const char *name;
1397 struct ast_taskprocessor *tps;
1398
1399 switch (cmd) {
1400 case CLI_INIT:
1401 e->command = "core reset taskprocessor";
1402 e->usage =
1403 "Usage: core reset taskprocessor <taskprocessor>\n"
1404 " Resets stats for the specified taskprocessor\n";
1405 return NULL;
1406 case CLI_GENERATE:
1408 }
1409
1410 if (a->argc != 4) {
1411 return CLI_SHOWUSAGE;
1412 }
1413
1414 name = a->argv[3];
1416 ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1417 return CLI_SUCCESS;
1418 }
1419 ast_cli(a->fd, "\nResetting %s\n\n", name);
1420
1421 tps_reset_stats(tps);
1422
1424
1425 return CLI_SUCCESS;
1426}
static void tps_reset_stats(struct ast_taskprocessor *tps)

References a, ast_cli(), ast_taskprocessor_get(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, name, NULL, TPS_REF_IF_EXISTS, tps_reset_stats(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats_all()

static char * cli_tps_reset_stats_all ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1428 of file taskprocessor.c.

1429{
1430 struct ast_taskprocessor *tps;
1431 struct ao2_iterator iter;
1432
1433 switch (cmd) {
1434 case CLI_INIT:
1435 e->command = "core reset taskprocessors";
1436 e->usage =
1437 "Usage: core reset taskprocessors\n"
1438 " Resets stats for all taskprocessors\n";
1439 return NULL;
1440 case CLI_GENERATE:
1441 return NULL;
1442 }
1443
1444 if (a->argc != e->args) {
1445 return CLI_SHOWUSAGE;
1446 }
1447
1448 ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1449
1450 iter = ao2_iterator_init(tps_singletons, 0);
1451 while ((tps = ao2_iterator_next(&iter))) {
1452 tps_reset_stats(tps);
1454 }
1455 ao2_iterator_destroy(&iter);
1456
1457 return CLI_SUCCESS;
1458}
#define ao2_iterator_next(iter)
Definition: astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_entry::args, ast_cli(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, NULL, tps_reset_stats(), and ast_cli_entry::usage.

◆ default_listener_die()

static int default_listener_die ( void *  data)
static

Definition at line 246 of file taskprocessor.c.

247{
248 struct default_taskprocessor_listener_pvt *pvt = data;
249 pvt->dead = 1;
250 return 0;
251}

References default_taskprocessor_listener_pvt::dead.

Referenced by default_listener_shutdown().

◆ default_listener_pvt_alloc()

static void * default_listener_pvt_alloc ( void  )
static

Definition at line 1020 of file taskprocessor.c.

1021{
1023
1024 pvt = ast_calloc(1, sizeof(*pvt));
1025 if (!pvt) {
1026 return NULL;
1027 }
1029 if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
1030 ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
1031 ast_free(pvt);
1032 return NULL;
1033 }
1034 return pvt;
1035}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
int errno
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.

References ast_calloc, ast_free, ast_log, AST_PTHREADT_NULL, ast_sem_init(), errno, LOG_ERROR, NULL, default_taskprocessor_listener_pvt::poll_thread, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get().

◆ default_listener_pvt_destroy()

static void default_listener_pvt_destroy ( struct default_taskprocessor_listener_pvt pvt)
static

Definition at line 176 of file taskprocessor.c.

177{
178 ast_assert(pvt->dead);
179 ast_sem_destroy(&pvt->sem);
180 ast_free(pvt);
181}
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.

References ast_assert, ast_free, ast_sem_destroy(), default_taskprocessor_listener_pvt::dead, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get(), and default_listener_pvt_dtor().

◆ default_listener_pvt_dtor()

static void default_listener_pvt_dtor ( struct ast_taskprocessor_listener listener)
static

Definition at line 183 of file taskprocessor.c.

184{
185 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
186
188
189 listener->user_data = NULL;
190}

References default_listener_pvt_destroy(), listener(), and NULL.

◆ default_listener_shutdown()

static void default_listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 253 of file taskprocessor.c.

254{
255 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
256 int res;
257
258 /* Hold a reference during shutdown */
259 ao2_t_ref(listener->tps, +1, "tps-shutdown");
260
262 /* This will cause the thread to exit early without completing tasks already
263 * in the queue. This is probably the least bad option in this situation. */
265 }
266
268
269 if (pthread_equal(pthread_self(), pvt->poll_thread)) {
270 res = pthread_detach(pvt->poll_thread);
271 if (res != 0) {
272 ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
273 }
274 } else {
275 res = pthread_join(pvt->poll_thread, NULL);
276 if (res != 0) {
277 ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
278 }
279 }
281}
#define ao2_t_ref(o, delta, tag)
Definition: astobj2.h:460
static int default_listener_die(void *data)

References ao2_t_ref, ast_assert, ast_log, AST_PTHREADT_NULL, ast_taskprocessor_push(), default_listener_die(), errno, listener(), LOG_ERROR, NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_listener_start()

static int default_listener_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 225 of file taskprocessor.c.

226{
227 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
228
230 return -1;
231 }
232
233 return 0;
234}
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:584

References ast_pthread_create, default_tps_processing_function(), listener(), NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_task_pushed()

static void default_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 236 of file taskprocessor.c.

237{
238 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
239
240 if (ast_sem_post(&pvt->sem) != 0) {
241 ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
242 strerror(errno));
243 }
244}
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.

References ast_log, ast_sem_post(), errno, listener(), LOG_ERROR, and default_taskprocessor_listener_pvt::sem.

◆ default_tps_processing_function()

static void * default_tps_processing_function ( void *  data)
static

Function that processes tasks in the taskprocessor.

Definition at line 196 of file taskprocessor.c.

197{
199 struct ast_taskprocessor *tps = listener->tps;
200 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
201 int sem_value;
202 int res;
203
204 while (!pvt->dead) {
205 res = ast_sem_wait(&pvt->sem);
206 if (res != 0 && errno != EINTR) {
207 ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
208 strerror(errno));
209 /* Just give up */
210 break;
211 }
213 }
214
215 /* No posting to a dead taskprocessor! */
216 res = ast_sem_getvalue(&pvt->sem, &sem_value);
217 ast_assert(res == 0 && sem_value == 0);
218
219 /* Free the shutdown reference (see default_listener_shutdown) */
220 ao2_t_ref(listener->tps, -1, "tps-shutdown");
221
222 return NULL;
223}
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.

References ao2_t_ref, ast_assert, ast_log, ast_sem_getvalue(), ast_sem_wait(), ast_taskprocessor_execute(), default_taskprocessor_listener_pvt::dead, errno, listener(), LOG_ERROR, NULL, and default_taskprocessor_listener_pvt::sem.

Referenced by default_listener_start().

◆ listener_shutdown()

static void listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 980 of file taskprocessor.c.

981{
982 listener->callbacks->shutdown(listener);
983 ao2_ref(listener->tps, -1);
984}

References ao2_ref, and listener().

Referenced by ast_taskprocessor_unreference().

◆ subsystem_alert_decrement()

static void subsystem_alert_decrement ( const char *  subsystem)
static

Definition at line 753 of file taskprocessor.c.

754{
755 struct subsystem_alert *alert;
756 int idx;
757
759 return;
760 }
761
762 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
763 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
764 if (idx < 0) {
766 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
767 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
768 return;
769 }
770 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
771
772 alert->alert_count--;
773 if (alert->alert_count <= 0) {
774 AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
775 ast_free(alert);
776 }
777
778 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
779}
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:887
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition: vector.h:412

References subsystem_alert::alert_count, ast_free, ast_log, ast_strlen_zero(), AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_REMOVE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_ERROR, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_alert_increment()

static void subsystem_alert_increment ( const char *  subsystem)
static

Definition at line 721 of file taskprocessor.c.

722{
723 struct subsystem_alert *alert;
724 int idx;
725
727 return;
728 }
729
730 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
731 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
732 if (idx >= 0) {
733 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
734 alert->alert_count++;
735 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
736 return;
737 }
738
739 alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
740 if (!alert) {
741 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
742 return;
743 }
744 alert->alert_count = 1;
745 strcpy(alert->subsystem, subsystem); /* Safe */
746
747 if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
748 ast_free(alert);
749 }
750 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
751}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256

References subsystem_alert::alert_count, ast_free, ast_malloc, ast_strlen_zero(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_cmp()

static int subsystem_cmp ( struct subsystem_alert a,
struct subsystem_alert b 
)
static

Definition at line 699 of file taskprocessor.c.

700{
701 return strcmp(a->subsystem, b->subsystem);
702}
static struct test_val b

References a, and b.

Referenced by subsystem_copy().

◆ subsystem_copy()

static void subsystem_copy ( struct subsystem_alert alert,
struct subsystem_alert_vector *  vector 
)
static

Definition at line 781 of file taskprocessor.c.

783{
784 struct subsystem_alert *alert_copy;
785 alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
786 if (!alert_copy) {
787 return;
788 }
789 alert_copy->alert_count = alert->alert_count;
790 strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
791 if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
792 ast_free(alert_copy);
793 }
794}
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition: vector.h:371

References subsystem_alert::alert_count, ast_free, ast_malloc, AST_VECTOR_ADD_SORTED, subsystem_alert::subsystem, and subsystem_cmp().

Referenced by cli_subsystem_alert_report().

◆ subsystem_match()

static int subsystem_match ( struct subsystem_alert alert,
const char *  subsystem 
)
static

Definition at line 694 of file taskprocessor.c.

695{
696 return !strcmp(alert->subsystem, subsystem);
697}

References ast_taskprocessor::subsystem, and subsystem_alert::subsystem.

Referenced by ast_taskprocessor_get_subsystem_alert(), subsystem_alert_decrement(), and subsystem_alert_increment().

◆ taskprocessor_listener_dtor()

static void taskprocessor_listener_dtor ( void *  obj)
static

Definition at line 986 of file taskprocessor.c.

987{
989
990 if (listener->callbacks->dtor) {
991 listener->callbacks->dtor(listener);
992 }
993}

References listener().

Referenced by ast_taskprocessor_listener_alloc().

◆ taskprocessor_push()

static int taskprocessor_push ( struct ast_taskprocessor tps,
struct tps_task t 
)
static

Definition at line 1204 of file taskprocessor.c.

1205{
1206 int previous_size;
1207 int was_empty;
1208
1209 if (!tps) {
1210 ast_log(LOG_ERROR, "tps is NULL!\n");
1211 return -1;
1212 }
1213
1214 if (!t) {
1215 ast_log(LOG_ERROR, "t is NULL!\n");
1216 return -1;
1217 }
1218
1219 ao2_lock(tps);
1220 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1221 previous_size = tps->tps_queue_size++;
1222
1223 if (tps->tps_queue_high <= tps->tps_queue_size) {
1224 if (!tps->high_water_alert) {
1225 ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1226 tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
1227 tps->high_water_warned = 1;
1228 tps->high_water_alert = 1;
1229 tps_alert_add(tps, +1);
1230 }
1231 }
1232
1233 /* The currently executing task counts as still in queue */
1234 was_empty = tps->executing ? 0 : previous_size == 0;
1235 ao2_unlock(tps);
1236 tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1237 return 0;
1238}
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:731
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:99
struct ast_taskprocessor::tps_queue tps_queue
unsigned int high_water_warned
Definition: taskprocessor.c:87

References ao2_lock, ao2_unlock, AST_LIST_INSERT_TAIL, ast_log, ast_taskprocessor_listener::callbacks, ast_taskprocessor::executing, ast_taskprocessor::high_water_alert, ast_taskprocessor::high_water_warned, ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, ast_taskprocessor_listener_callbacks::task_pushed, tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_size.

Referenced by ast_taskprocessor_push(), and ast_taskprocessor_push_local().

◆ tps_alert_add()

static void tps_alert_add ( struct ast_taskprocessor tps,
int  delta 
)
static

Definition at line 859 of file taskprocessor.c.

860{
861 unsigned int old;
862
864 old = tps_alert_count;
865 tps_alert_count += delta;
866 if (DEBUG_ATLEAST(3)
867 /* and tps_alert_count becomes zero or non-zero */
868 && !old != !tps_alert_count) {
869 ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
870 tps->name, tps_alert_count ? "triggered" : "cleared");
871 }
872
873 if (tps->subsystem[0] != '\0') {
874 if (delta > 0) {
876 } else {
878 }
879 }
880
882}
#define DEBUG_ATLEAST(level)
#define LOG_DEBUG
#define ast_rwlock_wrlock(a)
Definition: lock.h:236
static void subsystem_alert_decrement(const char *subsystem)
static void subsystem_alert_increment(const char *subsystem)

References ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, DEBUG_ATLEAST, LOG_DEBUG, ast_taskprocessor::name, ast_taskprocessor::subsystem, subsystem_alert_decrement(), subsystem_alert_increment(), tps_alert_count, and tps_alert_lock.

Referenced by ast_taskprocessor_alert_set_levels(), taskprocessor_push(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

◆ tps_cmp_cb()

static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
)
static

The astobj2 compare callback for taskprocessors.

Definition at line 686 of file taskprocessor.c.

687{
688 struct ast_taskprocessor *lhs = obj, *rhs = arg;
689 const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
690
691 return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
692}
@ CMP_MATCH
Definition: astobj2.h:1027
@ CMP_STOP
Definition: astobj2.h:1028

References CMP_MATCH, CMP_STOP, ast_taskprocessor::name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_hash_cb()

static int tps_hash_cb ( const void *  obj,
const int  flags 
)
static

The astobj2 hash callback for taskprocessors.

Definition at line 677 of file taskprocessor.c.

678{
679 const struct ast_taskprocessor *tps = obj;
680 const char *name = flags & OBJ_KEY ? obj : tps->name;
681
682 return ast_str_case_hash(name);
683}
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303

References ast_str_case_hash(), ast_taskprocessor::name, name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_ping_handler()

static int tps_ping_handler ( void *  datap)
static

CLI taskprocessor ping <blah>handler function.

Definition at line 469 of file taskprocessor.c.

470{
471 ast_mutex_lock(&cli_ping_cond_lock);
472 ast_cond_signal(&cli_ping_cond);
473 ast_mutex_unlock(&cli_ping_cond_lock);
474 return 0;
475}
#define ast_cond_signal(cond)
Definition: lock.h:203

References ast_cond_signal, ast_mutex_lock, and ast_mutex_unlock.

Referenced by cli_tps_ping().

◆ tps_report_taskprocessor_list()

static int tps_report_taskprocessor_list ( int  fd,
const char *  like 
)
static

Definition at line 606 of file taskprocessor.c.

607{
608 int tps_count = 0;
609 int word_len;
610 struct ao2_container *sorted_tps;
611 struct ast_taskprocessor *tps;
612 struct ao2_iterator iter;
613
615 NULL);
616 if (!sorted_tps
617 || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
618 ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
619 ao2_cleanup(sorted_tps);
620 return 0;
621 }
622
623 word_len = strlen(like);
624 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
625 while ((tps = ao2_iterator_next(&iter))) {
626 if (like) {
627 if (!strncasecmp(like, tps->name, word_len)) {
629 tps_count++;
630 }
631 } else {
633 tps_count++;
634 }
636 }
638 ao2_ref(sorted_tps, -1);
639
640 return tps_count;
641}
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition: astobj2.h:367
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
@ AO2_ITERATOR_UNLINK
Definition: astobj2.h:1863
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
#define ast_debug(level,...)
Log a DEBUG message.
Generic container type.
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_debug, ast_taskprocessor_unreference(), ast_taskprocessor::name, NULL, tps_report_taskprocessor_list_helper(), and tps_sort_cb().

Referenced by cli_tps_report().

◆ tps_report_taskprocessor_list_helper()

static void tps_report_taskprocessor_list_helper ( int  fd,
struct ast_taskprocessor tps 
)
static

◆ tps_reset_stats()

static void tps_reset_stats ( struct ast_taskprocessor tps)
static

◆ tps_shutdown()

static void tps_shutdown ( void  )
static

Definition at line 297 of file taskprocessor.c.

298{
299 int objcount;
300 int tries;
301 struct ao2_container *sorted_tps;
302 struct ast_taskprocessor *tps;
303 struct ao2_iterator iter;
304 struct timespec delay = {1, 0};
305
306 /* During shutdown there may still be taskprocessor threads running and those
307 * tasprocessors reference tps_singletons. When those taskprocessors finish
308 * they will call ast_taskprocessor_unreference, creating a race condition which
309 * can result in tps_singletons being referenced after being deleted. To try and
310 * avoid this we check the container count and if greater than zero, give the
311 * running taskprocessors a chance to finish */
312 objcount = ao2_container_count(tps_singletons);
313 if (objcount > 0) {
315 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
316 objcount);
317
318 /* give the running taskprocessors a chance to finish, up to
319 * AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT seconds */
320 for (tries = 0; tries < AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT; tries++) {
321 while (nanosleep(&delay, &delay));
322 objcount = ao2_container_count(tps_singletons);
323 /* if count is 0, we are done waiting */
324 if (objcount == 0) {
325 break;
326 }
327 delay.tv_sec = 1;
328 delay.tv_nsec = 0;
330 "waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
331 objcount);
332 }
333 }
334
335 /* rather than try forever, risk an assertion on shutdown. This probably indicates
336 * a taskprocessor was not cleaned up somewhere */
337 if (objcount > 0) {
339 "Asertion may occur, the following taskprocessors are still runing:\n");
340
342 NULL);
343 if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
344 ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors");
345 }
346 else {
347 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
348 while ((tps = ao2_iterator_next(&iter))) {
349 ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name);
350 }
351 }
352
353 ao2_cleanup(sorted_tps);
354 }
355 else {
357 "All waiting taskprocessors cleared!\n");
358 }
359
361 AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
362 AST_VECTOR_RW_FREE(&overloaded_subsystems);
363 ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
364 tps_singletons = NULL;
365}
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_count(), ao2_container_dup(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_t_ref, ARRAY_LEN, ast_cli_unregister_multiple(), ast_free, ast_log, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_RW_FREE, LOG_DEBUG, LOG_ERROR, ast_taskprocessor::name, NULL, taskprocessor_clis, and tps_sort_cb().

Referenced by ast_tps_init().

◆ tps_sort_cb()

static int tps_sort_cb ( const void *  obj_left,
const void *  obj_right,
int  flags 
)
static

Definition at line 555 of file taskprocessor.c.

556{
557 const struct ast_taskprocessor *tps_left = obj_left;
558 const struct ast_taskprocessor *tps_right = obj_right;
559 const char *right_key = obj_right;
560 int cmp;
561
562 switch (flags & OBJ_SEARCH_MASK) {
563 default:
565 right_key = tps_right->name;
566 /* Fall through */
567 case OBJ_SEARCH_KEY:
568 cmp = strcasecmp(tps_left->name, right_key);
569 break;
571 cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
572 break;
573 }
574 return cmp;
575}
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition: astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101

References ast_taskprocessor::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by tps_report_taskprocessor_list(), and tps_shutdown().

◆ tps_task_alloc()

static struct tps_task * tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap 
)
static

Definition at line 393 of file taskprocessor.c.

394{
395 struct tps_task *t;
396 if (!task_exe) {
397 ast_log(LOG_ERROR, "task_exe is NULL!\n");
398 return NULL;
399 }
400
401 t = ast_calloc(1, sizeof(*t));
402 if (!t) {
403 ast_log(LOG_ERROR, "failed to allocate task!\n");
404 return NULL;
405 }
406
407 t->callback.execute = task_exe;
408 t->datap = datap;
409
410 return t;
411}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute, LOG_ERROR, and NULL.

Referenced by ast_taskprocessor_push().

◆ tps_task_alloc_local()

static struct tps_task * tps_task_alloc_local ( int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap 
)
static

Definition at line 413 of file taskprocessor.c.

414{
415 struct tps_task *t;
416 if (!task_exe) {
417 ast_log(LOG_ERROR, "task_exe is NULL!\n");
418 return NULL;
419 }
420
421 t = ast_calloc(1, sizeof(*t));
422 if (!t) {
423 ast_log(LOG_ERROR, "failed to allocate task!\n");
424 return NULL;
425 }
426
427 t->callback.execute_local = task_exe;
428 t->datap = datap;
429 t->wants_local = 1;
430
431 return t;
432}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute_local, LOG_ERROR, NULL, and tps_task::wants_local.

Referenced by ast_taskprocessor_push_local().

◆ tps_task_free()

static void * tps_task_free ( struct tps_task task)
static

Definition at line 435 of file taskprocessor.c.

436{
437 ast_free(task);
438 return NULL;
439}
static int task(void *data)
Queued task for baseline test.

References ast_free, NULL, and task().

Referenced by ast_taskprocessor_execute(), and tps_taskprocessor_dtor().

◆ tps_taskprocessor_dtor()

static void tps_taskprocessor_dtor ( void *  tps)
static

Definition at line 931 of file taskprocessor.c.

932{
933 struct ast_taskprocessor *t = tps;
934 struct tps_task *task;
935
936 while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
938 }
939 t->tps_queue_size = 0;
940
941 if (t->high_water_alert) {
942 t->high_water_alert = 0;
943 tps_alert_add(t, -1);
944 }
945
947 t->listener = NULL;
948}
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:833
struct tps_task::@407 list
AST_LIST_ENTRY overhead.

References ao2_cleanup, AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, ast_taskprocessor::listener, NULL, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_free().

Referenced by __allocate_taskprocessor().

◆ tps_taskprocessor_pop()

static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor tps)
static

◆ tps_taskprocessor_tab_complete()

static char * tps_taskprocessor_tab_complete ( struct ast_cli_args a)
static

Definition at line 446 of file taskprocessor.c.

447{
448 int tklen;
449 struct ast_taskprocessor *p;
450 struct ao2_iterator i;
451
452 tklen = strlen(a->word);
453 i = ao2_iterator_init(tps_singletons, 0);
454 while ((p = ao2_iterator_next(&i))) {
455 if (!strncasecmp(a->word, p->name, tklen)) {
458 break;
459 }
460 }
462 }
464
465 return NULL;
466}
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2768

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_completion_add(), ast_strdup, ast_taskprocessor_unreference(), ast_taskprocessor::name, and NULL.

Referenced by cli_tps_ping(), cli_tps_report(), and cli_tps_reset_stats().

Variable Documentation

◆ default_listener_callbacks

const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static

Definition at line 283 of file taskprocessor.c.

Referenced by ast_taskprocessor_get().

◆ taskprocessor_clis

struct ast_cli_entry taskprocessor_clis[]
static

Definition at line 162 of file taskprocessor.c.

Referenced by ast_tps_init(), and tps_shutdown().

◆ tps_alert_count

unsigned int tps_alert_count
static

Count of the number of taskprocessors in high water alert.

Definition at line 846 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().

◆ tps_alert_lock

ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

Access protection for tps_alert_count

Definition at line 849 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().