Asterisk - The Open Source Telephony Project GIT-master-20e40a9
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions
taskprocessor.h File Reference

An API for managing task processing threads that can be shared across modules. More...

This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor_listener_callbacks
 
struct  ast_taskprocessor_local
 Local data parameter. More...
 

Macros

#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL   500
 
#define AST_TASKPROCESSOR_MAX_NAME   70
 Suggested maximum taskprocessor name length (less null terminator).
 
#define ast_taskprocessor_push(tps, task_exe, datap)    __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 
#define ast_taskprocessor_push_local(tps, task_exe, datap)    __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
 

Enumerations

enum  ast_tps_options { TPS_REF_DEFAULT = 0 , TPS_REF_IF_EXISTS = (1 << 0) }
 ast_tps_options for specification of taskprocessor options More...
 

Functions

int __ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
 
int __ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
 
unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count.
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue.
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end.
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener.
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it.
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by subsystem.
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status.
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task.
 
struct ast_taskprocessor_listenerast_taskprocessor_listener (struct ast_taskprocessor *tps)
 Return the listener associated with the taskprocessor.
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener.
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor.
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener.
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton.
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer.
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name.
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor.
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue.
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended.
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement.
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended.
 

Detailed Description

An API for managing task processing threads that can be shared across modules.

Author
Dwayne M. Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m
Note
A taskprocessor is a named object containing a task queue that serializes tasks pushed into it by [a] module(s) that reference the taskprocessor. A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener() function and destroyed when the taskprocessor reference count reaches zero. A taskprocessor also contains an accompanying listener that is notified when changes in the task queue occur.

A task is a wrapper around a task-handling function pointer and a data pointer. A task is pushed into a taskprocessor queue using the ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the taskprocessor after the task handling function returns. A module releases its reference to a taskprocessor using the ast_taskprocessor_unreference() function which may result in the destruction of the taskprocessor if the taskprocessor's reference count reaches zero. When the taskprocessor's reference count reaches zero, its listener's shutdown() callback will be called. Any further attempts to execute tasks will be denied.

The taskprocessor listener has the flexibility of doling out tasks to best fit the module's needs. For instance, a taskprocessor listener may have a single dispatch thread that handles all tasks, or it may dispatch tasks to a thread pool.

There is a default taskprocessor listener that will be used if a taskprocessor is created without any explicit listener. This default listener runs tasks sequentially in a single thread. The listener will execute tasks as long as there are tasks to be processed. When the taskprocessor is shut down, the default listener will stop processing tasks and join its execution thread.

Definition in file taskprocessor.h.

Macro Definition Documentation

◆ AST_TASKPROCESSOR_HIGH_WATER_LEVEL

#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL   500

Default taskprocessor high water level alert trigger

Definition at line 64 of file taskprocessor.h.

◆ AST_TASKPROCESSOR_MAX_NAME

#define AST_TASKPROCESSOR_MAX_NAME   70

Suggested maximum taskprocessor name length (less null terminator).

Definition at line 61 of file taskprocessor.h.

◆ ast_taskprocessor_push

#define ast_taskprocessor_push (   tps,
  task_exe,
  datap 
)     __ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 219 of file taskprocessor.h.

