Asterisk - The Open Source Telephony Project GIT-master-590b490
Loading...
Searching...
No Matches
Data Structures | Macros | Functions | Variables
res_stasis_broadcast.c File Reference

Stasis application broadcast resource. More...

#include "asterisk.h"
#include <errno.h>
#include <regex.h>
#include "asterisk/astobj2.h"
#include "asterisk/channel.h"
#include "asterisk/http.h"
#include "asterisk/json.h"
#include "asterisk/lock.h"
#include "asterisk/module.h"
#include "asterisk/pbx.h"
#include "asterisk/stasis_app.h"
#include "asterisk/stasis_app_impl.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/time.h"
#include "asterisk/utils.h"
#include "asterisk/stasis_app_broadcast.h"
Include dependency graph for res_stasis_broadcast.c:

Go to the source code of this file.

Data Structures

struct  stasis_broadcast_ctx
 Broadcast context stored on channel. More...
 

Macros

#define ALTERNATION_DEPTH_THRESHOLD   2
 Group depth threshold for alternation limits.
 
#define AST_API_MODULE   /* Mark this as the module providing the API */
 
#define BROADCAST_BUCKETS   37
 
#define BROADCAST_POLL_INTERVAL_MS   200
 Interval in ms between hangup checks while waiting for a claim.
 
#define MAX_ALTERNATIONS   20
 Maximum alternations allowed in deeply nested groups.
 
#define MAX_BROADCAST_TIMEOUT_MS   (24 * 60 * 60 * 1000)
 Maximum broadcast timeout in milliseconds (24 hours)
 
#define MAX_GROUP_DEPTH   10
 Maximum depth for regex group nesting.
 
#define MAX_NESTED_QUANTIFIERS   3
 Maximum number of nested quantifiers in regex.
 
#define MAX_QUANTIFIER_BOUND   100
 Maximum value for brace quantifier bounds {m,n}.
 
#define MAX_REGEX_LENGTH   256
 Maximum length for app_filter regex pattern.
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
static struct stasis_broadcast_ctxbroadcast_ctx_create (const char *channel_id, const char *app_filter, unsigned int flags)
 Create a new broadcast context.
 
static void broadcast_ctx_destructor (void *obj)
 Destructor for broadcast context.
 
static void broadcast_datastore_destroy (void *data)
 Destructor for broadcast datastore.
 
static int load_module (void)
 
static int send_broadcast_event (struct ast_channel *chan, struct stasis_broadcast_ctx *ctx)
 Create and send broadcast event to all applications.
 
int AST_OPTIONAL_API_NAME() stasis_app_broadcast_channel (struct ast_channel *chan, int timeout_ms, const char *app_filter, unsigned int flags)
 Start a broadcast for a channel.
 
void AST_OPTIONAL_API_NAME() stasis_app_broadcast_cleanup (const char *channel_id)
 Clean up broadcast context for a channel.
 
int AST_OPTIONAL_API_NAME() stasis_app_broadcast_wait (struct ast_channel *chan, int timeout_ms)
 Wait for a broadcast channel to be claimed.
 
char *AST_OPTIONAL_API_NAME() stasis_app_broadcast_winner (const char *channel_id)
 Get the winner app name for a broadcast channel.
 
int AST_OPTIONAL_API_NAME() stasis_app_claim_channel (const char *channel_id, const char *app_name)
 Attempt to claim a broadcast channel.
 
static int unload_module (void)
 
static int validate_regex_pattern (const char *pattern)
 Validate a regex pattern for safety.
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , .description = "Stasis application broadcast" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_EXTENDED, .load = load_module, .unload = unload_module, .requires = "res_stasis,res_ari,http", .load_pri = AST_MODPRI_APP_DEPEND - 1, }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct ao2_containerbroadcast_contexts
 Container for all active broadcast contexts.
 
static const struct ast_datastore_info broadcast_datastore_info
 Datastore information for broadcast context.
 

Detailed Description

Stasis application broadcast resource.

Author
Daniel Donoghue danie.nosp@m.l.do.nosp@m.noghu.nosp@m.e@au.nosp@m.rorai.nosp@m.nnov.nosp@m.ation.nosp@m..com

Definition in file res_stasis_broadcast.c.

Macro Definition Documentation

◆ ALTERNATION_DEPTH_THRESHOLD

#define ALTERNATION_DEPTH_THRESHOLD   2

Group depth threshold for alternation limits.

Definition at line 71 of file res_stasis_broadcast.c.

◆ AST_API_MODULE

#define AST_API_MODULE   /* Mark this as the module providing the API */

Definition at line 50 of file res_stasis_broadcast.c.

◆ BROADCAST_BUCKETS

#define BROADCAST_BUCKETS   37

Definition at line 53 of file res_stasis_broadcast.c.

◆ BROADCAST_POLL_INTERVAL_MS

#define BROADCAST_POLL_INTERVAL_MS   200

Interval in ms between hangup checks while waiting for a claim.

Definition at line 77 of file res_stasis_broadcast.c.

◆ MAX_ALTERNATIONS

#define MAX_ALTERNATIONS   20

Maximum alternations allowed in deeply nested groups.

Definition at line 68 of file res_stasis_broadcast.c.

◆ MAX_BROADCAST_TIMEOUT_MS

#define MAX_BROADCAST_TIMEOUT_MS   (24 * 60 * 60 * 1000)

Maximum broadcast timeout in milliseconds (24 hours)

