Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
Data Structures | Macros | Functions | Variables
taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
Include dependency graph for taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
 
struct  ast_taskprocessor_listener
 A listener for taskprocessors. More...
 
struct  default_taskprocessor_listener_pvt
 
struct  subsystem_alert
 
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
 
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
 
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...
 

Macros

#define ast_taskprocessor_push_internal(tps, task_exe, datap)    __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define ast_taskprocessor_push_local_internal(tps, task_exe, datap)    __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10
 How many seconds to wait for running taskprocessors to finish on shutdown.
 
#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"
 
#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"
 
#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s %10s %10s\n"
 
#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"
 
#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
 
#define TPS_MAX_BUCKETS   1567
 

Functions

static struct ast_taskprocessor__allocate_taskprocessor (const char *name, struct ast_taskprocessor_listener *listener)
 
int __ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
 
int __ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap, const char *file, int line, const char *function)
 
static struct ast_taskprocessor__start_taskprocessor (struct ast_taskprocessor *p)
 
unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count.
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue.
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end.
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener.
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it.
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by subsystem.
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status.
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task.
 
struct ast_taskprocessor_listenerast_taskprocessor_listener (struct ast_taskprocessor *tps)
 Return the listener associated with the taskprocessor.
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener.
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor.
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener.
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton.
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer.
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 
int ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name.
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor.
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue.
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended.
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement.
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended.
 
int ast_tps_init (void)
 
static AST_VECTOR_RW (subsystem_alert_vector, struct subsystem_alert *)
 
static char * cli_subsystem_alert_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats_all (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_show_taskprocessor (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int default_listener_die (void *data)
 
static void * default_listener_pvt_alloc (void)
 
static void default_listener_pvt_destroy (struct default_taskprocessor_listener_pvt *pvt)
 
static void default_listener_pvt_dtor (struct ast_taskprocessor_listener *listener)
 
static void default_listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static int default_listener_start (struct ast_taskprocessor_listener *listener)
 
static void default_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void * default_tps_processing_function (void *data)
 Function that processes tasks in the taskprocessor.
 
static void listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static void subsystem_alert_decrement (const char *subsystem)
 
static void subsystem_alert_increment (const char *subsystem)
 
static int subsystem_cmp (struct subsystem_alert *a, struct subsystem_alert *b)
 
static void subsystem_copy (struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
 
static int subsystem_match (struct subsystem_alert *alert, const char *subsystem)
 
static void taskprocessor_listener_dtor (void *obj)
 
static int taskprocessor_push (struct ast_taskprocessor *tps, struct tps_task *t)
 
static void tps_alert_add (struct ast_taskprocessor *tps, int delta)
 
static int tps_cmp_cb (void *obj, void *arg, int flags)
 
static int tps_hash_cb (const void *obj, const int flags)
 
static int tps_ping_handler (void *datap)
 
static int tps_report_taskprocessor_list (int fd, const char *like)
 
static void tps_report_taskprocessor_list_helper (int fd, struct ast_taskprocessor *tps)
 
static void tps_reset_stats (struct ast_taskprocessor *tps)
 
static void tps_shutdown (void)
 
static int tps_sort_cb (const void *obj_left, const void *obj_right, int flags)
 
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)
 
static struct tps_tasktps_task_alloc_local (int(*task_exe)(struct ast_taskprocessor_local *local), void *datap, const char *file, int line, const char *function)
 
static void * tps_task_free (struct tps_task *task)
 
static void tps_taskprocessor_dtor (void *tps)
 
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 
static char * tps_taskprocessor_tab_complete (struct ast_cli_args *a)
 

Variables

static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
 
static unsigned int tps_alert_count
 
static ast_rwlock_t tps_alert_lock = AST_RWLOCK_INIT_VALUE
 

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author
Dwayne Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m

Definition in file taskprocessor.c.

Macro Definition Documentation

◆ ast_taskprocessor_push_internal

#define ast_taskprocessor_push_internal (   tps,
  task_exe,
  datap 
)     __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 210 of file taskprocessor.c.

◆ ast_taskprocessor_push_local_internal

#define ast_taskprocessor_push_local_internal (   tps,
  task_exe,
  datap 
)     __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 215 of file taskprocessor.c.

◆ AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT

#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT   10

How many seconds to wait for running taskprocessors to finish on shutdown.

Definition at line 318 of file taskprocessor.c.

◆ FMT_FIELDS

#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"

Definition at line 615 of file taskprocessor.c.

◆ FMT_FIELDS_SUBSYSTEM

#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s %10s %10s\n"

Definition at line 614 of file taskprocessor.c.

◆ FMT_HEADERS_SUBSYSTEM

#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"

◆ SEQ_STR_SIZE

#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */

Definition at line 1515 of file taskprocessor.c.

◆ TPS_MAX_BUCKETS

#define TPS_MAX_BUCKETS   1567

Function Documentation

◆ __allocate_taskprocessor()

static struct ast_taskprocessor * __allocate_taskprocessor ( const char *  name,
struct ast_taskprocessor_listener listener 
)
static

Definition at line 1174 of file taskprocessor.c.

1175{
1176 struct ast_taskprocessor *p;
1177 char *subsystem_separator;
1178 size_t subsystem_length = 0;
1179 size_t name_length;
1180
1181 name_length = strlen(name);
1182 subsystem_separator = strchr(name, '/');
1183 if (subsystem_separator) {
1184 subsystem_length = subsystem_separator - name;
1185 }
1186
1187 p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
1188 if (!p) {
1189 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
1190 return NULL;
1191 }
1192
1193 /* Set default congestion water level alert triggers. */
1196
1197 strcpy(p->name, name); /* Safe */
1198 p->subsystem = p->name + name_length + 1;
1199 ast_copy_string(p->subsystem, name, subsystem_length + 1);
1200
1201 ao2_ref(listener, +1);
1202 p->listener = listener;
1203
1205
1206 ao2_ref(p, +1);
1207 listener->tps = p;
1208
1209 if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1210 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1211 listener->tps = NULL;
1212 ao2_ref(p, -2);
1213 return NULL;
1214 }
1215
1216 return p;
1217}
static void * listener(void *unused)
Definition asterisk.c:1530
#define ast_log
Definition astobj2.c:42
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
static const char name[]
Definition format_mp3.c:68
#define LOG_ERROR
#define LOG_WARNING
#define AST_PTHREADT_NULL
Definition lock.h:73
#define NULL
Definition resample.c:96
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
A ast_taskprocessor structure is a singleton by name.
long tps_queue_low
Taskprocessor low water clear alert level.
long tps_queue_high
Taskprocessor high water alert trigger level.
struct ast_taskprocessor_listener * listener
char * subsystem
Anything before the first '/' in the name (if there is one)
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
static void tps_taskprocessor_dtor(void *tps)
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL

References ao2_alloc, ao2_link_flags, ao2_ref, ast_copy_string(), ast_log, AST_PTHREADT_NULL, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, listener(), ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, name, NULL, OBJ_NOLOCK, ast_taskprocessor::subsystem, ast_taskprocessor::thread, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and tps_taskprocessor_dtor().

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ __ast_taskprocessor_push()

int __ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1371 of file taskprocessor.c.

1373{
1374 return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
1375}
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)

