Asterisk - The Open Source Telephony Project GIT-master-ff80666
Functions
serializer.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

struct ast_serializer_poolast_serializer_pool_create (const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
 Create a serializer pool. More...
 
int ast_serializer_pool_destroy (struct ast_serializer_pool *pool)
 Destroy the serializer pool. More...
 
struct ast_taskprocessorast_serializer_pool_get (struct ast_serializer_pool *pool)
 Retrieve a serializer from the pool. More...
 
const char * ast_serializer_pool_name (const struct ast_serializer_pool *pool)
 Retrieve the base name of the serializer pool. More...
 
int ast_serializer_pool_set_alerts (struct ast_serializer_pool *pool, long high, long low)
 Set taskprocessor alert levels for the serializers in the pool. More...
 
struct ast_serializer_poolast_serializer_taskpool_create (const char *name, unsigned int size, struct ast_taskpool *taskpool, int timeout)
 Create a serializer pool on taskpool. More...
 

Function Documentation

◆ ast_serializer_pool_create()

struct ast_serializer_pool * ast_serializer_pool_create ( const char *  name,
unsigned int  size,
struct ast_threadpool threadpool,
int  timeout 
)

Create a serializer pool.

Create a serializer pool with an optional shutdown group. If a timeout greater than -1 is specified then a shutdown group is enabled on the pool.

Parameters
nameThe base name for the pool, and used when building taskprocessor(s)
sizeThe size of the pool
threadpoolThe backing threadpool to use
timeoutThe timeout used if using a shutdown group (-1 = disabled)
Returns
A newly allocated serializer pool object
Return values
NULLon error

Definition at line 78 of file serializer.c.

80{
81 struct ast_serializer_pool *pool;
82 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
83 size_t idx;
84
85 ast_assert(size > 0);
86
87 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
88 if (!pool) {
89 return NULL;
90 }
91
92 strcpy(pool->name, name); /* safe */
93
94 pool->shutdown_group_timeout = timeout;
96
97 AST_VECTOR_RW_INIT(&pool->serializers, size);
98
99 for (idx = 0; idx < size; ++idx) {
100 struct ast_taskprocessor *tps;
101
102 /* Create name with seq number appended. */
103 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
104
106 if (!tps) {
108 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
109 tps_name);
110 return NULL;
111 }
112
113 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
115 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
116 tps_name);
117 return NULL;
118 }
119 }
120
121 return pool;
122}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
static const char name[]
Definition: format_mp3.c:68
#define LOG_ERROR
#define NULL
Definition: resample.c:96
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:41
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:87
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:32
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1301
#define ast_assert(a)
Definition: utils.h:776
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:169

References ast_assert, ast_log, ast_malloc, ast_serializer_pool_destroy(), ast_serializer_shutdown_group_alloc(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_name_append(), ast_threadpool_serializer_group(), AST_VECTOR_APPEND, AST_VECTOR_RW_INIT, LOG_ERROR, name, NULL, ast_serializer_pool::shutdown_group, ast_serializer_pool::shutdown_group_timeout, and threadpool.

Referenced by AST_TEST_DEFINE(), and load_module().

◆ ast_serializer_pool_destroy()

int ast_serializer_pool_destroy ( struct ast_serializer_pool pool)

Destroy the serializer pool.

Attempt to destroy the serializer pool. If a shutdown group has been enabled, and times out waiting for threads to complete, then this function will return the number of remaining threads, and the pool will not be destroyed.

Parameters
poolThe pool to destroy

Definition at line 41 of file serializer.c.

42{
43 if (!pool) {
44 return 0;
45 }
46
47 /* Clear out the serializers */
48 AST_VECTOR_RW_WRLOCK(&pool->serializers);
50 AST_VECTOR_RW_UNLOCK(&pool->serializers);
51
52 /* If using a shutdown group then wait for all threads to complete */
53 if (pool->shutdown_group) {
54 int remaining;
55
56 ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
57
60
61 if (remaining) {
62 /* If we've timed out don't fully cleanup yet */
63 ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
64 "'%d' dependencies still processing.\n", pool->name, remaining);
65 return remaining;
66 }
67
68 ao2_ref(pool->shutdown_group, -1);
69 pool->shutdown_group = NULL;
70 }
71
72 AST_VECTOR_RW_FREE(&pool->serializers);
73 ast_free(pool);
74
75 return 0;
76}
#define ast_free(a)
Definition: astmm.h:180
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_WARNING
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:636
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:898
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:908
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:213

References ao2_ref, ast_debug, ast_free, ast_log, ast_serializer_shutdown_group_join(), ast_taskprocessor_unreference(), AST_VECTOR_RESET, AST_VECTOR_RW_FREE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_WARNING, NULL, ast_serializer_pool::shutdown_group, and ast_serializer_pool::shutdown_group_timeout.