Definition at line 74 of file res_stasis_broadcast.c.

◆ MAX_GROUP_DEPTH

#define MAX_GROUP_DEPTH   10

Maximum depth for regex group nesting.

Definition at line 59 of file res_stasis_broadcast.c.

◆ MAX_NESTED_QUANTIFIERS

#define MAX_NESTED_QUANTIFIERS   3

Maximum number of nested quantifiers in regex.

Definition at line 62 of file res_stasis_broadcast.c.

◆ MAX_QUANTIFIER_BOUND

#define MAX_QUANTIFIER_BOUND   100

Maximum value for brace quantifier bounds {m,n}.

Definition at line 65 of file res_stasis_broadcast.c.

◆ MAX_REGEX_LENGTH

#define MAX_REGEX_LENGTH   256

Maximum length for app_filter regex pattern.

Definition at line 56 of file res_stasis_broadcast.c.

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 933 of file res_stasis_broadcast.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 933 of file res_stasis_broadcast.c.

◆ AST_MODULE_SELF_SYM()

struct ast_module * AST_MODULE_SELF_SYM ( void  )

Definition at line 933 of file res_stasis_broadcast.c.

◆ broadcast_ctx_create()

static struct stasis_broadcast_ctx * broadcast_ctx_create ( const char *  channel_id,
const char *  app_filter,
unsigned int  flags 
)
static

Create a new broadcast context.

Validates and compiles the app_filter regex if provided. On regex failure the context is still created but broadcasts will be sent to all applications (i.e. no filtering).

Definition at line 151 of file res_stasis_broadcast.c.

153{
154 struct stasis_broadcast_ctx *ctx;
155
156 ctx = ao2_alloc(sizeof(*ctx), broadcast_ctx_destructor);
157 if (!ctx) {
158 return NULL;
159 }
160
161 /* ao2_alloc zeroes the struct; only set non-zero fields explicitly */
162 ast_copy_string(ctx->channel_id, channel_id, sizeof(ctx->channel_id));
163 ctx->flags = flags;
165 ast_cond_init(&ctx->cond, NULL);
166
167 /* Validate and compile app_filter regex if provided */
169 ast_copy_string(ctx->app_filter, app_filter, sizeof(ctx->app_filter));
172 "Channel %s: rejecting app_filter regex as potentially dangerous: %s\n",
174 } else if (regcomp(&ctx->compiled_filter, app_filter,
175 REG_EXTENDED | REG_NOSUB) != 0) {
177 "Channel %s: failed to compile app_filter regex '%s'\n",
179 } else {
180 ctx->filter_compiled = 1;
181 }
182
183 if (!ctx->filter_compiled) {
185 "Channel %s: proceeding without application filtering due to invalid regex\n",
186 channel_id);
187 }
188 }
189
190 ast_debug(1, "Created broadcast context for channel %s (filter: %s, flags: 0x%x)\n",
191 ctx->channel_id,
192 ctx->filter_compiled ? ctx->app_filter : "none",
193 ctx->flags);
194
195 return ctx;
196}
#define ast_log
Definition astobj2.c:42
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_WARNING
#define ast_cond_init(cond, attr)
Definition lock.h:208
static void broadcast_ctx_destructor(void *obj)
Destructor for broadcast context.
static int validate_regex_pattern(const char *pattern)
Validate a regex pattern for safety.
static struct ao2_container * broadcast_contexts
Container for all active broadcast contexts.
#define NULL
Definition resample.c:96
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition strings.h:425
Broadcast context stored on channel.
char app_filter[MAX_REGEX_LENGTH+1]
struct ao2_container * container
char channel_id[AST_MAX_PUBLIC_UNIQUEID]

References ao2_alloc, ao2_bump, stasis_broadcast_ctx::app_filter, ast_cond_init, ast_copy_string(), ast_debug, ast_log, ast_strlen_zero(), broadcast_contexts, broadcast_ctx_destructor(), stasis_broadcast_ctx::channel_id, stasis_broadcast_ctx::compiled_filter, stasis_broadcast_ctx::cond, stasis_broadcast_ctx::container, stasis_broadcast_ctx::filter_compiled, stasis_broadcast_ctx::flags, LOG_WARNING, NULL, and validate_regex_pattern().

Referenced by stasis_app_broadcast_channel().

◆ broadcast_ctx_destructor()

static void broadcast_ctx_destructor ( void *  obj)
static

Destructor for broadcast context.

Definition at line 132 of file res_stasis_broadcast.c.

133{
134 struct stasis_broadcast_ctx *ctx = obj;
135 ast_free(ctx->winner_app);
136 if (ctx->filter_compiled) {
137 regfree(&ctx->compiled_filter);
138 }
140 ast_cond_destroy(&ctx->cond);
141}
#define ast_free(a)
Definition astmm.h:180
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ast_cond_destroy(cond)
Definition lock.h:209

References ao2_cleanup, ast_cond_destroy, ast_free, stasis_broadcast_ctx::compiled_filter, stasis_broadcast_ctx::cond, stasis_broadcast_ctx::container, stasis_broadcast_ctx::filter_compiled, and stasis_broadcast_ctx::winner_app.

Referenced by broadcast_ctx_create().

◆ broadcast_datastore_destroy()

static void broadcast_datastore_destroy ( void *  data)
static

Destructor for broadcast datastore.

Called when the channel is destroyed. Ensures the broadcast context is unlinked from the global container even if the caller never reached stasis_app_broadcast_cleanup (e.g. abnormal channel teardown).