References taskprocessor_push(), and tps_task_alloc().

Referenced by __ast_sip_push_task(), __ast_taskpool_push(), __ast_threadpool_push(), and ast_taskprocessor_push().

◆ __ast_taskprocessor_push_local()

int __ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *datap)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)

Definition at line 1382 of file taskprocessor.c.

1384{
1385 return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function));
1386}
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap, const char *file, int line, const char *function)

References taskprocessor_push(), and tps_task_alloc_local().

Referenced by ast_taskprocessor_push_local().

◆ __start_taskprocessor()

static struct ast_taskprocessor * __start_taskprocessor ( struct ast_taskprocessor p)
static

Definition at line 1219 of file taskprocessor.c.

1220{
1221 if (p && p->listener->callbacks->start(p->listener)) {
1222 ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n",
1223 p->name);
1225
1226 return NULL;
1227 }
1228
1229 return p;
1230}
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
const struct ast_taskprocessor_listener_callbacks * callbacks
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

References ast_log, ast_taskprocessor_unreference(), ast_taskprocessor_listener::callbacks, ast_taskprocessor::listener, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener_callbacks::start.

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 1004 of file taskprocessor.c.

1005{
1006 unsigned int count;
1007
1009 count = tps_alert_count;
1011
1012 return count;
1013}
#define ast_rwlock_rdlock(a)
Definition lock.h:242
#define ast_rwlock_unlock(a)
Definition lock.h:241
static unsigned int tps_alert_count
static ast_rwlock_t tps_alert_lock

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1015 of file taskprocessor.c.

1016{
1017 if (!tps || high_water < 0 || high_water < low_water) {
1018 return -1;
1019 }
1020
1021 if (low_water < 0) {
1022 /* Set low water level to 90% of high water level */
1023 low_water = (high_water * 9) / 10;
1024 }
1025
1026 ao2_lock(tps);
1027
1028 tps->tps_queue_low = low_water;
1029 tps->tps_queue_high = high_water;
1030
1031 if (tps->high_water_alert) {
1032 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
1033 /* Update water mark alert immediately */
1034 tps->high_water_alert = 0;
1035 tps_alert_add(tps, -1);
1036 }
1037 } else {
1038 if (high_water < tps->tps_queue_size) {
1039 /* Update water mark alert immediately */
1040 tps->high_water_alert = 1;
1041 tps_alert_add(tps, +1);
1042 }
1043 }
1044
1045 ao2_unlock(tps);
1046
1047 return 0;
1048}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
unsigned int high_water_alert
long tps_queue_size
Taskprocessor current queue size.
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.

Definition at line 1527 of file taskprocessor.c.

1528{
1529 va_list ap;
1530 int user_size;
1531
1532 ast_assert(buf != NULL);
1533 ast_assert(SEQ_STR_SIZE <= size);
1534
1535 va_start(ap, format);
1536 user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1537 va_end(ap);
1538 if (user_size < 0) {
1539 /*
1540 * Wow! We got an output error to a memory buffer.
1541 * Assume no user part of name written.
1542 */
1543 user_size = 0;
1544 } else if (size < user_size + SEQ_STR_SIZE) {
1545 /* Truncate user part of name to make sequence number fit. */
1546 user_size = size - SEQ_STR_SIZE;
1547 }
1548
1549 /* Append sequence number to end of user name. */
1550 snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1551}
char buf[BUFSIZE]
Definition eagi_proxy.c:66
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define SEQ_STR_SIZE
#define ast_assert(a)
Definition utils.h:779

References ast_assert, ast_taskprocessor_seq_num(), buf, NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), sorcery_object_type_alloc(), and taskpool_taskprocessor_alloc().

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor * ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure
non-NULLsuccess

Definition at line 1274 of file taskprocessor.c.

1275{
1276 struct ast_taskprocessor *p;
1277
1278 ao2_lock(tps_singletons);
1279 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1280 if (p) {
1281 ao2_unlock(tps_singletons);
1283 return NULL;
1284 }
1285
1287 ao2_unlock(tps_singletons);
1288
1289 return __start_taskprocessor(p);
1290}
#define OBJ_KEY
Definition astobj2.h:1151
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), listener(), name, NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by ast_taskpool_serializer_group(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1420 of file taskprocessor.c.

1421{
1422 struct ast_taskprocessor_local local;
1423 struct tps_task *t;
1424 long size;
1425 struct timeval start;
1426 long elapsed;
1427 const char *task_file = NULL;
1428 int task_line = 0;
1429 const char *task_function = NULL;
1430
1431 ao2_lock(tps);
1432 t = tps_taskprocessor_pop(tps);
1433 if (!t) {
1434 ao2_unlock(tps);
1435 return 0;
1436 }
1437
1438 tps->thread = pthread_self();
1439 tps->executing = 1;
1440
1441 if (t->wants_local) {
1442 local.local_data = tps->local_data;
1443 local.data = t->datap;
1444 }
1445
1446 /* Save task origin info before we free the task */
1447 task_file = t->file;
1448 task_line = t->line;
1449 task_function = t->function;
1450 ao2_unlock(tps);
1451
1452 start = ast_tvnow();
1453
1454 if (t->wants_local) {
1455 t->callback.execute_local(&local);
1456 } else {
1457 t->callback.execute(t->datap);
1458 }
1459 tps_task_free(t);
1460
1461 ao2_lock(tps);
1463 /* We need to check size in the same critical section where we reset the
1464 * executing bit. Avoids a race condition where a task is pushed right
1465 * after we pop an empty stack.
1466 */
1467 tps->executing = 0;
1468 size = ast_taskprocessor_size(tps);
1469
1470 /* Update the stats */
1472
1473 /* Include the task we just executed as part of the queue size. */
1474 if (size >= tps->stats.max_qsize) {
1475 tps->stats.max_qsize = size + 1;
1476 }
1477
1478 elapsed = ast_tvdiff_us(ast_tvnow(), start);
1479 if (elapsed > tps->stats.highest_time_processed) {
1480 tps->stats.highest_time_processed = elapsed;
1481 tps->stats.highest_time_task_file = task_file;
1482 tps->stats.highest_time_task_line = task_line;
1483 tps->stats.highest_time_task_function = task_function;
1484 }
1485 if (elapsed < tps->stats.lowest_time_processed) {
1486 tps->stats.lowest_time_processed = elapsed;
1487 }
1488
1489 ao2_unlock(tps);
1490
1491 /* If we executed a task, check for the transition to empty */
1492 if (size == 0 && tps->listener->callbacks->emptied) {
1493 tps->listener->callbacks->emptied(tps->listener);
1494 }
1495 return size > 0;
1496}
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
Local data parameter.
unsigned int executing
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
tps_task structure is queued to a taskprocessor
int(* execute)(void *datap)
int(* execute_local)(struct ast_taskprocessor_local *local)
unsigned int wants_local
union tps_task::@440 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
const char * file
Debug information about where the task was pushed from.
const char * function
long highest_time_processed
Highest time (in microseconds) spent processing a task.
long lowest_time_processed
Lowest time (in microseconds) spent processing a task.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
int highest_time_task_line
Line where the highest time task was pushed from.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
const char * highest_time_task_function
Function where the highest time task was pushed from.
const char * highest_time_task_file
File where the highest time task was pushed from.
static void * tps_task_free(struct tps_task *task)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int64_t ast_tvdiff_us(struct timeval end, struct timeval start)
Computes the difference (in microseconds) between two struct timeval instances.
Definition time.h:87
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), ast_tvdiff_us(), ast_tvnow(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, tps_task::file, tps_task::function, tps_taskprocessor_stats::highest_time_processed, tps_taskprocessor_stats::highest_time_task_file, tps_taskprocessor_stats::highest_time_task_function, tps_taskprocessor_stats::highest_time_task_line, tps_task::line, ast_taskprocessor::listener, ast_taskprocessor_local::local_data, ast_taskprocessor::local_data, tps_taskprocessor_stats::lowest_time_processed, tps_taskprocessor_stats::max_qsize, NULL, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by ast_taskpool_serializer_push_wait(), AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), execute_tasks(), and threadpool_execute().

