Asterisk - The Open Source Telephony Project GIT-master-77d630f
serializer.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2019, Sangoma Technologies Corporation
5 *
6 * Kevin Harwell <kharwell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#include "asterisk.h"
20
21#include "asterisk/astobj2.h"
22#include "asterisk/serializer.h"
24#include "asterisk/threadpool.h"
25#include "asterisk/taskpool.h"
26#include "asterisk/utils.h"
27#include "asterisk/vector.h"
29
31 /*! Shutdown group to monitor serializers. */
33 /*! Time to wait if using a shutdown group. */
35 /*! A pool of taskprocessor(s) */
36 AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
37 /*! Base name for the pool */
38 char name[];
39};
40
42{
43 if (!pool) {
44 return 0;
45 }
46
47 /* Clear out the serializers */
48 AST_VECTOR_RW_WRLOCK(&pool->serializers);
50 AST_VECTOR_RW_UNLOCK(&pool->serializers);
51
52 /* If using a shutdown group then wait for all threads to complete */
53 if (pool->shutdown_group) {
54 int remaining;
55
56 ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
57
60
61 if (remaining) {
62 /* If we've timed out don't fully cleanup yet */
63 ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
64 "'%d' dependencies still processing.\n", pool->name, remaining);
65 return remaining;
66 }
67
68 ao2_ref(pool->shutdown_group, -1);
69 pool->shutdown_group = NULL;
70 }
71
72 AST_VECTOR_RW_FREE(&pool->serializers);
73 ast_free(pool);
74
75 return 0;
76}
77
79 unsigned int size, struct ast_threadpool *threadpool, int timeout)
80{
81 struct ast_serializer_pool *pool;
82 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
83 size_t idx;
84
85 ast_assert(size > 0);
86
87 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
88 if (!pool) {
89 return NULL;
90 }
91
92 strcpy(pool->name, name); /* safe */
93
94 pool->shutdown_group_timeout = timeout;
96
97 AST_VECTOR_RW_INIT(&pool->serializers, size);
98
99 for (idx = 0; idx < size; ++idx) {
100 struct ast_taskprocessor *tps;
101
102 /* Create name with seq number appended. */
103 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
104
106 if (!tps) {
108 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
109 tps_name);
110 return NULL;
111 }
112
113 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
115 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
116 tps_name);
117 return NULL;
118 }
119 }
120
121 return pool;
122}
123
125 unsigned int size, struct ast_taskpool *taskpool, int timeout)
126{
127 struct ast_serializer_pool *pool;
128 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
129 size_t idx;
130
131 ast_assert(size > 0);
132
133 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
134 if (!pool) {
135 return NULL;
136 }
137
138 strcpy(pool->name, name); /* safe */
139
140 pool->shutdown_group_timeout = timeout;
142
143 AST_VECTOR_RW_INIT(&pool->serializers, size);
144
145 for (idx = 0; idx < size; ++idx) {
146 struct ast_taskprocessor *tps;
147
148 /* Create name with seq number appended. */
149 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
150
152 if (!tps) {
154 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
155 tps_name);
156 return NULL;
157 }
158
159 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
161 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
162 tps_name);
163 return NULL;
164 }
165 }
166
167 return pool;
168}
169
170const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
171{
172 return pool->name;
173}
174
176{
177 struct ast_taskprocessor *res;
178 size_t idx;
179
180 if (!pool) {
181 return NULL;
182 }
183
184 AST_VECTOR_RW_RDLOCK(&pool->serializers);
185 if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
186 AST_VECTOR_RW_UNLOCK(&pool->serializers);
187 return NULL;
188 }
189
190 res = AST_VECTOR_GET(&pool->serializers, 0);
191
192 /* Choose the taskprocessor with the smallest queue */
193 for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
194 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
196 res = cur;
197 }
198 }
199
200 AST_VECTOR_RW_UNLOCK(&pool->serializers);
201 return res;
202}
203
204int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
205{
206 size_t idx;
207 long tps_queue_high;
208 long tps_queue_low;
209
210 if (!pool) {
211 return 0;
212 }
213
214 tps_queue_high = high;
215 if (tps_queue_high <= 0) {
216 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
217 "trigger level '%ld'\n", pool->name, tps_queue_high);
219 }
220
221 tps_queue_low = low;
223 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
224 "level '%ld'\n", pool->name, tps_queue_low);
225 tps_queue_low = -1;
226 }
227
228 for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
229 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
231 ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
233 }
234 }
235
236 return 0;
237}
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static const char name[]
Definition: format_mp3.c:68
#define AST_LOG_WARNING
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define NULL
Definition: resample.c:96
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:175
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:204
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:170
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:78
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:41
struct ast_serializer_pool * ast_serializer_taskpool_create(const char *name, unsigned int size, struct ast_taskpool *taskpool, int timeout)
Create a serializer pool on taskpool.
Definition: serializer.c:124
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:87
static struct ast_taskpool * taskpool
Definition: stasis.c:374
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:32
An opaque taskpool structure.
Definition: taskpool.c:62
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
An opaque threadpool structure.
Definition: threadpool.c:36
struct ast_taskprocessor * ast_taskpool_serializer_group(const char *name, struct ast_taskpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_taskpool.
Definition: taskpool.c:789
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1301
Utility functions.
#define ast_assert(a)
Definition: utils.h:776
Vector container support.
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:636
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:620
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:898
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:908
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:213
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:104
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:888
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:267
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:691
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:169