Asterisk - The Open Source Telephony Project  GIT-master-a1fa8df
Data Structures | Macros | Enumerations | Functions
taskprocessor.h File Reference

An API for managing task processing threads that can be shared across modules. More...

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor_listener_callbacks
 
struct  ast_taskprocessor_local
 Local data parameter. More...
 

Macros

#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL   500
 
#define AST_TASKPROCESSOR_MAX_NAME   70
 Suggested maximum taskprocessor name length (less null terminator). More...
 

Enumerations

enum  ast_tps_options { TPS_REF_DEFAULT = 0, TPS_REF_IF_EXISTS = (1 << 0) }
 ast_tps_options for specification of taskprocessor options More...
 

Functions

unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count. More...
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue. More...
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end. More...
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener. More...
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it. More...
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary. More...
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by sybsystem. More...
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status. More...
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task. More...
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener. More...
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor. More...
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener. More...
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton. More...
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer. More...
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
int ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name. More...
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor. More...
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue. More...
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended. More...
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement. More...
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended. More...
 

Detailed Description

An API for managing task processing threads that can be shared across modules.

Author
Dwayne M. Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m
Note
A taskprocessor is a named object containing a task queue that serializes tasks pushed into it by [a] module(s) that reference the taskprocessor. A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener() function and destroyed when the taskprocessor reference count reaches zero. A taskprocessor also contains an accompanying listener that is notified when changes in the task queue occur.

A task is a wrapper around a task-handling function pointer and a data pointer. A task is pushed into a taskprocessor queue using the ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the taskprocessor after the task handling function returns. A module releases its reference to a taskprocessor using the ast_taskprocessor_unreference() function which may result in the destruction of the taskprocessor if the taskprocessor's reference count reaches zero. When the taskprocessor's reference count reaches zero, its listener's shutdown() callback will be called. Any further attempts to execute tasks will be denied.

The taskprocessor listener has the flexibility of doling out tasks to best fit the module's needs. For instance, a taskprocessor listener may have a single dispatch thread that handles all tasks, or it may dispatch tasks to a thread pool.

There is a default taskprocessor listener that will be used if a taskprocessor is created without any explicit listener. This default listener runs tasks sequentially in a single thread. The listener will execute tasks as long as there are tasks to be processed. When the taskprocessor is shut down, the default listener will stop processing tasks and join its execution thread.

Definition in file taskprocessor.h.

Macro Definition Documentation

◆ AST_TASKPROCESSOR_HIGH_WATER_LEVEL

#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL   500

◆ AST_TASKPROCESSOR_MAX_NAME

#define AST_TASKPROCESSOR_MAX_NAME   70

Enumeration Type Documentation

◆ ast_tps_options

ast_tps_options for specification of taskprocessor options

Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor does not already exist. The default behavior is to create a taskprocessor if it does not already exist and provide its reference to the calling function. To only return a reference to a taskprocessor if and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskprocessor_get().

Enumerator
TPS_REF_DEFAULT 

return a reference to a taskprocessor, create one if it does not exist

TPS_REF_IF_EXISTS 

return a reference to a taskprocessor ONLY if it already exists

Definition at line 73 of file taskprocessor.h.

73  {
74  /*! \brief return a reference to a taskprocessor, create one if it does not exist */
75  TPS_REF_DEFAULT = 0,
76  /*! \brief return a reference to a taskprocessor ONLY if it already exists */
77  TPS_REF_IF_EXISTS = (1 << 0),
78 };
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77

Function Documentation

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 819 of file taskprocessor.c.

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

820 {
821  unsigned int count;
822 
824  count = tps_alert_count;
826 
827  return count;
828 }
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
#define ast_rwlock_unlock(a)
Definition: lock.h:232
static ast_rwlock_t tps_alert_lock
static unsigned int tps_alert_count

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 830 of file taskprocessor.c.

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