◆ ast_taskprocessor_get()

struct ast_taskprocessor * ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1235 of file taskprocessor.c.

1236{
1237 struct ast_taskprocessor *p;
1240
1241 if (ast_strlen_zero(name)) {
1242 ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
1243 return NULL;
1244 }
1245 ao2_lock(tps_singletons);
1246 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1247 if (p || (create & TPS_REF_IF_EXISTS)) {
1248 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1249 ao2_unlock(tps_singletons);
1250 return p;
1251 }
1252
1253 /* Create a new taskprocessor. Start by creating a default listener */
1255 if (!pvt) {
1256 ao2_unlock(tps_singletons);
1257 return NULL;
1258 }
1260 if (!listener) {
1261 ao2_unlock(tps_singletons);
1263 return NULL;
1264 }
1265
1267 ao2_unlock(tps_singletons);
1268 p = __start_taskprocessor(p);
1269 ao2_ref(listener, -1);
1270
1271 return p;
1272}
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
A listener for taskprocessors.
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero(), ast_taskprocessor_listener_alloc(), default_listener_callbacks, default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, name, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_show_taskprocessor(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_module(), load_module(), load_objects(), taskpool_taskprocessor_alloc(), and threadpool_alloc().

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by subsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 824 of file taskprocessor.c.

825{
826 struct subsystem_alert *alert;
827 unsigned int count = 0;
828 int idx;
829
830 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
831 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
832 if (idx >= 0) {
833 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
834 count = alert->alert_count;
835 }
836 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
837
838 return count;
839}
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition vector.h:730
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition vector.h:908
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition vector.h:888
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1415 of file taskprocessor.c.

1416{
1417 return tps ? tps->suspended : -1;
1418}
unsigned int suspended

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1498 of file taskprocessor.c.

1499{
1500 int is_task;
1501
1502 ao2_lock(tps);
1503 is_task = pthread_equal(tps->thread, pthread_self());
1504 ao2_unlock(tps);
1505 return is_task;
1506}

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by __ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

◆ ast_taskprocessor_listener()

Return the listener associated with the taskprocessor.

Definition at line 1090 of file taskprocessor.c.

1091{
1092 return tps ? tps->listener : NULL;
1093}

References ast_taskprocessor::listener, NULL, and ast_taskprocessor_listener::tps.

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 1120 of file taskprocessor.c.

1121{
1123
1125 if (!listener) {
1126 return NULL;
1127 }
1128 listener->callbacks = callbacks;
1129 listener->user_data = user_data;
1130
1131 return listener;
1132}
struct @506 callbacks
static void taskprocessor_listener_dtor(void *obj)

References ao2_alloc, callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskpool_serializer_group(), ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor * ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 1134 of file taskprocessor.c.

1135{
1136 ao2_ref(listener->tps, +1);
1137 return listener->tps;
1138}

References ao2_ref, and listener().

Referenced by serializer_task_pushed(), and serializer_task_pushed().

◆ ast_taskprocessor_listener_get_user_data()

void * ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 1140 of file taskprocessor.c.

1141{
1142 return listener->user_data;
1143}

References listener().

Referenced by ast_taskpool_serializer_push_wait(), execute_tasks(), serializer_shutdown(), serializer_shutdown(), serializer_task_pushed(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

◆ ast_taskprocessor_name()

const char * ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 1096 of file taskprocessor.c.

1097{
1098 if (!tps) {
1099 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
1100 return NULL;
1101 }
1102 return tps->name;
1103}

References ast_log, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener::tps.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1517 of file taskprocessor.c.

1518{
1519 int final_size = strlen(name) + SEQ_STR_SIZE;
1520
1521 ast_assert(buf != NULL && name != NULL);
1522 ast_assert(final_size <= size);
1523
1524 snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1525}

References ast_assert, ast_taskprocessor_seq_num(), buf, name, NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create(), and ast_serializer_taskpool_create().

◆ ast_taskprocessor_push()

int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Definition at line 1377 of file taskprocessor.c.

1378{
1379 return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL);
1380}
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

References __ast_taskprocessor_push(), and NULL.

◆ ast_taskprocessor_push_local()

int ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *datap)  task_exe,
void *  datap 
)

Definition at line 1388 of file taskprocessor.c.

1389{
1390 return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL);
1391}
int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap, const char *file, int line, const char *function)

References __ast_taskprocessor_push_local(), and NULL.

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1508 of file taskprocessor.c.

1509{
1510 static int seq_num;
1511
1512 return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1513}
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1292 of file taskprocessor.c.

1294{
1295 SCOPED_AO2LOCK(lock, tps);
1296 tps->local_data = local_data;
1297}
ast_mutex_t lock
Definition app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 1085 of file taskprocessor.c.

1086{
1087 return (tps) ? tps->tps_queue_size : -1;
1088}

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskpool_serializer_push_wait(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), ast_threadpool_queue_size(), execute_tasks(), taskpool_least_full_selector(), and taskpool_sequential_selector().

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1393 of file taskprocessor.c.

1394{
1395 if (tps) {
1396 ao2_lock(tps);
1397 tps->suspended = 1;
1398 ao2_unlock(tps);
1399 return 0;
1400 }
1401 return -1;
1402}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

◆ ast_taskprocessor_unreference()

void * ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1300 of file taskprocessor.c.

1301{
1302 if (!tps) {
1303 return NULL;
1304 }
1305
1306 /* To prevent another thread from finding and getting a reference to this
1307 * taskprocessor we hold the singletons lock. If we didn't do this then
1308 * they may acquire it and find that the listener has been shut down.
1309 */
1310 ao2_lock(tps_singletons);
1311
1312 if (ao2_ref(tps, -1) > 3) {
1313 ao2_unlock(tps_singletons);
1314 return NULL;
1315 }
1316
1317 /* If we're down to 3 references, then those must be:
1318 * 1. The reference we just got rid of
1319 * 2. The container
1320 * 3. The listener
1321 */
1322 ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1323 ao2_unlock(tps_singletons);
1324
1326 return NULL;
1327}
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition astobj2.h:1600
static void listener_shutdown(struct ast_taskprocessor_listener *listener)

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), cli_tps_show_taskprocessor(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), execute_tasks(), exten_state_subscription_destructor(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), taskpool_taskprocessor_alloc(), taskpool_taskprocessor_dtor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), and websocket_cb().

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1404 of file taskprocessor.c.