Referenced by ast_serializer_pool_create(), ast_serializer_taskpool_create(), AST_TEST_DEFINE(), load_module(), and unload_module().

◆ ast_serializer_pool_get()

struct ast_taskprocessor * ast_serializer_pool_get ( struct ast_serializer_pool pool)

Retrieve a serializer from the pool.

Parameters
poolThe pool object
Returns
A serializer/taskprocessor

Definition at line 175 of file serializer.c.

176{
177 struct ast_taskprocessor *res;
178 size_t idx;
179
180 if (!pool) {
181 return NULL;
182 }
183
184 AST_VECTOR_RW_RDLOCK(&pool->serializers);
185 if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
186 AST_VECTOR_RW_UNLOCK(&pool->serializers);
187 return NULL;
188 }
189
190 res = AST_VECTOR_GET(&pool->serializers, 0);
191
192 /* Choose the taskprocessor with the smallest queue */
193 for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
194 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
196 res = cur;
197 }
198 }
199
200 AST_VECTOR_RW_UNLOCK(&pool->serializers);
201 return res;
202}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:888
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, and NULL.

Referenced by ast_sip_push_task(), ast_sip_push_task_wait_serializer(), AST_TEST_DEFINE(), load_module(), mwi_startup_event_cb(), mwi_stasis_cb(), send_contact_notify(), and send_notify().

◆ ast_serializer_pool_name()

const char * ast_serializer_pool_name ( const struct ast_serializer_pool pool)

Retrieve the base name of the serializer pool.

Parameters
poolThe pool object
Returns
The base name given to the pool

Definition at line 170 of file serializer.c.

171{
172 return pool->name;
173}

Referenced by AST_TEST_DEFINE().

◆ ast_serializer_pool_set_alerts()

int ast_serializer_pool_set_alerts ( struct ast_serializer_pool pool,
long  high,
long  low 
)

Set taskprocessor alert levels for the serializers in the pool.

Parameters
poolThe pool to destroy
high,low
Return values
0on success.
-1on error.

Definition at line 204 of file serializer.c.

205{
206 size_t idx;
207 long tps_queue_high;
208 long tps_queue_low;
209
210 if (!pool) {
211 return 0;
212 }
213
214 tps_queue_high = high;
215 if (tps_queue_high <= 0) {
216 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
217 "trigger level '%ld'\n", pool->name, tps_queue_high);
218 tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
219 }
220
221 tps_queue_low = low;
222 if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
223 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
224 "level '%ld'\n", pool->name, tps_queue_low);
225 tps_queue_low = -1;
226 }
227
228 for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
229 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
231 ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
233 }
234 }
235
236 return 0;
237}
#define AST_LOG_WARNING
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.

References ast_log, AST_LOG_WARNING, ast_taskprocessor_alert_set_levels(), AST_TASKPROCESSOR_HIGH_WATER_LEVEL, ast_taskprocessor_name(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_low.

Referenced by AST_TEST_DEFINE(), and global_loaded().

◆ ast_serializer_taskpool_create()

struct ast_serializer_pool * ast_serializer_taskpool_create ( const char *  name,
unsigned int  size,
struct ast_taskpool taskpool,
int  timeout 
)

Create a serializer pool on taskpool.

Since
23.1.0
22.7.0
20.17.0

Create a serializer pool with an optional shutdown group. If a timeout greater than -1 is specified then a shutdown group is enabled on the pool.

Parameters
nameThe base name for the pool, and used when building taskprocessor(s)
sizeThe size of the pool
taskpoolThe backing taskpool to use
timeoutThe timeout used if using a shutdown group (-1 = disabled)
Returns
A newly allocated serializer pool object
Return values
NULLon error

Definition at line 124 of file serializer.c.

126{
127 struct ast_serializer_pool *pool;
128 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
129 size_t idx;
130
131 ast_assert(size > 0);
132
133 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
134 if (!pool) {
135 return NULL;
136 }
137
138 strcpy(pool->name, name); /* safe */
139
140 pool->shutdown_group_timeout = timeout;
142
143 AST_VECTOR_RW_INIT(&pool->serializers, size);
144
145 for (idx = 0; idx < size; ++idx) {
146 struct ast_taskprocessor *tps;
147
148 /* Create name with seq number appended. */
149 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
150
152 if (!tps) {
154 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
155 tps_name);
156 return NULL;
157 }
158
159 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
161 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
162 tps_name);
163 return NULL;
164 }
165 }
166
167 return pool;
168}
static struct ast_taskpool * taskpool
Definition: stasis.c:374
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:789

References ast_assert, ast_log, ast_malloc, ast_serializer_pool_destroy(), ast_serializer_shutdown_group_alloc(), ast_taskpool_serializer_group(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_name_append(), AST_VECTOR_APPEND, AST_VECTOR_RW_INIT, LOG_ERROR, name, NULL, ast_serializer_pool::shutdown_group, ast_serializer_pool::shutdown_group_timeout, and taskpool.