Definition at line 112 of file res_stasis_broadcast.c.

113{
114 struct stasis_broadcast_ctx *ctx = data;
115
116 if (ctx->container) {
117 ao2_unlink(ctx->container, ctx);
118 }
119 ao2_cleanup(ctx);
120}
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578

References ao2_cleanup, ao2_unlink, and stasis_broadcast_ctx::container.

◆ load_module()

static int load_module ( void  )
static

Definition at line 899 of file res_stasis_broadcast.c.

900{
902 BROADCAST_BUCKETS, stasis_broadcast_ctx_hash_fn, NULL, stasis_broadcast_ctx_cmp_fn);
903
904 if (!broadcast_contexts) {
906 }
907
908 ast_debug(1, "Stasis broadcast module loaded\n");
910}
@ AO2_ALLOC_OPT_LOCK_MUTEX
Definition astobj2.h:363
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
#define BROADCAST_BUCKETS

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ast_debug, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, BROADCAST_BUCKETS, broadcast_contexts, and NULL.

◆ send_broadcast_event()

static int send_broadcast_event ( struct ast_channel chan,
struct stasis_broadcast_ctx ctx 
)
static

Create and send broadcast event to all applications.

Uses the compiled regex cached in ctx for application filtering.

Definition at line 412 of file res_stasis_broadcast.c.