1405{
1406 if (tps) {
1407 ao2_lock(tps);
1408 tps->suspended = 0;
1409 ao2_unlock(tps);
1410 return 0;
1411 }
1412 return -1;
1413}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().

◆ ast_tps_init()

int ast_tps_init ( void  )

Provided by taskprocessor.c

Definition at line 397 of file taskprocessor.c.

398{
401 if (!tps_singletons) {
402 ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n");
403 return -1;
404 }
405
406 if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
407 ao2_ref(tps_singletons, -1);
408 ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n");
409 return -1;
410 }
411
412 ast_cond_init(&cli_ping_cond, NULL);
413
414 ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
415
417
418 return 0;
419}
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition clicompat.c:19
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition cli.h:265
#define ast_cond_init(cond, attr)
Definition lock.h:208
static void tps_shutdown(void)
#define TPS_MAX_BUCKETS
static int tps_cmp_cb(void *obj, void *arg, int flags)
static int tps_hash_cb(const void *obj, const int flags)
#define ARRAY_LEN(a)
Definition utils.h:706
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition vector.h:169

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_cond_init, ast_log, ast_register_cleanup(), AST_VECTOR_RW_INIT, LOG_ERROR, NULL, tps_cmp_cb(), tps_hash_cb(), TPS_MAX_BUCKETS, and tps_shutdown().

Referenced by asterisk_daemon().

◆ AST_VECTOR_RW()

static AST_VECTOR_RW ( subsystem_alert_vector  ,
struct subsystem_alert  
)
static

Definition at line 141 of file taskprocessor.c.

177 {
178 AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
179 AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
180 AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"),
181 AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
182 AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
183 AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
184};
#define AST_CLI_DEFINE(fn, txt,...)
Definition cli.h:197
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)

◆ cli_subsystem_alert_report()

static char * cli_subsystem_alert_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 916 of file taskprocessor.c.

917{
918 struct subsystem_alert_vector sorted_subsystems;
919 int i;
920
921#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
922#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
923
924 switch (cmd) {
925 case CLI_INIT:
926 e->command = "core show taskprocessor alerted subsystems";
927 e->usage =
928 "Usage: core show taskprocessor alerted subsystems\n"
929 " Shows a list of task processor subsystems that are currently alerted\n";
930 return NULL;
931 case CLI_GENERATE:
932 return NULL;
933 }
934
935 if (a->argc != e->args) {
936 return CLI_SHOWUSAGE;
937 }
938
939 if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
940 return CLI_FAILURE;
941 }
942
943 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
944 for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
945 subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
946 }
947 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
948
949 ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
950
951 for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
952 struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
953 ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
954 }
955
956 ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
957
958 AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
959 AST_VECTOR_FREE(&sorted_subsystems);
960
961 return CLI_SUCCESS;
962}
#define ast_free(a)
Definition astmm.h:180
#define CLI_SHOWUSAGE
Definition cli.h:45
#define CLI_SUCCESS
Definition cli.h:44
void ast_cli(int fd, const char *fmt,...)
Definition clicompat.c:6
@ CLI_INIT
Definition cli.h:152
@ CLI_GENERATE
Definition cli.h:153
#define CLI_FAILURE
Definition cli.h:46
int args
This gets set in ast_cli_register()
Definition cli.h:185
char * command
Definition cli.h:186
const char * usage
Definition cli.h:177
#define FMT_HEADERS_SUBSYSTEM
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define FMT_FIELDS_SUBSYSTEM
static struct test_val a
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition vector.h:620
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition vector.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition vector.h:124
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition vector.h:873

References a, subsystem_alert::alert_count, ast_cli_entry::args, ast_cli(), ast_free, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_INIT, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_FIELDS_SUBSYSTEM, FMT_HEADERS_SUBSYSTEM, NULL, subsystem_alert::subsystem, subsystem_copy(), and ast_cli_entry::usage.

◆ cli_tps_ping()

static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 515 of file taskprocessor.c.

516{
517 struct timeval begin, end, delta;
518 const char *name;
519 struct timeval when;
520 struct timespec ts;
521 struct ast_taskprocessor *tps;
522
523 switch (cmd) {
524 case CLI_INIT:
525 e->command = "core ping taskprocessor";
526 e->usage =
527 "Usage: core ping taskprocessor <taskprocessor>\n"
528 " Displays the time required for a task to be processed\n";
529 return NULL;
530 case CLI_GENERATE:
531 if (a->pos == 3) {
533 } else {
534 return NULL;
535 }
536 }
537
538 if (a->argc != 4)
539 return CLI_SHOWUSAGE;
540
541 name = a->argv[3];
543 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
544 return CLI_SUCCESS;
545 }
546 ast_cli(a->fd, "\npinging %s ...", name);
547
548 /*
549 * Wait up to 5 seconds for a ping reply.
550 *
551 * On a very busy system it could take awhile to get a
552 * ping response from some taskprocessors.
553 */
554 begin = ast_tvnow();
555 when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
556 ts.tv_sec = when.tv_sec;
557 ts.tv_nsec = when.tv_usec * 1000;
558
559 ast_mutex_lock(&cli_ping_cond_lock);
561 ast_mutex_unlock(&cli_ping_cond_lock);
562 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
564 return CLI_FAILURE;
565 }
566 ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
567 ast_mutex_unlock(&cli_ping_cond_lock);
568
569 end = ast_tvnow();
570 delta = ast_tvsub(end, begin);
571 ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
573 return CLI_SUCCESS;
574}
char * end
Definition eagi_proxy.c:73
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define ast_mutex_unlock(a)
Definition lock.h:197
#define ast_mutex_lock(a)
Definition lock.h:196
#define ast_taskprocessor_push_internal(tps, task_exe, datap)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
static int tps_ping_handler(void *datap)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition time.h:282
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition extconf.c:2280
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition extconf.c:2295

References a, ast_cli(), ast_cond_timedwait, ast_mutex_lock, ast_mutex_unlock, ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push_internal, ast_taskprocessor_unreference(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, end, name, NULL, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_report()

static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 681 of file taskprocessor.c.

682{
683 const char *like;
684
685 switch (cmd) {
686 case CLI_INIT:
687 e->command = "core show taskprocessors [like]";
688 e->usage =
689 "Usage: core show taskprocessors [like keyword]\n"
690 " Shows a list of instantiated task processors and their statistics\n";
691 return NULL;
692 case CLI_GENERATE:
693 if (a->pos == e->args) {
695 } else {
696 return NULL;
697 }
698 }
699
700 if (a->argc == e->args - 1) {
701 like = "";
702 } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
703 like = a->argv[e->args];
704 } else {
705 return CLI_SHOWUSAGE;
706 }
707
708 ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)");
709 ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
710
711 return CLI_SUCCESS;
712}
#define FMT_HEADERS
static int tps_report_taskprocessor_list(int fd, const char *like)