222 {
223 /*! Local data, associated with the taskprocessor. */
224 void *local_data;
225 /*! Data pointer passed with this task. */
226 void *data;
227};
228
229/*!
230 * \brief Push a task into the specified taskprocessor queue and signal the
231 * taskprocessor thread.
232 *
233 * The callback receives a \ref ast_taskprocessor_local struct, which contains
234 * both the provided \a datap pointer, and any local data set on the
235 * taskprocessor with ast_taskprocessor_set_local().
236 *
237 * \param tps The taskprocessor structure
238 * \param task_exe The task handling function to push into the taskprocessor queue
239 * \param datap The data to be used by the task handling function
240 * \retval 0 success
241 * \retval -1 failure
242 * \since 12.0.0
243 */
245 int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
246 const char *file, int line, const char *function) attribute_warn_unused_result;
247#define ast_taskprocessor_push_local(tps, task_exe, datap) \
248 __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
249
250/*!
251 * \brief Indicate the taskprocessor is suspended.
252 *
253 * \since 13.12.0
254 *
255 * \param tps Task processor.
256 * \retval 0 success
257 * \retval -1 failure
258 */
260
261/*!
262 * \brief Indicate the taskprocessor is unsuspended.
263 *
264 * \since 13.12.0
265 *
266 * \param tps Task processor.
267 * \retval 0 success
268 * \retval -1 failure
269 */
271
272/*!
273 * \brief Get the task processor suspend status
274 *
275 * \since 13.12.0
276 *
277 * \param tps Task processor.
278 * \retval non-zero if the task processor is suspended
279 */
281
282/*!
283 * \brief Pop a task off the taskprocessor and execute it.
284 *
285 * \since 12.0.0
286 *
287 * \param tps The taskprocessor from which to execute.
288 * \retval 0 There is no further work to be done.
289 * \retval 1 Tasks still remain in the taskprocessor queue.
290 */
292
293/*!
294 * \brief Am I the given taskprocessor's current task.
295 * \since 12.7.0
296 *
297 * \param tps Taskprocessor to check.
298 *
299 * \retval non-zero if current thread is the taskprocessor thread.
300 */
302
303/*!
304 * \brief Get the next sequence number to create a human friendly taskprocessor name.
305 * \since 13.8.0
306 *
307 * \return Sequence number for use in creating human friendly taskprocessor names.
308 */
309unsigned int ast_taskprocessor_seq_num(void);
310
311/*!
312 * \brief Append the next sequence number to the given string, and copy into the buffer.
313 *
314 * \param buf Where to copy the appended taskprocessor name.
315 * \param size How large is buf including null terminator.
316 * \param name A name to append the sequence number to.
317 */
318void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name);
319
320/*!
321 * \brief Build a taskprocessor name with a sequence number on the end.
322 * \since 13.8.0
323 *
324 * \param buf Where to put the built taskprocessor name.
325 * \param size How large is buf including null terminator.
326 * \param format printf format to create the non-sequenced part of the name.
327 *
328 * \note The user supplied part of the taskprocessor name is truncated
329 * to allow the full sequence number to be appended within the supplied
330 * buffer size.
331 */
332void __attribute__((format(printf, 3, 4))) ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...);
333
334/*!
335 * \brief Return the name of the taskprocessor singleton
336 * \since 1.6.1
337 */
338const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
339
340/*!
341 * \brief Return the current size of the taskprocessor queue
342 * \since 13.7.0
343 */
345
346/*!
347 * \brief Return the listener associated with the taskprocessor
348 */
350
351/*!
352 * \brief Get the current taskprocessor high water alert count.
353 * \since 13.10.0
354 *
355 * \retval 0 if no taskprocessors are in high water alert.
356 * \retval non-zero if some task processors are in high water alert.
357 */
358unsigned int ast_taskprocessor_alert_get(void);
359
360
361/*!
362 * \brief Get the current taskprocessor high water alert count by subsystem.
363 * \since 13.26.0
364 * \since 16.3.0
365 *
366 * \param subsystem The subsystem name
367 *
368 * \retval 0 if no taskprocessors are in high water alert.
369 * \retval non-zero if some task processors are in high water alert.
370 */
371unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
372
373/*!
374 * \brief Set the high and low alert water marks of the given taskprocessor queue.
375 * \since 13.10.0
376 *
377 * \param tps Taskprocessor to update queue water marks.
378 * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
379 * \param high_water New queue high water mark.
380 *
381 * \retval 0 on success.
382 * \retval -1 on error (water marks not changed).
383 */
384int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water);
385
386#endif /* __AST_TASKPROCESSOR_H__ */
#define attribute_warn_unused_result
Definition compiler.h:71
char buf[BUFSIZE]
Definition eagi_proxy.c:66
static const char name[]
Definition format_mp3.c:68
A listener for taskprocessors.
Local data parameter.
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor's current task.
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap, const char *file, int line, const char *function) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.

