Asterisk - The Open Source Telephony Project GIT-master-590b490
Loading...
Searching...
No Matches
Macros | Functions
stasis_app_broadcast.h File Reference

Stasis Application Broadcast API. More...

#include "asterisk/channel.h"
#include "asterisk/optional_api.h"
Include dependency graph for stasis_app_broadcast.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED   (1 << 0)
 Suppress CallClaimed event for this broadcast.
 

Functions

int AST_OPTIONAL_API_NAME() stasis_app_broadcast_channel (struct ast_channel *chan, int timeout_ms, const char *app_filter, unsigned int flags)
 Start a broadcast for a channel.
 
void AST_OPTIONAL_API_NAME() stasis_app_broadcast_cleanup (const char *channel_id)
 Clean up broadcast context for a channel.
 
int AST_OPTIONAL_API_NAME() stasis_app_broadcast_wait (struct ast_channel *chan, int timeout_ms)
 Wait for a broadcast channel to be claimed.
 
char *AST_OPTIONAL_API_NAME() stasis_app_broadcast_winner (const char *channel_id)
 Get the winner app name for a broadcast channel.
 
int AST_OPTIONAL_API_NAME() stasis_app_claim_channel (const char *channel_id, const char *app_name)
 Attempt to claim a broadcast channel.
 

Detailed Description

Stasis Application Broadcast API.

Author
Daniel Donoghue danie.nosp@m.l.do.nosp@m.noghu.nosp@m.e@au.nosp@m.rorai.nosp@m.nnov.nosp@m.ation.nosp@m..com

This module provides the infrastructure for broadcasting incoming channels to multiple ARI applications and handling first-claim winner logic.

Definition in file stasis_app_broadcast.h.

Macro Definition Documentation

◆ STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED

#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED   (1 << 0)

Suppress CallClaimed event for this broadcast.

Definition at line 36 of file stasis_app_broadcast.h.

Function Documentation

◆ stasis_app_broadcast_channel()

int AST_OPTIONAL_API_NAME() stasis_app_broadcast_channel ( struct ast_channel chan,
int  timeout_ms,
const char *  app_filter,
unsigned int  flags 
)

Start a broadcast for a channel.

Since
20

Broadcasts a channel to all ARI applications (or filtered applications) allowing them to claim the channel. Only the first claim will succeed.

When a channel is claimed, a CallClaimed event is sent only to applications that matched the app_filter (or all apps if no filter was set). This can be suppressed entirely with STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED.

Parameters
chanThe channel to broadcast
timeout_msTimeout in milliseconds to wait for a claim
app_filterOptional regex filter for application names (NULL for all)
flagsCombination of STASIS_BROADCAST_FLAG_* values
Return values
0on success
-1on error
AST_OPTIONAL_API_UNAVAILABLEif res_stasis_broadcast is not loaded
Parameters
chanThe channel to broadcast
timeout_msTimeout in milliseconds
app_filterOptional regex filter for applications
Returns
0 on success, -1 on error

Definition at line 550 of file res_stasis_broadcast.c.