References a, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, FMT_HEADERS, NULL, tps_report_taskprocessor_list(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats()

static char * cli_tps_reset_stats ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1566 of file taskprocessor.c.

1567{
1568 const char *name;
1569 struct ast_taskprocessor *tps;
1570
1571 switch (cmd) {
1572 case CLI_INIT:
1573 e->command = "core reset taskprocessor";
1574 e->usage =
1575 "Usage: core reset taskprocessor <taskprocessor>\n"
1576 " Resets stats for the specified taskprocessor\n";
1577 return NULL;
1578 case CLI_GENERATE:
1580 }
1581
1582 if (a->argc != 4) {
1583 return CLI_SHOWUSAGE;
1584 }
1585
1586 name = a->argv[3];
1588 ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1589 return CLI_SUCCESS;
1590 }
1591 ast_cli(a->fd, "\nResetting %s\n\n", name);
1592
1593 tps_reset_stats(tps);
1594
1596
1597 return CLI_SUCCESS;
1598}
static void tps_reset_stats(struct ast_taskprocessor *tps)

References a, ast_cli(), ast_taskprocessor_get(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, name, NULL, TPS_REF_IF_EXISTS, tps_reset_stats(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ cli_tps_reset_stats_all()

static char * cli_tps_reset_stats_all ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1600 of file taskprocessor.c.

1601{
1602 struct ast_taskprocessor *tps;
1603 struct ao2_iterator iter;
1604
1605 switch (cmd) {
1606 case CLI_INIT:
1607 e->command = "core reset taskprocessors";
1608 e->usage =
1609 "Usage: core reset taskprocessors\n"
1610 " Resets stats for all taskprocessors\n";
1611 return NULL;
1612 case CLI_GENERATE:
1613 return NULL;
1614 }
1615
1616 if (a->argc != e->args) {
1617 return CLI_SHOWUSAGE;
1618 }
1619
1620 ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1621
1622 iter = ao2_iterator_init(tps_singletons, 0);
1623 while ((tps = ao2_iterator_next(&iter))) {
1624 tps_reset_stats(tps);
1626 }
1627 ao2_iterator_destroy(&iter);
1628
1629 return CLI_SUCCESS;
1630}
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_entry::args, ast_cli(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, NULL, tps_reset_stats(), and ast_cli_entry::usage.

◆ cli_tps_show_taskprocessor()

static char * cli_tps_show_taskprocessor ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 714 of file taskprocessor.c.

715{
716 const char *name;
717 struct ast_taskprocessor *tps;
718 struct tps_task *task;
719 int task_count = 0;
720
721 switch (cmd) {
722 case CLI_INIT:
723 e->command = "core show taskprocessor name";
724 e->usage =
725 "Usage: core show taskprocessor name <taskprocessor>\n"
726 " Displays detailed information about a specific taskprocessor,\n"
727 " including all queued tasks and their origins (DEVMODE only).\n";
728 return NULL;
729 case CLI_GENERATE:
730 if (a->pos == 4) {
732 }
733 return NULL;
734 }
735
736 if (a->argc != 5) {
737 return CLI_SHOWUSAGE;
738 }
739
740 name = a->argv[4];
742 if (!tps) {
743 ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name);
744 return CLI_SUCCESS;
745 }
746
747 ao2_lock(tps);
748
749 ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name);
750 ast_cli(a->fd, "===========================================\n");
751 ast_cli(a->fd, "Subsystem: %s\n", tps->subsystem[0] ? tps->subsystem : "(none)");
752 ast_cli(a->fd, "Tasks processed: %lu\n", tps->stats._tasks_processed_count);
753 ast_cli(a->fd, "Current queue size: %ld\n", tps->tps_queue_size);
754 ast_cli(a->fd, "Max queue depth: %lu\n", tps->stats.max_qsize);
755 ast_cli(a->fd, "Low water mark: %ld\n", tps->tps_queue_low);
756 ast_cli(a->fd, "High water mark: %ld\n", tps->tps_queue_high);
757 ast_cli(a->fd, "High water alert: %s\n", tps->high_water_alert ? "Yes" : "No");
758 ast_cli(a->fd, "Suspended: %s\n", tps->suspended ? "Yes" : "No");
759 ast_cli(a->fd, "Currently executing: %s\n", tps->executing ? "Yes" : "No");
760 ast_cli(a->fd, "Highest time (us): %ld\n", tps->stats.highest_time_processed);
761 if (tps->stats.highest_time_task_file) {
762 ast_cli(a->fd, " Highest task origin: %s:%d (%s)\n",
766 }
767 ast_cli(a->fd, "Lowest time (us): %ld\n", tps->stats.lowest_time_processed);
768
769 if (tps->tps_queue_size > 0) {
770 ast_cli(a->fd, "\nQueued Tasks:\n");
771 ast_cli(a->fd, "-------------------------------------------\n");
772
774 task_count++;
775 if (task->file) {
776 ast_cli(a->fd, " Task #%d:\n", task_count);
777 ast_cli(a->fd, " Origin: %s:%d\n", task->file, task->line);
778 ast_cli(a->fd, " Function: %s\n", task->function);
779 ast_cli(a->fd, " Type: %s\n", task->wants_local ? "Local" : "Standard");
780 } else {
781 ast_cli(a->fd, " Task #%d: (origin not available)\n", task_count);
782 }
783 }
784 ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count);
785 } else {
786 ast_cli(a->fd, "\nNo tasks currently queued.\n");
787 }
788
789 ao2_unlock(tps);
791
792 ast_cli(a->fd, "\n");
793 return CLI_SUCCESS;
794}
#define AST_LIST_TRAVERSE(head, var, field)
Loops over (traverses) the entries in a list.
static int task_count
struct ast_taskprocessor::tps_queue tps_queue
struct tps_task::@441 list
AST_LIST_ENTRY overhead.
static int task(void *data)
Queued task for baseline test.

References tps_taskprocessor_stats::_tasks_processed_count, a, ao2_lock, ao2_unlock, ast_cli(), AST_LIST_TRAVERSE, ast_taskprocessor_get(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_taskprocessor::executing, ast_taskprocessor::high_water_alert, tps_taskprocessor_stats::highest_time_processed, tps_taskprocessor_stats::highest_time_task_file, tps_taskprocessor_stats::highest_time_task_function, tps_taskprocessor_stats::highest_time_task_line, tps_task::list, tps_taskprocessor_stats::lowest_time_processed, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::name, name, NULL, ast_taskprocessor::stats, ast_taskprocessor::subsystem, ast_taskprocessor::suspended, task(), task_count, ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, ast_taskprocessor::tps_queue_size, TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

◆ default_listener_die()

static int default_listener_die ( void *  data)
static

Definition at line 273 of file taskprocessor.c.

274{
275 struct default_taskprocessor_listener_pvt *pvt = data;
276 pvt->dead = 1;
277 return 0;
278}

References default_taskprocessor_listener_pvt::dead.

Referenced by default_listener_shutdown().

◆ default_listener_pvt_alloc()

static void * default_listener_pvt_alloc ( void  )
static

Definition at line 1145 of file taskprocessor.c.

1146{
1148
1149 pvt = ast_calloc(1, sizeof(*pvt));
1150 if (!pvt) {
1151 ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n");
1152 return NULL;
1153 }
1155 if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
1156 ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno));
1157 ast_free(pvt);
1158 return NULL;
1159 }
1160 return pvt;
1161}
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
int errno
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.

References ast_calloc, ast_free, ast_log, AST_PTHREADT_NULL, ast_sem_init(), errno, LOG_ERROR, NULL, default_taskprocessor_listener_pvt::poll_thread, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get().

◆ default_listener_pvt_destroy()

static void default_listener_pvt_destroy ( struct default_taskprocessor_listener_pvt pvt)
static