◆ ast_taskprocessor_push_local

#define ast_taskprocessor_push_local (   tps,
  task_exe,
  datap 
)     __ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)

Definition at line 248 of file taskprocessor.h.

Enumeration Type Documentation

◆ ast_tps_options

ast_tps_options for specification of taskprocessor options

Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor does not already exist. The default behavior is to create a taskprocessor if it does not already exist and provide its reference to the calling function. To only return a reference to a taskprocessor if and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskprocessor_get().

Enumerator
TPS_REF_DEFAULT 

return a reference to a taskprocessor, create one if it does not exist

TPS_REF_IF_EXISTS 

return a reference to a taskprocessor ONLY if it already exists

Definition at line 74 of file taskprocessor.h.

74 {
75 /*! \brief return a reference to a taskprocessor, create one if it does not exist */
77 /*! \brief return a reference to a taskprocessor ONLY if it already exists */
78 TPS_REF_IF_EXISTS = (1 << 0),
79};
@ TPS_REF_IF_EXISTS
return a reference to a taskprocessor ONLY if it already exists
@ TPS_REF_DEFAULT
return a reference to a taskprocessor, create one if it does not exist

Function Documentation

◆ __ast_taskprocessor_push()

int __ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1371 of file taskprocessor.c.

1373{
1374 return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
1375}
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap, const char *file, int line, const char *function)

References taskprocessor_push(), and tps_task_alloc().

Referenced by __ast_sip_push_task(), __ast_taskpool_push(), __ast_threadpool_push(), and ast_taskprocessor_push().

◆ __ast_taskprocessor_push_local()

int __ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap,
const char *  file,
int  line,
const char *  function 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

The callback receives a ast_taskprocessor_local struct, which contains both the provided datap pointer, and any local data set on the taskprocessor with ast_taskprocessor_set_local().

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
12.0.0

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 1004 of file taskprocessor.c.

1005{
1006 unsigned int count;
1007
1009 count = tps_alert_count;
1011
1012 return count;
1013}
#define ast_rwlock_rdlock(a)
Definition lock.h:242
#define ast_rwlock_unlock(a)
Definition lock.h:241
static unsigned int tps_alert_count
static ast_rwlock_t tps_alert_lock

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1015 of file taskprocessor.c.

1016{
1017 if (!tps || high_water < 0 || high_water < low_water) {
1018 return -1;
1019 }
1020
1021 if (low_water < 0) {
1022 /* Set low water level to 90% of high water level */
1023 low_water = (high_water * 9) / 10;
1024 }
1025
1026 ao2_lock(tps);
1027
1028 tps->tps_queue_low = low_water;
1029 tps->tps_queue_high = high_water;
1030
1031 if (tps->high_water_alert) {
1032 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
1033 /* Update water mark alert immediately */
1034 tps->high_water_alert = 0;
1035 tps_alert_add(tps, -1);
1036 }
1037 } else {
1038 if (high_water < tps->tps_queue_size) {
1039 /* Update water mark alert immediately */
1040 tps->high_water_alert = 1;
1041 tps_alert_add(tps, +1);
1042 }
1043 }
1044
1045 ao2_unlock(tps);
1046
1047 return 0;
1048}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
long tps_queue_low
Taskprocessor low water clear alert level.
long tps_queue_high
Taskprocessor high water alert trigger level.
unsigned int high_water_alert
long tps_queue_size
Taskprocessor current queue size.
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.

Definition at line 1527 of file taskprocessor.c.

