Asterisk - The Open Source Telephony Project GIT-master-f36a736
Data Structures | Functions
serializer.c File Reference
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"
Include dependency graph for serializer.c:

Go to the source code of this file.

Data Structures

struct  ast_serializer_pool
 

Functions

struct ast_serializer_poolast_serializer_pool_create (const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
 Create a serializer pool. More...
 
int ast_serializer_pool_destroy (struct ast_serializer_pool *pool)
 Destroy the serializer pool. More...
 
struct ast_taskprocessorast_serializer_pool_get (struct ast_serializer_pool *pool)
 Retrieve a serializer from the pool. More...
 
const char * ast_serializer_pool_name (const struct ast_serializer_pool *pool)
 Retrieve the base name of the serializer pool. More...
 
int ast_serializer_pool_set_alerts (struct ast_serializer_pool *pool, long high, long low)
 Set taskprocessor alert levels for the serializers in the pool. More...
 

Function Documentation

◆ ast_serializer_pool_create()

struct ast_serializer_pool * ast_serializer_pool_create ( const char *  name,
unsigned int  size,
struct ast_threadpool threadpool,
int  timeout 
)

Create a serializer pool.

Create a serializer pool with an optional shutdown group. If a timeout greater than -1 is specified then a shutdown group is enabled on the pool.

Parameters
nameThe base name for the pool, and used when building taskprocessor(s)
sizeThe size of the pool
threadpoolThe backing threadpool to use
timeoutThe timeout used if using a shutdown group (-1 = disabled)
Returns
A newly allocated serializer pool object
Return values
NULLon error

Definition at line 76 of file serializer.c.

78{
79 struct ast_serializer_pool *pool;
80 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
81 size_t idx;
82
83 ast_assert(size > 0);
84
85 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
86 if (!pool) {
87 return NULL;
88 }
89
90 strcpy(pool->name, name); /* safe */
91
92 pool->shutdown_group_timeout = timeout;
94
95 AST_VECTOR_RW_INIT(&pool->serializers, size);
96
97 for (idx = 0; idx < size; ++idx) {
98 struct ast_taskprocessor *tps;
99
100 /* Create name with seq number appended. */
101 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
102
104 if (!tps) {
106 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
107 tps_name);
108 return NULL;
109 }
110
111 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
113 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
114 tps_name);
115 return NULL;
116 }
117 }
118
119 return pool;
120}
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
static const char name[]
Definition: format_mp3.c:68
#define LOG_ERROR
#define NULL
Definition: resample.c:96
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:30
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
#define ast_assert(a)
Definition: utils.h:739
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158

References ast_assert, ast_log, ast_malloc, ast_serializer_pool_destroy(), ast_serializer_shutdown_group_alloc(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_name_append(), ast_threadpool_serializer_group(), AST_VECTOR_APPEND, AST_VECTOR_RW_INIT, LOG_ERROR, name, NULL, ast_serializer_pool::shutdown_group, ast_serializer_pool::shutdown_group_timeout, and threadpool.

Referenced by AST_TEST_DEFINE(), and load_module().

◆ ast_serializer_pool_destroy()

int ast_serializer_pool_destroy ( struct ast_serializer_pool pool)

Destroy the serializer pool.

Attempt to destroy the serializer pool. If a shutdown group has been enabled, and times out waiting for threads to complete, then this function will return the number of remaining threads, and the pool will not be destroyed.

Parameters
poolThe pool to destroy

Definition at line 39 of file serializer.c.

40{
41 if (!pool) {
42 return 0;
43 }
44
45 /* Clear out the serializers */
46 AST_VECTOR_RW_WRLOCK(&pool->serializers);
48 AST_VECTOR_RW_UNLOCK(&pool->serializers);
49
50 /* If using a shutdown group then wait for all threads to complete */
51 if (pool->shutdown_group) {
52 int remaining;
53
54 ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
55
58
59 if (remaining) {
60 /* If we've timed out don't fully cleanup yet */
61 ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
62 "'%d' dependencies still processing.\n", pool->name, remaining);
63 return remaining;
64 }
65
66 ao2_ref(pool->shutdown_group, -1);
67 pool->shutdown_group = NULL;
68 }
69
70 AST_VECTOR_RW_FREE(&pool->serializers);
71 ast_free(pool);
72
73 return 0;
74}
#define ast_free(a)
Definition: astmm.h:180
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_WARNING
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:625
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:887
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:897
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202

References ao2_ref, ast_debug, ast_free, ast_log, ast_serializer_shutdown_group_join(), ast_taskprocessor_unreference(), AST_VECTOR_RESET, AST_VECTOR_RW_FREE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_WARNING, NULL, ast_serializer_pool::shutdown_group, and ast_serializer_pool::shutdown_group_timeout.

Referenced by ast_serializer_pool_create(), AST_TEST_DEFINE(), load_module(), and unload_module().

◆ ast_serializer_pool_get()

struct ast_taskprocessor * ast_serializer_pool_get ( struct ast_serializer_pool pool)

Retrieve a serializer from the pool.

Parameters
poolThe pool object
Returns
A serializer/taskprocessor

Definition at line 127 of file serializer.c.

128{
129 struct ast_taskprocessor *res;
130 size_t idx;
131
132 if (!pool) {
133 return NULL;
134 }
135
136 AST_VECTOR_RW_RDLOCK(&pool->serializers);
137 if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
138 AST_VECTOR_RW_UNLOCK(&pool->serializers);
139 return NULL;
140 }
141
142 res = AST_VECTOR_GET(&pool->serializers, 0);
143
144 /* Choose the taskprocessor with the smallest queue */
145 for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
146 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
148 res = cur;
149 }
150 }
151
152 AST_VECTOR_RW_UNLOCK(&pool->serializers);
153 return res;
154}
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:877
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, and NULL.

Referenced by ast_sip_push_task(), ast_sip_push_task_wait_serializer(), AST_TEST_DEFINE(), load_module(), mwi_startup_event_cb(), mwi_stasis_cb(), send_contact_notify(), and send_notify().

◆ ast_serializer_pool_name()

const char * ast_serializer_pool_name ( const struct ast_serializer_pool pool)

Retrieve the base name of the serializer pool.

Parameters
poolThe pool object
Returns
The base name given to the pool

Definition at line 122 of file serializer.c.

123{
124 return pool->name;
125}

Referenced by AST_TEST_DEFINE().

◆ ast_serializer_pool_set_alerts()

int ast_serializer_pool_set_alerts ( struct ast_serializer_pool pool,
long  high,
long  low 
)

Set taskprocessor alert levels for the serializers in the pool.

Parameters
poolThe pool to destroy
high,low
Return values
0on success.
-1on error.

Definition at line 156 of file serializer.c.

157{
158 size_t idx;
159 long tps_queue_high;
160 long tps_queue_low;
161
162 if (!pool) {
163 return 0;
164 }
165
166 tps_queue_high = high;
167 if (tps_queue_high <= 0) {
168 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
169 "trigger level '%ld'\n", pool->name, tps_queue_high);
170 tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
171 }
172
173 tps_queue_low = low;
174 if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
175 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
176 "level '%ld'\n", pool->name, tps_queue_low);
177 tps_queue_low = -1;
178 }
179
180 for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
181 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
183 ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
185 }
186 }
187
188 return 0;
189}
#define AST_LOG_WARNING
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.

References ast_log, AST_LOG_WARNING, ast_taskprocessor_alert_set_levels(), AST_TASKPROCESSOR_HIGH_WATER_LEVEL, ast_taskprocessor_name(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_low.

Referenced by AST_TEST_DEFINE(), and global_loaded().