Definition at line 192 of file taskprocessor.c.

193{
194 ast_assert(pvt->dead);
195 ast_sem_destroy(&pvt->sem);
196 ast_free(pvt);
197}
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.

References ast_assert, ast_free, ast_sem_destroy(), default_taskprocessor_listener_pvt::dead, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get(), and default_listener_pvt_dtor().

◆ default_listener_pvt_dtor()

static void default_listener_pvt_dtor ( struct ast_taskprocessor_listener listener)
static

Definition at line 199 of file taskprocessor.c.

200{
201 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
202
204
205 listener->user_data = NULL;
206}

References default_listener_pvt_destroy(), listener(), and NULL.

◆ default_listener_shutdown()

static void default_listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 280 of file taskprocessor.c.

281{
282 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
283 int res;
284
285 /* Hold a reference during shutdown */
286 ao2_t_ref(listener->tps, +1, "tps-shutdown");
287
289 /* This will cause the thread to exit early without completing tasks already
290 * in the queue. This is probably the least bad option in this situation. */
292 }
293
295
296 if (pthread_equal(pthread_self(), pvt->poll_thread)) {
297 res = pthread_detach(pvt->poll_thread);
298 if (res != 0) {
299 ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
300 }
301 } else {
302 res = pthread_join(pvt->poll_thread, NULL);
303 if (res != 0) {
304 ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
305 }
306 }
308}
#define ao2_t_ref(o, delta, tag)
Definition astobj2.h:460
static int default_listener_die(void *data)

References ao2_t_ref, ast_assert, ast_log, AST_PTHREADT_NULL, ast_taskprocessor_push_internal, default_listener_die(), errno, listener(), LOG_ERROR, NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_listener_start()

static int default_listener_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 252 of file taskprocessor.c.

253{
254 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
255
257 return -1;
258 }
259
260 return 0;
261}
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
#define ast_pthread_create(a, b, c, d)
Definition utils.h:624

References ast_pthread_create, default_tps_processing_function(), listener(), NULL, and default_taskprocessor_listener_pvt::poll_thread.

◆ default_task_pushed()

static void default_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 263 of file taskprocessor.c.

264{
265 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
266
267 if (ast_sem_post(&pvt->sem) != 0) {
268 ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n",
269 listener->tps->name, strerror(errno));
270 }
271}
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.

References ast_log, ast_sem_post(), errno, listener(), LOG_ERROR, and default_taskprocessor_listener_pvt::sem.

◆ default_tps_processing_function()

static void * default_tps_processing_function ( void *  data)
static

Function that processes tasks in the taskprocessor.

Definition at line 223 of file taskprocessor.c.

224{
226 struct ast_taskprocessor *tps = listener->tps;
227 struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
228 int sem_value;
229 int res;
230
231 while (!pvt->dead) {
232 res = ast_sem_wait(&pvt->sem);
233 if (res != 0 && errno != EINTR) {
234 ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n",
235 tps->name, strerror(errno));
236 /* Just give up */
237 break;
238 }
240 }
241
242 /* No posting to a dead taskprocessor! */
243 res = ast_sem_getvalue(&pvt->sem, &sem_value);
244 ast_assert(res == 0 && sem_value == 0);
245
246 /* Free the shutdown reference (see default_listener_shutdown) */
247 ao2_t_ref(listener->tps, -1, "tps-shutdown");
248
249 return NULL;
250}
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.

References ao2_t_ref, ast_assert, ast_log, ast_sem_getvalue(), ast_sem_wait(), ast_taskprocessor_execute(), default_taskprocessor_listener_pvt::dead, errno, listener(), LOG_ERROR, ast_taskprocessor::name, NULL, and default_taskprocessor_listener_pvt::sem.

Referenced by default_listener_start().

◆ listener_shutdown()

static void listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 1105 of file taskprocessor.c.

1106{
1107 listener->callbacks->shutdown(listener);
1108 ao2_ref(listener->tps, -1);
1109}

References ao2_ref, and listener().

Referenced by ast_taskprocessor_unreference().

◆ subsystem_alert_decrement()

static void subsystem_alert_decrement ( const char *  subsystem)
static

Definition at line 873 of file taskprocessor.c.

874{
875 struct subsystem_alert *alert;
876 int idx;
877
879 return;
880 }
881
882 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
883 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
884 if (idx < 0) {
886 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
887 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
888 return;
889 }
890 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
891
892 alert->alert_count--;
893 if (alert->alert_count <= 0) {
894 AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
895 ast_free(alert);
896 }
897
898 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
899}
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition vector.h:898
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition vector.h:423

References subsystem_alert::alert_count, ast_free, ast_log, ast_strlen_zero(), AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_REMOVE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_ERROR, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_alert_increment()

static void subsystem_alert_increment ( const char *  subsystem)
static

Definition at line 841 of file taskprocessor.c.

842{
843 struct subsystem_alert *alert;
844 int idx;
845
847 return;
848 }
849
850 AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
851 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
852 if (idx >= 0) {
853 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
854 alert->alert_count++;
855 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
856 return;
857 }
858
859 alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
860 if (!alert) {
861 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
862 return;
863 }
864 alert->alert_count = 1;
865 strcpy(alert->subsystem, subsystem); /* Safe */
866
867 if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
868 ast_free(alert);
869 }
870 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
871}
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition vector.h:267

References subsystem_alert::alert_count, ast_free, ast_malloc, ast_strlen_zero(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

◆ subsystem_cmp()

static int subsystem_cmp ( struct subsystem_alert a,
struct subsystem_alert b 
)
static

Definition at line 819 of file taskprocessor.c.

820{
821 return strcmp(a->subsystem, b->subsystem);
822}
static struct test_val b

References a, and b.

Referenced by subsystem_copy().

◆ subsystem_copy()

static void subsystem_copy ( struct subsystem_alert alert,
struct subsystem_alert_vector *  vector 
)
static

Definition at line 901 of file taskprocessor.c.

903{
904 struct subsystem_alert *alert_copy;
905 alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
906 if (!alert_copy) {
907 return;
908 }
909 alert_copy->alert_count = alert->alert_count;
910 strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
911 if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
912 ast_free(alert_copy);
913 }
914}
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition vector.h:382

References subsystem_alert::alert_count, ast_free, ast_malloc, AST_VECTOR_ADD_SORTED, subsystem_alert::subsystem, and subsystem_cmp().

Referenced by cli_subsystem_alert_report().

◆ subsystem_match()

static int subsystem_match ( struct subsystem_alert alert,
const char *  subsystem 
)
static

Definition at line 814 of file taskprocessor.c.

815{
816 return !strcmp(alert->subsystem, subsystem);
817}

References ast_taskprocessor::subsystem, and subsystem_alert::subsystem.

Referenced by ast_taskprocessor_get_subsystem_alert(), subsystem_alert_decrement(), and subsystem_alert_increment().

◆ taskprocessor_listener_dtor()

static void taskprocessor_listener_dtor ( void *  obj)
static

Definition at line 1111 of file taskprocessor.c.

1112{
1114
1115 if (listener->callbacks->dtor) {
1117 }
1118}
void(* dtor)(struct ast_taskprocessor_listener *listener)

References ast_taskprocessor_listener::callbacks, ast_taskprocessor_listener_callbacks::dtor, and listener().