414{
416 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
418 struct ao2_iterator iter;
419 char *app_name;
420 const char *caller = NULL;
421 const char *called = NULL;
422
423 /* Get snapshot and caller/called info under a single channel lock */
424 ast_channel_lock(chan);
425 snapshot = ao2_bump(ast_channel_snapshot(chan));
426 caller = ast_strdupa(S_OR(ast_channel_caller(chan)->id.number.str, ""));
427 called = ast_strdupa(S_OR(ast_channel_exten(chan), ""));
428 ast_channel_unlock(chan);
429
430 /* Build the broadcast event. Channel variables configured in
431 * ari.conf "channelvars" are already included in the channel
432 * snapshot produced by ast_channel_snapshot_to_json(). */
433 event = ast_json_pack("{s: s, s: o, s: o, s: s?, s: s?}",
434 "type", "CallBroadcast",
435 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
436 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
437 "caller", caller,
438 "called", called);
439
440 if (!event) {
441 ast_log(LOG_ERROR, "Channel %s: failed to create broadcast event\n",
442 ast_channel_name(chan));
443 return -1;
444 }
445
446 /* Get all registered applications */
448 if (!apps) {
449 ast_log(LOG_ERROR, "Channel %s: failed to get stasis applications\n",
450 ast_channel_name(chan));
451 return -1;
452 }
453
454 ast_debug(2, "Broadcasting to %d registered Stasis applications\n",
456
457 /*
458 * Broadcast to all matching applications.
459 *
460 * We collect matching apps into a plain array, Fisher-Yates shuffle it,
461 * then call stasis_app_send() for each. stasis_app_send() writes
462 * directly to each app's WebSocket socket synchronously on the calling
463 * thread. The shuffle ensures no single ARI application is consistently
464 * first to receive the event — every app gets a fair chance to claim the
465 * channel regardless of its position in the ao2 hash container.
466 */
467 {
468 int app_count;
469 char **matching_arr;
470 int n = 0;
471 int i;
472
473 app_count = ao2_container_count(apps);
474 if (app_count == 0) {
475 ast_debug(2, "Channel %s: no Stasis applications registered\n",
477 return 0;
478 }
479
480 matching_arr = ast_malloc(app_count * sizeof(*matching_arr));
481 if (!matching_arr) {
482 ast_log(LOG_ERROR, "Channel %s: failed to allocate matching apps array\n",
483 ast_channel_name(chan));
484 return -1;
485 }
486
487 /* First pass: collect all matching app names (transfer refs to array) */
488 iter = ao2_iterator_init(apps, 0);
489 while ((app_name = ao2_iterator_next(&iter)) && n < app_count) {
490 if (ctx->filter_compiled &&
491 regexec(&ctx->compiled_filter, app_name, 0, NULL, 0) == REG_NOMATCH) {
492 ast_debug(3, "App '%s' does not match filter, skipping\n", app_name);
493 ao2_ref(app_name, -1);
494 continue;
495 }
496 matching_arr[n++] = app_name; /* ref transferred to array */
497 }
499
500 ast_debug(2, "Broadcasting channel %s to %d matching applications\n",
501 ast_channel_uniqueid(chan), n);
502
503 /* Fisher-Yates shuffle: randomise delivery order so no app is
504 * consistently first to receive the broadcast event. */
505 for (i = n - 1; i > 0; i--) {
506 int j = ast_random() % (i + 1);
507 char *tmp = matching_arr[i];
508 matching_arr[i] = matching_arr[j];
509 matching_arr[j] = tmp;
510 }
511
512 /*
513 * Second pass: send to each matching app. A deep copy of the event
514 * is required for each call because stasis_app_send() mutates the
515 * message in-place (adds "asterisk_id" via ast_json_object_set).
516 */
517 for (i = 0; i < n; i++) {
518 char *match_name = matching_arr[i];
519 struct ast_json *event_copy;
520
521 ast_debug(3, "Sending broadcast to app '%s'\n", match_name);
522
523 event_copy = ast_json_deep_copy(event);
524 if (!event_copy) {
526 "Channel %s: failed to deep-copy event for app '%s'\n",
527 ast_channel_uniqueid(chan), match_name);
528 ao2_ref(match_name, -1);
529 continue;
530 }
531
532 stasis_app_send(match_name, event_copy);
533 ast_json_unref(event_copy);
534 ao2_ref(match_name, -1);
535 }
536
537 ast_free(matching_arr);
538 }
539
540 return 0;
541}
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define ast_malloc(len)
A wrapper for malloc()
Definition astmm.h:191
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
const char * ast_channel_name(const struct ast_channel *chan)
#define ast_channel_lock(chan)
Definition channel.h:2982
const char * ast_channel_uniqueid(const struct ast_channel *chan)
struct ast_party_caller * ast_channel_caller(struct ast_channel *chan)
const char * ast_channel_exten(const struct ast_channel *chan)
#define ast_channel_unlock(chan)
Definition channel.h:2983
#define LOG_ERROR
struct ast_json * ast_json_deep_copy(const struct ast_json *value)
Copy a JSON value, and its children.
Definition json.c:641
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
const char * app_name(struct ast_app *app)
Definition pbx_app.c:475
int stasis_app_send(const char *app_name, struct ast_json *message)
Send a message to the given Stasis application.
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
Registered applications container.
Definition pbx_app.c:69
Structure representing a snapshot of channel state.
Abstract JSON element (object, array, string, int, ...).
Number structure.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
long int ast_random(void)
Definition utils.c:2346

References ao2_bump, ao2_cleanup, ao2_container_count(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, app_name(), ast_channel_caller(), ast_channel_exten(), ast_channel_lock, ast_channel_name(), ast_channel_snapshot_to_json(), ast_channel_uniqueid(), ast_channel_unlock, ast_debug, ast_free, ast_json_deep_copy(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_log, ast_malloc, ast_random(), ast_strdupa, ast_tvnow(), stasis_broadcast_ctx::compiled_filter, stasis_broadcast_ctx::filter_compiled, LOG_ERROR, NULL, RAII_VAR, S_OR, stasis_app_get_all(), and stasis_app_send().

Referenced by stasis_app_broadcast_channel().

◆ stasis_app_broadcast_channel()

int AST_OPTIONAL_API_NAME() stasis_app_broadcast_channel ( struct ast_channel chan,
int  timeout_ms,
const char *  app_filter,
unsigned int  flags 
)

Start a broadcast for a channel.

Parameters
chanThe channel to broadcast
timeout_msTimeout in milliseconds
app_filterOptional regex filter for applications
Returns
0 on success, -1 on error

Definition at line 550 of file res_stasis_broadcast.c.

552{
554 struct ast_datastore *datastore;
555
556 if (!chan) {
557 return -1;
558 }
559
560 if (!broadcast_contexts) {
561 return -1;
562 }
563
564 /* Remove any previous broadcast datastore from a prior attempt.
565 * This supports failover scenarios where StasisBroadcast() is
566 * called multiple times for the same channel. The datastore
567 * destructor unlinks the old context from the container. */
568 {
569 struct ast_datastore *old_ds;
570 ast_channel_lock(chan);
572 if (old_ds) {
573 ast_channel_datastore_remove(chan, old_ds);
574 }
575 ast_channel_unlock(chan);
576 if (old_ds) {
577 ast_datastore_free(old_ds);
578 }
579 }
580
581 /* Create broadcast context (validates and compiles app_filter regex) */
582 ctx = broadcast_ctx_create(ast_channel_uniqueid(chan), app_filter, flags);
583 if (!ctx) {
584 ast_log(LOG_ERROR, "Channel %s: failed to create broadcast context\n",
586 return -1;
587 }
588
589 /* Store context in container */
591
592 /* Create and attach datastore to channel */
594 if (!datastore) {
595 ast_log(LOG_ERROR, "Channel %s: failed to allocate broadcast datastore\n",
598 return -1;
599 }
600
601 datastore->data = ao2_bump(ctx);
602 ast_channel_lock(chan);
603 if (ast_channel_datastore_add(chan, datastore)) {
604 ast_channel_unlock(chan);
605 ast_log(LOG_ERROR, "Channel %s: failed to attach broadcast datastore\n",
607 ast_datastore_free(datastore);
609 return -1;
610 }
611 ast_channel_unlock(chan);
612
613 ast_debug(1, "Starting broadcast for channel %s (timeout: %dms, filter: %s)\n",
614 ast_channel_uniqueid(chan), timeout_ms, app_filter ? app_filter : "none");
615
616 /* Send broadcast event to all matching applications */
617 if (send_broadcast_event(chan, ctx) != 0) {
618 ast_log(LOG_ERROR, "Channel %s: failed to send broadcast event\n",
620 ast_channel_lock(chan);
621 ast_channel_datastore_remove(chan, datastore);
622 ast_channel_unlock(chan);
623 ast_datastore_free(datastore);
625 return -1;
626 }
627
628 return 0;
629}
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
int ast_channel_datastore_add(struct ast_channel *chan, struct ast_datastore *datastore)
Add a datastore to a channel.
Definition channel.c:2376
int ast_channel_datastore_remove(struct ast_channel *chan, struct ast_datastore *datastore)
Remove a datastore from a channel.
Definition channel.c:2385
struct ast_datastore * ast_channel_datastore_find(struct ast_channel *chan, const struct ast_datastore_info *info, const char *uid)
Find a datastore on a channel.
Definition channel.c:2390
#define ast_datastore_alloc(info, uid)
Definition datastore.h:85
int ast_datastore_free(struct ast_datastore *datastore)
Free a data store object.
Definition datastore.c:68
static int send_broadcast_event(struct ast_channel *chan, struct stasis_broadcast_ctx *ctx)
Create and send broadcast event to all applications.
static const struct ast_datastore_info broadcast_datastore_info
Datastore information for broadcast context.
static struct stasis_broadcast_ctx * broadcast_ctx_create(const char *channel_id, const char *app_filter, unsigned int flags)
Create a new broadcast context.
Structure for a data store object.
Definition datastore.h:64
void * data
Definition datastore.h:66

References ao2_bump, ao2_cleanup, ao2_link, ao2_unlink, ast_channel_datastore_add(), ast_channel_datastore_find(), ast_channel_datastore_remove(), ast_channel_lock, ast_channel_uniqueid(), ast_channel_unlock, ast_datastore_alloc, ast_datastore_free(), ast_debug, ast_log, broadcast_contexts, broadcast_ctx_create(), broadcast_datastore_info, ast_datastore::data, LOG_ERROR, NULL, RAII_VAR, and send_broadcast_event().

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_cleanup()

void AST_OPTIONAL_API_NAME() stasis_app_broadcast_cleanup ( const char *  channel_id)

Clean up broadcast context for a channel.

This is the normal-path cleanup called by the dialplan application after the broadcast completes. The channel datastore destructor (broadcast_datastore_destroy) also unlinks the context as a safety net for abnormal teardown; ao2_unlink is idempotent so the double call is harmless.

Parameters
channel_idThe unique ID of the channel

Definition at line 885 of file res_stasis_broadcast.c.

886{
888
889 if (ast_strlen_zero(channel_id) || !broadcast_contexts) {
890 return;
891 }
892
894 if (ctx) {
895 ast_debug(3, "Cleaning up broadcast context for %s\n", channel_id);
896 }
897}
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101

References ao2_cleanup, ao2_find, ast_debug, ast_strlen_zero(), broadcast_contexts, NULL, OBJ_SEARCH_KEY, OBJ_UNLINK, and RAII_VAR.

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_wait()

int AST_OPTIONAL_API_NAME() stasis_app_broadcast_wait ( struct ast_channel chan,
int  timeout_ms 
)

Wait for a broadcast channel to be claimed.

Blocks until the channel is claimed, the timeout expires, or the channel hangs up. The hangup check runs every BROADCAST_POLL_INTERVAL_MS so that a dead channel does not tie up a PBX thread for the full timeout period.

Parameters
chanThe channel
timeout_msMaximum time to wait in milliseconds
Returns
0 if claimed within timeout, -1 otherwise

Definition at line 782 of file res_stasis_broadcast.c.

783{
785 const char *channel_id;
786 struct timeval deadline;
787 int result = -1;
788
789 if (!chan) {
790 return -1;
791 }
792
793 channel_id = ast_channel_uniqueid(chan);
794
795 if (!broadcast_contexts) {
796 return -1;
797 }
798
799 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
800 if (!ctx) {
801 ast_log(LOG_WARNING, "No broadcast context for channel %s\n", channel_id);
802 return -1;
803 }
804
805 /* Cap excessive timeouts to prevent arithmetic overflow */
806 if (timeout_ms < 0) {
807 timeout_ms = 0;
808 } else if (timeout_ms > MAX_BROADCAST_TIMEOUT_MS) {
809 timeout_ms = MAX_BROADCAST_TIMEOUT_MS;
810 }
811
812 /* Calculate absolute deadline */
813 deadline = ast_tvadd(ast_tvnow(),
814 ast_tv(timeout_ms / 1000, (timeout_ms % 1000) * 1000));
815
816 ao2_lock(ctx);
817 while (!ctx->claimed) {
818 struct timeval now;
819 struct timespec poll_spec;
820 long remaining_ms;
821 long poll_ms;
822 int wait_result;
823
824 /* Check for hangup so we don't block on a dead channel */
825 if (ast_check_hangup(chan)) {
826 ast_debug(3, "Channel %s hung up during broadcast wait\n",
827 channel_id);
828 break;
829 }
830
831 /* Check if we've passed the overall deadline */
832 now = ast_tvnow();
833 remaining_ms = ast_tvdiff_ms(deadline, now);
834 if (remaining_ms <= 0) {
835 ast_debug(3, "Broadcast timeout for channel %s after %dms\n",
836 channel_id, timeout_ms);
837 break;
838 }
839
840 /* Sleep for the shorter of the remaining time and the poll interval */
841 poll_ms = remaining_ms;
842 if (poll_ms > BROADCAST_POLL_INTERVAL_MS) {
844 }
845
846 poll_spec.tv_sec = now.tv_sec + (poll_ms / 1000);
847 poll_spec.tv_nsec = (long)(now.tv_usec) * 1000L
848 + (long)(poll_ms % 1000) * 1000000L;
849 while (poll_spec.tv_nsec >= 1000000000) {
850 poll_spec.tv_sec++;
851 poll_spec.tv_nsec -= 1000000000;
852 }
853
854 wait_result = ast_cond_timedwait(&ctx->cond, ao2_object_get_lockaddr(ctx), &poll_spec);
855 if (wait_result != 0 && wait_result != ETIMEDOUT) {
857 "Channel %s: unexpected error waiting for claim: %s (%d)\n",
858 channel_id, strerror(wait_result), wait_result);
859 break;
860 }
861 /* Loop back: re-check claimed, then hangup, then deadline */
862 }
863
864 if (ctx->claimed) {
865 ast_debug(1, "Channel %s claimed by %s\n",
866 channel_id, ctx->winner_app);
867 result = 0;
868 }
869 ao2_unlock(ctx);
870
871 return result;
872}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
static PGresult * result
Definition cel_pgsql.c:84
int ast_check_hangup(struct ast_channel *chan)
Check to see if a channel is needing hang up.
Definition channel.c:446
static char * wait_result(void)
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define MAX_BROADCAST_TIMEOUT_MS
Maximum broadcast timeout in milliseconds (24 hours)
#define BROADCAST_POLL_INTERVAL_MS
Interval in ms between hangup checks while waiting for a claim.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition extconf.c:2280
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition time.h:235

References ao2_cleanup, ao2_find, ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_channel_uniqueid(), ast_check_hangup(), ast_cond_timedwait, ast_debug, ast_log, ast_tv(), ast_tvadd(), ast_tvdiff_ms(), ast_tvnow(), broadcast_contexts, BROADCAST_POLL_INTERVAL_MS, LOG_WARNING, MAX_BROADCAST_TIMEOUT_MS, NULL, OBJ_SEARCH_KEY, RAII_VAR, result, and wait_result().

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_winner()

char *AST_OPTIONAL_API_NAME() stasis_app_broadcast_winner ( const char *  channel_id)

Get the winner app name for a broadcast channel.

Parameters
channel_idThe unique ID of the channel
Returns
A copy of the winner app name (caller must free with ast_free), or NULL if not claimed or not found

Definition at line 743 of file res_stasis_broadcast.c.

744{
746 char *winner = NULL;
747
748 if (ast_strlen_zero(channel_id)) {
749 return NULL;
750 }
751
752 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
753 if (!ctx) {
754 return NULL;
755 }
756
757 ao2_lock(ctx);
758 if (ctx->claimed) {
759 winner = ast_strdup(ctx->winner_app);
760 }
761 /* Mark the broadcast as finished so no new claims can succeed.
762 * This closes the race window between reading the winner and
763 * the subsequent broadcast_cleanup call. */
764 ctx->finished = 1;
765 ao2_unlock(ctx);
766
767 return winner;
768}
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241

References ao2_cleanup, ao2_find, ao2_lock, ao2_unlock, ast_strdup, ast_strlen_zero(), broadcast_contexts, NULL, OBJ_SEARCH_KEY, and RAII_VAR.

Referenced by stasis_broadcast_exec().

◆ stasis_app_claim_channel()

int AST_OPTIONAL_API_NAME() stasis_app_claim_channel ( const char *  channel_id,
const char *  app_name 
)

Attempt to claim a broadcast channel.

Parameters
channel_idThe unique ID of the channel
app_nameThe name of the application claiming the channel
Returns
0 if claim successful, -1 if channel not found, -2 if already claimed

Definition at line 637 of file res_stasis_broadcast.c.

638{
640
641 if (ast_strlen_zero(channel_id) || ast_strlen_zero(app_name)) {
642 return -1;
643 }
644
645 if (!broadcast_contexts) {
646 return -1;
647 }
648
649 /* Find broadcast context */
650 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
651 if (!ctx) {
652 ast_debug(1, "No broadcast context found for channel %s\n", channel_id);
653 return -1;
654 }
655
656 /* Atomically check and set claimed flag.
657 * Check claimed before finished: if the channel was claimed and then the
658 * broadcast finished, a late claim should return -2 (409 Conflict) rather
659 * than -1 (404) so callers can distinguish "already taken" from "not found". */
660 ao2_lock(ctx);
661 if (ctx->claimed) {
662 ast_debug(1, "Channel %s already claimed by %s (attempt by %s denied)\n",
663 channel_id, ctx->winner_app ? ctx->winner_app : "(unknown)", app_name);
664 ao2_unlock(ctx);
665 return -2;
666 }
667 if (ctx->finished) {
668 ast_debug(1, "Channel %s broadcast already finished (late claim by %s rejected)\n",
669 channel_id, app_name);
670 ao2_unlock(ctx);
671 return -1;
672 }
673 ctx->winner_app = ast_strdup(app_name);
674 if (!ctx->winner_app) {
676 "Failed to allocate winner app name for channel %s\n",
677 channel_id);
678 ao2_unlock(ctx);
679 return -1;
680 }
681 ctx->claimed = 1;
682 ast_verb(3, "Channel %s claimed by application %s\n",
683 channel_id, app_name);
684 /* Signal waiting thread that channel was claimed */
685 ast_cond_signal(&ctx->cond);
686 ao2_unlock(ctx);
687
688 /* Send CallClaimed event to matching apps */
689 if (!(ctx->flags & STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED)) {
690 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
693 struct ao2_iterator iter;
694 char *app_name_iter;
695
696 snapshot = ast_channel_snapshot_get_latest(channel_id);
697 if (snapshot) {
698 event = ast_json_pack("{s: s, s: o, s: o, s: s}",
699 "type", "CallClaimed",
700 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
701 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
702 "winner_app", app_name);
703 }
704 if (event) {
706 }
707 if (apps) {
708 iter = ao2_iterator_init(apps, 0);
709 while ((app_name_iter = ao2_iterator_next(&iter))) {
710 struct ast_json *event_copy;
711
712 /* Only send to apps that matched the original broadcast filter */
713 if (ctx->filter_compiled &&
714 regexec(&ctx->compiled_filter, app_name_iter,
715 0, NULL, 0) == REG_NOMATCH) {
716 ao2_ref(app_name_iter, -1);
717 continue;
718 }
719
720 event_copy = ast_json_deep_copy(event);
721 if (!event_copy) {
722 ao2_ref(app_name_iter, -1);
723 continue;
724 }
725
726 stasis_app_send(app_name_iter, event_copy);
727 ast_json_unref(event_copy);
728 ao2_ref(app_name_iter, -1);
729 }
731 }
732 }
733
734 return 0;
735}
struct ast_channel_snapshot * ast_channel_snapshot_get_latest(const char *uniqueid)
Obtain the latest ast_channel_snapshot from the Stasis Message Bus API cache. This is an ao2 object,...
#define ast_verb(level,...)
#define ast_cond_signal(cond)
Definition lock.h:210
#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED
Suppress CallClaimed event for this broadcast.

References ao2_cleanup, ao2_find, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_lock, ao2_ref, ao2_unlock, app_name(), ast_channel_snapshot_get_latest(), ast_channel_snapshot_to_json(), ast_cond_signal, ast_debug, ast_json_deep_copy(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_log, ast_strdup, ast_strlen_zero(), ast_tvnow(), ast_verb, broadcast_contexts, LOG_ERROR, NULL, OBJ_SEARCH_KEY, RAII_VAR, stasis_app_get_all(), stasis_app_send(), and STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED.

Referenced by ast_ari_events_claim_channel().

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 912 of file res_stasis_broadcast.c.

913{
914 /* NULL the global pointer before releasing the reference so that
915 * concurrent lookups see NULL (safe) rather than a freed pointer. */
916 {
917 struct ao2_container *old_contexts = broadcast_contexts;
919 ao2_cleanup(old_contexts);
920 }
921
922 ast_debug(1, "Stasis broadcast module unloaded\n");
923 return 0;
924}

References ao2_cleanup, ast_debug, broadcast_contexts, and NULL.

◆ validate_regex_pattern()

static int validate_regex_pattern ( const char *  pattern)
static

Validate a regex pattern for safety.

Checks that the regex pattern is within length limits and doesn't contain patterns that could cause excessive backtracking or denial of service.

Parameters
patternThe regex pattern to validate
Returns
0 if valid, -1 if invalid

Definition at line 207 of file res_stasis_broadcast.c.

208{
209 size_t len;
210 int group_depth = 0;
211 int quantified_groups = 0;
212 int in_class = 0; /* Inside [...] */
213 /* Track alternations per group depth. Index 0 is outside groups and unused. */
214 int alternations_per_depth[MAX_GROUP_DEPTH + 1] = { 0 };
215 const char *p;
216
217 if (ast_strlen_zero(pattern)) {
218 return 0; /* Empty pattern is valid (will be skipped) */
219 }
220
221 /* Check maximum length to prevent excessive regex compilation time */
222 len = strlen(pattern);
223 if (len > MAX_REGEX_LENGTH) {
224 ast_debug(3, "Regex pattern exceeds maximum length of %d characters (got %zu)\n",
226 return -1;
227 }
228
229 /*
230 * Check for potentially dangerous patterns that could cause
231 * excessive regex compilation or matching time. Look for:
232 * - Excessive group nesting depth
233 * - Too many quantified groups (groups followed by +, *, or ?)
234 *
235 * Note: This is a heuristic approach that catches common dangerous
236 * patterns. Combined with the length limit, it provides reasonable
237 * protection against ReDoS while allowing legitimate regex usage.
238 */
239 for (p = pattern; *p; p++) {
240 /* Handle character classes: enter on unescaped '[' and exit on unescaped ']' */
241 if (!in_class && *p == '[' && (p == pattern || *(p - 1) != '\\')) {
242 in_class = 1;
243 /* In POSIX ERE, ']' immediately after '[' or '[^' is a
244 * literal, not the end of the class. Advance past the
245 * optional negation caret and the literal ']' so the
246 * main loop does not leave in_class prematurely. */
247 if (*(p + 1) == '^') {
248 p++;
249 }
250 if (*(p + 1) == ']') {
251 p++;
252 }
253 continue;
254 } else if (in_class) {
255 if (*p == '\\') {
256 /* Skip the next escaped character inside character class */
257 if (*(p + 1)) {
258 p++;
259 }
260 continue;
261 }
262 if (*p == ']') {
263 in_class = 0;
264 }
265 /* Ignore everything inside character classes for heuristics */
266 continue;
267 }
268 switch (*p) {
269 case '(':
270 group_depth++;
271 if (group_depth > MAX_GROUP_DEPTH) {
272 ast_debug(3, "Regex pattern has too many nested groups (max %d)\n",
274 return -1;
275 }
276 /* Reset alternation counter for newly entered group depth */
277 alternations_per_depth[group_depth] = 0;
278 break;
279 case ')':
280 if (group_depth > 0) {
281 /* Clear alternations count for this depth before leaving */
282 alternations_per_depth[group_depth] = 0;
283 group_depth--;
284 }
285 break;
286 case '+':
287 case '*':
288 case '?':
289 /*
290 * Count quantified groups - patterns like (...)+ or (...)*
291 * Too many of these can cause slow matching on certain inputs.
292 */
293 if (p > pattern && *(p - 1) == ')') {
294 quantified_groups++;
295 }
296 break;
297 case '{': {
298 /* Parse POSIX quantifier {m}, {m,}, {m,n} with overflow and bound checks */
299 const char *q = p + 1;
300 long m = 0, n = -1; /* n=-1 means open upper bound */
301 int valid = 0;
302 int digit;
303 int overflow = 0;
304
305 if (*q >= '0' && *q <= '9') {
306 /* Parse m safely */
307 while (*q >= '0' && *q <= '9') {
308 digit = (*q - '0');
309 if (m > (LONG_MAX - digit) / 10) { /* overflow on next step */
310 overflow = 1;
311 break;
312 }
313 m = (m * 10) + digit;
314 if (m > MAX_QUANTIFIER_BOUND) { /* early bound exceed */
315 overflow = 1;
316 break;
317 }
318 q++;
319 }
320 if (!overflow && *q == ',') {
321 q++;
322 if (*q >= '0' && *q <= '9') {
323 long nn = 0;
324 while (*q >= '0' && *q <= '9') {
325 digit = (*q - '0');
326 if (nn > (LONG_MAX - digit) / 10) {
327 overflow = 1;
328 break;
329 }
330 nn = (nn * 10) + digit;
331 if (nn > MAX_QUANTIFIER_BOUND) {
332 overflow = 1;
333 break;
334 }
335 q++;
336 }
337 n = nn;
338 } else {
339 n = -1; /* open upper bound */
340 }
341 } else if (!overflow) {
342 n = m; /* {m} */
343 }
344 if (!overflow && *q == '}') {
345 valid = 1;
346 }
347 }
348 if (overflow) {
349 ast_debug(3, "Regex quantifier overflow or exceeds max bound (max %d)\n", MAX_QUANTIFIER_BOUND);
350 return -1;
351 }
352 if (valid) {
353 /* Additional bounds check (defensive) */
354 if (m > MAX_QUANTIFIER_BOUND || (n != -1 && n > MAX_QUANTIFIER_BOUND)) {
355 ast_debug(3, "Regex quantifier bounds too large (max %d)\n", MAX_QUANTIFIER_BOUND);
356 return -1;
357 }
358 if (p > pattern && *(p - 1) == ')') {
359 quantified_groups++;
360 }
361 p = q; /* q currently points to '}' */
362 }
363 break;
364 }
365 case '|':
366 if (group_depth > 0) {
367 alternations_per_depth[group_depth]++;
368 if (group_depth > ALTERNATION_DEPTH_THRESHOLD &&
369 alternations_per_depth[group_depth] > MAX_ALTERNATIONS) {
370 ast_debug(3,
371 "Regex has too many alternations in deep group (depth %d, count %d, max %d)\n",
372 group_depth,
373 alternations_per_depth[group_depth],
375 return -1;
376 }
377 }
378 break;
379 case '\\':
380 /*
381 * Skip the next character entirely from heuristic processing.
382 * This ensures escaped characters (metacharacters in BRE or literals
383 * in ERE like \‍(, \‍), \+, \*, \?, etc.) do not affect group depth
384 * or quantified group counts.
385 */
386 if (*(p + 1)) {
387 p++;
388 }
389 /* Continue to next loop iteration without evaluating the escaped char */
390 continue;
391 }
392 }
393
394 /*
395 * Reject patterns with too many quantified groups, as these are
396 * often indicators of potentially slow patterns that could be
397 * exploited for denial of service.
398 */
399 if (quantified_groups > MAX_NESTED_QUANTIFIERS) {
400 ast_debug(3, "Regex pattern has too many quantified groups (max %d)\n",
402 return -1;
403 }
404
405 return 0;
406}
char digit
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
#define MAX_GROUP_DEPTH
Maximum depth for regex group nesting.
#define ALTERNATION_DEPTH_THRESHOLD
Group depth threshold for alternation limits.
#define MAX_QUANTIFIER_BOUND
Maximum value for brace quantifier bounds {m,n}.
#define MAX_NESTED_QUANTIFIERS
Maximum number of nested quantifiers in regex.
#define MAX_ALTERNATIONS
Maximum alternations allowed in deeply nested groups.
#define MAX_REGEX_LENGTH
Maximum length for app_filter regex pattern.

References ALTERNATION_DEPTH_THRESHOLD, ast_debug, ast_strlen_zero(), digit, len(), MAX_ALTERNATIONS, MAX_GROUP_DEPTH, MAX_NESTED_QUANTIFIERS, MAX_QUANTIFIER_BOUND, and MAX_REGEX_LENGTH.

Referenced by broadcast_ctx_create().

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , .description = "Stasis application broadcast" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_EXTENDED, .load = load_module, .unload = unload_module, .requires = "res_stasis,res_ari,http", .load_pri = AST_MODPRI_APP_DEPEND - 1, }
static

Definition at line 933 of file res_stasis_broadcast.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 933 of file res_stasis_broadcast.c.

◆ broadcast_contexts

struct ao2_container* broadcast_contexts
static

◆ broadcast_datastore_info

const struct ast_datastore_info broadcast_datastore_info
static
Initial value:
= {
.type = "stasis_broadcast_context",
}
static void broadcast_datastore_destroy(void *data)
Destructor for broadcast datastore.

Datastore information for broadcast context.

Definition at line 123 of file res_stasis_broadcast.c.

123 {
124 .type = "stasis_broadcast_context",
126};

Referenced by stasis_app_broadcast_channel().