831 {
832  if (!tps || high_water < 0 || high_water < low_water) {
833  return -1;
834  }
835 
836  if (low_water < 0) {
837  /* Set low water level to 90% of high water level */
838  low_water = (high_water * 9) / 10;
839  }
840 
841  ao2_lock(tps);
842 
843  tps->tps_queue_low = low_water;
844  tps->tps_queue_high = high_water;
845 
846  if (tps->high_water_alert) {
847  if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
848  /* Update water mark alert immediately */
849  tps->high_water_alert = 0;
850  tps_alert_add(tps, -1);
851  }
852  } else {
853  if (high_water < tps->tps_queue_size) {
854  /* Update water mark alert immediately */
855  tps->high_water_alert = 1;
856  tps_alert_add(tps, +1);
857  }
858  }
859 
860  ao2_unlock(tps);
861 
862  return 0;
863 }
unsigned int high_water_alert
Definition: taskprocessor.c:89
#define ao2_unlock(a)
Definition: astobj2.h:730
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
#define ao2_lock(a)
Definition: astobj2.h:718
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.
Returns
Nothing

Definition at line 1295 of file taskprocessor.c.

References ast_assert, ast_taskprocessor_seq_num(), NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), and sorcery_object_type_alloc().

1296 {
1297  va_list ap;
1298  int user_size;
1299 
1300  ast_assert(buf != NULL);
1301  ast_assert(SEQ_STR_SIZE <= size);
1302 
1303  va_start(ap, format);
1304  user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1305  va_end(ap);
1306  if (user_size < 0) {
1307  /*
1308  * Wow! We got an output error to a memory buffer.
1309  * Assume no user part of name written.
1310  */
1311  user_size = 0;
1312  } else if (size < user_size + SEQ_STR_SIZE) {
1313  /* Truncate user part of name to make sequence number fit. */
1314  user_size = size - SEQ_STR_SIZE;
1315  }
1316 
1317  /* Append sequence number to end of user name. */
1318  snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1319 }
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
#define SEQ_STR_SIZE
static snd_pcm_format_t format
Definition: chan_alsa.c:106

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor* ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure non-NULL success

Definition at line 1083 of file taskprocessor.c.

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

1084 {
1085  struct ast_taskprocessor *p;
1086 
1087  ao2_lock(tps_singletons);
1088  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1089  if (p) {
1090  ao2_unlock(tps_singletons);
1092  return NULL;
1093  }
1094 
1095  p = __allocate_taskprocessor(name, listener);
1096  ao2_unlock(tps_singletons);
1097 
1098  return __start_taskprocessor(p);
1099 }
static const char name[]
Definition: format_mp3.c:68
#define OBJ_KEY
Definition: astobj2.h:1155
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1212 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, ast_taskprocessor::listener, ast_taskprocessor::local_data, ast_taskprocessor_local::local_data, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), and threadpool_execute().

1213 {
1214  struct ast_taskprocessor_local local;
1215  struct tps_task *t;
1216  long size;
1217 
1218  ao2_lock(tps);
1219  t = tps_taskprocessor_pop(tps);
1220  if (!t) {
1221  ao2_unlock(tps);
1222  return 0;
1223  }
1224 
1225  tps->thread = pthread_self();
1226  tps->executing = 1;
1227 
1228  if (t->wants_local) {
1229  local.local_data = tps->local_data;
1230  local.data = t->datap;
1231  }
1232  ao2_unlock(tps);
1233 
1234  if (t->wants_local) {
1235  t->callback.execute_local(&local);
1236  } else {
1237  t->callback.execute(t->datap);
1238  }
1239  tps_task_free(t);
1240 
1241  ao2_lock(tps);
1242  tps->thread = AST_PTHREADT_NULL;
1243  /* We need to check size in the same critical section where we reset the
1244  * executing bit. Avoids a race condition where a task is pushed right
1245  * after we pop an empty stack.
1246  */
1247  tps->executing = 0;
1248  size = ast_taskprocessor_size(tps);
1249 
1250  /* Update the stats */
1252 
1253  /* Include the task we just executed as part of the queue size. */
1254  if (size >= tps->stats.max_qsize) {
1255  tps->stats.max_qsize = size + 1;
1256  }
1257  ao2_unlock(tps);
1258 
1259  /* If we executed a task, check for the transition to empty */
1260  if (size == 0 && tps->listener->callbacks->emptied) {
1261  tps->listener->callbacks->emptied(tps->listener);
1262  }
1263  return size > 0;
1264 }
const struct ast_taskprocessor_listener_callbacks * callbacks
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
union tps_task::@415 callback
The execute() task callback function pointer.
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_unlock(a)
Definition: astobj2.h:730
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ao2_lock(a)
Definition: astobj2.h:718
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
unsigned int executing
Definition: taskprocessor.c:85
Local data parameter.
int(* execute)(void *datap)
Definition: taskprocessor.c:50
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
unsigned int wants_local
Definition: taskprocessor.c:57
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
static void * tps_task_free(struct tps_task *task)
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71