552{
554 struct ast_datastore *datastore;
555
556 if (!chan) {
557 return -1;
558 }
559
560 if (!broadcast_contexts) {
561 return -1;
562 }
563
564 /* Remove any previous broadcast datastore from a prior attempt.
565 * This supports failover scenarios where StasisBroadcast() is
566 * called multiple times for the same channel. The datastore
567 * destructor unlinks the old context from the container. */
568 {
569 struct ast_datastore *old_ds;
570 ast_channel_lock(chan);
572 if (old_ds) {
573 ast_channel_datastore_remove(chan, old_ds);
574 }
575 ast_channel_unlock(chan);
576 if (old_ds) {
577 ast_datastore_free(old_ds);
578 }
579 }
580
581 /* Create broadcast context (validates and compiles app_filter regex) */
582 ctx = broadcast_ctx_create(ast_channel_uniqueid(chan), app_filter, flags);
583 if (!ctx) {
584 ast_log(LOG_ERROR, "Channel %s: failed to create broadcast context\n",
586 return -1;
587 }
588
589 /* Store context in container */
591
592 /* Create and attach datastore to channel */
594 if (!datastore) {
595 ast_log(LOG_ERROR, "Channel %s: failed to allocate broadcast datastore\n",
598 return -1;
599 }
600
601 datastore->data = ao2_bump(ctx);
602 ast_channel_lock(chan);
603 if (ast_channel_datastore_add(chan, datastore)) {
604 ast_channel_unlock(chan);
605 ast_log(LOG_ERROR, "Channel %s: failed to attach broadcast datastore\n",
607 ast_datastore_free(datastore);
609 return -1;
610 }
611 ast_channel_unlock(chan);
612
613 ast_debug(1, "Starting broadcast for channel %s (timeout: %dms, filter: %s)\n",
614 ast_channel_uniqueid(chan), timeout_ms, app_filter ? app_filter : "none");
615
616 /* Send broadcast event to all matching applications */
617 if (send_broadcast_event(chan, ctx) != 0) {
618 ast_log(LOG_ERROR, "Channel %s: failed to send broadcast event\n",
620 ast_channel_lock(chan);
621 ast_channel_datastore_remove(chan, datastore);
622 ast_channel_unlock(chan);
623 ast_datastore_free(datastore);
625 return -1;
626 }
627
628 return 0;
629}
#define ast_log
Definition astobj2.c:42
#define ao2_link(container, obj)
Add an object to a container.
Definition astobj2.h:1532
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
int ast_channel_datastore_add(struct ast_channel *chan, struct ast_datastore *datastore)
Add a datastore to a channel.
Definition channel.c:2376
int ast_channel_datastore_remove(struct ast_channel *chan, struct ast_datastore *datastore)
Remove a datastore from a channel.
Definition channel.c:2385
#define ast_channel_lock(chan)
Definition channel.h:2982
const char * ast_channel_uniqueid(const struct ast_channel *chan)
#define ast_channel_unlock(chan)
Definition channel.h:2983
struct ast_datastore * ast_channel_datastore_find(struct ast_channel *chan, const struct ast_datastore_info *info, const char *uid)
Find a datastore on a channel.
Definition channel.c:2390
#define ast_datastore_alloc(info, uid)
Definition datastore.h:85
int ast_datastore_free(struct ast_datastore *datastore)
Free a data store object.
Definition datastore.c:68
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
static int send_broadcast_event(struct ast_channel *chan, struct stasis_broadcast_ctx *ctx)
Create and send broadcast event to all applications.
static struct ao2_container * broadcast_contexts
Container for all active broadcast contexts.
static const struct ast_datastore_info broadcast_datastore_info
Datastore information for broadcast context.
static struct stasis_broadcast_ctx * broadcast_ctx_create(const char *channel_id, const char *app_filter, unsigned int flags)
Create a new broadcast context.
#define NULL
Definition resample.c:96
Structure for a data store object.
Definition datastore.h:64
void * data
Definition datastore.h:66
Broadcast context stored on channel.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981

References ao2_bump, ao2_cleanup, ao2_link, ao2_unlink, ast_channel_datastore_add(), ast_channel_datastore_find(), ast_channel_datastore_remove(), ast_channel_lock, ast_channel_uniqueid(), ast_channel_unlock, ast_datastore_alloc, ast_datastore_free(), ast_debug, ast_log, broadcast_contexts, broadcast_ctx_create(), broadcast_datastore_info, ast_datastore::data, LOG_ERROR, NULL, RAII_VAR, and send_broadcast_event().

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_cleanup()

void AST_OPTIONAL_API_NAME() stasis_app_broadcast_cleanup ( const char *  channel_id)

Clean up broadcast context for a channel.

Since
20

Removes the broadcast context when the channel is done or leaving the broadcast state.

Parameters
channel_idThe unique ID of the channel

This is the normal-path cleanup called by the dialplan application after the broadcast completes. The channel datastore destructor (broadcast_datastore_destroy) also unlinks the context as a safety net for abnormal teardown; ao2_unlink is idempotent so the double call is harmless.

Parameters
channel_idThe unique ID of the channel

Definition at line 885 of file res_stasis_broadcast.c.

886{
888
889 if (ast_strlen_zero(channel_id) || !broadcast_contexts) {
890 return;
891 }
892
894 if (ctx) {
895 ast_debug(3, "Cleaning up broadcast context for %s\n", channel_id);
896 }
897}
#define ao2_find(container, arg, flags)
Definition astobj2.h:1736
@ OBJ_UNLINK
Definition astobj2.h:1039
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65

References ao2_cleanup, ao2_find, ast_debug, ast_strlen_zero(), broadcast_contexts, NULL, OBJ_SEARCH_KEY, OBJ_UNLINK, and RAII_VAR.

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_wait()

int AST_OPTIONAL_API_NAME() stasis_app_broadcast_wait ( struct ast_channel chan,
int  timeout_ms 
)

Wait for a broadcast channel to be claimed.

Since
20

Blocks until the channel is claimed or the timeout expires.

Parameters
chanThe channel
timeout_msMaximum time to wait in milliseconds
Return values
0if claimed within timeout
-1if timeout expired or error
AST_OPTIONAL_API_UNAVAILABLEif res_stasis_broadcast is not loaded

Blocks until the channel is claimed, the timeout expires, or the channel hangs up. The hangup check runs every BROADCAST_POLL_INTERVAL_MS so that a dead channel does not tie up a PBX thread for the full timeout period.

Parameters
chanThe channel
timeout_msMaximum time to wait in milliseconds
Returns
0 if claimed within timeout, -1 otherwise

Definition at line 782 of file res_stasis_broadcast.c.

783{
785 const char *channel_id;
786 struct timeval deadline;
787 int result = -1;
788
789 if (!chan) {
790 return -1;
791 }
792
793 channel_id = ast_channel_uniqueid(chan);
794
795 if (!broadcast_contexts) {
796 return -1;
797 }
798
799 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
800 if (!ctx) {
801 ast_log(LOG_WARNING, "No broadcast context for channel %s\n", channel_id);
802 return -1;
803 }
804
805 /* Cap excessive timeouts to prevent arithmetic overflow */
806 if (timeout_ms < 0) {
807 timeout_ms = 0;
808 } else if (timeout_ms > MAX_BROADCAST_TIMEOUT_MS) {
809 timeout_ms = MAX_BROADCAST_TIMEOUT_MS;
810 }
811
812 /* Calculate absolute deadline */
813 deadline = ast_tvadd(ast_tvnow(),
814 ast_tv(timeout_ms / 1000, (timeout_ms % 1000) * 1000));
815
816 ao2_lock(ctx);
817 while (!ctx->claimed) {
818 struct timeval now;
819 struct timespec poll_spec;
820 long remaining_ms;
821 long poll_ms;
822 int wait_result;
823
824 /* Check for hangup so we don't block on a dead channel */
825 if (ast_check_hangup(chan)) {
826 ast_debug(3, "Channel %s hung up during broadcast wait\n",
827 channel_id);
828 break;
829 }
830
831 /* Check if we've passed the overall deadline */
832 now = ast_tvnow();
833 remaining_ms = ast_tvdiff_ms(deadline, now);
834 if (remaining_ms <= 0) {
835 ast_debug(3, "Broadcast timeout for channel %s after %dms\n",
836 channel_id, timeout_ms);
837 break;
838 }
839
840 /* Sleep for the shorter of the remaining time and the poll interval */
841 poll_ms = remaining_ms;
842 if (poll_ms > BROADCAST_POLL_INTERVAL_MS) {
844 }
845
846 poll_spec.tv_sec = now.tv_sec + (poll_ms / 1000);
847 poll_spec.tv_nsec = (long)(now.tv_usec) * 1000L
848 + (long)(poll_ms % 1000) * 1000000L;
849 while (poll_spec.tv_nsec >= 1000000000) {
850 poll_spec.tv_sec++;
851 poll_spec.tv_nsec -= 1000000000;
852 }
853
854 wait_result = ast_cond_timedwait(&ctx->cond, ao2_object_get_lockaddr(ctx), &poll_spec);
855 if (wait_result != 0 && wait_result != ETIMEDOUT) {
857 "Channel %s: unexpected error waiting for claim: %s (%d)\n",
858 channel_id, strerror(wait_result), wait_result);
859 break;
860 }
861 /* Loop back: re-check claimed, then hangup, then deadline */
862 }
863
864 if (ctx->claimed) {
865 ast_debug(1, "Channel %s claimed by %s\n",
866 channel_id, ctx->winner_app);
867 result = 0;
868 }
869 ao2_unlock(ctx);
870
871 return result;
872}
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_lock(a)
Definition astobj2.h:717
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition astobj2.c:476
static PGresult * result
Definition cel_pgsql.c:84
int ast_check_hangup(struct ast_channel *chan)
Check to see if a channel is needing hang up.
Definition channel.c:446
static char * wait_result(void)
#define LOG_WARNING
#define ast_cond_timedwait(cond, mutex, time)
Definition lock.h:213
#define MAX_BROADCAST_TIMEOUT_MS
Maximum broadcast timeout in milliseconds (24 hours)
#define BROADCAST_POLL_INTERVAL_MS
Interval in ms between hangup checks while waiting for a claim.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition extconf.c:2280
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition time.h:107
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition time.h:159
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition time.h:235

References ao2_cleanup, ao2_find, ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_channel_uniqueid(), ast_check_hangup(), ast_cond_timedwait, ast_debug, ast_log, ast_tv(), ast_tvadd(), ast_tvdiff_ms(), ast_tvnow(), broadcast_contexts, BROADCAST_POLL_INTERVAL_MS, LOG_WARNING, MAX_BROADCAST_TIMEOUT_MS, NULL, OBJ_SEARCH_KEY, RAII_VAR, result, and wait_result().

Referenced by stasis_broadcast_exec().

◆ stasis_app_broadcast_winner()

char *AST_OPTIONAL_API_NAME() stasis_app_broadcast_winner ( const char *  channel_id)

Get the winner app name for a broadcast channel.

Since
20
Parameters
channel_idThe unique ID of the channel
Returns
A copy of the winner app name (caller must free with ast_free), or NULL if not claimed or not found
Return values
NULLif res_stasis_broadcast is not loaded
Parameters
channel_idThe unique ID of the channel
Returns
A copy of the winner app name (caller must free with ast_free), or NULL if not claimed or not found

Definition at line 743 of file res_stasis_broadcast.c.

744{
746 char *winner = NULL;
747
748 if (ast_strlen_zero(channel_id)) {
749 return NULL;
750 }
751
752 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
753 if (!ctx) {
754 return NULL;
755 }
756
757 ao2_lock(ctx);
758 if (ctx->claimed) {
759 winner = ast_strdup(ctx->winner_app);
760 }
761 /* Mark the broadcast as finished so no new claims can succeed.
762 * This closes the race window between reading the winner and
763 * the subsequent broadcast_cleanup call. */
764 ctx->finished = 1;
765 ao2_unlock(ctx);
766
767 return winner;
768}
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241

References ao2_cleanup, ao2_find, ao2_lock, ao2_unlock, ast_strdup, ast_strlen_zero(), broadcast_contexts, NULL, OBJ_SEARCH_KEY, and RAII_VAR.

Referenced by stasis_broadcast_exec().

◆ stasis_app_claim_channel()

int AST_OPTIONAL_API_NAME() stasis_app_claim_channel ( const char *  channel_id,
const char *  app_name 
)

Attempt to claim a broadcast channel.

Since
20

Atomically attempts to claim a channel that is in broadcast state. Only the first claim for a given channel will succeed.

Parameters
channel_idThe unique ID of the channel
app_nameThe name of the application claiming the channel
Return values
0if claim successful
-1if channel not found
-2if already claimed by another application
AST_OPTIONAL_API_UNAVAILABLEif res_stasis_broadcast is not loaded
Parameters
channel_idThe unique ID of the channel
app_nameThe name of the application claiming the channel
Returns
0 if claim successful, -1 if channel not found, -2 if already claimed

Definition at line 637 of file res_stasis_broadcast.c.

638{
640
641 if (ast_strlen_zero(channel_id) || ast_strlen_zero(app_name)) {
642 return -1;
643 }
644
645 if (!broadcast_contexts) {
646 return -1;
647 }
648
649 /* Find broadcast context */
650 ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
651 if (!ctx) {
652 ast_debug(1, "No broadcast context found for channel %s\n", channel_id);
653 return -1;
654 }
655
656 /* Atomically check and set claimed flag.
657 * Check claimed before finished: if the channel was claimed and then the
658 * broadcast finished, a late claim should return -2 (409 Conflict) rather
659 * than -1 (404) so callers can distinguish "already taken" from "not found". */
660 ao2_lock(ctx);
661 if (ctx->claimed) {
662 ast_debug(1, "Channel %s already claimed by %s (attempt by %s denied)\n",
663 channel_id, ctx->winner_app ? ctx->winner_app : "(unknown)", app_name);
664 ao2_unlock(ctx);
665 return -2;
666 }
667 if (ctx->finished) {
668 ast_debug(1, "Channel %s broadcast already finished (late claim by %s rejected)\n",
669 channel_id, app_name);
670 ao2_unlock(ctx);
671 return -1;
672 }
673 ctx->winner_app = ast_strdup(app_name);
674 if (!ctx->winner_app) {
676 "Failed to allocate winner app name for channel %s\n",
677 channel_id);
678 ao2_unlock(ctx);
679 return -1;
680 }
681 ctx->claimed = 1;
682 ast_verb(3, "Channel %s claimed by application %s\n",
683 channel_id, app_name);
684 /* Signal waiting thread that channel was claimed */
685 ast_cond_signal(&ctx->cond);
686 ao2_unlock(ctx);
687
688 /* Send CallClaimed event to matching apps */
689 if (!(ctx->flags & STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED)) {
690 RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
693 struct ao2_iterator iter;
694 char *app_name_iter;
695
696 snapshot = ast_channel_snapshot_get_latest(channel_id);
697 if (snapshot) {
698 event = ast_json_pack("{s: s, s: o, s: o, s: s}",
699 "type", "CallClaimed",
700 "timestamp", ast_json_timeval(ast_tvnow(), NULL),
701 "channel", ast_channel_snapshot_to_json(snapshot, NULL),
702 "winner_app", app_name);
703 }
704 if (event) {
706 }
707 if (apps) {
708 iter = ao2_iterator_init(apps, 0);
709 while ((app_name_iter = ao2_iterator_next(&iter))) {
710 struct ast_json *event_copy;
711
712 /* Only send to apps that matched the original broadcast filter */
713 if (ctx->filter_compiled &&
714 regexec(&ctx->compiled_filter, app_name_iter,
715 0, NULL, 0) == REG_NOMATCH) {
716 ao2_ref(app_name_iter, -1);
717 continue;
718 }
719
720 event_copy = ast_json_deep_copy(event);
721 if (!event_copy) {
722 ao2_ref(app_name_iter, -1);
723 continue;
724 }
725
726 stasis_app_send(app_name_iter, event_copy);
727 ast_json_unref(event_copy);
728 ao2_ref(app_name_iter, -1);
729 }
731 }
732 }
733
734 return 0;
735}
#define ao2_iterator_next(iter)
Definition astobj2.h:1911
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition astobj2.h:459
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct ast_channel_snapshot * ast_channel_snapshot_get_latest(const char *uniqueid)
Obtain the latest ast_channel_snapshot from the Stasis Message Bus API cache. This is an ao2 object,...
#define ast_verb(level,...)
struct ast_json * ast_json_deep_copy(const struct ast_json *value)
Copy a JSON value, and its children.
Definition json.c:641
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition json.c:670
#define ast_cond_signal(cond)
Definition lock.h:210
const char * app_name(struct ast_app *app)
Definition pbx_app.c:475
int stasis_app_send(const char *app_name, struct ast_json *message)
Send a message to the given Stasis application.
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED
Suppress CallClaimed event for this broadcast.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
Generic container type.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition astobj2.h:1821
Registered applications container.
Definition pbx_app.c:69
Structure representing a snapshot of channel state.
Abstract JSON element (object, array, string, int, ...).

References ao2_cleanup, ao2_find, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_lock, ao2_ref, ao2_unlock, app_name(), ast_channel_snapshot_get_latest(), ast_channel_snapshot_to_json(), ast_cond_signal, ast_debug, ast_json_deep_copy(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_log, ast_strdup, ast_strlen_zero(), ast_tvnow(), ast_verb, broadcast_contexts, LOG_ERROR, NULL, OBJ_SEARCH_KEY, RAII_VAR, stasis_app_get_all(), stasis_app_send(), and STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED.

Referenced by ast_ari_events_claim_channel().