1528{
1529 va_list ap;
1530 int user_size;
1531
1532 ast_assert(buf != NULL);
1533 ast_assert(SEQ_STR_SIZE <= size);
1534
1535 va_start(ap, format);
1536 user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1537 va_end(ap);
1538 if (user_size < 0) {
1539 /*
1540 * Wow! We got an output error to a memory buffer.
1541 * Assume no user part of name written.
1542 */
1543 user_size = 0;
1544 } else if (size < user_size + SEQ_STR_SIZE) {
1545 /* Truncate user part of name to make sequence number fit. */
1546 user_size = size - SEQ_STR_SIZE;
1547 }
1548
1549 /* Append sequence number to end of user name. */
1550 snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1551}
#define NULL
Definition resample.c:96
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define SEQ_STR_SIZE
#define ast_assert(a)
Definition utils.h:779

References ast_assert, ast_taskprocessor_seq_num(), buf, NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), sorcery_object_type_alloc(), and taskpool_taskprocessor_alloc().

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor * ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure
non-NULLsuccess

Definition at line 1274 of file taskprocessor.c.

1275{
1276 struct ast_taskprocessor *p;
1277
1278 ao2_lock(tps_singletons);
1279 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1280 if (p) {
1281 ao2_unlock(tps_singletons);
1283 return NULL;
1284 }
1285
1287 ao2_unlock(tps_singletons);
1288
1289 return __start_taskprocessor(p);
1290}
static void * listener(void *unused)
Definition asterisk.c:1530
#define OBJ_KEY
Definition astobj2.h:1151
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), listener(), name, NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by ast_taskpool_serializer_group(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1420 of file taskprocessor.c.

1421{
1422 struct ast_taskprocessor_local local;
1423 struct tps_task *t;
1424 long size;
1425 struct timeval start;
1426 long elapsed;
1427 const char *task_file = NULL;
1428 int task_line = 0;
1429 const char *task_function = NULL;
1430
1431 ao2_lock(tps);
1432 t = tps_taskprocessor_pop(tps);
1433 if (!t) {
1434 ao2_unlock(tps);
1435 return 0;
1436 }
1437
1438 tps->thread = pthread_self();
1439 tps->executing = 1;
1440
1441 if (t->wants_local) {
1442 local.local_data = tps->local_data;
1443 local.data = t->datap;
1444 }
1445
1446 /* Save task origin info before we free the task */
1447 task_file = t->file;
1448 task_line = t->line;
1449 task_function = t->function;
1450 ao2_unlock(tps);
1451
1452 start = ast_tvnow();
1453
1454 if (t->wants_local) {
1455 t->callback.execute_local(&local);
1456 } else {
1457 t->callback.execute(t->datap);
1458 }
1459 tps_task_free(t);
1460
1461 ao2_lock(tps);
1463 /* We need to check size in the same critical section where we reset the
1464 * executing bit. Avoids a race condition where a task is pushed right
1465 * after we pop an empty stack.
1466 */
1467 tps->executing = 0;
1468 size = ast_taskprocessor_size(tps);
1469
1470 /* Update the stats */
1472
1473 /* Include the task we just executed as part of the queue size. */
1474 if (size >= tps->stats.max_qsize) {
1475 tps->stats.max_qsize = size + 1;
1476 }
1477
1478 elapsed = ast_tvdiff_us(ast_tvnow(), start);
1479 if (elapsed > tps->stats.highest_time_processed) {
1480 tps->stats.highest_time_processed = elapsed;
1481 tps->stats.highest_time_task_file = task_file;
1482 tps->stats.highest_time_task_line = task_line;
1483 tps->stats.highest_time_task_function = task_function;
1484 }
1485 if (elapsed < tps->stats.lowest_time_processed) {
1486 tps->stats.lowest_time_processed = elapsed;
1487 }
1488
1489 ao2_unlock(tps);
1490
1491 /* If we executed a task, check for the transition to empty */
1492 if (size == 0 && tps->listener->callbacks->emptied) {
1493 tps->listener->callbacks->emptied(tps->listener);
1494 }
1495 return size > 0;
1496}
#define AST_PTHREADT_NULL
Definition lock.h:73
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
const struct ast_taskprocessor_listener_callbacks * callbacks
unsigned int executing
struct ast_taskprocessor_listener * listener
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
tps_task structure is queued to a taskprocessor
int(* execute)(void *datap)
int(* execute_local)(struct ast_taskprocessor_local *local)
unsigned int wants_local
union tps_task::@440 callback
The execute() task callback function pointer.
void * datap
The data pointer for the task execute() function.
const char * file
Debug information about where the task was pushed from.
const char * function
long highest_time_processed
Highest time (in microseconds) spent processing a task.
long lowest_time_processed
Lowest time (in microseconds) spent processing a task.
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
int highest_time_task_line
Line where the highest time task was pushed from.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
const char * highest_time_task_function
Function where the highest time task was pushed from.
const char * highest_time_task_file
File where the highest time task was pushed from.
static void * tps_task_free(struct tps_task *task)
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int64_t ast_tvdiff_us(struct timeval end, struct timeval start)
Computes the difference (in microseconds) between two struct timeval instances.
Definition time.h:87
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), ast_tvdiff_us(), ast_tvnow(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, tps_task::file, tps_task::function, tps_taskprocessor_stats::highest_time_processed, tps_taskprocessor_stats::highest_time_task_file, tps_taskprocessor_stats::highest_time_task_function, tps_taskprocessor_stats::highest_time_task_line, tps_task::line, ast_taskprocessor::listener, ast_taskprocessor_local::local_data, ast_taskprocessor::local_data, tps_taskprocessor_stats::lowest_time_processed, tps_taskprocessor_stats::max_qsize, NULL, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by ast_taskpool_serializer_push_wait(), AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), execute_tasks(), and threadpool_execute().