◆ ast_taskprocessor_get()

struct ast_taskprocessor* ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1044 of file taskprocessor.c.

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero(), ast_taskprocessor_listener_alloc(), default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_objects(), and threadpool_alloc().

1045 {
1046  struct ast_taskprocessor *p;
1049 
1050  if (ast_strlen_zero(name)) {
1051  ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1052  return NULL;
1053  }
1054  ao2_lock(tps_singletons);
1055  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1056  if (p || (create & TPS_REF_IF_EXISTS)) {
1057  /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1058  ao2_unlock(tps_singletons);
1059  return p;
1060  }
1061 
1062  /* Create a new taskprocessor. Start by creating a default listener */
1064  if (!pvt) {
1065  ao2_unlock(tps_singletons);
1066  return NULL;
1067  }
1069  if (!listener) {
1070  ao2_unlock(tps_singletons);
1072  return NULL;
1073  }
1074 
1075  p = __allocate_taskprocessor(name, listener);
1076  ao2_unlock(tps_singletons);
1077  p = __start_taskprocessor(p);
1078  ao2_ref(listener, -1);
1079 
1080  return p;
1081 }
A listener for taskprocessors.
static const char name[]
Definition: format_mp3.c:68
#define OBJ_KEY
Definition: astobj2.h:1155
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_unlock(a)
Definition: astobj2.h:730
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
#define LOG_ERROR
Definition: logger.h:285
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by sybsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 637 of file taskprocessor.c.

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

638 {
639  struct subsystem_alert *alert;
640  unsigned int count = 0;
641  int idx;
642 
643  AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
644  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
645  if (idx >= 0) {
646  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
647  count = alert->alert_count;
648  }
649  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
650 
651  return count;
652 }
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:721
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1207 of file taskprocessor.c.

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

1208 {
1209  return tps ? tps->suspended : -1;
1210 }
unsigned int suspended
Definition: taskprocessor.c:91

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1266 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