Referenced by ast_taskprocessor_listener_alloc().

◆ taskprocessor_push()

static int taskprocessor_push ( struct ast_taskprocessor tps,
struct tps_task t 
)
static

Definition at line 1330 of file taskprocessor.c.

1331{
1332 int previous_size;
1333 int was_empty;
1334
1335 if (!tps) {
1336 ast_log(LOG_ERROR, "Taskprocessor is NULL!\n");
1337 return -1;
1338 }
1339
1340 if (!t) {
1341 ast_log(LOG_ERROR, "Task is NULL!\n");
1342 return -1;
1343 }
1344
1345 if (t->file) {
1346 ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n",
1347 tps->name, t->file, t->line, t->function);
1348 }
1349
1350 ao2_lock(tps);
1351 AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1352 previous_size = tps->tps_queue_size++;
1353
1354 if (tps->tps_queue_high <= tps->tps_queue_size) {
1355 if (!tps->high_water_alert) {
1356 ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n",
1357 tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : "");
1358 tps->high_water_warned = 1;
1359 tps->high_water_alert = 1;
1360 tps_alert_add(tps, +1);
1361 }
1362 }
1363
1364 /* The currently executing task counts as still in queue */
1365 was_empty = tps->executing ? 0 : previous_size == 0;
1366 ao2_unlock(tps);
1367 tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1368 return 0;
1369}
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
unsigned int high_water_warned

References ao2_lock, ao2_unlock, ast_debug, AST_LIST_INSERT_TAIL, ast_log, ast_taskprocessor_listener::callbacks, ast_taskprocessor::executing, tps_task::file, tps_task::function, ast_taskprocessor::high_water_alert, ast_taskprocessor::high_water_warned, tps_task::line, ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, ast_taskprocessor_listener_callbacks::task_pushed, tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_size.

Referenced by __ast_taskprocessor_push(), and __ast_taskprocessor_push_local().

◆ tps_alert_add()

static void tps_alert_add ( struct ast_taskprocessor tps,
int  delta 
)
static

Definition at line 979 of file taskprocessor.c.

980{
981 unsigned int old;
982
984 old = tps_alert_count;
985 tps_alert_count += delta;
986 if (DEBUG_ATLEAST(3)
987 /* and tps_alert_count becomes zero or non-zero */
988 && !old != !tps_alert_count) {
989 ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n",
990 tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count);
991 }
992
993 if (tps->subsystem[0] != '\0') {
994 if (delta > 0) {
996 } else {
998 }
999 }
1000
1002}
#define DEBUG_ATLEAST(level)
#define LOG_DEBUG
#define ast_rwlock_wrlock(a)
Definition lock.h:243
static void subsystem_alert_decrement(const char *subsystem)
static void subsystem_alert_increment(const char *subsystem)

References ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, DEBUG_ATLEAST, LOG_DEBUG, ast_taskprocessor::name, ast_taskprocessor::subsystem, subsystem_alert_decrement(), subsystem_alert_increment(), tps_alert_count, and tps_alert_lock.

Referenced by ast_taskprocessor_alert_set_levels(), taskprocessor_push(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

◆ tps_cmp_cb()

static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 806 of file taskprocessor.c.

807{
808 struct ast_taskprocessor *lhs = obj, *rhs = arg;
809 const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
810
811 return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
812}
@ CMP_MATCH
Definition astobj2.h:1027
@ CMP_STOP
Definition astobj2.h:1028

References CMP_MATCH, CMP_STOP, ast_taskprocessor::name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_hash_cb()

static int tps_hash_cb ( const void *  obj,
const int  flags 
)
static

Definition at line 797 of file taskprocessor.c.

798{
799 const struct ast_taskprocessor *tps = obj;
800 const char *name = flags & OBJ_KEY ? obj : tps->name;
801
802 return ast_str_case_hash(name);
803}
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition strings.h:1303

References ast_str_case_hash(), ast_taskprocessor::name, name, and OBJ_KEY.

Referenced by ast_tps_init().

◆ tps_ping_handler()

static int tps_ping_handler ( void *  datap)
static

Definition at line 506 of file taskprocessor.c.

507{
508 ast_mutex_lock(&cli_ping_cond_lock);
509 ast_cond_signal(&cli_ping_cond);
510 ast_mutex_unlock(&cli_ping_cond_lock);
511 return 0;
512}
#define ast_cond_signal(cond)
Definition lock.h:210

References ast_cond_signal, ast_mutex_lock, and ast_mutex_unlock.

Referenced by cli_tps_ping().

◆ tps_report_taskprocessor_list()

static int tps_report_taskprocessor_list ( int  fd,
const char *  like 
)
static

Definition at line 644 of file taskprocessor.c.

645{
646 int tps_count = 0;
647 int word_len;
648 struct ao2_container *sorted_tps;
649 struct ast_taskprocessor *tps;
650 struct ao2_iterator iter;
651
653 NULL);
654 if (!sorted_tps
655 || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
656 ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
657 ao2_cleanup(sorted_tps);
658 return 0;
659 }
660
661 word_len = strlen(like);
662 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
663 while ((tps = ao2_iterator_next(&iter))) {
664 if (like) {
665 if (!strncasecmp(like, tps->name, word_len)) {
667 tps_count++;
668 }
669 } else {
671 tps_count++;
672 }
674 }
676 ao2_ref(sorted_tps, -1);
677
678 return tps_count;
679}
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
@ AO2_ALLOC_OPT_LOCK_NOLOCK
Definition astobj2.h:367
#define ao2_cleanup(obj)
Definition astobj2.h:1934
@ AO2_ITERATOR_UNLINK
Definition astobj2.h:1863
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition astobj2.h:1349
Generic container type.
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_debug, ast_taskprocessor_unreference(), ast_taskprocessor::name, NULL, tps_report_taskprocessor_list_helper(), and tps_sort_cb().

Referenced by cli_tps_report().

◆ tps_report_taskprocessor_list_helper()

static void tps_report_taskprocessor_list_helper ( int  fd,
struct ast_taskprocessor tps 
)
static

◆ tps_reset_stats()

static void tps_reset_stats ( struct ast_taskprocessor tps)
static

◆ tps_shutdown()

static void tps_shutdown ( void  )
static

Definition at line 324 of file taskprocessor.c.