◆ ast_taskprocessor_get()

struct ast_taskprocessor * ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1235 of file taskprocessor.c.

1236{
1237 struct ast_taskprocessor *p;
1240
1241 if (ast_strlen_zero(name)) {
1242 ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
1243 return NULL;
1244 }
1245 ao2_lock(tps_singletons);
1246 p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1247 if (p || (create & TPS_REF_IF_EXISTS)) {
1248 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1249 ao2_unlock(tps_singletons);
1250 return p;
1251 }
1252
1253 /* Create a new taskprocessor. Start by creating a default listener */
1255 if (!pvt) {
1256 ao2_unlock(tps_singletons);
1257 return NULL;
1258 }
1260 if (!listener) {
1261 ao2_unlock(tps_singletons);
1263 return NULL;
1264 }
1265
1267 ao2_unlock(tps_singletons);
1268 p = __start_taskprocessor(p);
1269 ao2_ref(listener, -1);
1270
1271 return p;
1272}
#define ast_log
Definition astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
#define LOG_ERROR
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero(), ast_taskprocessor_listener_alloc(), default_listener_callbacks, default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, name, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_show_taskprocessor(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_module(), load_module(), load_objects(), taskpool_taskprocessor_alloc(), and threadpool_alloc().

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by subsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 824 of file taskprocessor.c.

825{
826 struct subsystem_alert *alert;
827 unsigned int count = 0;
828 int idx;
829
830 AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
831 idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
832 if (idx >= 0) {
833 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
834 count = alert->alert_count;
835 }
836 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
837
838 return count;
839}
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition vector.h:730
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition vector.h:908
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition vector.h:888
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition vector.h:691

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1415 of file taskprocessor.c.

1416{
1417 return tps ? tps->suspended : -1;
1418}
unsigned int suspended

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1498 of file taskprocessor.c.

1499{
1500 int is_task;
1501
1502 ao2_lock(tps);
1503 is_task = pthread_equal(tps->thread, pthread_self());
1504 ao2_unlock(tps);
1505 return is_task;
1506}

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by __ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

◆ ast_taskprocessor_listener()

Return the listener associated with the taskprocessor.

Definition at line 1090 of file taskprocessor.c.

1091{
1092 return tps ? tps->listener : NULL;
1093}

References ast_taskprocessor::listener, NULL, and ast_taskprocessor_listener::tps.

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 1120 of file taskprocessor.c.

1121{
1123
1125 if (!listener) {
1126 return NULL;
1127 }
1128 listener->callbacks = callbacks;
1129 listener->user_data = user_data;
1130
1131 return listener;
1132}
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
struct @506 callbacks
static void taskprocessor_listener_dtor(void *obj)

References ao2_alloc, callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskpool_serializer_group(), ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor * ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 1134 of file taskprocessor.c.

1135{
1136 ao2_ref(listener->tps, +1);
1137 return listener->tps;
1138}

References ao2_ref, and listener().

Referenced by serializer_task_pushed(), and serializer_task_pushed().

◆ ast_taskprocessor_listener_get_user_data()

void * ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 1140 of file taskprocessor.c.

1141{
1142 return listener->user_data;
1143}

References listener().

Referenced by ast_taskpool_serializer_push_wait(), execute_tasks(), serializer_shutdown(), serializer_shutdown(), serializer_task_pushed(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

◆ ast_taskprocessor_name()

const char * ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 1096 of file taskprocessor.c.

1097{
1098 if (!tps) {
1099 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
1100 return NULL;
1101 }
1102 return tps->name;
1103}
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.

References ast_log, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener::tps.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1517 of file taskprocessor.c.

1518{
1519 int final_size = strlen(name) + SEQ_STR_SIZE;
1520
1521 ast_assert(buf != NULL && name != NULL);
1522 ast_assert(final_size <= size);
1523
1524 snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1525}

References ast_assert, ast_taskprocessor_seq_num(), buf, name, NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create(), and ast_serializer_taskpool_create().

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1508 of file taskprocessor.c.

1509{
1510 static int seq_num;
1511
1512 return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1513}
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition lock.h:764

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1292 of file taskprocessor.c.

1294{
1295 SCOPED_AO2LOCK(lock, tps);
1296 tps->local_data = local_data;
1297}
ast_mutex_t lock
Definition app_sla.c:337
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition lock.h:611

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 1085 of file taskprocessor.c.

1086{
1087 return (tps) ? tps->tps_queue_size : -1;
1088}

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskpool_serializer_push_wait(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), ast_threadpool_queue_size(), execute_tasks(), taskpool_least_full_selector(), and taskpool_sequential_selector().

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1393 of file taskprocessor.c.

1394{
1395 if (tps) {
1396 ao2_lock(tps);
1397 tps->suspended = 1;
1398 ao2_unlock(tps);
1399 return 0;
1400 }
1401 return -1;
1402}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

◆ ast_taskprocessor_unreference()

void * ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1300 of file taskprocessor.c.

1301{
1302 if (!tps) {
1303 return NULL;
1304 }
1305
1306 /* To prevent another thread from finding and getting a reference to this
1307 * taskprocessor we hold the singletons lock. If we didn't do this then
1308 * they may acquire it and find that the listener has been shut down.
1309 */
1310 ao2_lock(tps_singletons);
1311
1312 if (ao2_ref(tps, -1) > 3) {
1313 ao2_unlock(tps_singletons);
1314 return NULL;
1315 }
1316
1317 /* If we're down to 3 references, then those must be:
1318 * 1. The reference we just got rid of
1319 * 2. The container
1320 * 3. The listener
1321 */
1322 ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1323 ao2_unlock(tps_singletons);
1324
1326 return NULL;
1327}
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
Definition astobj2.h:1600
static void listener_shutdown(struct ast_taskprocessor_listener *listener)

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), cli_tps_show_taskprocessor(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), execute_tasks(), exten_state_subscription_destructor(), handle_cli_taskpool_push_serializer_efficiency(), handle_cli_threadpool_push_serializer_efficiency(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), taskpool_taskprocessor_alloc(), taskpool_taskprocessor_dtor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), unload_module(), and websocket_cb().

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1404 of file taskprocessor.c.

1405{
1406 if (tps) {
1407 ao2_lock(tps);
1408 tps->suspended = 0;
1409 ao2_unlock(tps);
1410 return 0;
1411 }
1412 return -1;
1413}

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().