Asterisk - The Open Source Telephony Project GIT-master-55f4e6d
serializer.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2019, Sangoma Technologies Corporation
5 *
6 * Kevin Harwell <kharwell@digium.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19#include "asterisk.h"
20
21#include "asterisk/astobj2.h"
22#include "asterisk/serializer.h"
24#include "asterisk/threadpool.h"
25#include "asterisk/utils.h"
26#include "asterisk/vector.h"
27
29 /*! Shutdown group to monitor serializers. */
31 /*! Time to wait if using a shutdown group. */
33 /*! A pool of taskprocessor(s) */
34 AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
35 /*! Base name for the pool */
36 char name[];
37};
38
40{
41 if (!pool) {
42 return 0;
43 }
44
45 /* Clear out the serializers */
46 AST_VECTOR_RW_WRLOCK(&pool->serializers);
48 AST_VECTOR_RW_UNLOCK(&pool->serializers);
49
50 /* If using a shutdown group then wait for all threads to complete */
51 if (pool->shutdown_group) {
52 int remaining;
53
54 ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
55
58
59 if (remaining) {
60 /* If we've timed out don't fully cleanup yet */
61 ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
62 "'%d' dependencies still processing.\n", pool->name, remaining);
63 return remaining;
64 }
65
66 ao2_ref(pool->shutdown_group, -1);
67 pool->shutdown_group = NULL;
68 }
69
70 AST_VECTOR_RW_FREE(&pool->serializers);
71 ast_free(pool);
72
73 return 0;
74}
75
77 unsigned int size, struct ast_threadpool *threadpool, int timeout)
78{
79 struct ast_serializer_pool *pool;
80 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
81 size_t idx;
82
83 ast_assert(size > 0);
84
85 pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
86 if (!pool) {
87 return NULL;
88 }
89
90 strcpy(pool->name, name); /* safe */
91
92 pool->shutdown_group_timeout = timeout;
94
95 AST_VECTOR_RW_INIT(&pool->serializers, size);
96
97 for (idx = 0; idx < size; ++idx) {
98 struct ast_taskprocessor *tps;
99
100 /* Create name with seq number appended. */
101 ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
102
104 if (!tps) {
106 ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
107 tps_name);
108 return NULL;
109 }
110
111 if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
113 ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
114 tps_name);
115 return NULL;
116 }
117 }
118
119 return pool;
120}
121
122const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
123{
124 return pool->name;
125}
126
128{
129 struct ast_taskprocessor *res;
130 size_t idx;
131
132 if (!pool) {
133 return NULL;
134 }
135
136 AST_VECTOR_RW_RDLOCK(&pool->serializers);
137 if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
138 AST_VECTOR_RW_UNLOCK(&pool->serializers);
139 return NULL;
140 }
141
142 res = AST_VECTOR_GET(&pool->serializers, 0);
143
144 /* Choose the taskprocessor with the smallest queue */
145 for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
146 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
148 res = cur;
149 }
150 }
151
152 AST_VECTOR_RW_UNLOCK(&pool->serializers);
153 return res;
154}
155
156int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
157{
158 size_t idx;
159 long tps_queue_high;
160 long tps_queue_low;
161
162 if (!pool) {
163 return 0;
164 }
165
166 tps_queue_high = high;
167 if (tps_queue_high <= 0) {
168 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
169 "trigger level '%ld'\n", pool->name, tps_queue_high);
171 }
172
173 tps_queue_low = low;
175 ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
176 "level '%ld'\n", pool->name, tps_queue_low);
177 tps_queue_low = -1;
178 }
179
180 for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
181 struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
183 ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
185 }
186 }
187
188 return 0;
189}
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition: astmm.h:180
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
static const char name[]
Definition: format_mp3.c:68
#define AST_LOG_WARNING
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define NULL
Definition: resample.c:96
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:30
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
An opaque threadpool structure.
Definition: threadpool.c:36
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:64
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1398
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
Utility functions.
#define ast_assert(a)
Definition: utils.h:739
Vector container support.
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:625
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:887
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:897
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:93
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:877
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158