325{
326 int objcount;
327 int tries;
328 struct ao2_container *sorted_tps;
329 struct ast_taskprocessor *tps;
330 struct ao2_iterator iter;
331 struct timespec delay = {1, 0};
332
333 /* During shutdown there may still be taskprocessor threads running and those
334 * tasprocessors reference tps_singletons. When those taskprocessors finish
335 * they will call ast_taskprocessor_unreference, creating a race condition which
336 * can result in tps_singletons being referenced after being deleted. To try and
337 * avoid this we check the container count and if greater than zero, give the
338 * running taskprocessors a chance to finish */
339 objcount = ao2_container_count(tps_singletons);
340 if (objcount > 0) {
342 "Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n",
343 objcount);
344
345 /* give the running taskprocessors a chance to finish, up to
346 * AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT seconds */
347 for (tries = 0; tries < AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT; tries++) {
348 while (nanosleep(&delay, &delay));
349 objcount = ao2_container_count(tps_singletons);
350 /* if count is 0, we are done waiting */
351 if (objcount == 0) {
352 break;
353 }
354 delay.tv_sec = 1;
355 delay.tv_nsec = 0;
357 "Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n",
358 objcount, tries + 1);
359 }
360 }
361
362 /* rather than try forever, risk an assertion on shutdown. This probably indicates
363 * a taskprocessor was not cleaned up somewhere */
364 if (objcount > 0) {
366 "Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n",
368
370 NULL);
371 if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
372 ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n");
373 }
374 else {
375 iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
376 while ((tps = ao2_iterator_next(&iter))) {
377 ast_log(LOG_ERROR, " - Taskprocessor '%s' (queue size: %ld)\n",
378 tps->name, tps->tps_queue_size);
379 }
380 }
381
382 ao2_cleanup(sorted_tps);
383 }
384 else {
386 "Taskprocessor shutdown: All taskprocessors completed successfully.\n");
387 }
388
389 ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
390 AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
391 AST_VECTOR_RW_FREE(&overloaded_subsystems);
392 ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
393 tps_singletons = NULL;
394}
void ast_cli_unregister_multiple(void)
Definition ael_main.c:408
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT
How many seconds to wait for running taskprocessors to finish on shutdown.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition vector.h:213

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_count(), ao2_container_dup(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_t_ref, ARRAY_LEN, ast_cli_unregister_multiple(), ast_free, ast_log, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_RW_FREE, LOG_DEBUG, LOG_ERROR, ast_taskprocessor::name, NULL, ast_taskprocessor_listener_callbacks::start, ast_taskprocessor::tps_queue_size, and tps_sort_cb().

Referenced by ast_tps_init().

◆ tps_sort_cb()

static int tps_sort_cb ( const void *  obj_left,
const void *  obj_right,
int  flags 
)
static

Definition at line 592 of file taskprocessor.c.

593{
594 const struct ast_taskprocessor *tps_left = obj_left;
595 const struct ast_taskprocessor *tps_right = obj_right;
596 const char *right_key = obj_right;
597 int cmp;
598
599 switch (flags & OBJ_SEARCH_MASK) {
600 default:
602 right_key = tps_right->name;
603 /* Fall through */
604 case OBJ_SEARCH_KEY:
605 cmp = strcasecmp(tps_left->name, right_key);
606 break;
608 cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
609 break;
610 }
611 return cmp;
612}
@ OBJ_SEARCH_PARTIAL_KEY
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition astobj2.h:1116
@ OBJ_SEARCH_OBJECT
The arg parameter is an object of the same type.
Definition astobj2.h:1087
@ OBJ_SEARCH_MASK
Search option field mask.
Definition astobj2.h:1072
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101

References ast_taskprocessor::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by tps_report_taskprocessor_list(), and tps_shutdown().

◆ tps_task_alloc()

static struct tps_task * tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)
static

Definition at line 422 of file taskprocessor.c.

424{
425 struct tps_task *t;
426 if (!task_exe) {
427 ast_log(LOG_ERROR, "Task callback function is NULL!\n");
428 return NULL;
429 }
430
431 t = ast_calloc(1, sizeof(*t));
432 if (!t) {
433 ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
434 return NULL;
435 }
436
437 t->callback.execute = task_exe;
438 t->datap = datap;
439 t->file = file;
440 t->line = line;
441 t->function = function;
442
443 return t;
444}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute, tps_task::file, tps_task::function, tps_task::line, LOG_ERROR, and NULL.

Referenced by __ast_taskprocessor_push().

◆ tps_task_alloc_local()

static struct tps_task * tps_task_alloc_local ( int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)
static

Definition at line 446 of file taskprocessor.c.

448{
449 struct tps_task *t;
450 if (!task_exe) {
451 ast_log(LOG_ERROR, "Task callback function is NULL!\n");
452 return NULL;
453 }
454
455 t = ast_calloc(1, sizeof(*t));
456 if (!t) {
457 ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
458 return NULL;
459 }
460
461 t->callback.execute_local = task_exe;
462 t->datap = datap;
463 t->wants_local = 1;
464 t->file = file;
465 t->line = line;
466 t->function = function;
467
468 return t;
469}

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute_local, tps_task::file, tps_task::function, tps_task::line, LOG_ERROR, NULL, and tps_task::wants_local.

Referenced by __ast_taskprocessor_push_local().

◆ tps_task_free()

static void * tps_task_free ( struct tps_task task)
static

Definition at line 472 of file taskprocessor.c.

473{
474 ast_free(task);
475 return NULL;
476}

References ast_free, NULL, and task().

Referenced by ast_taskprocessor_execute(), and tps_taskprocessor_dtor().

◆ tps_taskprocessor_dtor()

static void tps_taskprocessor_dtor ( void *  tps)
static

Definition at line 1051 of file taskprocessor.c.

1052{
1053 struct ast_taskprocessor *t = tps;
1054 struct tps_task *task;
1055
1056 while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
1058 }
1059 t->tps_queue_size = 0;
1060
1061 if (t->high_water_alert) {
1062 t->high_water_alert = 0;
1063 tps_alert_add(t, -1);
1064 }
1065
1067 t->listener = NULL;
1068}
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.

References ao2_cleanup, AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, ast_taskprocessor::listener, NULL, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_free().

Referenced by __allocate_taskprocessor().

◆ tps_taskprocessor_pop()

static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor tps)
static

Definition at line 1071 of file taskprocessor.c.

1072{
1073 struct tps_task *task;
1074
1075 if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
1076 --tps->tps_queue_size;
1077 if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
1078 tps->high_water_alert = 0;
1079 tps_alert_add(tps, -1);
1080 }
1081 }
1082 return task;
1083}

References AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by ast_taskprocessor_execute().

◆ tps_taskprocessor_tab_complete()

static char * tps_taskprocessor_tab_complete ( struct ast_cli_args a)
static

Definition at line 483 of file taskprocessor.c.

484{
485 int tklen;
486 struct ast_taskprocessor *p;
487 struct ao2_iterator i;
488
489 tklen = strlen(a->word);
490 i = ao2_iterator_init(tps_singletons, 0);
491 while ((p = ao2_iterator_next(&i))) {
492 if (!strncasecmp(a->word, p->name, tklen)) {
495 break;
496 }
497 }
499 }
501
502 return NULL;
503}
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition main/cli.c:2737

References a, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_completion_add(), ast_strdup, ast_taskprocessor_unreference(), ast_taskprocessor::name, and NULL.

Referenced by cli_tps_ping(), cli_tps_report(), cli_tps_reset_stats(), and cli_tps_show_taskprocessor().

Variable Documentation

◆ default_listener_callbacks

const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static

Definition at line 310 of file taskprocessor.c.

310 {
311 .start = default_listener_start,
312 .task_pushed = default_task_pushed,
313 .shutdown = default_listener_shutdown,
315};
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
static int default_listener_start(struct ast_taskprocessor_listener *listener)

Referenced by ast_taskprocessor_get().

◆ tps_alert_count

unsigned int tps_alert_count
static

Count of the number of taskprocessors in high water alert.

Definition at line 966 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().

◆ tps_alert_lock

ast_rwlock_t tps_alert_lock = AST_RWLOCK_INIT_VALUE
static

Access protection for tps_alert_count

Definition at line 969 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().