Asterisk - The Open Source Telephony Project GIT-master-590b490
Loading...
Searching...
No Matches
res_stasis_broadcast.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2026, Aurora Innovation AB
5 *
6 * Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \brief Stasis application broadcast resource
22 *
23 * \author Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
24 */
25
26/*** MODULEINFO
27 <depend type="module">res_stasis</depend>
28 <depend type="module">res_ari</depend>
29 <support_level>extended</support_level>
30 ***/
31
32#include "asterisk.h"
33
34#include <errno.h>
35#include <regex.h>
36
37#include "asterisk/astobj2.h"
38#include "asterisk/channel.h"
39#include "asterisk/http.h"
40#include "asterisk/json.h"
41#include "asterisk/lock.h"
42#include "asterisk/module.h"
43#include "asterisk/pbx.h"
44#include "asterisk/stasis_app.h"
47#include "asterisk/time.h"
48#include "asterisk/utils.h"
49
50#define AST_API_MODULE /* Mark this as the module providing the API */
52
53#define BROADCAST_BUCKETS 37
54
55/*! \brief Maximum length for app_filter regex pattern */
56#define MAX_REGEX_LENGTH 256
57
58/*! \brief Maximum depth for regex group nesting */
59#define MAX_GROUP_DEPTH 10
60
61/*! \brief Maximum number of nested quantifiers in regex */
62#define MAX_NESTED_QUANTIFIERS 3
63
64/*! \brief Maximum value for brace quantifier bounds {m,n} */
65#define MAX_QUANTIFIER_BOUND 100
66
67/*! \brief Maximum alternations allowed in deeply nested groups */
68#define MAX_ALTERNATIONS 20
69
70/*! \brief Group depth threshold for alternation limits */
71#define ALTERNATION_DEPTH_THRESHOLD 2
72
73/*! \brief Maximum broadcast timeout in milliseconds (24 hours) */
74#define MAX_BROADCAST_TIMEOUT_MS (24 * 60 * 60 * 1000)
75
76/*! \brief Interval in ms between hangup checks while waiting for a claim */
77#define BROADCAST_POLL_INTERVAL_MS 200
78
79/*! \brief Broadcast context stored on channel */
81 /*! The unique ID of the channel */
83 /*! Name of the winning application (dynamically allocated, NULL until claimed) */
85 /*! Regex pattern used to filter broadcast recipients */
87 /*! Compiled regex for app_filter (valid only when filter_compiled is set) */
89 /*! Flag indicating if channel was claimed */
90 unsigned int claimed:1;
91 /*! Whether compiled_filter is valid and must be freed */
92 unsigned int filter_compiled:1;
93 /*! Set when the PBX thread retrieves the winner; prevents late claims */
94 unsigned int finished:1;
95 /*! Broadcast behaviour flags (STASIS_BROADCAST_FLAG_*) */
96 unsigned int flags;
97 /*! Reference to the global container (prevents use-after-free during module unload) */
99 /*! Condition variable for claim notification */
101};
102
103/*! \brief Container for all active broadcast contexts */
105
106/*! \brief Destructor for broadcast datastore
107 *
108 * Called when the channel is destroyed. Ensures the broadcast context
109 * is unlinked from the global container even if the caller never
110 * reached stasis_app_broadcast_cleanup (e.g. abnormal channel teardown).
111 */
112static void broadcast_datastore_destroy(void *data)
113{
114 struct stasis_broadcast_ctx *ctx = data;
115
116 if (ctx->container) {
117 ao2_unlink(ctx->container, ctx);
118 }
119 ao2_cleanup(ctx);
120}
121
122/*! \brief Datastore information for broadcast context */
124 .type = "stasis_broadcast_context",
126};
127
130
131/*! \brief Destructor for broadcast context */
132static void broadcast_ctx_destructor(void *obj)
133{
134 struct stasis_broadcast_ctx *ctx = obj;
135 ast_free(ctx->winner_app);
136 if (ctx->filter_compiled) {
137 regfree(&ctx->compiled_filter);
138 }
140 ast_cond_destroy(&ctx->cond);
141}
142
143static int validate_regex_pattern(const char *pattern);
144
145/*! \brief Create a new broadcast context
146 *
147 * Validates and compiles the app_filter regex if provided. On regex
148 * failure the context is still created but broadcasts will be sent
149 * to all applications (i.e. no filtering).
150 */
152 const char *channel_id, const char *app_filter, unsigned int flags)
153{
154 struct stasis_broadcast_ctx *ctx;
155
156 ctx = ao2_alloc(sizeof(*ctx), broadcast_ctx_destructor);
157 if (!ctx) {
158 return NULL;
159 }
160
161 /* ao2_alloc zeroes the struct; only set non-zero fields explicitly */
162 ast_copy_string(ctx->channel_id, channel_id, sizeof(ctx->channel_id));
163 ctx->flags = flags;
165 ast_cond_init(&ctx->cond, NULL);
166
167 /* Validate and compile app_filter regex if provided */
169 ast_copy_string(ctx->app_filter, app_filter, sizeof(ctx->app_filter));
172 "Channel %s: rejecting app_filter regex as potentially dangerous: %s\n",
174 } else if (regcomp(&ctx->compiled_filter, app_filter,
175 REG_EXTENDED | REG_NOSUB) != 0) {
177 "Channel %s: failed to compile app_filter regex '%s'\n",
179 } else {
180 ctx->filter_compiled = 1;
181 }
182
183 if (!ctx->filter_compiled) {
185 "Channel %s: proceeding without application filtering due to invalid regex\n",
186 channel_id);
187 }
188 }
189
190 ast_debug(1, "Created broadcast context for channel %s (filter: %s, flags: 0x%x)\n",
191 ctx->channel_id,
192 ctx->filter_compiled ? ctx->app_filter : "none",
193 ctx->flags);
194
195 return ctx;
196}
197
198/*!
199 * \brief Validate a regex pattern for safety
200 *
201 * Checks that the regex pattern is within length limits and doesn't contain
202 * patterns that could cause excessive backtracking or denial of service.
203 *
204 * \param pattern The regex pattern to validate
205 * \return 0 if valid, -1 if invalid
206 */
207static int validate_regex_pattern(const char *pattern)
208{
209 size_t len;
210 int group_depth = 0;
211 int quantified_groups = 0;
212 int in_class = 0; /* Inside [...] */
213 /* Track alternations per group depth. Index 0 is outside groups and unused. */
214 int alternations_per_depth[MAX_GROUP_DEPTH + 1] = { 0 };
215 const char *p;
216
217 if (ast_strlen_zero(pattern)) {
218 return 0; /* Empty pattern is valid (will be skipped) */
219 }
220
221 /* Check maximum length to prevent excessive regex compilation time */
222 len = strlen(pattern);
223 if (len > MAX_REGEX_LENGTH) {
224 ast_debug(3, "Regex pattern exceeds maximum length of %d characters (got %zu)\n",
226 return -1;
227 }
228
229 /*
230 * Check for potentially dangerous patterns that could cause
231 * excessive regex compilation or matching time. Look for:
232 * - Excessive group nesting depth
233 * - Too many quantified groups (groups followed by +, *, or ?)
234 *
235 * Note: This is a heuristic approach that catches common dangerous
236 * patterns. Combined with the length limit, it provides reasonable
237 * protection against ReDoS while allowing legitimate regex usage.
238 */
239 for (p = pattern; *p; p++) {
240 /* Handle character classes: enter on unescaped '[' and exit on unescaped ']' */
241 if (!in_class && *p == '[' && (p == pattern || *(p - 1) != '\\')) {
242 in_class = 1;
243 /* In POSIX ERE, ']' immediately after '[' or '[^' is a
244 * literal, not the end of the class. Advance past the
245 * optional negation caret and the literal ']' so the
246 * main loop does not leave in_class prematurely. */
247 if (*(p + 1) == '^') {
248 p++;
249 }
250 if (*(p + 1) == ']') {
251 p++;
252 }
253 continue;
254 } else if (in_class) {
255 if (*p == '\\') {
256 /* Skip the next escaped character inside character class */
257 if (*(p + 1)) {
258 p++;
259 }
260 continue;
261 }
262 if (*p == ']') {
263 in_class = 0;
264 }
265 /* Ignore everything inside character classes for heuristics */
266 continue;
267 }
268 switch (*p) {
269 case '(':
270 group_depth++;
271 if (group_depth > MAX_GROUP_DEPTH) {
272 ast_debug(3, "Regex pattern has too many nested groups (max %d)\n",
274 return -1;
275 }
276 /* Reset alternation counter for newly entered group depth */
277 alternations_per_depth[group_depth] = 0;
278 break;
279 case ')':
280 if (group_depth > 0) {
281 /* Clear alternations count for this depth before leaving */
282 alternations_per_depth[group_depth] = 0;
283 group_depth--;
284 }
285 break;
286 case '+':
287 case '*':
288 case '?':
289 /*
290 * Count quantified groups - patterns like (...)+ or (...)*
291 * Too many of these can cause slow matching on certain inputs.
292 */
293 if (p > pattern && *(p - 1) == ')') {
294 quantified_groups++;
295 }
296 break;
297 case '{': {
298 /* Parse POSIX quantifier {m}, {m,}, {m,n} with overflow and bound checks */
299 const char *q = p + 1;
300 long m = 0, n = -1; /* n=-1 means open upper bound */
301 int valid = 0;
302 int digit;
303 int overflow = 0;
304
305 if (*q >= '0' && *q <= '9') {
306 /* Parse m safely */
307 while (*q >= '0' && *q <= '9') {
308 digit = (*q - '0');
309 if (m > (LONG_MAX - digit) / 10) { /* overflow on next step */
310 overflow = 1;
311 break;
312 }
313 m = (m * 10) + digit;
314 if (m > MAX_QUANTIFIER_BOUND) { /* early bound exceed */
315 overflow = 1;
316 break;
317 }
318 q++;
319 }
320 if (!overflow && *q == ',') {
321 q++;
322 if (*q >= '0' && *q <= '9') {
323 long nn = 0;
324 while (*q >= '0' && *q <= '9') {
325 digit = (*q - '0');
326 if (nn > (LONG_MAX - digit) / 10) {
327 overflow = 1;
328 break;
329 }
330 nn = (nn * 10) + digit;
331 if (nn > MAX_QUANTIFIER_BOUND) {
332 overflow = 1;
333 break;
334 }
335 q++;
336 }
337 n = nn;
338 } else {
339 n = -1; /* open upper bound */
340 }
341 } else if (!overflow) {
342 n = m; /* {m} */
343 }
344 if (!overflow && *q == '}') {
345 valid = 1;
346 }
347 }
348 if (overflow) {
349 ast_debug(3, "Regex quantifier overflow or exceeds max bound (max %d)\n", MAX_QUANTIFIER_BOUND);
350 return -1;
351 }
352 if (valid) {
353 /* Additional bounds check (defensive) */
354 if (m > MAX_QUANTIFIER_BOUND || (n != -1 && n > MAX_QUANTIFIER_BOUND)) {
355 ast_debug(3, "Regex quantifier bounds too large (max %d)\n", MAX_QUANTIFIER_BOUND);
356 return -1;
357 }
358 if (p > pattern && *(p - 1) == ')') {
359 quantified_groups++;
360 }
361 p = q; /* q currently points to '}' */
362 }
363 break;
364 }
365 case '|':
366 if (group_depth > 0) {
367 alternations_per_depth[group_depth]++;
368 if (group_depth > ALTERNATION_DEPTH_THRESHOLD &&
369 alternations_per_depth[group_depth] > MAX_ALTERNATIONS) {
370 ast_debug(3,
371 "Regex has too many alternations in deep group (depth %d, count %d, max %d)\n",
372 group_depth,
373 alternations_per_depth[group_depth],
375 return -1;
376 }
377 }
378 break;
379 case '\\':
380 /*
381 * Skip the next character entirely from heuristic processing.
382 * This ensures escaped characters (metacharacters in BRE or literals
383 * in ERE like \‍(, \‍), \+, \*, \?, etc.) do not affect group depth
384 * or quantified group counts.
385 */
386 if (*(p + 1)) {
387 p++;
388 }
389 /* Continue to next loop iteration without evaluating the escaped char */
390 continue;
391 }
392 }
393
394 /*
395 * Reject patterns with too many quantified groups, as these are
396 * often indicators of potentially slow patterns that could be
397 * exploited for denial of service.
398 */
399 if (quantified_groups > MAX_NESTED_QUANTIFIERS) {
400 ast_debug(3, "Regex pattern has too many quantified groups (max %d)\n",
402 return -1;
403 }
404
405 return 0;
406}
407
408/*! \brief Create and send broadcast event to all applications
409 *
410 * Uses the compiled regex cached in \a ctx for application filtering.
411 */
412static int send_broadcast_event(struct ast_channel *chan,
413 struct stasis_broadcast_ctx *ctx)
414{
416 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
418 struct ao2_iterator iter;
419 char *app_name;
420 const char *caller = NULL;
421 const char *called = NULL;
422
423 /* Get snapshot and caller/called info under a single channel lock */
424 ast_channel_lock(chan);
425 snapshot = ao2_bump(ast_channel_snapshot(chan));
426 caller = ast_strdupa(S_OR(ast_channel_caller(chan)->id.number.str, ""));
427 called = ast_strdupa(S_OR(ast_channel_exten(chan), ""));
428 ast_channel_unlock(chan);
429
430 /* Build the broadcast event. Channel variables configured in
431 * ari.conf "channelvars" are already included in the channel
432 * snapshot produced by ast_channel_snapshot_to_json(). */
433 event = ast_json_pack("{s: s, s: o, s: o, s: s?, s: s?}",
434 "type", "CallBroadcast",
435 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
436 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
437 "caller", caller,
438 "called", called);
439
440 if (!event) {
441 ast_log(LOG_ERROR, "Channel %s: failed to create broadcast event\n",
442 ast_channel_name(chan));
443 return -1;
444 }
445
446 /* Get all registered applications */
448 if (!apps) {
449 ast_log(LOG_ERROR, "Channel %s: failed to get stasis applications\n",
450 ast_channel_name(chan));
451 return -1;
452 }
453
454 ast_debug(2, "Broadcasting to %d registered Stasis applications\n",
456
457 /*
458 * Broadcast to all matching applications.
459 *
460 * We collect matching apps into a plain array, Fisher-Yates shuffle it,
461 * then call stasis_app_send() for each. stasis_app_send() writes
462 * directly to each app's WebSocket socket synchronously on the calling
463 * thread. The shuffle ensures no single ARI application is consistently
464 * first to receive the event — every app gets a fair chance to claim the
465 * channel regardless of its position in the ao2 hash container.
466 */
467 {
468 int app_count;
469 char **matching_arr;
470 int n = 0;
471 int i;
472
473 app_count = ao2_container_count(apps);
474 if (app_count == 0) {
475 ast_debug(2, "Channel %s: no Stasis applications registered\n",
477 return 0;
478 }
479
480 matching_arr = ast_malloc(app_count * sizeof(*matching_arr));
481 if (!matching_arr) {
482 ast_log(LOG_ERROR, "Channel %s: failed to allocate matching apps array\n",
483 ast_channel_name(chan));
484 return -1;
485 }
486
487 /* First pass: collect all matching app names (transfer refs to array) */
488 iter = ao2_iterator_init(apps, 0);
489 while ((app_name = ao2_iterator_next(&iter)) && n < app_count) {
490 if (ctx->filter_compiled &&
491 regexec(&ctx->compiled_filter, app_name, 0, NULL, 0) == REG_NOMATCH) {
492 ast_debug(3, "App '%s' does not match filter, skipping\n", app_name);
493 ao2_ref(app_name, -1);
494 continue;
495 }
496 matching_arr[n++] = app_name; /* ref transferred to array */
497 }
499
500 ast_debug(2, "Broadcasting channel %s to %d matching applications\n",
501 ast_channel_uniqueid(chan), n);
502
503 /* Fisher-Yates shuffle: randomise delivery order so no app is
504 * consistently first to receive the broadcast event. */
505 for (i = n - 1; i > 0; i--) {
506 int j = ast_random() % (i + 1);
507 char *tmp = matching_arr[i];
508 matching_arr[i] = matching_arr[j];
509 matching_arr[j] = tmp;
510 }
511
512 /*
513 * Second pass: send to each matching app. A deep copy of the event
514 * is required for each call because stasis_app_send() mutates the
515 * message in-place (adds "asterisk_id" via ast_json_object_set).
516 */
517 for (i = 0; i < n; i++) {
518 char *match_name = matching_arr[i];
519 struct ast_json *event_copy;
520
521 ast_debug(3, "Sending broadcast to app '%s'\n", match_name);
522
523 event_copy = ast_json_deep_copy(event);
524 if (!event_copy) {
526 "Channel %s: failed to deep-copy event for app '%s'\n",
527 ast_channel_uniqueid(chan), match_name);
528 ao2_ref(match_name, -1);
529 continue;
530 }
531
532 stasis_app_send(match_name, event_copy);
533 ast_json_unref(event_copy);
534 ao2_ref(match_name, -1);
535 }
536
537 ast_free(matching_arr);
538 }
539
540 return 0;
541}
542
543/*!
544 * \brief Start a broadcast for a channel
545 * \param chan The channel to broadcast
546 * \param timeout_ms Timeout in milliseconds
547 * \param app_filter Optional regex filter for applications
548 * \return 0 on success, -1 on error
549 */
551 const char *app_filter, unsigned int flags)
552{
554 struct ast_datastore *datastore;
555
556 if (!chan) {
557 return -1;
558 }
559
560 if (!broadcast_contexts) {
561 return -1;
562 }
563
564 /* Remove any previous broadcast datastore from a prior attempt.
565 * This supports failover scenarios where StasisBroadcast() is
566 * called multiple times for the same channel. The datastore
567 * destructor unlinks the old context from the container. */
568 {
569 struct ast_datastore *old_ds;
570 ast_channel_lock(chan);
572 if (old_ds) {
573 ast_channel_datastore_remove(chan, old_ds);
574 }
575 ast_channel_unlock(chan);
576 if (old_ds) {
577 ast_datastore_free(old_ds);
578 }
579 }
580
581 /* Create broadcast context (validates and compiles app_filter regex) */
582 ctx = broadcast_ctx_create(ast_channel_uniqueid(chan), app_filter, flags);
583 if (!ctx) {
584 ast_log(LOG_ERROR, "Channel %s: failed to create broadcast context\n",
586 return -1;
587 }
588
589 /* Store context in container */
591
592 /* Create and attach datastore to channel */
594 if (!datastore) {
595 ast_log(LOG_ERROR, "Channel %s: failed to allocate broadcast datastore\n",
598 return -1;
599 }
600
601 datastore->data = ao2_bump(ctx);
602 ast_channel_lock(chan);
603 if (ast_channel_datastore_add(chan, datastore)) {
604 ast_channel_unlock(chan);
605 ast_log(LOG_ERROR, "Channel %s: failed to attach broadcast datastore\n",
607 ast_datastore_free(datastore);
609 return -1;
610 }
611 ast_channel_unlock(chan);
612
613 ast_debug(1, "Starting broadcast for channel %s (timeout: %dms, filter: %s)\n",
614 ast_channel_uniqueid(chan), timeout_ms, app_filter ? app_filter : "none");
615
616 /* Send broadcast event to all matching applications */
617 if (send_broadcast_event(chan, ctx) != 0) {
618 ast_log(LOG_ERROR, "Channel %s: failed to send broadcast event\n",
620 ast_channel_lock(chan);
621 ast_channel_datastore_remove(chan, datastore);
622 ast_channel_unlock(chan);
623 ast_datastore_free(datastore);
625 return -1;
626 }
627
628 return 0;
629}
630
631/*!
632 * \brief Attempt to claim a broadcast channel
633 * \param channel_id The unique ID of the channel
634 * \param app_name The name of the application claiming the channel
635 * \return 0 if claim successful, -1 if channel not found, -2 if already claimed
636 */
637int AST_OPTIONAL_API_NAME(stasis_app_claim_channel)(const char *channel_id, const char *app_name)
638{
640
641 if (ast_strlen_zero(channel_id) || ast_strlen_zero(app_name)) {
642 return -1;
643 }
644
645 if (!broadcast_contexts) {
646 return -1;
647 }
648
649 /* Find broadcast context */
650 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
651 if (!ctx) {
652 ast_debug(1, "No broadcast context found for channel %s\n", channel_id);
653 return -1;
654 }
655
656 /* Atomically check and set claimed flag.
657 * Check claimed before finished: if the channel was claimed and then the
658 * broadcast finished, a late claim should return -2 (409 Conflict) rather
659 * than -1 (404) so callers can distinguish "already taken" from "not found". */
660 ao2_lock(ctx);
661 if (ctx->claimed) {
662 ast_debug(1, "Channel %s already claimed by %s (attempt by %s denied)\n",
663 channel_id, ctx->winner_app ? ctx->winner_app : "(unknown)", app_name);
664 ao2_unlock(ctx);
665 return -2;
666 }
667 if (ctx->finished) {
668 ast_debug(1, "Channel %s broadcast already finished (late claim by %s rejected)\n",
669 channel_id, app_name);
670 ao2_unlock(ctx);
671 return -1;
672 }
673 ctx->winner_app = ast_strdup(app_name);
674 if (!ctx->winner_app) {
676 "Failed to allocate winner app name for channel %s\n",
677 channel_id);
678 ao2_unlock(ctx);
679 return -1;
680 }
681 ctx->claimed = 1;
682 ast_verb(3, "Channel %s claimed by application %s\n",
683 channel_id, app_name);
684 /* Signal waiting thread that channel was claimed */
685 ast_cond_signal(&ctx->cond);
686 ao2_unlock(ctx);
687
688 /* Send CallClaimed event to matching apps */
689 if (!(ctx->flags & STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED)) {
690 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
693 struct ao2_iterator iter;
694 char *app_name_iter;
695
696 snapshot = ast_channel_snapshot_get_latest(channel_id);
697 if (snapshot) {
698 event = ast_json_pack("{s: s, s: o, s: o, s: s}",
699 "type", "CallClaimed",
700 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
701 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
702 "winner_app", app_name);
703 }
704 if (event) {
706 }
707 if (apps) {
708 iter = ao2_iterator_init(apps, 0);
709 while ((app_name_iter = ao2_iterator_next(&iter))) {
710 struct ast_json *event_copy;
711
712 /* Only send to apps that matched the original broadcast filter */
713 if (ctx->filter_compiled &&
714 regexec(&ctx->compiled_filter, app_name_iter,
715 0, NULL, 0) == REG_NOMATCH) {
716 ao2_ref(app_name_iter, -1);
717 continue;
718 }
719
720 event_copy = ast_json_deep_copy(event);
721 if (!event_copy) {
722 ao2_ref(app_name_iter, -1);
723 continue;
724 }
725
726 stasis_app_send(app_name_iter, event_copy);
727 ast_json_unref(event_copy);
728 ao2_ref(app_name_iter, -1);
729 }
731 }
732 }
733
734 return 0;
735}
736
737/*!
738 * \brief Get the winner app name for a broadcast channel
739 * \param channel_id The unique ID of the channel
740 * \return A copy of the winner app name (caller must free with ast_free),
741 * or NULL if not claimed or not found
742 */
744{
746 char *winner = NULL;
747
748 if (ast_strlen_zero(channel_id)) {
749 return NULL;
750 }
751
752 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
753 if (!ctx) {
754 return NULL;
755 }
756
757 ao2_lock(ctx);
758 if (ctx->claimed) {
759 winner = ast_strdup(ctx->winner_app);
760 }
761 /* Mark the broadcast as finished so no new claims can succeed.
762 * This closes the race window between reading the winner and
763 * the subsequent broadcast_cleanup call. */
764 ctx->finished = 1;
765 ao2_unlock(ctx);
766
767 return winner;
768}
769
770/*!
771 * \brief Wait for a broadcast channel to be claimed
772 *
773 * Blocks until the channel is claimed, the timeout expires, or the
774 * channel hangs up. The hangup check runs every
775 * #BROADCAST_POLL_INTERVAL_MS so that a dead channel does not tie up
776 * a PBX thread for the full timeout period.
777 *
778 * \param chan The channel
779 * \param timeout_ms Maximum time to wait in milliseconds
780 * \return 0 if claimed within timeout, -1 otherwise
781 */
783{
785 const char *channel_id;
786 struct timeval deadline;
787 int result = -1;
788
789 if (!chan) {
790 return -1;
791 }
792
793 channel_id = ast_channel_uniqueid(chan);
794
795 if (!broadcast_contexts) {
796 return -1;
797 }
798
799 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
800 if (!ctx) {
801 ast_log(LOG_WARNING, "No broadcast context for channel %s\n", channel_id);
802 return -1;
803 }
804
805 /* Cap excessive timeouts to prevent arithmetic overflow */
806 if (timeout_ms < 0) {
807 timeout_ms = 0;
808 } else if (timeout_ms > MAX_BROADCAST_TIMEOUT_MS) {
809 timeout_ms = MAX_BROADCAST_TIMEOUT_MS;
810 }
811
812 /* Calculate absolute deadline */
813 deadline = ast_tvadd(ast_tvnow(),
814 ast_tv(timeout_ms / 1000, (timeout_ms % 1000) * 1000));
815
816 ao2_lock(ctx);
817 while (!ctx->claimed) {
818 struct timeval now;
819 struct timespec poll_spec;
820 long remaining_ms;
821 long poll_ms;
822 int wait_result;
823
824 /* Check for hangup so we don't block on a dead channel */
825 if (ast_check_hangup(chan)) {
826 ast_debug(3, "Channel %s hung up during broadcast wait\n",
827 channel_id);
828 break;
829 }
830
831 /* Check if we've passed the overall deadline */
832 now = ast_tvnow();
833 remaining_ms = ast_tvdiff_ms(deadline, now);
834 if (remaining_ms <= 0) {
835 ast_debug(3, "Broadcast timeout for channel %s after %dms\n",
836 channel_id, timeout_ms);
837 break;
838 }
839
840 /* Sleep for the shorter of the remaining time and the poll interval */
841 poll_ms = remaining_ms;
842 if (poll_ms > BROADCAST_POLL_INTERVAL_MS) {
844 }
845
846 poll_spec.tv_sec = now.tv_sec + (poll_ms / 1000);
847 poll_spec.tv_nsec = (long)(now.tv_usec) * 1000L
848 + (long)(poll_ms % 1000) * 1000000L;
849 while (poll_spec.tv_nsec >= 1000000000) {
850 poll_spec.tv_sec++;
851 poll_spec.tv_nsec -= 1000000000;
852 }
853
854 wait_result = ast_cond_timedwait(&ctx->cond, ao2_object_get_lockaddr(ctx), &poll_spec);
855 if (wait_result != 0 && wait_result != ETIMEDOUT) {
857 "Channel %s: unexpected error waiting for claim: %s (%d)\n",
858 channel_id, strerror(wait_result), wait_result);
859 break;
860 }
861 /* Loop back: re-check claimed, then hangup, then deadline */
862 }
863
864 if (ctx->claimed) {
865 ast_debug(1, "Channel %s claimed by %s\n",
866 channel_id, ctx->winner_app);
867 result = 0;
868 }
869 ao2_unlock(ctx);
870
871 return result;
872}
873
874/*!
875 * \brief Clean up broadcast context for a channel
876 *
877 * This is the normal-path cleanup called by the dialplan application
878 * after the broadcast completes. The channel datastore destructor
879 * (broadcast_datastore_destroy) also unlinks the context as a safety
880 * net for abnormal teardown; ao2_unlink is idempotent so the double
881 * call is harmless.
882 *
883 * \param channel_id The unique ID of the channel
884 */
886{
888
889 if (ast_strlen_zero(channel_id) || !broadcast_contexts) {
890 return;
891 }
892
894 if (ctx) {
895 ast_debug(3, "Cleaning up broadcast context for %s\n", channel_id);
896 }
897}
898
899static int load_module(void)
900{
902 BROADCAST_BUCKETS, stasis_broadcast_ctx_hash_fn, NULL, stasis_broadcast_ctx_cmp_fn);
903
904 if (!broadcast_contexts) {
906 }
907
908 ast_debug(1, "Stasis broadcast module loaded\n");
910}
911
912static int unload_module(void)
913{
914 /* NULL the global pointer before releasing the reference so that
915 * concurrent lookups see NULL (safe) rather than a freed pointer. */
916 {
917 struct ao2_container *old_contexts = broadcast_contexts;
919 ao2_cleanup(old_contexts);
920 }
921
922 ast_debug(1, "Stasis broadcast module unloaded\n");
923 return 0;
924}
925
927 "Stasis application broadcast",
928 .support_level = AST_MODULE_SUPPORT_EXTENDED,
929 .load = load_module,
930 .unload = unload_module,
931 .requires = "res_stasis,res_ari,http",
932 .load_pri = AST_MODPRI_APP_DEPEND - 1,
char digit
Asterisk main include file. File version handling, generic pbx functions.
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define ast_log
Definition astobj2.c:42
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition astobj2.h:2032
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
static PGresult * result
Definition cel_pgsql.c:84
General Asterisk PBX channel definitions.
const char * ast_channel_name(const struct ast_channel *chan)
#define AST_MAX_PUBLIC_UNIQUEID
Definition channel.h:147
int ast_channel_datastore_add(struct ast_channel *chan, struct ast_datastore *datastore)
Add a datastore to a channel.
Definition channel.c:2376
int ast_channel_datastore_remove(struct ast_channel *chan, struct ast_datastore *datastore)
Remove a datastore from a channel.
Definition channel.c:2385
#define ast_channel_lock(chan)
Definition channel.h:2982
const char * ast_channel_uniqueid(const struct ast_channel *chan)
int ast_check_hangup(struct ast_channel *chan)
Check to see if a channel is needing hang up.
Definition channel.c:446
struct ast_party_caller * ast_channel_caller(struct ast_channel *chan)
const char * ast_channel_exten(const struct ast_channel *chan)
#define ast_channel_unlock(chan)
Definition channel.h:2983
struct ast_datastore * ast_channel_datastore_find(struct ast_channel *chan, const struct ast_datastore_info *info, const char *uid)
Find a datastore on a channel.
Definition channel.c:2390
#define ast_datastore_alloc(info, uid)
Definition datastore.h:85
int ast_datastore_free(struct ast_datastore *datastore)
Free a data store object.
Definition datastore.c:68
static char * wait_result(void)
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
struct ast_channel_snapshot * ast_channel_snapshot_get_latest(const char *uniqueid)
Obtain the latest ast_channel_snapshot from the Stasis Message Bus API cache. This is an ao2 object,...
Support for Private Asterisk HTTP Servers.
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define ast_verb(level,...)
#define LOG_WARNING
Asterisk JSON abstraction layer.
struct ast_json * ast_json_deep_copy(const struct ast_json *value)
Copy a JSON value, and its children.
Definition json.c:641
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
Asterisk locking-related definitions:
#define ast_cond_destroy(cond)
Definition lock.h:209
#define ast_cond_init(cond, attr)
Definition lock.h:208
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
pthread_cond_t ast_cond_t
Definition lock.h:185
#define ast_cond_signal(cond)
Definition lock.h:210
Asterisk module definitions.
@ AST_MODFLAG_LOAD_ORDER
Definition module.h:331
@ AST_MODFLAG_GLOBAL_SYMBOLS
Definition module.h:330
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition module.h:557
@ AST_MODPRI_APP_DEPEND
Definition module.h:342
@ AST_MODULE_SUPPORT_EXTENDED
Definition module.h:122
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
#define AST_OPTIONAL_API_NAME(name)
Expands to the name of the implementation function.
Core PBX routines and definitions.
const char * app_name(struct ast_app *app)
Definition pbx_app.c:475
#define MAX_GROUP_DEPTH
Maximum depth for regex group nesting.
void AST_OPTIONAL_API_NAME() stasis_app_broadcast_cleanup(const char *channel_id)
Clean up broadcast context for a channel.
int AST_OPTIONAL_API_NAME() stasis_app_broadcast_wait(struct ast_channel *chan, int timeout_ms)
Wait for a broadcast channel to be claimed.
static void broadcast_ctx_destructor(void *obj)
Destructor for broadcast context.
static int send_broadcast_event(struct ast_channel *chan, struct stasis_broadcast_ctx *ctx)
Create and send broadcast event to all applications.
#define ALTERNATION_DEPTH_THRESHOLD
Group depth threshold for alternation limits.
static int validate_regex_pattern(const char *pattern)
Validate a regex pattern for safety.
#define MAX_BROADCAST_TIMEOUT_MS
Maximum broadcast timeout in milliseconds (24 hours)
static struct ao2_container * broadcast_contexts
Container for all active broadcast contexts.
#define MAX_QUANTIFIER_BOUND
Maximum value for brace quantifier bounds {m,n}.
static void broadcast_datastore_destroy(void *data)
Destructor for broadcast datastore.
#define MAX_NESTED_QUANTIFIERS
Maximum number of nested quantifiers in regex.
static int load_module(void)
int AST_OPTIONAL_API_NAME() stasis_app_broadcast_channel(struct ast_channel *chan, int timeout_ms, const char *app_filter, unsigned int flags)
Start a broadcast for a channel.
#define BROADCAST_POLL_INTERVAL_MS
Interval in ms between hangup checks while waiting for a claim.
static const struct ast_datastore_info broadcast_datastore_info
Datastore information for broadcast context.
static int unload_module(void)
#define MAX_ALTERNATIONS
Maximum alternations allowed in deeply nested groups.
char *AST_OPTIONAL_API_NAME() stasis_app_broadcast_winner(const char *channel_id)
Get the winner app name for a broadcast channel.
static struct stasis_broadcast_ctx * broadcast_ctx_create(const char *channel_id, const char *app_filter, unsigned int flags)
Create a new broadcast context.
#define BROADCAST_BUCKETS
int AST_OPTIONAL_API_NAME() stasis_app_claim_channel(const char *channel_id, const char *app_name)
Attempt to claim a broadcast channel.
#define MAX_REGEX_LENGTH
Maximum length for app_filter regex pattern.
#define NULL
Definition resample.c:96
Stasis Application API. See Stasis Application API for detailed documentation.
int stasis_app_send(const char *app_name, struct ast_json *message)
Send a message to the given Stasis application.
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
Stasis Application Broadcast API.
#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED
Suppress CallClaimed event for this broadcast.
Backend API for implementing components of res_stasis.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
Registered applications container.
Definition pbx_app.c:69
Structure representing a snapshot of channel state.
Main Channel structure associated with a channel.
struct ast_flags flags
Structure for a data store type.
Definition datastore.h:31
const char * type
Definition datastore.h:32
Structure for a data store object.
Definition datastore.h:64
void * data
Definition datastore.h:66
Abstract JSON element (object, array, string, int, ...).
Number structure.
Broadcast context stored on channel.
char app_filter[MAX_REGEX_LENGTH+1]
struct ao2_container * container
char channel_id[AST_MAX_PUBLIC_UNIQUEID]
Time-related functions and macros.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition extconf.c:2280
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition time.h:235
Utility functions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
long int ast_random(void)
Definition utils.c:2346