Asterisk - The Open Source Telephony Project  GIT-master-932eae6
serializer.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2019, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <kharwell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #include "asterisk.h"
20 
21 #include "asterisk/astobj2.h"
22 #include "asterisk/serializer.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/threadpool.h"
25 #include "asterisk/utils.h"
26 #include "asterisk/vector.h"
27 
29  /*! Shutdown group to monitor serializers. */
31  /*! Time to wait if using a shutdown group. */
33  /*! A pool of taskprocessor(s) */
34  AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
35  /*! Base name for the pool */
36  char name[];
37 };
38 
40 {
41  if (!pool) {
42  return 0;
43  }
44 
45  /* Clear out the serializers */
46  AST_VECTOR_RW_WRLOCK(&pool->serializers);
48  AST_VECTOR_RW_UNLOCK(&pool->serializers);
49 
50  /* If using a shutdown group then wait for all threads to complete */
51  if (pool->shutdown_group) {
52  int remaining;
53 
54  ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
55 
58 
59  if (remaining) {
60  /* If we've timed out don't fully cleanup yet */
61  ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
62  "'%d' dependencies still processing.\n", pool->name, remaining);
63  return remaining;
64  }
65 
66  ao2_ref(pool->shutdown_group, -1);
67  pool->shutdown_group = NULL;
68  }
69 
70  AST_VECTOR_RW_FREE(&pool->serializers);
71  ast_free(pool);
72 
73  return 0;
74 }
75 
77  unsigned int size, struct ast_threadpool *threadpool, int timeout)
78 {
79  struct ast_serializer_pool *pool;
80  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
81  size_t idx;
82 
83  ast_assert(size > 0);
84 
85  pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
86  if (!pool) {
87  return NULL;
88  }
89 
90  strcpy(pool->name, name); /* safe */
91 
93  pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
94 
95  AST_VECTOR_RW_INIT(&pool->serializers, size);
96 
97  for (idx = 0; idx < size; ++idx) {
98  struct ast_taskprocessor *tps;
99 
100  /* Create name with seq number appended. */
101  ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
102 
103  tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
104  if (!tps) {
106  ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
107  tps_name);
108  return NULL;
109  }
110 
111  if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
113  ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
114  tps_name);
115  return NULL;
116  }
117  }
118 
119  return pool;
120 }
121 
122 const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
123 {
124  return pool->name;
125 }
126 
128 {
129  struct ast_taskprocessor *res;
130  size_t idx;
131 
132  if (!pool) {
133  return NULL;
134  }
135 
136  AST_VECTOR_RW_RDLOCK(&pool->serializers);
137  if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
138  AST_VECTOR_RW_UNLOCK(&pool->serializers);
139  return NULL;
140  }
141 
142  res = AST_VECTOR_GET(&pool->serializers, 0);
143 
144  /* Choose the taskprocessor with the smallest queue */
145  for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
146  struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
148  res = cur;
149  }
150  }
151 
152  AST_VECTOR_RW_UNLOCK(&pool->serializers);
153  return res;
154 }
155 
156 int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
157 {
158  size_t idx;
159  long tps_queue_high;
160  long tps_queue_low;
161 
162  if (!pool) {
163  return 0;
164  }
165 
166  tps_queue_high = high;
167  if (tps_queue_high <= 0) {
168  ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
169  "trigger level '%ld'\n", pool->name, tps_queue_high);
170  tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
171  }
172 
173  tps_queue_low = low;
174  if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
175  ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
176  "level '%ld'\n", pool->name, tps_queue_low);
177  tps_queue_low = -1;
178  }
179 
180  for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
181  struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
182  if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
183  ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
185  }
186  }
187 
188  return 0;
189 }
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158
Asterisk main include file. File version handling, generic pbx functions.
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:93
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define LOG_WARNING
Definition: logger.h:274
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:63
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
static int timeout
Definition: cdr_mysql.c:86
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_LOG_WARNING
Definition: logger.h:279
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1402
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
#define ast_assert(a)
Definition: utils.h:650
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define NULL
Definition: resample.c:96
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
Utility functions.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:444
#define ast_log
Definition: astobj2.c:42
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
#define LOG_ERROR
Definition: logger.h:285
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:30
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:627
Vector container support.
An API for managing task processing threads that can be shared across modules.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
An opaque threadpool structure.
Definition: threadpool.c:36
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.