1267 {
1268  int is_task;
1269 
1270  ao2_lock(tps);
1271  is_task = pthread_equal(tps->thread, pthread_self());
1272  ao2_unlock(tps);
1273  return is_task;
1274 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener* ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 930 of file taskprocessor.c.

References ao2_alloc, ast_taskprocessor_listener::callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

931 {
933 
934  listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
935  if (!listener) {
936  return NULL;
937  }
938  listener->callbacks = callbacks;
939  listener->user_data = user_data;
940 
941  return listener;
942 }
A listener for taskprocessors.
const struct ast_taskprocessor_listener_callbacks * callbacks
#define NULL
Definition: resample.c:96
static void * listener(void *unused)
Definition: asterisk.c:1476
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static void taskprocessor_listener_dtor(void *obj)

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor* ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 944 of file taskprocessor.c.

References ao2_ref, and ast_taskprocessor_listener::tps.

Referenced by serializer_task_pushed().

945 {
946  ao2_ref(listener->tps, +1);
947  return listener->tps;
948 }
struct ast_taskprocessor * tps
#define ao2_ref(o, delta)
Definition: astobj2.h:464

◆ ast_taskprocessor_listener_get_user_data()

void* ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 950 of file taskprocessor.c.

References ast_taskprocessor_listener::user_data.

Referenced by serializer_shutdown(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

951 {
952  return listener->user_data;
953 }

◆ ast_taskprocessor_name()

const char* ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 906 of file taskprocessor.c.

References ast_log, LOG_ERROR, ast_taskprocessor::name, and NULL.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

907 {
908  if (!tps) {
909  ast_log(LOG_ERROR, "no taskprocessor specified!\n");
910  return NULL;
911  }
912  return tps->name;
913 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define LOG_ERROR
Definition: logger.h:285

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1285 of file taskprocessor.c.

References ast_assert, ast_taskprocessor_seq_num(), NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create().

1286 {
1287  int final_size = strlen(name) + SEQ_STR_SIZE;
1288 
1289  ast_assert(buf != NULL && name != NULL);
1290  ast_assert(final_size <= size);
1291 
1292  snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1293 }
static const char name[]
Definition: format_mp3.c:68
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define ast_assert(a)
Definition: utils.h:710
#define NULL
Definition: resample.c:96
#define SEQ_STR_SIZE

◆ ast_taskprocessor_push()

int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1175 of file taskprocessor.c.

References taskprocessor_push(), and tps_task_alloc().

Referenced by ast_cc_agent_status_response(), ast_cc_monitor_failed(), ast_cc_monitor_party_b_free(), ast_cc_monitor_status_request(), ast_cc_monitor_stop_ringing(), ast_msg_queue(), ast_sip_push_task(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_update(), AST_TEST_DEFINE(), ast_threadpool_push(), ast_threadpool_set_size(), async_delete_name_rec(), async_play_sound_helper(), cc_request_state_change(), cli_tps_ping(), default_listener_shutdown(), destroy_conference_bridge(), dns_system_resolver_resolve(), generic_monitor_devstate_cb(), handle_cc_status(), hepv3_send_packet(), iax2_transmit(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), play_sound_helper(), sorcery_object_load(), stasis_unsubscribe(), threadpool_active_thread_idle(), threadpool_idle_thread_dead(), threadpool_tps_emptied(), threadpool_tps_task_pushed(), and threadpool_zombie_thread_dead().

1176 {
1177  return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1178 }
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)

◆ ast_taskprocessor_push_local()

int ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

The callback receives a ast_taskprocessor_local struct, which contains both the provided datap pointer, and any local data set on the taskprocessor with ast_taskprocessor_set_local().

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
12.0.0

Referenced by AST_TEST_DEFINE(), and dispatch_message().

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1276 of file taskprocessor.c.

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

1277 {
1278  static int seq_num;
1279 
1280  return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1281 }
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1101 of file taskprocessor.c.

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

1103 {
1104  SCOPED_AO2LOCK(lock, tps);
1105  tps->local_data = local_data;
1106 }
ast_mutex_t lock
Definition: app_meetme.c:1093
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 900 of file taskprocessor.c.

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), and ast_threadpool_queue_size().

901 {
902  return (tps) ? tps->tps_queue_size : -1;
903 }
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1185 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

1186 {
1187  if (tps) {
1188  ao2_lock(tps);
1189  tps->suspended = 1;
1190  ao2_unlock(tps);
1191  return 0;
1192  }
1193  return -1;
1194 }
unsigned int suspended
Definition: taskprocessor.c:91
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ ast_taskprocessor_unreference()

void* ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1109 of file taskprocessor.c.

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), exten_state_subscription_destructor(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), and websocket_cb().

1110 {
1111  if (!tps) {
1112  return NULL;
1113  }
1114 
1115  /* To prevent another thread from finding and getting a reference to this
1116  * taskprocessor we hold the singletons lock. If we didn't do this then
1117  * they may acquire it and find that the listener has been shut down.
1118  */
1119  ao2_lock(tps_singletons);
1120 
1121  if (ao2_ref(tps, -1) > 3) {
1122  ao2_unlock(tps_singletons);
1123  return NULL;
1124  }
1125 
1126  /* If we're down to 3 references, then those must be:
1127  * 1. The reference we just got rid of
1128  * 2. The container
1129  * 3. The listener
1130  */
1131  ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1132  ao2_unlock(tps_singletons);
1133 
1135  return NULL;
1136 }
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
#define ao2_unlink_flags(container, obj, flags)
Definition: astobj2.h:1622

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1196 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().

1197 {
1198  if (tps) {
1199  ao2_lock(tps);
1200  tps->suspended = 0;
1201  ao2_unlock(tps);
1202  return 0;
1203  }
1204  return -1;
1205 }
unsigned int suspended
Definition: taskprocessor.c:91
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718