Asterisk - The Open Source Telephony Project GIT-master-4f2b068
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions | Variables
chan_websocket.c File Reference

Websocket Media Channel. More...

#include "asterisk.h"
#include "asterisk/app.h"
#include "asterisk/causes.h"
#include "asterisk/channel.h"
#include "asterisk/codec.h"
#include "asterisk/http_websocket.h"
#include "asterisk/format_cache.h"
#include "asterisk/frame.h"
#include "asterisk/json.h"
#include "asterisk/lock.h"
#include "asterisk/mod_format.h"
#include "asterisk/module.h"
#include "asterisk/pbx.h"
#include "asterisk/uuid.h"
#include "asterisk/timing.h"
#include "asterisk/translate.h"
#include "asterisk/websocket_client.h"
#include "asterisk/sorcery.h"
Include dependency graph for chan_websocket.c:

Go to the source code of this file.

Data Structures

struct  instance_proxy
 
struct  webchan_conf_global
 
struct  websocket_pvt
 

Macros

#define _create_event_MEDIA_XOFF(_instance)   _create_event_nodata(_instance, "MEDIA_XOFF");
 
#define _create_event_MEDIA_XON(_instance)   _create_event_nodata(_instance, "MEDIA_XON");
 
#define _create_event_QUEUE_DRAINED(_instance)   _create_event_nodata(_instance, "QUEUE_DRAINED");
 
#define ANSWER_CHANNEL   "ANSWER"
 
#define CONTINUE_MEDIA   "CONTINUE_MEDIA"
 
#define create_event(_instance, _event, ...)    _create_event_ ## _event(_instance, ##__VA_ARGS__)
 Use this macro to create events passing in any event-specific parameters.
 
#define FLUSH_MEDIA   "FLUSH_MEDIA"
 
#define GET_DRIVER_STATUS   "GET_STATUS"
 
#define HANGUP_CHANNEL   "HANGUP"
 
#define INCOMING_CONNECTION_ID   "INCOMING"
 
#define MARK_MEDIA   "MARK_MEDIA"
 
#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
 
#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"
 
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
 
#define PAUSE_MEDIA   "PAUSE_MEDIA"
 
#define QUEUE_LENGTH_MAX   1000
 
#define QUEUE_LENGTH_XOFF_LEVEL   900
 
#define QUEUE_LENGTH_XON_LEVEL   800
 
#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"
 
#define send_event(_instance, _event, ...)
 Use this macro to create and send events passing in any event-specific parameters.
 
#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"
 
#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"
 
#define websocket_request_hangup(_instance, _cause, _tech)    _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__)
 
#define WS_TIMER_FDNO   (AST_EXTENDED_FDS + 1)
 
#define WS_WEBSOCKET_FDNO   (AST_EXTENDED_FDS + 2)
 

Enumerations

enum  {
  OPT_WS_CODEC = (1 << 0) , OPT_WS_NO_AUTO_ANSWER = (1 << 1) , OPT_WS_URI_PARAM = (1 << 2) , OPT_WS_PASSTHROUGH = (1 << 3) ,
  OPT_WS_MSG_FORMAT = (1 << 4)
}
 
enum  {
  OPT_ARG_WS_CODEC , OPT_ARG_WS_NO_AUTO_ANSWER , OPT_ARG_WS_URI_PARAM , OPT_ARG_WS_PASSTHROUGH ,
  OPT_ARG_WS_MSG_FORMAT , OPT_ARG_ARRAY_SIZE
}
 
enum  webchan_control_msg_format { WEBCHAN_CONTROL_MSG_FORMAT_PLAIN = 0 , WEBCHAN_CONTROL_MSG_FORMAT_JSON , WEBCHAN_CONTROL_MSG_FORMAT_INVALID }
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
static char * _create_event_DTMF_END (struct websocket_pvt *instance, const char digit)
 
static char * _create_event_ERROR (struct websocket_pvt *instance, const char *format,...)
 
static char * _create_event_MEDIA_BUFFERING_COMPLETED (struct websocket_pvt *instance, const char *id)
 
static char * _create_event_MEDIA_MARK_PROCESSED (struct websocket_pvt *instance, const char *id)
 
static char * _create_event_MEDIA_START (struct websocket_pvt *instance)
 
static char * _create_event_nodata (struct websocket_pvt *instance, char *event)
 
static char * _create_event_STATUS (struct websocket_pvt *instance)
 
static void _websocket_request_hangup (struct websocket_pvt *instance, int ast_cause, enum ast_websocket_status_code tech_cause, int line, const char *function)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
static enum webchan_control_msg_format control_msg_format_from_str (const char *value)
 
static const char * control_msg_format_to_str (enum webchan_control_msg_format value)
 
static struct ast_framedequeue_frame (struct websocket_pvt *instance)
 
static void * global_alloc (const char *name)
 
static int global_apply (const struct ast_sorcery *sorcery, void *obj)
 
static int global_control_message_format_from_str (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int global_control_message_format_to_str (const void *obj, const intptr_t *args, char **buf)
 
static int handle_command (struct websocket_pvt *instance, char *buffer)
 
static void incoming_ws_established_cb (struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
 
static int incoming_ws_http_callback (struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
 
static void instance_proxy_cb (void *weakproxy, void *data)
 
static int load_config (void)
 
static int load_module (void)
 Function called when our module is loaded.
 
static int process_binary_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int process_text_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int queue_frame_from_buffer (struct websocket_pvt *instance, char *buffer, size_t len)
 
static int queue_option_frame (struct websocket_pvt *instance, char *buffer)
 
static int read_from_ws_and_queue (struct websocket_pvt *instance)
 
static int reload_module (void)
 
static void set_channel_format (struct websocket_pvt *instance, struct ast_format *fmt)
 
static int set_channel_timer (struct websocket_pvt *instance)
 
static int set_channel_variables (struct websocket_pvt *instance)
 
static int set_instance_silence_frame (struct websocket_pvt *instance)
 
static int set_instance_translator (struct websocket_pvt *instance)
 
static int unload_module (void)
 Function called when our module is unloaded.
 
static int validate_uri_parameters (const char *uri_params)
 
static int webchan_call (struct ast_channel *ast, const char *dest, int timeout)
 
static int webchan_hangup (struct ast_channel *ast)
 
static struct ast_framewebchan_read (struct ast_channel *ast)
 
static struct ast_channelwebchan_request (const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
 
static int webchan_send_dtmf_text (struct ast_channel *ast, char digit, unsigned int duration)
 
static int webchan_write (struct ast_channel *ast, struct ast_frame *f)
 Function called when we should write a frame to the channel.
 
static void websocket_destructor (void *data)
 
static int websocket_handoff_to_channel (struct websocket_pvt *instance)
 
static struct websocket_pvtwebsocket_new (const char *chan_name, const char *connection_id, struct ast_format *fmt)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .reload = reload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct ast_websocket_serverast_ws_server
 
static struct ast_http_uri http_uri
 
static struct ao2_containerinstances = NULL
 
static const char * msg_format_map []
 
static struct ast_sorcerysorcery = NULL
 
static const struct ast_app_option websocket_options [128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, [ 'v' ] = { .flag = OPT_WS_URI_PARAM , .arg_index = OPT_ARG_WS_URI_PARAM + 1 }, [ 'p' ] = { .flag = OPT_WS_PASSTHROUGH }, [ 'f' ] = { .flag = OPT_WS_MSG_FORMAT , .arg_index = OPT_ARG_WS_MSG_FORMAT + 1 }, }
 
static struct ast_channel_tech websocket_tech
 

Detailed Description

Websocket Media Channel.

Author
George Joseph gjose.nosp@m.ph@s.nosp@m.angom.nosp@m.a.co.nosp@m.m

Definition in file chan_websocket.c.

Macro Definition Documentation

◆ _create_event_MEDIA_XOFF

#define _create_event_MEDIA_XOFF (   _instance)    _create_event_nodata(_instance, "MEDIA_XOFF");

Definition at line 206 of file chan_websocket.c.

◆ _create_event_MEDIA_XON

#define _create_event_MEDIA_XON (   _instance)    _create_event_nodata(_instance, "MEDIA_XON");

Definition at line 205 of file chan_websocket.c.

◆ _create_event_QUEUE_DRAINED

#define _create_event_QUEUE_DRAINED (   _instance)    _create_event_nodata(_instance, "QUEUE_DRAINED");

Definition at line 207 of file chan_websocket.c.

◆ ANSWER_CHANNEL

#define ANSWER_CHANNEL   "ANSWER"

Definition at line 119 of file chan_websocket.c.

◆ CONTINUE_MEDIA

#define CONTINUE_MEDIA   "CONTINUE_MEDIA"

Definition at line 128 of file chan_websocket.c.

◆ create_event

#define create_event (   _instance,
  _event,
  ... 
)     _create_event_ ## _event(_instance, ##__VA_ARGS__)

Use this macro to create events passing in any event-specific parameters.

Definition at line 422 of file chan_websocket.c.

429 { \
430 int _res = -1; \
431 char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
432 if (_payload && _instance->websocket) { \
433 _res = ast_websocket_write_string(_instance->websocket, _payload); \
434 if (_res != 0) { \
435 ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
436 ast_channel_name(instance->channel), _payload); \
437 } else { \
438 ast_debug(3, "%s: Sent %s\n", \
439 ast_channel_name(instance->channel), _payload); \
440 }\
441 ast_free(_payload); \
442 } \
443 (_res); \
444})
445
446static void set_channel_format(struct websocket_pvt * instance,
447 struct ast_format *fmt)
448{
453 ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt));
454 }
455}
456
457/*
458 * Reminder... This function gets called by webchan_read which is
459 * triggered by the channel timer firing. It always gets called
460 * every 20ms (or whatever the timer is set to) even if there are
461 * no frames in the queue.
462 */
463static struct ast_frame *dequeue_frame(struct websocket_pvt *instance)
464{
465 struct ast_frame *queued_frame = NULL;
466 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
468
469 /*
470 * If the queue is paused, don't read a frame. Processing
471 * will continue down the function and a silence frame will
472 * be sent in its place.
473 */
474 if (instance->queue_paused) {
475 return NULL;
476 }
477
478 /*
479 * We need to check if we need to send an XON before anything
480 * else because there are multiple escape paths in this function
481 * and we don't want to accidentally keep the queue in a "full"
482 * state.
483 */
484 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
485 instance->queue_full = 0;
486 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
487 ast_channel_name(instance->channel));
488 send_event(instance, MEDIA_XON);
489 }
490
491 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
492
493 /*
494 * If there are no frames in the queue, we need to
495 * return NULL so we can send a silence frame. We also need
496 * to send the QUEUE_DRAINED notification if we were requested
497 * to do so.
498 */
499 if (!queued_frame) {
500 if (instance->report_queue_drained) {
501 instance->report_queue_drained = 0;
502 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
503 ast_channel_name(instance->channel));
504 send_event(instance, QUEUE_DRAINED);
505 }
506 return NULL;
507 }
508
509 /*
510 * The only way a control frame could be present here is as
511 * a result of us calling queue_option_frame() in response
512 * to an incoming TEXT command from the websocket.
513 * We'll be safe and make sure it's a AST_CONTROL_OPTION
514 * frame anyway.
515 *
516 * It's quite possible that there are multiple control frames
517 * in a row in the queue so we need to process consecutive ones
518 * immediately.
519 *
520 * In any case, processing a control frame MUST not use up
521 * a media timeslot so after all control frames have been
522 * processed, we need to read an audio frame and process it.
523 */
524 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
525 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
526 /*
527 * We just need to send the data to the websocket.
528 * The data should already be NULL terminated.
529 */
531 queued_frame->data.ptr);
532 ast_debug(4, "%s: Sent %s\n",
533 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
534 }
535 /*
536 * We do NOT send these to the core so we need to free
537 * the frame and grab the next one. If it's also a
538 * control frame, we need to process it otherwise
539 * continue down in the function.
540 */
541 ast_frame_free(queued_frame, 0);
542 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
543 /*
544 * Jut FYI... We didn't bump the queue length when we added the control
545 * frames so we don't need to decrement it here.
546 */
547 }
548
549 /*
550 * If, after reading all control frames, there are no frames
551 * left in the queue, we need to return NULL so we can send
552 * a silence frame.
553 */
554 if (!queued_frame) {
555 return NULL;
556 }
557
558 instance->frame_queue_length--;
559
560 return queued_frame;
561}
562/*!
563 * \internal
564 *
565 * There are two file descriptors on this channel that can trigger
566 * this function...
567 *
568 * The timer fd (WS_TIMER_FDNO) which gets triggered at a constant
569 * rate determined by the format. In this case, we need to pull a
570 * frame OFF the queue and return it to the core.
571 *
572 * The websocket fd (WS_WEBSOCKET_FDNO) which gets triggered when
573 * there's incoming data to read from the websocket. In this case,
574 * we read the data and put it ON the queue. We'll return a null frame.
575 *
576 */
577static struct ast_frame *webchan_read(struct ast_channel *ast)
578{
579 struct websocket_pvt *instance = NULL;
580 struct ast_frame *native_frame = NULL;
581 struct ast_frame *slin_frame = NULL;
582 int fdno = ast_channel_fdno(ast);
583
584 instance = ast_channel_tech_pvt(ast);
585 if (!instance) {
586 return NULL;
587 }
588
589 if (fdno == WS_WEBSOCKET_FDNO) {
590 read_from_ws_and_queue(instance);
591 return &ast_null_frame;
592 }
593 if (fdno != WS_TIMER_FDNO) {
594 return &ast_null_frame;
595 }
596
598 ast_timer_ack(instance->timer, 1);
599 }
600
601 native_frame = dequeue_frame(instance);
602
603 /*
604 * No frame when the timer fires means we have to create and
605 * return a silence frame in its place.
606 */
607 if (!native_frame) {
608 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
609 set_channel_format(instance, instance->slin_format);
610 slin_frame = ast_frdup(&instance->silence);
611 return slin_frame;
612 }
613
614 /*
615 * If we're in passthrough mode or the frame length is already optimal_frame_size,
616 * we can just return it.
617 */
618 if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) {
619 set_channel_format(instance, instance->native_format);
620 return native_frame;
621 }
622
623 /*
624 * If we're here, we have a short frame that we need to pad
625 * with silence.
626 */
627
628 if (instance->translator) {
629 slin_frame = ast_translate(instance->translator, native_frame, 0);
630 if (!slin_frame) {
631 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
632 ast_channel_name(ast), native_frame->datalen);
633 return NULL;
634 }
635 ast_frame_free(native_frame, 0);
636 } else {
637 /*
638 * If there was no translator then the native format
639 * was already slin.
640 */
641 slin_frame = native_frame;
642 }
643
644 set_channel_format(instance, instance->slin_format);
645
646 /*
647 * So now we have an slin frame but it's probably still short
648 * so we create a new data buffer with the correct length
649 * which is filled with zeros courtesy of ast_calloc.
650 * We then copy the short frame data into the new buffer
651 * and set the offset to AST_FRIENDLY_OFFSET so that
652 * the core can read the data without any issues.
653 * If the original frame data was mallocd, we need to free the old
654 * data buffer so we don't leak memory and we need to set
655 * mallocd to AST_MALLOCD_DATA so that the core knows
656 * it needs to free the new data buffer when it's done.
657 */
658
659 if (slin_frame->datalen != instance->silence.datalen) {
660 char *old_data = slin_frame->data.ptr;
661 int old_len = slin_frame->datalen;
662 int old_offset = slin_frame->offset;
663 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
664 ast_channel_name(ast), instance->silence.datalen,
665 slin_frame->datalen);
666
667 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
668 if (!slin_frame->data.ptr) {
669 ast_frame_free(slin_frame, 0);
670 return NULL;
671 }
672 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
673 slin_frame->offset = AST_FRIENDLY_OFFSET;
674 memcpy(slin_frame->data.ptr, old_data, old_len);
675 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
676 ast_free(old_data - old_offset);
677 }
678 slin_frame->mallocd |= AST_MALLOCD_DATA;
679 slin_frame->datalen = instance->silence.datalen;
680 slin_frame->samples = instance->silence.samples;
681 }
682
683 return slin_frame;
684}
685
686static int queue_frame_from_buffer(struct websocket_pvt *instance,
687 char *buffer, size_t len)
688{
689 struct ast_frame fr = { 0, };
690 struct ast_frame *duped_frame = NULL;
691
692 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
694 fr.subclass.format = instance->native_format;
695 fr.samples = instance->native_codec->samples_count(&fr);
696
697 duped_frame = ast_frisolate(&fr);
698 if (!duped_frame) {
699 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
700 ast_channel_name(instance->channel));
701 return -1;
702 }
703
704 {
705 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
707 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
708 instance->frame_queue_length++;
709 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
710 instance->queue_full = 1;
711 send_event(instance, MEDIA_XOFF);
712 }
713 }
714
715 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
716 duped_frame->datalen);
717
718 return 0;
719}
720
721static int queue_option_frame(struct websocket_pvt *instance,
722 char *buffer)
723{
724 struct ast_frame fr = { 0, };
725 struct ast_frame *duped_frame = NULL;
726
727 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
730
731 duped_frame = ast_frisolate(&fr);
732 if (!duped_frame) {
733 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
734 ast_channel_name(instance->channel));
735 return -1;
736 }
737
738 AST_LIST_LOCK(&instance->frame_queue);
739 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
740 AST_LIST_UNLOCK(&instance->frame_queue);
741
742 ast_debug(4, "%s: Queued '%s' option frame\n",
743 ast_channel_name(instance->channel), buffer);
744
745 return 0;
746}
747
748/*!
749 * \internal
750 * \brief Handle commands from the websocket
751 *
752 * \param instance
753 * \param buffer Allocated by caller so don't free.
754 * \retval 0 Success
755 * \retval -1 Failure
756 */
757static int handle_command(struct websocket_pvt *instance, char *buffer)
758{
759 int res = 0;
760 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
761 const char *command = NULL;
762 char *data = NULL;
763
765 struct ast_json_error json_error;
766
767 json = ast_json_load_buf(buffer, strlen(buffer), &json_error);
768 if (!json) {
769 send_event(instance, ERROR, "Unable to parse JSON command");
770 return -1;
771 }
772 command = ast_json_object_string_get(json, "command");
773 } else {
774 command = buffer;
775 data = strchr(buffer, ' ');
776 if (data) {
777 *data = '\0';
778 data++;
779 }
780 }
781
782 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
784
785 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
787
788 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
789 if (instance->passthrough) {
790 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
791 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
792 ast_channel_name(instance->channel), command);
793 return 0;
794 }
795 AST_LIST_LOCK(&instance->frame_queue);
796 instance->bulk_media_in_progress = 1;
797 AST_LIST_UNLOCK(&instance->frame_queue);
798
799 } else if (ast_strings_equal(command, STOP_MEDIA_BUFFERING)) {
800 const char *id;
801 char *option;
802 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
804
806 id = ast_json_object_string_get(json, "correlation_id");
807 } else {
808 id = data;
809 }
810
811 if (instance->passthrough) {
812 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
813 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
814 ast_channel_name(instance->channel), command);
815 return 0;
816 }
817
818 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
820 (int)instance->leftover_len);
821
822 instance->bulk_media_in_progress = 0;
823 if (instance->leftover_len > 0) {
824 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
825 if (res != 0) {
826 return res;
827 }
828 }
829 instance->leftover_len = 0;
830 option = create_event(instance, MEDIA_BUFFERING_COMPLETED, id);
831 if (!option) {
832 return -1;
833 }
834 res = queue_option_frame(instance, option);
835 ast_free(option);
836
837 } else if (ast_strings_equal(command, MARK_MEDIA)) {
838 const char *id;
839 char *option;
840 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
842
843 if (instance->passthrough) {
844 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
845 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
846 ast_channel_name(instance->channel), command);
847 return 0;
848 }
849
851 id = ast_json_object_string_get(json, "correlation_id");
852 } else {
853 id = data;
854 }
855
856 ast_debug(4, "%s: %s %s\n",
857 ast_channel_name(instance->channel), MARK_MEDIA, id);
858
859 option = create_event(instance, MEDIA_MARK_PROCESSED, id);
860 if (!option) {
861 return -1;
862 }
863 res = queue_option_frame(instance, option);
864 ast_free(option);
865
866 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
867 struct ast_frame *frame = NULL;
868
869 if (instance->passthrough) {
870 send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
871 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
872 ast_channel_name(instance->channel), command);
873 return 0;
874 }
875
876 AST_LIST_LOCK(&instance->frame_queue);
877 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
878 ast_frfree(frame);
879 }
880 instance->frame_queue_length = 0;
881 instance->bulk_media_in_progress = 0;
882 instance->leftover_len = 0;
883 AST_LIST_UNLOCK(&instance->frame_queue);
884
885 } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
886 if (instance->passthrough) {
887 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
888 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
889 ast_channel_name(instance->channel), command);
890 return 0;
891 }
892
893 AST_LIST_LOCK(&instance->frame_queue);
894 instance->report_queue_drained = 1;
895 AST_LIST_UNLOCK(&instance->frame_queue);
896
897 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
898 return send_event(instance, STATUS);
899
900 } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
901 if (instance->passthrough) {
902 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
903 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
904 ast_channel_name(instance->channel), command);
905 return 0;
906 }
907 AST_LIST_LOCK(&instance->frame_queue);
908 instance->queue_paused = 1;
909 AST_LIST_UNLOCK(&instance->frame_queue);
910
911 } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
912 if (instance->passthrough) {
913 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
914 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
915 ast_channel_name(instance->channel), command);
916 return 0;
917 }
918 AST_LIST_LOCK(&instance->frame_queue);
919 instance->queue_paused = 0;
920 AST_LIST_UNLOCK(&instance->frame_queue);
921
922 } else {
923 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
924 ast_channel_name(instance->channel), command);
925 }
926
927 return res;
928}
929
930static int process_text_message(struct websocket_pvt *instance,
931 char *payload, uint64_t payload_len)
932{
933 char *command;
934
935 if (payload_len == 0) {
936 ast_log(LOG_WARNING, "%s: WebSocket TEXT message has 0 length\n",
937 ast_channel_name(instance->channel));
938 return 0;
939 }
940
941 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
942 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
943 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
944 return 0;
945 }
946
947 /*
948 * Unfortunately, payload is not NULL terminated even when it's
949 * a TEXT frame so we need to allocate a new buffer, copy
950 * the data into it, and NULL terminate it.
951 */
952 command = ast_alloca(payload_len + 1);
953 memcpy(command, payload, payload_len); /* Safe */
954 command[payload_len] = '\0';
955 command = ast_strip(command);
956
957 ast_debug(4, "%s: Received: %s\n",
958 ast_channel_name(instance->channel), command);
959
960 return handle_command(instance, command);
961}
962
963static int process_binary_message(struct websocket_pvt *instance,
964 char *payload, uint64_t payload_len)
965{
966 char *next_frame_ptr = NULL;
967 size_t bytes_read = 0;
968 int res = 0;
969 size_t bytes_left = 0;
970
971 {
972 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
974 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
975 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
976 ast_channel_name(instance->channel));
977 return 0;
978 }
979 }
980
981 next_frame_ptr = payload;
982 instance->bytes_read += payload_len;
983
984 if (instance->passthrough) {
985 res = queue_frame_from_buffer(instance, payload, payload_len);
986 return res;
987 }
988
989 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
990 /*
991 * We have leftover data from a previous websocket message.
992 * Try to make a complete frame by appending data from
993 * the current message to the leftover data.
994 */
995 char *append_ptr = instance->leftover_data + instance->leftover_len;
996 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
997 /*
998 * It's possible that even the current message doesn't have enough
999 * data to make a complete frame.
1000 */
1001 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
1002
1003 /*
1004 * Append whatever we can to the end of the leftover data
1005 * even if it's not enough to make a complete frame.
1006 */
1007 memcpy(append_ptr, payload, bytes_avail_to_copy);
1008
1009 /*
1010 * If leftover data is still short, just return and wait for the
1011 * next websocket message.
1012 */
1013 if (bytes_avail_to_copy < bytes_needed_for_frame) {
1014 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
1015 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
1016 instance->leftover_len += bytes_avail_to_copy;
1017 return 0;
1018 }
1019
1020 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
1021 if (res < 0) {
1022 return -1;
1023 }
1024
1025 /*
1026 * We stole data from the current payload so decrement payload_len
1027 * and set the next frame pointer after the data in payload
1028 * we just copied.
1029 */
1030 payload_len -= bytes_avail_to_copy;
1031 next_frame_ptr = payload + bytes_avail_to_copy;
1032
1033 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
1034 ast_channel_name(instance->channel),
1035 instance->frame_queue_length,
1036 (int)instance->bytes_read,
1037 (int)(payload_len + bytes_avail_to_copy),
1038 (int)instance->leftover_len,
1039 payload,
1040 next_frame_ptr,
1041 (int)(next_frame_ptr - payload),
1042 (int)payload_len,
1043 (int)bytes_avail_to_copy
1044 );
1045
1046
1047 instance->leftover_len = 0;
1048 }
1049
1050 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
1051 instance->leftover_len = 0;
1052 }
1053
1054 bytes_left = payload_len;
1055 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
1056 res = queue_frame_from_buffer(instance, next_frame_ptr,
1057 instance->optimal_frame_size);
1058 if (res < 0) {
1059 break;
1060 }
1061 bytes_read += instance->optimal_frame_size;
1062 next_frame_ptr += instance->optimal_frame_size;
1063 bytes_left -= instance->optimal_frame_size;
1064 }
1065
1066 if (instance->bulk_media_in_progress && bytes_left > 0) {
1067 /*
1068 * We have a partial frame. Save the leftover data.
1069 */
1070 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
1071 ast_channel_name(instance->channel),
1072 (int)instance->bytes_read,
1073 instance->frame_queue_length,
1074 (int)payload_len,
1075 (int)instance->leftover_len,
1076 payload,
1077 next_frame_ptr,
1078 (int)(next_frame_ptr - payload),
1079 (int)bytes_left
1080 );
1081 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
1082 instance->leftover_len = bytes_left;
1083 }
1084
1085 return 0;
1086}
1087
1088static int read_from_ws_and_queue(struct websocket_pvt *instance)
1089{
1090 uint64_t payload_len = 0;
1091 char *payload = NULL;
1092 enum ast_websocket_opcode opcode;
1093 int fragmented = 0;
1094 int res = 0;
1095
1096 if (!instance->websocket) {
1097 ast_log(LOG_WARNING, "%s: WebSocket session not found\n",
1098 ast_channel_name(instance->channel));
1099 return -1;
1100 }
1101
1102 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
1103 &opcode, &fragmented);
1104
1105 if (res) {
1106 ast_debug(3, "%s: WebSocket read error\n",
1107 ast_channel_name(instance->channel));
1109 return -1;
1110 }
1111 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
1112 (int)payload_len);
1113
1114 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
1115 return process_text_message(instance, payload, payload_len);
1116 }
1117
1118 if (opcode == AST_WEBSOCKET_OPCODE_PING || opcode == AST_WEBSOCKET_OPCODE_PONG) {
1119 return 0;
1120 }
1121
1122 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
1123 ast_debug(3, "%s: WebSocket closed by remote\n",
1124 ast_channel_name(instance->channel));
1126 return -1;
1127 }
1128
1129 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
1130 ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n",
1131 ast_channel_name(instance->channel), (int)opcode);
1133 return 0;
1134 }
1135
1136 return process_binary_message(instance, payload, payload_len);
1137}
1138
1139static int websocket_handoff_to_channel(struct websocket_pvt *instance)
1140{
1141 int res = 0;
1142 int nodelay = 1;
1143 struct ast_sockaddr *remote_addr = ast_websocket_remote_address(instance->websocket);
1144
1145 instance->remote_addr = ast_strdup(ast_sockaddr_stringify(remote_addr));
1146 ast_debug(3, "%s: WebSocket connection with %s established\n",
1147 ast_channel_name(instance->channel), instance->remote_addr);
1148
1149 if (setsockopt(ast_websocket_fd(instance->websocket),
1150 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1151 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
1152 }
1153
1155
1156 res = send_event(instance, MEDIA_START);
1157 if (res != 0 ) {
1158 if (instance->type == AST_WS_TYPE_SERVER) {
1160 } else {
1161 /*
1162 * We were called by webchan_call so just need to set causes.
1163 * The core will hangup the channel.
1164 */
1167 }
1168 return -1;
1169 }
1170
1171 if (!instance->no_auto_answer) {
1172 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
1174 }
1175
1176 return 0;
1177}
1178
1179static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause,
1180 enum ast_websocket_status_code tech_cause, int line, const char *function)
1181{
1182 if (!instance || !instance->channel) {
1183 return;
1184 }
1185 ast_debug(3, "%s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d)",
1186 ast_channel_name(instance->channel), instance->remote_addr,
1187 function, line,
1188 ast_cause2str(ast_cause), ast_cause, ast_websocket_status_to_str(tech_cause), tech_cause);
1189
1190 if (tech_cause) {
1191 ast_channel_tech_hangupcause_set(instance->channel, tech_cause);
1192 }
1193 ast_queue_hangup_with_cause(instance->channel, ast_cause);
1194}
1195
1196/*! \brief Function called when we should write a frame to the channel */
1197static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
1198{
1199 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1200
1201 if (!instance || !instance->websocket) {
1202 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
1203 ast_channel_name(ast));
1204 return -1;
1205 }
1206
1207 if (f->frametype == AST_FRAME_CNG) {
1208 return 0;
1209 }
1210
1211 if (f->frametype != AST_FRAME_VOICE) {
1212 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
1213 ast_channel_name(ast));
1214 return 0;
1215 }
1216
1218 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
1221 return -1;
1222 }
1223
1225 (char *)f->data.ptr, (uint64_t)f->datalen);
1226}
1227
1228/*!
1229 * \internal
1230 *
1231 * Called by the core to actually call the remote.
1232 * The core will hang up the channel if a non-zero is returned.
1233 * We just need to set hangup causes if appropriate.
1234 */
1235static int webchan_call(struct ast_channel *ast, const char *dest,
1236 int timeout)
1237{
1238 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1240
1241 if (!instance) {
1242 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1243 ast_channel_name(ast));
1245 return -1;
1246 }
1247
1248 if (instance->type == AST_WS_TYPE_SERVER) {
1249 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
1250 return 0;
1251 }
1252 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
1253
1254 if (!instance->client) {
1255 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
1256 ast_channel_name(ast));
1258 return -1;
1259 }
1260
1261 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
1262 ast_channel_name(ast), dest, instance->connection_id);
1263
1264 if (!ast_strlen_zero(instance->uri_params)) {
1266 }
1267
1268 instance->websocket = ast_websocket_client_connect(instance->client,
1269 instance, ast_channel_name(ast), &result);
1270 if (!instance->websocket || result != WS_OK) {
1271 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
1274 return -1;
1275 }
1276
1277 return websocket_handoff_to_channel(instance);
1278}
1279
1280static void websocket_destructor(void *data)
1281{
1282 struct websocket_pvt *instance = data;
1283 struct ast_frame *frame = NULL;
1284 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
1285
1286 AST_LIST_LOCK(&instance->frame_queue);
1287 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
1288 ast_frfree(frame);
1289 }
1290 AST_LIST_UNLOCK(&instance->frame_queue);
1291
1292 if (instance->timer) {
1293 ast_timer_close(instance->timer);
1294 instance->timer = NULL;
1295 }
1296
1297 if (instance->channel) {
1298 ast_channel_unref(instance->channel);
1299 instance->channel = NULL;
1300 }
1301 if (instance->websocket) {
1302 ast_websocket_unref(instance->websocket);
1303 instance->websocket = NULL;
1304 }
1305
1306 ao2_cleanup(instance->client);
1307 instance->client = NULL;
1308
1309 ao2_cleanup(instance->native_codec);
1310 instance->native_codec = NULL;
1311
1312 ao2_cleanup(instance->native_format);
1313 instance->native_format = NULL;
1314
1315 ao2_cleanup(instance->slin_codec);
1316 instance->slin_codec = NULL;
1317
1318 ao2_cleanup(instance->slin_format);
1319 instance->slin_format = NULL;
1320
1321 if (instance->silence.data.ptr) {
1322 ast_free(instance->silence.data.ptr);
1323 instance->silence.data.ptr = NULL;
1324 }
1325
1326 if (instance->translator) {
1328 instance->translator = NULL;
1329 }
1330
1331 if (instance->leftover_data) {
1332 ast_free(instance->leftover_data);
1333 instance->leftover_data = NULL;
1334 }
1335
1336 ast_free(instance->uri_params);
1337 ast_free(instance->remote_addr);
1338}
1339
1340struct instance_proxy {
1341 AO2_WEAKPROXY();
1342 /*! \brief The name of the module owning this sorcery instance */
1343 char connection_id[0];
1344};
1345
1346static void instance_proxy_cb(void *weakproxy, void *data)
1347{
1348 struct instance_proxy *proxy = weakproxy;
1349 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1350 ao2_unlink(instances, weakproxy);
1351}
1352
1353static struct websocket_pvt* websocket_new(const char *chan_name,
1354 const char *connection_id, struct ast_format *fmt)
1355{
1356 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1357 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1358 char uuid[AST_UUID_STR_LEN];
1359 enum ast_websocket_type ws_type;
1360
1361 SCOPED_AO2WRLOCK(locker, instances);
1362
1365 ws_type = AST_WS_TYPE_SERVER;
1366 } else {
1367 ws_type = AST_WS_TYPE_CLIENT;
1368 }
1369
1370 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1371 if (!proxy) {
1372 return NULL;
1373 }
1374 strcpy(proxy->connection_id, connection_id); /* Safe */
1375
1376 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1378 if (!instance) {
1379 return NULL;
1380 }
1381 strcpy(instance->connection_id, connection_id); /* Safe */
1382
1383 instance->type = ws_type;
1384 if (ws_type == AST_WS_TYPE_CLIENT) {
1386 if (!instance->client) {
1387 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1388 chan_name, instance->connection_id);
1389 return NULL;
1390 }
1391 }
1392
1393 AST_LIST_HEAD_INIT(&instance->frame_queue);
1394
1395 /*
1396 * We need the codec to calculate the number of samples in a frame
1397 * so we'll get it once and store it in the instance.
1398 *
1399 * References for native_format and native_codec are now held by the
1400 * instance and will be released when the instance is destroyed.
1401 */
1402 instance->native_format = fmt;
1403 instance->native_codec = ast_format_get_codec(instance->native_format);
1404 /*
1405 * References for native_format and native_codec are now held by the
1406 * instance and will be released when the instance is destroyed.
1407 */
1408
1409 /*
1410 * It's not possible for us to re-time or re-frame media if the data
1411 * stream can't be broken up on arbitrary byte boundaries. This is usually
1412 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1413 * We need to force passthrough mode in this case.
1414 */
1415 if (instance->native_codec->minimum_bytes <= 10) {
1416 instance->passthrough = 1;
1417 instance->optimal_frame_size = 0;
1418 } else {
1419 instance->optimal_frame_size =
1420 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1421 / instance->native_codec->minimum_ms;
1422 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1423 if (!instance->leftover_data) {
1424 return NULL;
1425 }
1426 }
1427
1428 ast_debug(3,
1429 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1430 chan_name, ast_format_get_name(instance->native_format),
1435 instance->passthrough,
1436 instance->optimal_frame_size);
1437
1438 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1439 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1440 return NULL;
1441 }
1442
1444 return NULL;
1445 }
1446
1447 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1448 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1449 proxy->connection_id);
1450 return NULL;
1451 }
1452 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1453
1454 return ao2_bump(instance);
1455}
1456
1457static int set_instance_translator(struct websocket_pvt *instance)
1458{
1460 instance->slin_format = ao2_bump(instance->native_format);
1461 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1462 return 0;
1463 }
1464
1466 if (!instance->slin_format) {
1467 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1468 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1469 return -1;
1470 }
1471 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1475
1476 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1477 if (!instance->translator) {
1478 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1480 ast_format_get_name(instance->slin_format));
1481 return -1;
1482 }
1483
1484 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1485 return 0;
1486}
1487
1488static int set_instance_silence_frame(struct websocket_pvt *instance)
1489{
1490 instance->silence.frametype = AST_FRAME_VOICE;
1491 instance->silence.datalen =
1492 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1493 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1494 /*
1495 * Even though we'll calloc the data pointer, we don't mark it as
1496 * mallocd because this frame will be around for a while and we don't
1497 * want it accidentally freed before we're done with it.
1498 */
1499 instance->silence.mallocd = 0;
1500 instance->silence.offset = 0;
1501 instance->silence.src = __PRETTY_FUNCTION__;
1502 instance->silence.subclass.format = instance->slin_format;
1503 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1504 if (!instance->silence.data.ptr) {
1505 return -1;
1506 }
1507
1508 return 0;
1509}
1510
1511static int set_channel_timer(struct websocket_pvt *instance)
1512{
1513 int rate = 0;
1514 instance->timer = ast_timer_open();
1515 if (!instance->timer) {
1516 return -1;
1517 }
1518 /* Rate is the number of ticks per second, not the interval. */
1519 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1520 ast_debug(3, "%s: WebSocket timer rate %d\n",
1521 ast_channel_name(instance->channel), rate);
1522 ast_timer_set_rate(instance->timer, rate);
1523 /*
1524 * Calling ast_channel_set_fd will cause the channel thread to call
1525 * webchan_read at 'rate' times per second.
1526 */
1528
1529 return 0;
1530}
1531
1532static int set_channel_variables(struct websocket_pvt *instance)
1533{
1534 char *pkt_size = NULL;
1535 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1536 if (res <= 0) {
1537 return -1;
1538 }
1539
1541 pkt_size);
1542 ast_free(pkt_size);
1544 instance->connection_id);
1545
1546 return 0;
1547}
1548
1549static int validate_uri_parameters(const char *uri_params)
1550{
1551 char *params = ast_strdupa(uri_params);
1552 char *nvp = NULL;
1553 char *nv = NULL;
1554
1555 /*
1556 * uri_params should be a comma-separated list of key=value pairs.
1557 * For example:
1558 * name1=value1,name2=value2
1559 * We're verifying that each name and value either doesn't need
1560 * to be encoded or that it already is.
1561 */
1562
1563 while((nvp = ast_strsep(&params, ',', 0))) {
1564 /* nvp will be name1=value1 */
1565 while((nv = ast_strsep(&nvp, '=', 0))) {
1566 /* nv will be either name1 or value1 */
1567 if (!ast_uri_verify_encoded(nv)) {
1568 return 0;
1569 }
1570 }
1571 }
1572
1573 return 1;
1574}
1575
1576enum {
1577 OPT_WS_CODEC = (1 << 0),
1578 OPT_WS_NO_AUTO_ANSWER = (1 << 1),
1579 OPT_WS_URI_PARAM = (1 << 2),
1580 OPT_WS_PASSTHROUGH = (1 << 3),
1581 OPT_WS_MSG_FORMAT = (1 << 4),
1582};
1583
1584enum {
1591};
1592
1599 END_OPTIONS );
1600
1601static struct ast_channel *webchan_request(const char *type,
1602 struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids,
1603 const struct ast_channel *requestor, const char *data, int *cause)
1604{
1605 char *parse;
1606 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1607 struct ast_channel *chan = NULL;
1608 struct ast_format *fmt = NULL;
1609 struct ast_format_cap *caps = NULL;
1611 AST_APP_ARG(connection_id);
1613 );
1614 struct ast_flags opts = { 0, };
1615 char *opt_args[OPT_ARG_ARRAY_SIZE];
1616 const char *requestor_name = requestor ? ast_channel_name(requestor) :
1617 (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
1618 RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
1619
1620 global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
1621
1622 ast_debug(3, "%s: WebSocket channel requested\n",
1623 requestor_name);
1624
1625 if (ast_strlen_zero(data)) {
1626 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1627 requestor_name);
1628 goto failure;
1629 }
1630 parse = ast_strdupa(data);
1631 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1632
1633 if (ast_strlen_zero(args.connection_id)) {
1634 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1635 requestor_name);
1636 goto failure;
1637 }
1638
1639 if (!ast_strlen_zero(args.options)
1640 && ast_app_parse_options(websocket_options, &opts, opt_args,
1641 ast_strdupa(args.options))) {
1642 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1643 requestor_name, args.options);
1644 goto failure;
1645 }
1646
1647 if (ast_test_flag(&opts, OPT_WS_CODEC)
1648 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1649 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1650 } else {
1651 /*
1652 * If codec wasn't specified in the dial string,
1653 * use the first format in the capabilities.
1654 */
1655 fmt = ast_format_cap_get_format(cap, 0);
1656 }
1657
1658 if (!fmt) {
1659 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1660 requestor_name, args.connection_id);
1661 goto failure;
1662 }
1663
1664 ast_debug(3, "%s: Using format %s from %s\n",
1665 requestor_name, ast_format_get_name(fmt),
1666 ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
1667
1668 instance = websocket_new(requestor_name, args.connection_id, fmt);
1669 if (!instance) {
1670 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1671 requestor_name);
1672 goto failure;
1673 }
1674
1676 if (!instance->passthrough) {
1677 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1678 }
1679
1681 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1682 char *comma;
1683
1684 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1686 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1687 requestor_name);
1688 goto failure;
1689 }
1690
1691 ast_debug(3, "%s: Using URI parameters '%s'\n",
1692 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1693
1695 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1696 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1697 args.connection_id);
1698 goto failure;
1699 }
1700
1701 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1702 comma = instance->uri_params;
1703 /*
1704 * The normal separator for query string components is an
1705 * ampersand ('&') but the Dial app interprets them as additional
1706 * channels to dial in parallel so we instruct users to separate
1707 * the parameters with commas (',') instead. We now have to
1708 * convert those commas back to ampersands.
1709 */
1710 while ((comma = strchr(comma,','))) {
1711 *comma = '&';
1712 }
1713 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1714 }
1715
1716 if (ast_test_flag(&opts, OPT_WS_MSG_FORMAT)) {
1718
1720 ast_log(LOG_WARNING, "%s: 'f/control message format' dialstring parameter value missing or invalid. "
1721 "Defaulting to 'plain-text'\n",
1722 ast_channel_name(requestor));
1724 }
1725 } else if (global_cfg) {
1726 instance->control_msg_format = global_cfg->control_msg_format;
1727 }
1728
1729 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1730 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1731 if (!chan) {
1732 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1733 goto failure;
1734 }
1735
1736 /* Prevent device state caching as this channel involves ephemeral destinations or sources */
1738 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1739 ast_channel_name(chan), requestor_name,
1740 instance->connection_id);
1741
1742 instance->channel = ao2_bump(chan);
1744
1745 if (set_instance_translator(instance) != 0) {
1746 goto failure;
1747 }
1748
1749 if (set_instance_silence_frame(instance) != 0) {
1750 goto failure;
1751 }
1752
1753 if (set_channel_timer(instance) != 0) {
1754 goto failure;
1755 }
1756
1757 if (set_channel_variables(instance) != 0) {
1758 goto failure;
1759 }
1760
1762 if (!caps) {
1763 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1764 goto failure;
1765 }
1766
1767 ast_format_cap_append(caps, instance->native_format, 0);
1768 ast_channel_nativeformats_set(instance->channel, caps);
1771 ast_channel_set_readformat(instance->channel, instance->native_format);
1773 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1774 ast_channel_unlock(chan);
1775 ao2_cleanup(caps);
1776
1777 ast_debug(3, "%s: WebSocket channel created to %s\n",
1778 ast_channel_name(chan), args.connection_id);
1779
1780 return chan;
1781
1782failure:
1783 if (chan) {
1784 ast_channel_unlock(chan);
1785 }
1786 *cause = AST_CAUSE_FAILURE;
1787 return NULL;
1788}
1789
1790/*!
1791 * \internal
1792 *
1793 * Called by the core to hang up the channel.
1794 */
1795static int webchan_hangup(struct ast_channel *ast)
1796{
1797 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1798
1799 if (!instance) {
1800 return -1;
1801 }
1802 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1803 ast_channel_name(ast), instance->connection_id);
1804
1805 if (instance->websocket) {
1807 ast_websocket_unref(instance->websocket);
1808 instance->websocket = NULL;
1809 }
1811
1812 /* Clean up the reference from adding the instance to the channel */
1813 ao2_cleanup(instance);
1814
1815 return 0;
1816}
1817
1818static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
1819{
1820 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1821
1822 if (!instance) {
1823 return -1;
1824 }
1825
1826 return send_event(instance, DTMF_END, digit);
1827}
1828
1829/*!
1830 * \internal
1831 *
1832 * Called by res_http_websocket after a client has connected and
1833 * successfully upgraded from HTTP to WebSocket.
1834 *
1835 * Depends on incoming_ws_http_callback parsing the connection_id from
1836 * the HTTP request and storing it in get_params.
1837 */
1838static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
1839 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
1840{
1841 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1842 struct ast_variable *v;
1843 const char *connection_id = NULL;
1844 struct websocket_pvt *instance = NULL;
1845
1846 ast_debug(3, "WebSocket established\n");
1847
1848 for (v = upgrade_headers; v; v = v->next) {
1849 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1850 }
1851 for (v = get_params; v; v = v->next) {
1852 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1853 }
1854
1855 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1856 if (!connection_id) {
1857 /*
1858 * This can't really happen because websocket_http_callback won't
1859 * let it get this far if it can't add the connection_id to the
1860 * get_params.
1861 * Just in case though...
1862 */
1863 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1866 return;
1867 }
1868
1870 if (!instance) {
1871 /*
1872 * This also can't really happen because websocket_http_callback won't
1873 * let it get this far if it can't find the instance.
1874 * Just in case though...
1875 */
1876 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1879 return;
1880 }
1881 instance->websocket = ao2_bump(ast_ws_session);
1882
1884 ao2_cleanup(instance);
1885 /*
1886 * The instance is the channel's responsibility now.
1887 * We just return here.
1888 */
1889}
1890
1891/*!
1892 * \internal
1893 *
1894 * Called by the core http server after a client connects but before
1895 * the upgrade from HTTP to Websocket. We need to save the URI in
1896 * the CONNECTION_ID in a get_param because it contains the connection UUID
1897 * we gave to the client when they used externalMedia to create the channel.
1898 * incoming_ws_established_cb() will use this to retrieve the chan_websocket
1899 * instance.
1900 */
1902 const struct ast_http_uri *urih, const char *uri,
1903 enum ast_http_method method, struct ast_variable *get_params,
1904 struct ast_variable *headers)
1905{
1906 struct ast_http_uri fake_urih = {
1908 };
1909 int res = 0;
1910 /*
1911 * Normally the http server will destroy the get_params
1912 * when the session ends but if there weren't any initially
1913 * and we create some and add them to the list, the http server
1914 * won't know about it so we have to destroy it ourselves.
1915 */
1916 int destroy_get_params = (get_params == NULL);
1917 struct ast_variable *v = NULL;
1918 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1919
1920 ast_debug(2, "URI: %s Starting\n", uri);
1921
1922 /*
1923 * The client will have issued the GET request with a URI of
1924 * /media/<connection_id>
1925 *
1926 * Since this callback is registered for the /media URI prefix the
1927 * http server will strip that off the front of the URI passing in
1928 * only the path components after that in the 'uri' parameter.
1929 * This should leave only the connection id without a leading '/'.
1930 */
1931 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1932 if (!instance) {
1933 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1934 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1935 return -1;
1936 }
1937
1938 /*
1939 * We don't allow additional connections using the same connection id.
1940 */
1941 if (instance->websocket) {
1942 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1943 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1944 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1945 return -1;
1946 }
1947
1948 v = ast_variable_new("CONNECTION_ID", uri, "");
1949 if (!v) {
1950 ast_http_error(ser, 500, "Server error", "");
1951 return -1;
1952 }
1953 ast_variable_list_append(&get_params, v);
1954
1955 for (v = get_params; v; v = v->next) {
1956 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1957 }
1958
1959 /*
1960 * This will ultimately call internal_ws_established_cb() so
1961 * this function will block until the websocket is closed and
1962 * internal_ws_established_cb() returns;
1963 */
1964 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1965 get_params, headers);
1966 if (destroy_get_params) {
1967 ast_variables_destroy(get_params);
1968 }
1969
1970 ast_debug(2, "URI: %s DONE\n", uri);
1971
1972 return res;
1973}
1974
1975static struct ast_http_uri http_uri = {
1977 .description = "Media over Websocket",
1978 .uri = "media",
1979 .has_subtree = 1,
1980 .data = NULL,
1981 .key = __FILE__,
1982 .no_decode_uri = 1,
1983};
1984
1988
1989static int global_control_message_format_from_str(const struct aco_option *opt,
1990 struct ast_variable *var, void *obj)
1991{
1992 struct webchan_conf_global *cfg = obj;
1993
1995
1997 ast_log(LOG_ERROR, "chan_websocket.conf: Invalid value '%s' for "
1998 "control_mesage_format. Must be 'plain-text' or 'json'\n",
1999 var->value);
2000 return -1;
2001 }
2002
2003 return 0;
2004}
2005
2006static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
2007{
2008 const struct webchan_conf_global *cfg = obj;
2009
2011
2012 return 0;
2013}
2014
2015static void *global_alloc(const char *name)
2016{
2018 sizeof(*cfg), NULL);
2019
2020 if (!cfg) {
2021 return NULL;
2022 }
2023
2024 return cfg;
2025}
2026
2027static int global_apply(const struct ast_sorcery *sorcery, void *obj)
2028{
2029 struct webchan_conf_global *cfg = obj;
2030
2031 ast_debug(1, "control_msg_format: %s\n",
2033
2034 return 0;
2035}
2036
2037static int load_config(void)
2038{
2039 ast_debug(2, "Initializing Websocket Client Configuration\n");
2041 if (!sorcery) {
2042 ast_log(LOG_ERROR, "Failed to open sorcery\n");
2043 return -1;
2044 }
2045
2046 ast_sorcery_apply_default(sorcery, "global", "config",
2047 "chan_websocket.conf,criteria=type=global,single_object=yes,explicit_name=global");
2048
2050 ast_log(LOG_ERROR, "Failed to register chan_websocket global object with sorcery\n");
2052 sorcery = NULL;
2053 return -1;
2054 }
2055
2056 ast_sorcery_object_field_register_nodoc(sorcery, "global", "type", "", OPT_NOOP_T, 0, 0);
2057 ast_sorcery_register_cust(global, control_message_format, "plain-text");
2058
2060
2061 return 0;
2062}
2063
2064/*! \brief Function called when our module is unloaded */
2065static int unload_module(void)
2066{
2070
2074
2076 instances = NULL;
2077
2079 sorcery = NULL;
2080
2081 return 0;
2082}
2083
2084static int reload_module(void)
2085{
2086 ast_debug(2, "Reloading chan_websocket configuration\n");
2088
2089 return 0;
2090}
2091
2092/*! \brief Function called when our module is loaded */
2093static int load_module(void)
2094{
2095 int res = 0;
2096 struct ast_websocket_protocol *protocol;
2097
2098 res = load_config();
2099 if (res != 0) {
2101 }
2102
2105 }
2106
2109 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
2110 unload_module();
2112 }
2113
2115 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
2116 instance_proxy_sort_fn, instance_proxy_cmp_fn);
2117 if (!instances) {
2119 "Failed to allocate the chan_websocket instance registry\n");
2120 unload_module();
2122 }
2123
2125 if (!ast_ws_server) {
2126 unload_module();
2128 }
2129
2130 protocol = ast_websocket_sub_protocol_alloc("media");
2131 if (!protocol) {
2132 unload_module();
2134 }
2137
2139
2141}
2142
2143AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Websocket Media Channel",
2144 .support_level = AST_MODULE_SUPPORT_CORE,
2145 .load = load_module,
2146 .unload = unload_module,
2148 .load_pri = AST_MODPRI_CHANNEL_DRIVER,
2149 .requires = "res_http_websocket,res_websocket_client",
2150);
char digit
enum queue_result id
Definition app_queue.c:1790
#define var
Definition ast_expr2f.c:605
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ast_log
Definition astobj2.c:42
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
Definition astobj2.h:579
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition astobj2.h:365
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition astobj2.h:2064
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
Definition astobj2.h:550
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition astobj2.h:2032
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition astobj2.h:1211
#define AST_CAUSE_FAILURE
Definition causes.h:150
#define AST_CAUSE_NORMAL
Definition causes.h:151
#define AST_CAUSE_NETWORK_OUT_OF_ORDER
Definition causes.h:121
#define AST_CAUSE_NO_ROUTE_DESTINATION
Definition causes.h:100
static PGresult * result
Definition cel_pgsql.c:84
static const char type[]
static int set_instance_translator(struct websocket_pvt *instance)
#define FLUSH_MEDIA
static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause, enum ast_websocket_status_code tech_cause, int line, const char *function)
#define QUEUE_LENGTH_XON_LEVEL
#define MARK_MEDIA
#define send_event(_instance, _event,...)
Use this macro to create and send events passing in any event-specific parameters.
static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
#define ANSWER_CHANNEL
static void instance_proxy_cb(void *weakproxy, void *data)
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static void * global_alloc(const char *name)
static const char * control_msg_format_to_str(enum webchan_control_msg_format value)
static int validate_uri_parameters(const char *uri_params)
static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
Function called when we should write a frame to the channel.
static int webchan_hangup(struct ast_channel *ast)
static struct ast_channel_tech websocket_tech
#define HANGUP_CHANNEL
static struct ast_frame * webchan_read(struct ast_channel *ast)
@ OPT_ARG_WS_PASSTHROUGH
@ OPT_ARG_WS_URI_PARAM
@ OPT_ARG_WS_MSG_FORMAT
@ OPT_ARG_WS_CODEC
@ OPT_ARG_WS_NO_AUTO_ANSWER
@ OPT_ARG_ARRAY_SIZE
#define websocket_request_hangup(_instance, _cause, _tech)
#define MEDIA_WEBSOCKET_CONNECTION_ID
#define WS_WEBSOCKET_FDNO
static int read_from_ws_and_queue(struct websocket_pvt *instance)
@ WEBCHAN_CONTROL_MSG_FORMAT_JSON
@ WEBCHAN_CONTROL_MSG_FORMAT_PLAIN
@ WEBCHAN_CONTROL_MSG_FORMAT_INVALID
static int handle_command(struct websocket_pvt *instance, char *buffer)
static int global_control_message_format_from_str(const struct aco_option *opt, struct ast_variable *var, void *obj)
static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
static struct ast_http_uri http_uri
static int reload_module(void)
#define GET_DRIVER_STATUS
static int set_channel_variables(struct websocket_pvt *instance)
@ OPT_WS_CODEC
@ OPT_WS_PASSTHROUGH
@ OPT_WS_URI_PARAM
@ OPT_WS_MSG_FORMAT
@ OPT_WS_NO_AUTO_ANSWER
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static void websocket_destructor(void *data)
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
#define WS_TIMER_FDNO
static struct ast_sorcery * sorcery
#define START_MEDIA_BUFFERING
static int set_channel_timer(struct websocket_pvt *instance)
#define QUEUE_LENGTH_XOFF_LEVEL
static const struct ast_app_option websocket_options[128]
#define PAUSE_MEDIA
#define create_event(_instance, _event,...)
Use this macro to create events passing in any event-specific parameters.
static struct ao2_container * instances
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
static void set_channel_format(struct websocket_pvt *instance, struct ast_format *fmt)
#define INCOMING_CONNECTION_ID
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
#define REPORT_QUEUE_DRAINED
#define CONTINUE_MEDIA
#define STOP_MEDIA_BUFFERING
static int load_module(void)
Function called when our module is loaded.
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout)
static enum webchan_control_msg_format control_msg_format_from_str(const char *value)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static int set_instance_silence_frame(struct websocket_pvt *instance)
static struct ast_websocket_server * ast_ws_server
static int unload_module(void)
Function called when our module is unloaded.
static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
#define MAX_TEXT_MESSAGE_LEN
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
#define QUEUE_LENGTH_MAX
static int websocket_handoff_to_channel(struct websocket_pvt *instance)
static struct ast_channel * webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
static int load_config(void)
static int global_apply(const struct ast_sorcery *sorcery, void *obj)
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
const char * ast_channel_name(const struct ast_channel *chan)
int ast_channel_tech_hangupcause(const struct ast_channel *chan)
void * ast_channel_tech_pvt(const struct ast_channel *chan)
struct ast_format * ast_channel_rawreadformat(struct ast_channel *chan)
void ast_channel_tech_hangupcause_set(struct ast_channel *chan, int value)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition channel.h:1299
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
int ast_channel_fdno(const struct ast_channel *chan)
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
Definition channel.c:570
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
Definition channel.c:1288
struct ast_flags * ast_channel_flags(struct ast_channel *chan)
int ast_set_read_format(struct ast_channel *chan, struct ast_format *format)
Sets read format on channel chan.
Definition channel.c:5757
int ast_queue_hangup_with_cause(struct ast_channel *chan, int cause)
Queue a hangup frame with hangupcause set.
Definition channel.c:1212
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
Definition channel.c:539
#define ast_channel_unref(c)
Decrease channel reference count.
Definition channel.h:3018
const char * ast_cause2str(int cause) attribute_pure
Gives the string form of a given cause code.
Definition channel.c:612
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
Definition channel.c:2416
@ AST_FLAG_DISABLE_DEVSTATE_CACHE
Definition channel.h:1049
void ast_channel_hangupcause_set(struct ast_channel *chan, int value)
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
Definition channel.h:2983
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
struct ast_format * ast_channel_readformat(struct ast_channel *chan)
@ AST_STATE_DOWN
@ AST_MEDIA_TYPE_UNKNOWN
Definition codec.h:31
@ OPT_NOOP_T
Type for a default handler that should do nothing.
char buf[BUFSIZE]
Definition eagi_proxy.c:66
struct ast_codec * ast_format_get_codec(const struct ast_format *format)
Get the codec associated with a format.
Definition format.c:324
unsigned int ast_format_get_minimum_bytes(const struct ast_format *format)
Get the minimum number of bytes expected in a frame for this format.
Definition format.c:374
unsigned int ast_format_get_sample_rate(const struct ast_format *format)
Get the sample rate of a media format.
Definition format.c:379
unsigned int ast_format_get_minimum_ms(const struct ast_format *format)
Get the minimum amount of media carried in this format.
Definition format.c:364
enum ast_format_cmp_res ast_format_cmp(const struct ast_format *format1, const struct ast_format *format2)
Compare two formats.
Definition format.c:201
@ AST_FORMAT_CMP_NOT_EQUAL
Definition format.h:38
const char * ast_format_get_name(const struct ast_format *format)
Get the name associated with a format.
Definition format.c:334
unsigned int ast_format_get_default_ms(const struct ast_format *format)
Get the default framing size (in milliseconds) for a format.
Definition format.c:359
int ast_format_cache_is_slinear(struct ast_format *format)
Determines if a format is one of the cached slin formats.
#define ast_format_cache_get(name)
Retrieve a named format from the cache.
struct ast_format * ast_format_cache_get_slin_by_rate(unsigned int rate)
Retrieve the best signed linear format given a sample rate.
int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_type type)
Add all codecs Asterisk knows about for a specific type to the capabilities structure.
Definition format_cap.c:216
struct ast_format * ast_format_cap_get_format(const struct ast_format_cap *cap, int position)
Get the format at a specific index.
Definition format_cap.c:400
@ AST_FORMAT_CAP_FLAG_DEFAULT
Definition format_cap.h:38
#define ast_format_cap_append(cap, format, framing)
Add format capability to capabilities structure.
Definition format_cap.h:99
#define ast_format_cap_alloc(flags)
Allocate a new ast_format_cap structure.
Definition format_cap.h:49
static const char name[]
Definition format_mp3.c:68
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
Definition func_uuid.c:52
ast_http_method
HTTP Request methods known by Asterisk.
Definition http.h:58
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
Definition http.c:721
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition http.c:664
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Definition http.c:689
int AST_OPTIONAL_API_NAME() ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
ast_websocket_status_code
Websocket Status Codes from RFC-6455.
@ AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA
@ AST_WEBSOCKET_STATUS_NORMAL
@ AST_WEBSOCKET_STATUS_GOING_AWAY
@ AST_WEBSOCKET_STATUS_INTERNAL_ERROR
int AST_OPTIONAL_API_NAME() ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int AST_OPTIONAL_API_NAME() ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
struct ast_sockaddr *AST_OPTIONAL_API_NAME() ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
int AST_OPTIONAL_API_NAME() ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
int AST_OPTIONAL_API_NAME() ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_PING
@ AST_WEBSOCKET_OPCODE_PONG
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
const char *AST_OPTIONAL_API_NAME() ast_websocket_status_to_str(enum ast_websocket_status_code code)
Convert a websocket status code to a string.
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_CLIENT
@ AST_WS_TYPE_SERVER
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME() ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int AST_OPTIONAL_API_NAME() ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
struct ast_websocket_server *AST_OPTIONAL_API_NAME() ast_websocket_server_create(void)
Creates a ast_websocket_server.
void AST_OPTIONAL_API_NAME() ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
const char *AST_OPTIONAL_API_NAME() ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
#define AST_APP_ARG(name)
Define an application argument.
#define END_OPTIONS
#define AST_APP_OPTIONS(holder, options...)
Declares an array of options for an application.
#define AST_APP_OPTION_ARG(option, flagno, argno)
Declares an application option that accepts an argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define BEGIN_OPTIONS
#define AST_APP_OPTION(option, flagno)
Declares an application option that does not accept an argument.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
Definition main/app.c:3066
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition extconf.c:1260
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
Definition main/frame.c:176
#define ast_frdup(fr)
Copies a frame.
#define ast_frfree(fr)
#define AST_MALLOCD_DATA
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
#define AST_FRIENDLY_OFFSET
Offset into a frame's data buffer.
@ AST_FRAME_CONTROL
@ AST_CONTROL_ANSWER
@ AST_CONTROL_OPTION
struct ast_frame ast_null_frame
Definition main/frame.c:79
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition json.h:600
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition json.c:585
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
#define AST_LIST_LOCK(head)
Locks a list.
Definition linkedlists.h:40
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
Definition lock.h:621
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Definition lock.h:590
int errno
@ AST_MODFLAG_LOAD_ORDER
Definition module.h:331
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition module.h:557
@ AST_MODPRI_CHANNEL_DRIVER
Definition module.h:341
@ AST_MODULE_SUPPORT_CORE
Definition module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition netsock2.h:256
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.
static int reload(void)
const char * method
Definition res_pjsip.c:1273
static struct @522 args
#define NULL
Definition resample.c:96
#define ast_sorcery_unref(sorcery)
Decrease the reference count of a sorcery structure.
Definition sorcery.h:1500
#define ast_sorcery_object_field_register_nodoc(sorcery, type, name, default_val, opt_type, flags,...)
Register a field within an object without documentation.
Definition sorcery.h:987
#define ast_sorcery_register_cust(object, option, def_value)
Register a custom field within an object.
Definition sorcery.h:1767
void ast_sorcery_load(const struct ast_sorcery *sorcery)
Inform any wizards to load persistent objects.
Definition sorcery.c:1441
void * ast_sorcery_retrieve_by_id(const struct ast_sorcery *sorcery, const char *type, const char *id)
Retrieve an object using its unique identifier.
Definition sorcery.c:1917
#define ast_sorcery_object_register(sorcery, type, alloc, transform, apply)
Register an object type.
Definition sorcery.h:837
void ast_sorcery_reload(const struct ast_sorcery *sorcery)
Inform any wizards to reload persistent objects.
Definition sorcery.c:1472
void * ast_sorcery_generic_alloc(size_t size, ao2_destructor_fn destructor)
Allocate a generic sorcery capable object.
Definition sorcery.c:1792
#define ast_sorcery_apply_default(sorcery, type, name, data)
Definition sorcery.h:476
#define ast_sorcery_open()
Open a new sorcery structure.
Definition sorcery.h:406
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Definition strings.h:223
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition utils.c:1871
Structure to pass both assignedid values to channel drivers.
Definition channel.h:606
struct ast_format_cap * capabilities
Definition channel.h:652
Main Channel structure associated with a channel.
const char * data
unsigned int sample_rate
Sample rate (number of samples carried in a second)
Definition codec.h:52
unsigned int minimum_bytes
Length in bytes of the data payload of a minimum_ms frame.
Definition codec.h:60
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
Definition codec.h:58
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
Definition codec.h:68
unsigned int minimum_ms
Minimum length of media that can be carried (in milliseconds) in a frame.
Definition codec.h:54
Structure used to handle boolean flags.
Definition utils.h:220
Format capabilities structure, holds formats + preference order + etc.
Definition format_cap.c:54
Definition of a media format.
Definition format.c:43
struct ast_format * format
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@239 data
Definition of a URI handler.
Definition http.h:102
ast_http_callback callback
Definition http.h:107
void * data
Definition http.h:116
JSON parsing error information.
Definition json.h:887
Abstract JSON element (object, array, string, int, ...).
Socket address structure.
Definition netsock2.h:97
Full structure for sorcery.
Definition sorcery.c:231
describes a server instance
Definition tcptls.h:151
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
Structure definition for session.
char connection_id[0]
The name of the module owning this sorcery instance.
enum webchan_control_msg_format control_msg_format
char connection_id[0]
struct ast_format * native_format
struct websocket_pvt::@145 frame_queue
struct ast_channel * channel
struct ast_codec * slin_codec
enum webchan_control_msg_format control_msg_format
enum ast_websocket_type type
struct ast_timer * timer
struct ast_format * slin_format
struct ast_frame silence
struct ast_codec * native_codec
struct ast_websocket_client * client
struct ast_websocket * websocket
struct ast_trans_pvt * translator
static struct aco_type global
static struct test_options options
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
Definition timing.c:154
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
Definition timing.c:171
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
Definition timing.c:166
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
Definition timing.c:186
struct ast_timer * ast_timer_open(void)
Open a timer.
Definition timing.c:122
@ AST_TIMING_EVENT_EXPIRED
Definition timing.h:58
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Definition timing.c:161
struct ast_frame * ast_translate(struct ast_trans_pvt *tr, struct ast_frame *f, int consume)
translates one or more frames Apply an input frame into the translator and receive zero or one output...
Definition translate.c:566
void ast_translator_free_path(struct ast_trans_pvt *tr)
Frees a translator path Frees the given translator path structure.
Definition translate.c:476
struct ast_trans_pvt * ast_translator_build_path(struct ast_format *dest, struct ast_format *source)
Builds a translator path Build a path (possibly NULL) from source to dest.
Definition translate.c:486
#define ast_test_flag(p, flag)
Definition utils.h:64
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define MIN(a, b)
Definition utils.h:252
#define ast_set_flag(p, flag)
Definition utils.h:71
int ast_uri_verify_encoded(const char *string)
Verify if a string is valid as a URI component.
Definition utils.c:781
#define AST_UUID_STR_LEN
Definition uuid.h:27
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
Definition uuid.c:141
void ast_websocket_client_add_uri_params(struct ast_websocket_client *wc, const char *uri_params)
Add additional parameters to the URI.
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.

◆ FLUSH_MEDIA

#define FLUSH_MEDIA   "FLUSH_MEDIA"

Definition at line 124 of file chan_websocket.c.

◆ GET_DRIVER_STATUS

#define GET_DRIVER_STATUS   "GET_STATUS"

Definition at line 125 of file chan_websocket.c.

◆ HANGUP_CHANNEL

#define HANGUP_CHANNEL   "HANGUP"

Definition at line 120 of file chan_websocket.c.

◆ INCOMING_CONNECTION_ID

#define INCOMING_CONNECTION_ID   "INCOMING"

Definition at line 117 of file chan_websocket.c.

◆ MARK_MEDIA

#define MARK_MEDIA   "MARK_MEDIA"

Definition at line 123 of file chan_websocket.c.

◆ MAX_TEXT_MESSAGE_LEN

#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))

Definition at line 133 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_CONNECTION_ID

#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"

Definition at line 116 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE

#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"

Definition at line 115 of file chan_websocket.c.

◆ PAUSE_MEDIA

#define PAUSE_MEDIA   "PAUSE_MEDIA"

Definition at line 127 of file chan_websocket.c.

◆ QUEUE_LENGTH_MAX

#define QUEUE_LENGTH_MAX   1000

Definition at line 130 of file chan_websocket.c.

◆ QUEUE_LENGTH_XOFF_LEVEL

#define QUEUE_LENGTH_XOFF_LEVEL   900

Definition at line 131 of file chan_websocket.c.

◆ QUEUE_LENGTH_XON_LEVEL

#define QUEUE_LENGTH_XON_LEVEL   800

Definition at line 132 of file chan_websocket.c.

◆ REPORT_QUEUE_DRAINED

#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"

Definition at line 126 of file chan_websocket.c.

◆ send_event

#define send_event (   _instance,
  _event,
  ... 
)

Use this macro to create and send events passing in any event-specific parameters.

Definition at line 429 of file chan_websocket.c.

430 { \
431 int _res = -1; \
432 char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
433 if (_payload && _instance->websocket) { \
434 _res = ast_websocket_write_string(_instance->websocket, _payload); \
435 if (_res != 0) { \
436 ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
437 ast_channel_name(instance->channel), _payload); \
438 } else { \
439 ast_debug(3, "%s: Sent %s\n", \
440 ast_channel_name(instance->channel), _payload); \
441 }\
442 ast_free(_payload); \
443 } \
444 (_res); \
445})

◆ START_MEDIA_BUFFERING

#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"

Definition at line 121 of file chan_websocket.c.

◆ STOP_MEDIA_BUFFERING

#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"

Definition at line 122 of file chan_websocket.c.

◆ websocket_request_hangup

#define websocket_request_hangup (   _instance,
  _cause,
  _tech 
)     _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__)

Definition at line 146 of file chan_websocket.c.

◆ WS_TIMER_FDNO

#define WS_TIMER_FDNO   (AST_EXTENDED_FDS + 1)

Definition at line 112 of file chan_websocket.c.

◆ WS_WEBSOCKET_FDNO

#define WS_WEBSOCKET_FDNO   (AST_EXTENDED_FDS + 2)

Definition at line 113 of file chan_websocket.c.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
OPT_WS_CODEC 
OPT_WS_NO_AUTO_ANSWER 
OPT_WS_URI_PARAM 
OPT_WS_PASSTHROUGH 
OPT_WS_MSG_FORMAT 

Definition at line 1577 of file chan_websocket.c.

1577 {
1578 OPT_WS_CODEC = (1 << 0),
1579 OPT_WS_NO_AUTO_ANSWER = (1 << 1),
1580 OPT_WS_URI_PARAM = (1 << 2),
1581 OPT_WS_PASSTHROUGH = (1 << 3),
1582 OPT_WS_MSG_FORMAT = (1 << 4),
1583};

◆ anonymous enum

anonymous enum
Enumerator
OPT_ARG_WS_CODEC 
OPT_ARG_WS_NO_AUTO_ANSWER 
OPT_ARG_WS_URI_PARAM 
OPT_ARG_WS_PASSTHROUGH 
OPT_ARG_WS_MSG_FORMAT 
OPT_ARG_ARRAY_SIZE 

Definition at line 1585 of file chan_websocket.c.

◆ webchan_control_msg_format

Enumerator
WEBCHAN_CONTROL_MSG_FORMAT_PLAIN 
WEBCHAN_CONTROL_MSG_FORMAT_JSON 
WEBCHAN_CONTROL_MSG_FORMAT_INVALID 

Definition at line 56 of file chan_websocket.c.

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 2151 of file chan_websocket.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 2151 of file chan_websocket.c.

◆ _create_event_DTMF_END()

static char * _create_event_DTMF_END ( struct websocket_pvt instance,
const char  digit 
)
static

Definition at line 315 of file chan_websocket.c.

317{
318 char *payload = NULL;
320 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s#}",
321 "event", "DTMF_END",
322 "channel_id", ast_channel_uniqueid(instance->channel),
323 "digit", &digit, 1
324 );
325 if (!msg) {
326 return NULL;
327 }
329 ast_json_unref(msg);
330 } else {
331 ast_asprintf(&payload, "%s digit:%c channel_id:%s",
332 "DTMF_END", digit, ast_channel_uniqueid(instance->channel));
333 }
334
335 return payload;
336}
const char * ast_channel_uniqueid(const struct ast_channel *chan)
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
@ AST_JSON_COMPACT
Definition json.h:793
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition json.c:484

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, digit, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_ERROR()

static char * _create_event_ERROR ( struct websocket_pvt instance,
const char *  format,
  ... 
)
static

Definition at line 383 of file chan_websocket.c.

385{
386 char *payload = NULL;
387 char *error_text = NULL;
388 va_list ap;
389 int res = 0;
390
391 va_start(ap, format);
392 res = ast_vasprintf(&error_text, format, ap);
393 va_end(ap);
394 if (res < 0 || !error_text) {
395 return NULL;
396 }
397
399 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
400 "event", "ERROR",
401 "channel_id", ast_channel_uniqueid(instance->channel),
402 "error_text", error_text);
403 ast_free(error_text);
404 if (!msg) {
405 return NULL;
406 }
408 ast_json_unref(msg);
409 } else {
410 ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
411 "ERROR", ast_channel_uniqueid(instance->channel), error_text);
412 ast_free(error_text);
413 }
414
415 return payload;
416}
#define ast_vasprintf(ret, fmt, ap)
A wrapper for vasprintf()
Definition astmm.h:278

References ast_asprintf, ast_channel_uniqueid(), ast_free, AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), ast_vasprintf, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_BUFFERING_COMPLETED()

static char * _create_event_MEDIA_BUFFERING_COMPLETED ( struct websocket_pvt instance,
const char *  id 
)
static

Definition at line 255 of file chan_websocket.c.

257{
258 char *payload = NULL;
260 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
261 "event", "MEDIA_BUFFERING_COMPLETED",
262 "channel_id", ast_channel_uniqueid(instance->channel),
263 "correlation_id", S_OR(id, "")
264 );
265 if (!msg) {
266 return NULL;
267 }
269 ast_json_unref(msg);
270 } else {
271 ast_asprintf(&payload, "%s%s%s",
272 "MEDIA_BUFFERING_COMPLETED",
273 S_COR(id, " ",""), S_OR(id, ""));
274
275 }
276
277 return payload;
278}
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
Definition strings.h:87

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, S_COR, S_OR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_MARK_PROCESSED()

static char * _create_event_MEDIA_MARK_PROCESSED ( struct websocket_pvt instance,
const char *  id 
)
static

Definition at line 285 of file chan_websocket.c.

287{
288 char *payload = NULL;
290 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
291 "event", "MEDIA_MARK_PROCESSED",
292 "channel_id", ast_channel_uniqueid(instance->channel),
293 "correlation_id", S_OR(id, "")
294 );
295 if (!msg) {
296 return NULL;
297 }
299 ast_json_unref(msg);
300 } else {
301 ast_asprintf(&payload, "%s%s%s",
302 "MEDIA_MARK_PROCESSED",
303 S_COR(id, " ",""), S_OR(id, ""));
304
305 }
306
307 return payload;
308}

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, S_COR, S_OR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_START()

static char * _create_event_MEDIA_START ( struct websocket_pvt instance)
static

Definition at line 214 of file chan_websocket.c.

215{
216 char *payload = NULL;
217
219 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s, s:s, s:s, s:i, s:i, s:o }",
220 "event", "MEDIA_START",
221 "connection_id", instance->connection_id,
222 "channel", ast_channel_name(instance->channel),
223 "channel_id", ast_channel_uniqueid(instance->channel),
224 "format", ast_format_get_name(instance->native_format),
225 "optimal_frame_size", instance->optimal_frame_size,
226 "ptime", instance->native_codec->default_ms,
227 "channel_variables", ast_json_channel_vars(ast_channel_varshead(
228 instance->channel))
229 );
230 if (!msg) {
231 return NULL;
232 }
234 ast_json_unref(msg);
235 } else {
236 ast_asprintf(&payload, "%s %s:%s %s:%s %s:%s %s:%s %s:%d %s:%d",
237 "MEDIA_START",
238 "connection_id", instance->connection_id,
239 "channel", ast_channel_name(instance->channel),
240 "channel_id", ast_channel_uniqueid(instance->channel),
241 "format", ast_format_get_name(instance->native_format),
242 "optimal_frame_size", instance->optimal_frame_size,
243 "ptime", instance->native_codec->default_ms
244 );
245 }
246
247 return payload;
248}
struct varshead * ast_channel_varshead(struct ast_channel *chan)
struct ast_json * ast_json_channel_vars(struct varshead *channelvars)
Construct a JSON object from a ast_var_t list.
Definition json.c:941

References ast_asprintf, ast_channel_name(), ast_channel_uniqueid(), ast_channel_varshead(), ast_format_get_name(), ast_json_channel_vars(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::connection_id, websocket_pvt::control_msg_format, ast_codec::default_ms, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, websocket_pvt::optimal_frame_size, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_nodata()

static char * _create_event_nodata ( struct websocket_pvt instance,
char *  event 
)
static

Definition at line 186 of file chan_websocket.c.

187{
188 char *payload = NULL;
190 struct ast_json * msg = ast_json_pack("{ s:s s:s }",
191 "event", event,
192 "channel_id", ast_channel_uniqueid(instance->channel));
193 if (!msg) {
194 return NULL;
195 }
197 ast_json_unref(msg);
198 } else {
199 payload = ast_strdup(event);
200 }
201
202 return payload;
203}

References ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), ast_strdup, websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_STATUS()

static char * _create_event_STATUS ( struct websocket_pvt instance)
static

Definition at line 343 of file chan_websocket.c.

344{
345 char *payload = NULL;
346
348 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:i, s:i, s:i, s:b, s:b, s:b }",
349 "event", "STATUS",
350 "channel_id", ast_channel_uniqueid(instance->channel),
351 "queue_length", instance->frame_queue_length,
352 "xon_level", QUEUE_LENGTH_XON_LEVEL,
353 "xoff_level", QUEUE_LENGTH_XOFF_LEVEL,
354 "queue_full", instance->queue_full,
355 "bulk_media", instance->bulk_media_in_progress,
356 "media_paused", instance->queue_paused
357 );
358 if (!msg) {
359 return NULL;
360 }
362 ast_json_unref(msg);
363 } else {
364 ast_asprintf(&payload, "%s channel_id:%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
365 "STATUS",
366 ast_channel_uniqueid(instance->channel),
369 S_COR(instance->queue_full, "true", "false"),
370 S_COR(instance->bulk_media_in_progress, "true", "false"),
371 S_COR(instance->queue_paused, "true", "false")
372 );
373 }
374
375 return payload;
376}

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::bulk_media_in_progress, websocket_pvt::channel, websocket_pvt::control_msg_format, websocket_pvt::frame_queue_length, NULL, websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, QUEUE_LENGTH_XON_LEVEL, websocket_pvt::queue_paused, S_COR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _websocket_request_hangup()

static void _websocket_request_hangup ( struct websocket_pvt instance,
int  ast_cause,
enum ast_websocket_status_code  tech_cause,
int  line,
const char *  function 
)
static

Definition at line 1180 of file chan_websocket.c.

1182{
1183 if (!instance || !instance->channel) {
1184 return;
1185 }
1186 ast_debug(3, "%s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d)",
1187 ast_channel_name(instance->channel), instance->remote_addr,
1188 function, line,
1189 ast_cause2str(ast_cause), ast_cause, ast_websocket_status_to_str(tech_cause), tech_cause);
1190
1191 if (tech_cause) {
1192 ast_channel_tech_hangupcause_set(instance->channel, tech_cause);
1193 }
1194 ast_queue_hangup_with_cause(instance->channel, ast_cause);
1195}

References ast_cause2str(), ast_channel_name(), ast_channel_tech_hangupcause_set(), ast_debug, ast_queue_hangup_with_cause(), ast_websocket_status_to_str(), websocket_pvt::channel, and websocket_pvt::remote_addr.

◆ AST_MODULE_SELF_SYM()

struct ast_module * AST_MODULE_SELF_SYM ( void  )

Definition at line 2151 of file chan_websocket.c.

◆ control_msg_format_from_str()

static enum webchan_control_msg_format control_msg_format_from_str ( const char *  value)
static

◆ control_msg_format_to_str()

static const char * control_msg_format_to_str ( enum webchan_control_msg_format  value)
static

Definition at line 173 of file chan_websocket.c.

174{
176 return NULL;
177 }
178 return msg_format_map[value];
179}
#define ARRAY_IN_BOUNDS(v, a)
Checks to see if value is within the bounds of the given array.
Definition utils.h:727

References ARRAY_IN_BOUNDS, msg_format_map, NULL, and value.

Referenced by global_apply(), and global_control_message_format_to_str().

◆ dequeue_frame()

static struct ast_frame * dequeue_frame ( struct websocket_pvt instance)
static

Definition at line 464 of file chan_websocket.c.

465{
466 struct ast_frame *queued_frame = NULL;
467 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
469
470 /*
471 * If the queue is paused, don't read a frame. Processing
472 * will continue down the function and a silence frame will
473 * be sent in its place.
474 */
475 if (instance->queue_paused) {
476 return NULL;
477 }
478
479 /*
480 * We need to check if we need to send an XON before anything
481 * else because there are multiple escape paths in this function
482 * and we don't want to accidentally keep the queue in a "full"
483 * state.
484 */
485 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
486 instance->queue_full = 0;
487 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
488 ast_channel_name(instance->channel));
489 send_event(instance, MEDIA_XON);
490 }
491
492 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
493
494 /*
495 * If there are no frames in the queue, we need to
496 * return NULL so we can send a silence frame. We also need
497 * to send the QUEUE_DRAINED notification if we were requested
498 * to do so.
499 */
500 if (!queued_frame) {
501 if (instance->report_queue_drained) {
502 instance->report_queue_drained = 0;
503 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
504 ast_channel_name(instance->channel));
505 send_event(instance, QUEUE_DRAINED);
506 }
507 return NULL;
508 }
509
510 /*
511 * The only way a control frame could be present here is as
512 * a result of us calling queue_option_frame() in response
513 * to an incoming TEXT command from the websocket.
514 * We'll be safe and make sure it's a AST_CONTROL_OPTION
515 * frame anyway.
516 *
517 * It's quite possible that there are multiple control frames
518 * in a row in the queue so we need to process consecutive ones
519 * immediately.
520 *
521 * In any case, processing a control frame MUST not use up
522 * a media timeslot so after all control frames have been
523 * processed, we need to read an audio frame and process it.
524 */
525 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
526 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
527 /*
528 * We just need to send the data to the websocket.
529 * The data should already be NULL terminated.
530 */
532 queued_frame->data.ptr);
533 ast_debug(4, "%s: Sent %s\n",
534 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
535 }
536 /*
537 * We do NOT send these to the core so we need to free
538 * the frame and grab the next one. If it's also a
539 * control frame, we need to process it otherwise
540 * continue down in the function.
541 */
542 ast_frame_free(queued_frame, 0);
543 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
544 /*
545 * Jut FYI... We didn't bump the queue length when we added the control
546 * frames so we don't need to decrement it here.
547 */
548 }
549
550 /*
551 * If, after reading all control frames, there are no frames
552 * left in the queue, we need to return NULL so we can send
553 * a silence frame.
554 */
555 if (!queued_frame) {
556 return NULL;
557 }
558
559 instance->frame_queue_length--;
560
561 return queued_frame;
562}

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, ast_frame_free(), AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_websocket_write_string(), websocket_pvt::channel, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, ast_frame_subclass::integer, NULL, ast_frame::ptr, websocket_pvt::queue_full, QUEUE_LENGTH_XON_LEVEL, websocket_pvt::queue_paused, websocket_pvt::report_queue_drained, SCOPED_LOCK, send_event, ast_frame::subclass, and websocket_pvt::websocket.

Referenced by webchan_read().

◆ global_alloc()

static void * global_alloc ( const char *  name)
static

Definition at line 2016 of file chan_websocket.c.

2017{
2019 sizeof(*cfg), NULL);
2020
2021 if (!cfg) {
2022 return NULL;
2023 }
2024
2025 return cfg;
2026}

References ast_sorcery_generic_alloc(), and NULL.

Referenced by load_config().

◆ global_apply()

static int global_apply ( const struct ast_sorcery sorcery,
void *  obj 
)
static

Definition at line 2028 of file chan_websocket.c.

2029{
2030 struct webchan_conf_global *cfg = obj;
2031
2032 ast_debug(1, "control_msg_format: %s\n",
2034
2035 return 0;
2036}

References ast_debug, webchan_conf_global::control_msg_format, and control_msg_format_to_str().

Referenced by load_config().

◆ global_control_message_format_from_str()

static int global_control_message_format_from_str ( const struct aco_option opt,
struct ast_variable var,
void *  obj 
)
static

Definition at line 1990 of file chan_websocket.c.

1992{
1993 struct webchan_conf_global *cfg = obj;
1994
1996
1998 ast_log(LOG_ERROR, "chan_websocket.conf: Invalid value '%s' for "
1999 "control_mesage_format. Must be 'plain-text' or 'json'\n",
2000 var->value);
2001 return -1;
2002 }
2003
2004 return 0;
2005}

References ast_log, webchan_conf_global::control_msg_format, control_msg_format_from_str(), LOG_ERROR, var, and WEBCHAN_CONTROL_MSG_FORMAT_INVALID.

◆ global_control_message_format_to_str()

static int global_control_message_format_to_str ( const void *  obj,
const intptr_t *  args,
char **  buf 
)
static

Definition at line 2007 of file chan_websocket.c.

2008{
2009 const struct webchan_conf_global *cfg = obj;
2010
2012
2013 return 0;
2014}

References ast_strdup, buf, webchan_conf_global::control_msg_format, and control_msg_format_to_str().

◆ handle_command()

static int handle_command ( struct websocket_pvt instance,
char *  buffer 
)
static

Definition at line 758 of file chan_websocket.c.

759{
760 int res = 0;
761 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
762 const char *command = NULL;
763 char *data = NULL;
764
766 struct ast_json_error json_error;
767
768 json = ast_json_load_buf(buffer, strlen(buffer), &json_error);
769 if (!json) {
770 send_event(instance, ERROR, "Unable to parse JSON command");
771 return -1;
772 }
773 command = ast_json_object_string_get(json, "command");
774 } else {
775 command = buffer;
776 data = strchr(buffer, ' ');
777 if (data) {
778 *data = '\0';
779 data++;
780 }
781 }
782
783 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
785
786 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
788
789 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
790 if (instance->passthrough) {
791 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
792 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
793 ast_channel_name(instance->channel), command);
794 return 0;
795 }
796 AST_LIST_LOCK(&instance->frame_queue);
797 instance->bulk_media_in_progress = 1;
798 AST_LIST_UNLOCK(&instance->frame_queue);
799
800 } else if (ast_strings_equal(command, STOP_MEDIA_BUFFERING)) {
801 const char *id;
802 char *option;
803 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
805
807 id = ast_json_object_string_get(json, "correlation_id");
808 } else {
809 id = data;
810 }
811
812 if (instance->passthrough) {
813 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
814 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
815 ast_channel_name(instance->channel), command);
816 return 0;
817 }
818
819 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
821 (int)instance->leftover_len);
822
823 instance->bulk_media_in_progress = 0;
824 if (instance->leftover_len > 0) {
825 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
826 if (res != 0) {
827 return res;
828 }
829 }
830 instance->leftover_len = 0;
831 option = create_event(instance, MEDIA_BUFFERING_COMPLETED, id);
832 if (!option) {
833 return -1;
834 }
835 res = queue_option_frame(instance, option);
836 ast_free(option);
837
838 } else if (ast_strings_equal(command, MARK_MEDIA)) {
839 const char *id;
840 char *option;
841 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
843
844 if (instance->passthrough) {
845 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
846 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
847 ast_channel_name(instance->channel), command);
848 return 0;
849 }
850
852 id = ast_json_object_string_get(json, "correlation_id");
853 } else {
854 id = data;
855 }
856
857 ast_debug(4, "%s: %s %s\n",
858 ast_channel_name(instance->channel), MARK_MEDIA, id);
859
860 option = create_event(instance, MEDIA_MARK_PROCESSED, id);
861 if (!option) {
862 return -1;
863 }
864 res = queue_option_frame(instance, option);
865 ast_free(option);
866
867 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
868 struct ast_frame *frame = NULL;
869
870 if (instance->passthrough) {
871 send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
872 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
873 ast_channel_name(instance->channel), command);
874 return 0;
875 }
876
877 AST_LIST_LOCK(&instance->frame_queue);
878 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
879 ast_frfree(frame);
880 }
881 instance->frame_queue_length = 0;
882 instance->bulk_media_in_progress = 0;
883 instance->leftover_len = 0;
884 AST_LIST_UNLOCK(&instance->frame_queue);
885
886 } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
887 if (instance->passthrough) {
888 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
889 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
890 ast_channel_name(instance->channel), command);
891 return 0;
892 }
893
894 AST_LIST_LOCK(&instance->frame_queue);
895 instance->report_queue_drained = 1;
896 AST_LIST_UNLOCK(&instance->frame_queue);
897
898 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
899 return send_event(instance, STATUS);
900
901 } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
902 if (instance->passthrough) {
903 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
904 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
905 ast_channel_name(instance->channel), command);
906 return 0;
907 }
908 AST_LIST_LOCK(&instance->frame_queue);
909 instance->queue_paused = 1;
910 AST_LIST_UNLOCK(&instance->frame_queue);
911
912 } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
913 if (instance->passthrough) {
914 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
915 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
916 ast_channel_name(instance->channel), command);
917 return 0;
918 }
919 AST_LIST_LOCK(&instance->frame_queue);
920 instance->queue_paused = 0;
921 AST_LIST_UNLOCK(&instance->frame_queue);
922
923 } else {
924 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
925 ast_channel_name(instance->channel), command);
926 }
927
928 return res;
929}

References ANSWER_CHANNEL, AST_CAUSE_NORMAL, ast_channel_name(), AST_CONTROL_ANSWER, ast_debug, ast_free, ast_frfree, ast_json_load_buf(), ast_json_object_string_get, ast_json_unref(), AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log, ast_queue_control(), ast_strings_equal(), AST_WEBSOCKET_STATUS_NORMAL, websocket_pvt::bulk_media_in_progress, websocket_pvt::channel, CONTINUE_MEDIA, websocket_pvt::control_msg_format, create_event, ast_frame::data, FLUSH_MEDIA, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, GET_DRIVER_STATUS, HANGUP_CHANNEL, id, websocket_pvt::leftover_data, websocket_pvt::leftover_len, LOG_WARNING, MARK_MEDIA, NULL, websocket_pvt::passthrough, PAUSE_MEDIA, queue_frame_from_buffer(), queue_option_frame(), websocket_pvt::queue_paused, RAII_VAR, websocket_pvt::report_queue_drained, REPORT_QUEUE_DRAINED, SCOPED_LOCK, send_event, START_MEDIA_BUFFERING, STOP_MEDIA_BUFFERING, WEBCHAN_CONTROL_MSG_FORMAT_JSON, and websocket_request_hangup.

Referenced by process_text_message().

◆ incoming_ws_established_cb()

static void incoming_ws_established_cb ( struct ast_websocket ast_ws_session,
struct ast_variable get_params,
struct ast_variable upgrade_headers 
)
static

Definition at line 1839 of file chan_websocket.c.

1841{
1842 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1843 struct ast_variable *v;
1844 const char *connection_id = NULL;
1845 struct websocket_pvt *instance = NULL;
1846
1847 ast_debug(3, "WebSocket established\n");
1848
1849 for (v = upgrade_headers; v; v = v->next) {
1850 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1851 }
1852 for (v = get_params; v; v = v->next) {
1853 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1854 }
1855
1856 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1857 if (!connection_id) {
1858 /*
1859 * This can't really happen because websocket_http_callback won't
1860 * let it get this far if it can't add the connection_id to the
1861 * get_params.
1862 * Just in case though...
1863 */
1864 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1867 return;
1868 }
1869
1871 if (!instance) {
1872 /*
1873 * This also can't really happen because websocket_http_callback won't
1874 * let it get this far if it can't find the instance.
1875 * Just in case though...
1876 */
1877 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1880 return;
1881 }
1882 instance->websocket = ao2_bump(ast_ws_session);
1883
1885 ao2_cleanup(instance);
1886 /*
1887 * The instance is the channel's responsibility now.
1888 * We just return here.
1889 */
1890}

References ao2_bump, ao2_cleanup, ao2_weakproxy_find, AST_CAUSE_FAILURE, ast_debug, ast_log, ast_variable_find_in_list(), ast_websocket_close(), AST_WEBSOCKET_STATUS_INTERNAL_ERROR, ast_websocket_unref(), websocket_pvt::connection_id, instances, LOG_WARNING, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, ast_variable::value, websocket_pvt::websocket, websocket_handoff_to_channel(), and websocket_request_hangup.

Referenced by load_module().

◆ incoming_ws_http_callback()

static int incoming_ws_http_callback ( struct ast_tcptls_session_instance ser,
const struct ast_http_uri urih,
const char *  uri,
enum ast_http_method  method,
struct ast_variable get_params,
struct ast_variable headers 
)
static

Definition at line 1902 of file chan_websocket.c.

1906{
1907 struct ast_http_uri fake_urih = {
1909 };
1910 int res = 0;
1911 /*
1912 * Normally the http server will destroy the get_params
1913 * when the session ends but if there weren't any initially
1914 * and we create some and add them to the list, the http server
1915 * won't know about it so we have to destroy it ourselves.
1916 */
1917 int destroy_get_params = (get_params == NULL);
1918 struct ast_variable *v = NULL;
1919 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1920
1921 ast_debug(2, "URI: %s Starting\n", uri);
1922
1923 /*
1924 * The client will have issued the GET request with a URI of
1925 * /media/<connection_id>
1926 *
1927 * Since this callback is registered for the /media URI prefix the
1928 * http server will strip that off the front of the URI passing in
1929 * only the path components after that in the 'uri' parameter.
1930 * This should leave only the connection id without a leading '/'.
1931 */
1932 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1933 if (!instance) {
1934 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1935 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1936 return -1;
1937 }
1938
1939 /*
1940 * We don't allow additional connections using the same connection id.
1941 */
1942 if (instance->websocket) {
1943 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1944 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1945 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1946 return -1;
1947 }
1948
1949 v = ast_variable_new("CONNECTION_ID", uri, "");
1950 if (!v) {
1951 ast_http_error(ser, 500, "Server error", "");
1952 return -1;
1953 }
1954 ast_variable_list_append(&get_params, v);
1955
1956 for (v = get_params; v; v = v->next) {
1957 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1958 }
1959
1960 /*
1961 * This will ultimately call internal_ws_established_cb() so
1962 * this function will block until the websocket is closed and
1963 * internal_ws_established_cb() returns;
1964 */
1965 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1966 get_params, headers);
1967 if (destroy_get_params) {
1968 ast_variables_destroy(get_params);
1969 }
1970
1971 ast_debug(2, "URI: %s DONE\n", uri);
1972
1973 return res;
1974}

References ao2_cleanup, ao2_weakproxy_find, ast_channel_name(), ast_debug, ast_http_error(), ast_log, ast_variable_list_append, ast_variable_new, ast_variables_destroy(), ast_websocket_uri_cb(), ast_ws_server, ast_http_uri::data, instances, LOG_WARNING, method, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, and ast_variable::value.

◆ instance_proxy_cb()

static void instance_proxy_cb ( void *  weakproxy,
void *  data 
)
static

Definition at line 1347 of file chan_websocket.c.

1348{
1349 struct instance_proxy *proxy = weakproxy;
1350 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1351 ao2_unlink(instances, weakproxy);
1352}

References ao2_unlink, ast_debug, instance_proxy::connection_id, and instances.

Referenced by websocket_new().

◆ load_config()

static int load_config ( void  )
static

Definition at line 2038 of file chan_websocket.c.

2039{
2040 ast_debug(2, "Initializing Websocket Client Configuration\n");
2042 if (!sorcery) {
2043 ast_log(LOG_ERROR, "Failed to open sorcery\n");
2044 return -1;
2045 }
2046
2047 ast_sorcery_apply_default(sorcery, "global", "config",
2048 "chan_websocket.conf,criteria=type=global,single_object=yes,explicit_name=global");
2049
2051 ast_log(LOG_ERROR, "Failed to register chan_websocket global object with sorcery\n");
2053 sorcery = NULL;
2054 return -1;
2055 }
2056
2057 ast_sorcery_object_field_register_nodoc(sorcery, "global", "type", "", OPT_NOOP_T, 0, 0);
2058 ast_sorcery_register_cust(global, control_message_format, "plain-text");
2059
2061
2062 return 0;
2063}

References ast_debug, ast_log, ast_sorcery_apply_default, ast_sorcery_load(), ast_sorcery_object_field_register_nodoc, ast_sorcery_object_register, ast_sorcery_open, ast_sorcery_register_cust, ast_sorcery_unref, global, global_alloc(), global_apply(), LOG_ERROR, NULL, OPT_NOOP_T, and sorcery.

Referenced by load_module().

◆ load_module()

static int load_module ( void  )
static

Function called when our module is loaded.

Definition at line 2094 of file chan_websocket.c.

2095{
2096 int res = 0;
2097 struct ast_websocket_protocol *protocol;
2098
2099 res = load_config();
2100 if (res != 0) {
2102 }
2103
2106 }
2107
2110 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
2111 unload_module();
2113 }
2114
2116 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
2117 instance_proxy_sort_fn, instance_proxy_cmp_fn);
2118 if (!instances) {
2120 "Failed to allocate the chan_websocket instance registry\n");
2121 unload_module();
2123 }
2124
2126 if (!ast_ws_server) {
2127 unload_module();
2129 }
2130
2131 protocol = ast_websocket_sub_protocol_alloc("media");
2132 if (!protocol) {
2133 unload_module();
2135 }
2138
2140
2142}

References AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_container_alloc_hash, AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, ast_channel_register(), ast_format_cap_alloc, ast_format_cap_append_by_type(), AST_FORMAT_CAP_FLAG_DEFAULT, ast_http_uri_link(), ast_log, AST_MEDIA_TYPE_UNKNOWN, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_websocket_server_add_protocol2(), ast_websocket_server_create(), ast_websocket_sub_protocol_alloc(), ast_ws_server, ast_channel_tech::capabilities, http_uri, incoming_ws_established_cb(), instances, load_config(), LOG_ERROR, LOG_WARNING, ast_websocket_protocol::session_established, unload_module(), and websocket_tech.

◆ process_binary_message()

static int process_binary_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 964 of file chan_websocket.c.

966{
967 char *next_frame_ptr = NULL;
968 size_t bytes_read = 0;
969 int res = 0;
970 size_t bytes_left = 0;
971
972 {
973 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
975 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
976 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
977 ast_channel_name(instance->channel));
978 return 0;
979 }
980 }
981
982 next_frame_ptr = payload;
983 instance->bytes_read += payload_len;
984
985 if (instance->passthrough) {
986 res = queue_frame_from_buffer(instance, payload, payload_len);
987 return res;
988 }
989
990 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
991 /*
992 * We have leftover data from a previous websocket message.
993 * Try to make a complete frame by appending data from
994 * the current message to the leftover data.
995 */
996 char *append_ptr = instance->leftover_data + instance->leftover_len;
997 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
998 /*
999 * It's possible that even the current message doesn't have enough
1000 * data to make a complete frame.
1001 */
1002 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
1003
1004 /*
1005 * Append whatever we can to the end of the leftover data
1006 * even if it's not enough to make a complete frame.
1007 */
1008 memcpy(append_ptr, payload, bytes_avail_to_copy);
1009
1010 /*
1011 * If leftover data is still short, just return and wait for the
1012 * next websocket message.
1013 */
1014 if (bytes_avail_to_copy < bytes_needed_for_frame) {
1015 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
1016 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
1017 instance->leftover_len += bytes_avail_to_copy;
1018 return 0;
1019 }
1020
1021 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
1022 if (res < 0) {
1023 return -1;
1024 }
1025
1026 /*
1027 * We stole data from the current payload so decrement payload_len
1028 * and set the next frame pointer after the data in payload
1029 * we just copied.
1030 */
1031 payload_len -= bytes_avail_to_copy;
1032 next_frame_ptr = payload + bytes_avail_to_copy;
1033
1034 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
1035 ast_channel_name(instance->channel),
1036 instance->frame_queue_length,
1037 (int)instance->bytes_read,
1038 (int)(payload_len + bytes_avail_to_copy),
1039 (int)instance->leftover_len,
1040 payload,
1041 next_frame_ptr,
1042 (int)(next_frame_ptr - payload),
1043 (int)payload_len,
1044 (int)bytes_avail_to_copy
1045 );
1046
1047
1048 instance->leftover_len = 0;
1049 }
1050
1051 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
1052 instance->leftover_len = 0;
1053 }
1054
1055 bytes_left = payload_len;
1056 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
1057 res = queue_frame_from_buffer(instance, next_frame_ptr,
1058 instance->optimal_frame_size);
1059 if (res < 0) {
1060 break;
1061 }
1062 bytes_read += instance->optimal_frame_size;
1063 next_frame_ptr += instance->optimal_frame_size;
1064 bytes_left -= instance->optimal_frame_size;
1065 }
1066
1067 if (instance->bulk_media_in_progress && bytes_left > 0) {
1068 /*
1069 * We have a partial frame. Save the leftover data.
1070 */
1071 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
1072 ast_channel_name(instance->channel),
1073 (int)instance->bytes_read,
1074 instance->frame_queue_length,
1075 (int)payload_len,
1076 (int)instance->leftover_len,
1077 payload,
1078 next_frame_ptr,
1079 (int)(next_frame_ptr - payload),
1080 (int)bytes_left
1081 );
1082 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
1083 instance->leftover_len = bytes_left;
1084 }
1085
1086 return 0;
1087}

References ast_channel_name(), ast_debug, AST_LIST_LOCK, AST_LIST_UNLOCK, websocket_pvt::bulk_media_in_progress, websocket_pvt::bytes_read, websocket_pvt::channel, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, websocket_pvt::leftover_data, websocket_pvt::leftover_len, MIN, NULL, websocket_pvt::optimal_frame_size, websocket_pvt::passthrough, queue_frame_from_buffer(), QUEUE_LENGTH_MAX, and SCOPED_LOCK.

Referenced by read_from_ws_and_queue().

◆ process_text_message()

static int process_text_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 931 of file chan_websocket.c.

933{
934 char *command;
935
936 if (payload_len == 0) {
937 ast_log(LOG_WARNING, "%s: WebSocket TEXT message has 0 length\n",
938 ast_channel_name(instance->channel));
939 return 0;
940 }
941
942 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
943 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
944 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
945 return 0;
946 }
947
948 /*
949 * Unfortunately, payload is not NULL terminated even when it's
950 * a TEXT frame so we need to allocate a new buffer, copy
951 * the data into it, and NULL terminate it.
952 */
953 command = ast_alloca(payload_len + 1);
954 memcpy(command, payload, payload_len); /* Safe */
955 command[payload_len] = '\0';
956 command = ast_strip(command);
957
958 ast_debug(4, "%s: Received: %s\n",
959 ast_channel_name(instance->channel), command);
960
961 return handle_command(instance, command);
962}

References ast_alloca, ast_channel_name(), ast_debug, ast_log, ast_strip(), websocket_pvt::channel, handle_command(), LOG_WARNING, and MAX_TEXT_MESSAGE_LEN.

Referenced by read_from_ws_and_queue().

◆ queue_frame_from_buffer()

static int queue_frame_from_buffer ( struct websocket_pvt instance,
char *  buffer,
size_t  len 
)
static

Definition at line 687 of file chan_websocket.c.

689{
690 struct ast_frame fr = { 0, };
691 struct ast_frame *duped_frame = NULL;
692
693 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
695 fr.subclass.format = instance->native_format;
696 fr.samples = instance->native_codec->samples_count(&fr);
697
698 duped_frame = ast_frisolate(&fr);
699 if (!duped_frame) {
700 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
701 ast_channel_name(instance->channel));
702 return -1;
703 }
704
705 {
706 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
708 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
709 instance->frame_queue_length++;
710 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
711 instance->queue_full = 1;
712 send_event(instance, MEDIA_XOFF);
713 }
714 }
715
716 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
717 duped_frame->datalen);
718
719 return 0;
720}

References ast_channel_name(), ast_debug, AST_FRAME_SET_BUFFER, AST_FRAME_VOICE, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, websocket_pvt::channel, ast_frame::datalen, ast_frame_subclass::format, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, len(), LOG_WARNING, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, ast_frame::samples, ast_codec::samples_count, SCOPED_LOCK, send_event, and ast_frame::subclass.

Referenced by handle_command(), and process_binary_message().

◆ queue_option_frame()

static int queue_option_frame ( struct websocket_pvt instance,
char *  buffer 
)
static

Definition at line 722 of file chan_websocket.c.

724{
725 struct ast_frame fr = { 0, };
726 struct ast_frame *duped_frame = NULL;
727
728 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
731
732 duped_frame = ast_frisolate(&fr);
733 if (!duped_frame) {
734 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
735 ast_channel_name(instance->channel));
736 return -1;
737 }
738
739 AST_LIST_LOCK(&instance->frame_queue);
740 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
741 AST_LIST_UNLOCK(&instance->frame_queue);
742
743 ast_debug(4, "%s: Queued '%s' option frame\n",
744 ast_channel_name(instance->channel), buffer);
745
746 return 0;
747}

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, AST_FRAME_SET_BUFFER, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, websocket_pvt::channel, websocket_pvt::frame_queue, ast_frame::frametype, ast_frame_subclass::integer, LOG_WARNING, NULL, and ast_frame::subclass.

Referenced by handle_command().

◆ read_from_ws_and_queue()

static int read_from_ws_and_queue ( struct websocket_pvt instance)
static

Definition at line 1089 of file chan_websocket.c.

1090{
1091 uint64_t payload_len = 0;
1092 char *payload = NULL;
1093 enum ast_websocket_opcode opcode;
1094 int fragmented = 0;
1095 int res = 0;
1096
1097 if (!instance->websocket) {
1098 ast_log(LOG_WARNING, "%s: WebSocket session not found\n",
1099 ast_channel_name(instance->channel));
1100 return -1;
1101 }
1102
1103 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
1104 &opcode, &fragmented);
1105
1106 if (res) {
1107 ast_debug(3, "%s: WebSocket read error\n",
1108 ast_channel_name(instance->channel));
1110 return -1;
1111 }
1112 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
1113 (int)payload_len);
1114
1115 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
1116 return process_text_message(instance, payload, payload_len);
1117 }
1118
1119 if (opcode == AST_WEBSOCKET_OPCODE_PING || opcode == AST_WEBSOCKET_OPCODE_PONG) {
1120 return 0;
1121 }
1122
1123 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
1124 ast_debug(3, "%s: WebSocket closed by remote\n",
1125 ast_channel_name(instance->channel));
1127 return -1;
1128 }
1129
1130 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
1131 ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n",
1132 ast_channel_name(instance->channel), (int)opcode);
1134 return 0;
1135 }
1136
1137 return process_binary_message(instance, payload, payload_len);
1138}

References AST_CAUSE_FAILURE, AST_CAUSE_NETWORK_OUT_OF_ORDER, AST_CAUSE_NORMAL, ast_channel_name(), ast_debug, ast_log, AST_WEBSOCKET_OPCODE_BINARY, AST_WEBSOCKET_OPCODE_CLOSE, AST_WEBSOCKET_OPCODE_PING, AST_WEBSOCKET_OPCODE_PONG, AST_WEBSOCKET_OPCODE_TEXT, ast_websocket_read(), AST_WEBSOCKET_STATUS_GOING_AWAY, AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA, websocket_pvt::channel, LOG_WARNING, NULL, process_binary_message(), process_text_message(), websocket_pvt::websocket, and websocket_request_hangup.

Referenced by webchan_read().

◆ reload_module()

static int reload_module ( void  )
static

Definition at line 2085 of file chan_websocket.c.

2086{
2087 ast_debug(2, "Reloading chan_websocket configuration\n");
2089
2090 return 0;
2091}

References ast_debug, ast_sorcery_reload(), and sorcery.

◆ set_channel_format()

static void set_channel_format ( struct websocket_pvt instance,
struct ast_format fmt 
)
static

◆ set_channel_timer()

static int set_channel_timer ( struct websocket_pvt instance)
static

Definition at line 1512 of file chan_websocket.c.

1513{
1514 int rate = 0;
1515 instance->timer = ast_timer_open();
1516 if (!instance->timer) {
1517 return -1;
1518 }
1519 /* Rate is the number of ticks per second, not the interval. */
1520 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1521 ast_debug(3, "%s: WebSocket timer rate %d\n",
1522 ast_channel_name(instance->channel), rate);
1523 ast_timer_set_rate(instance->timer, rate);
1524 /*
1525 * Calling ast_channel_set_fd will cause the channel thread to call
1526 * webchan_read at 'rate' times per second.
1527 */
1529
1530 return 0;
1531}

References ast_channel_name(), ast_channel_set_fd(), ast_debug, ast_format_get_default_ms(), ast_timer_fd(), ast_timer_open(), ast_timer_set_rate(), websocket_pvt::channel, websocket_pvt::native_format, websocket_pvt::timer, and WS_TIMER_FDNO.

Referenced by webchan_request().

◆ set_channel_variables()

static int set_channel_variables ( struct websocket_pvt instance)
static

Definition at line 1533 of file chan_websocket.c.

1534{
1535 char *pkt_size = NULL;
1536 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1537 if (res <= 0) {
1538 return -1;
1539 }
1540
1542 pkt_size);
1543 ast_free(pkt_size);
1545 instance->connection_id);
1546
1547 return 0;
1548}

References ast_asprintf, ast_free, websocket_pvt::channel, websocket_pvt::connection_id, MEDIA_WEBSOCKET_CONNECTION_ID, MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE, NULL, websocket_pvt::optimal_frame_size, and pbx_builtin_setvar_helper().

Referenced by webchan_request().

◆ set_instance_silence_frame()

static int set_instance_silence_frame ( struct websocket_pvt instance)
static

Definition at line 1489 of file chan_websocket.c.

1490{
1491 instance->silence.frametype = AST_FRAME_VOICE;
1492 instance->silence.datalen =
1493 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1494 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1495 /*
1496 * Even though we'll calloc the data pointer, we don't mark it as
1497 * mallocd because this frame will be around for a while and we don't
1498 * want it accidentally freed before we're done with it.
1499 */
1500 instance->silence.mallocd = 0;
1501 instance->silence.offset = 0;
1502 instance->silence.src = __PRETTY_FUNCTION__;
1503 instance->silence.subclass.format = instance->slin_format;
1504 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1505 if (!instance->silence.data.ptr) {
1506 return -1;
1507 }
1508
1509 return 0;
1510}

References ast_calloc, AST_FRAME_VOICE, ast_frame::data, ast_frame::datalen, ast_codec::default_ms, ast_frame_subclass::format, ast_frame::frametype, ast_frame::mallocd, ast_codec::minimum_bytes, ast_codec::minimum_ms, ast_frame::offset, ast_frame::ptr, ast_frame::samples, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, ast_frame::src, and ast_frame::subclass.

Referenced by webchan_request().

◆ set_instance_translator()

static int set_instance_translator ( struct websocket_pvt instance)
static

Definition at line 1458 of file chan_websocket.c.

1459{
1461 instance->slin_format = ao2_bump(instance->native_format);
1462 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1463 return 0;
1464 }
1465
1467 if (!instance->slin_format) {
1468 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1469 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1470 return -1;
1471 }
1472 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1476
1477 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1478 if (!instance->translator) {
1479 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1481 ast_format_get_name(instance->slin_format));
1482 return -1;
1483 }
1484
1485 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1486 return 0;
1487}

References ao2_bump, ast_channel_name(), ast_debug, ast_format_cache_get_slin_by_rate(), ast_format_cache_is_slinear(), ast_format_get_codec(), ast_format_get_default_ms(), ast_format_get_name(), ast_format_get_sample_rate(), ast_log, ast_translator_build_path(), websocket_pvt::channel, LOG_ERROR, websocket_pvt::native_codec, websocket_pvt::native_format, ast_codec::sample_rate, websocket_pvt::slin_codec, websocket_pvt::slin_format, and websocket_pvt::translator.

Referenced by webchan_request().

◆ unload_module()

static int unload_module ( void  )
static

◆ validate_uri_parameters()

static int validate_uri_parameters ( const char *  uri_params)
static

Definition at line 1550 of file chan_websocket.c.

1551{
1552 char *params = ast_strdupa(uri_params);
1553 char *nvp = NULL;
1554 char *nv = NULL;
1555
1556 /*
1557 * uri_params should be a comma-separated list of key=value pairs.
1558 * For example:
1559 * name1=value1,name2=value2
1560 * We're verifying that each name and value either doesn't need
1561 * to be encoded or that it already is.
1562 */
1563
1564 while((nvp = ast_strsep(&params, ',', 0))) {
1565 /* nvp will be name1=value1 */
1566 while((nv = ast_strsep(&nvp, '=', 0))) {
1567 /* nv will be either name1 or value1 */
1568 if (!ast_uri_verify_encoded(nv)) {
1569 return 0;
1570 }
1571 }
1572 }
1573
1574 return 1;
1575}

References ast_strdupa, ast_strsep(), ast_uri_verify_encoded(), NULL, and websocket_pvt::uri_params.

Referenced by webchan_request().

◆ webchan_call()

static int webchan_call ( struct ast_channel ast,
const char *  dest,
int  timeout 
)
static

Definition at line 1236 of file chan_websocket.c.

1238{
1239 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1241
1242 if (!instance) {
1243 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1244 ast_channel_name(ast));
1246 return -1;
1247 }
1248
1249 if (instance->type == AST_WS_TYPE_SERVER) {
1250 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
1251 return 0;
1252 }
1253 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
1254
1255 if (!instance->client) {
1256 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
1257 ast_channel_name(ast));
1259 return -1;
1260 }
1261
1262 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
1263 ast_channel_name(ast), dest, instance->connection_id);
1264
1265 if (!ast_strlen_zero(instance->uri_params)) {
1267 }
1268
1269 instance->websocket = ast_websocket_client_connect(instance->client,
1270 instance, ast_channel_name(ast), &result);
1271 if (!instance->websocket || result != WS_OK) {
1272 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
1275 return -1;
1276 }
1277
1278 return websocket_handoff_to_channel(instance);
1279}

References AST_CAUSE_FAILURE, AST_CAUSE_NO_ROUTE_DESTINATION, ast_channel_hangupcause_set(), ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_log, ast_strlen_zero(), ast_websocket_client_add_uri_params(), ast_websocket_client_connect(), ast_websocket_result_to_str(), AST_WS_TYPE_SERVER, websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, LOG_WARNING, result, websocket_pvt::type, websocket_pvt::uri_params, websocket_pvt::websocket, websocket_handoff_to_channel(), and WS_OK.

◆ webchan_hangup()

static int webchan_hangup ( struct ast_channel ast)
static

Definition at line 1796 of file chan_websocket.c.

1797{
1798 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1799
1800 if (!instance) {
1801 return -1;
1802 }
1803 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1804 ast_channel_name(ast), instance->connection_id);
1805
1806 if (instance->websocket) {
1808 ast_websocket_unref(instance->websocket);
1809 instance->websocket = NULL;
1810 }
1812
1813 /* Clean up the reference from adding the instance to the channel */
1814 ao2_cleanup(instance);
1815
1816 return 0;
1817}

References ao2_cleanup, ast_channel_name(), ast_channel_tech_hangupcause(), ast_channel_tech_pvt(), ast_channel_tech_pvt_set(), ast_debug, ast_websocket_close(), ast_websocket_unref(), websocket_pvt::connection_id, NULL, and websocket_pvt::websocket.

◆ webchan_read()

static struct ast_frame * webchan_read ( struct ast_channel ast)
static

Definition at line 578 of file chan_websocket.c.

579{
580 struct websocket_pvt *instance = NULL;
581 struct ast_frame *native_frame = NULL;
582 struct ast_frame *slin_frame = NULL;
583 int fdno = ast_channel_fdno(ast);
584
585 instance = ast_channel_tech_pvt(ast);
586 if (!instance) {
587 return NULL;
588 }
589
590 if (fdno == WS_WEBSOCKET_FDNO) {
591 read_from_ws_and_queue(instance);
592 return &ast_null_frame;
593 }
594 if (fdno != WS_TIMER_FDNO) {
595 return &ast_null_frame;
596 }
597
599 ast_timer_ack(instance->timer, 1);
600 }
601
602 native_frame = dequeue_frame(instance);
603
604 /*
605 * No frame when the timer fires means we have to create and
606 * return a silence frame in its place.
607 */
608 if (!native_frame) {
609 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
610 set_channel_format(instance, instance->slin_format);
611 slin_frame = ast_frdup(&instance->silence);
612 return slin_frame;
613 }
614
615 /*
616 * If we're in passthrough mode or the frame length is already optimal_frame_size,
617 * we can just return it.
618 */
619 if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) {
620 set_channel_format(instance, instance->native_format);
621 return native_frame;
622 }
623
624 /*
625 * If we're here, we have a short frame that we need to pad
626 * with silence.
627 */
628
629 if (instance->translator) {
630 slin_frame = ast_translate(instance->translator, native_frame, 0);
631 if (!slin_frame) {
632 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
633 ast_channel_name(ast), native_frame->datalen);
634 return NULL;
635 }
636 ast_frame_free(native_frame, 0);
637 } else {
638 /*
639 * If there was no translator then the native format
640 * was already slin.
641 */
642 slin_frame = native_frame;
643 }
644
645 set_channel_format(instance, instance->slin_format);
646
647 /*
648 * So now we have an slin frame but it's probably still short
649 * so we create a new data buffer with the correct length
650 * which is filled with zeros courtesy of ast_calloc.
651 * We then copy the short frame data into the new buffer
652 * and set the offset to AST_FRIENDLY_OFFSET so that
653 * the core can read the data without any issues.
654 * If the original frame data was mallocd, we need to free the old
655 * data buffer so we don't leak memory and we need to set
656 * mallocd to AST_MALLOCD_DATA so that the core knows
657 * it needs to free the new data buffer when it's done.
658 */
659
660 if (slin_frame->datalen != instance->silence.datalen) {
661 char *old_data = slin_frame->data.ptr;
662 int old_len = slin_frame->datalen;
663 int old_offset = slin_frame->offset;
664 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
665 ast_channel_name(ast), instance->silence.datalen,
666 slin_frame->datalen);
667
668 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
669 if (!slin_frame->data.ptr) {
670 ast_frame_free(slin_frame, 0);
671 return NULL;
672 }
673 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
674 slin_frame->offset = AST_FRIENDLY_OFFSET;
675 memcpy(slin_frame->data.ptr, old_data, old_len);
676 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
677 ast_free(old_data - old_offset);
678 }
679 slin_frame->mallocd |= AST_MALLOCD_DATA;
680 slin_frame->datalen = instance->silence.datalen;
681 slin_frame->samples = instance->silence.samples;
682 }
683
684 return slin_frame;
685}

References ast_calloc, ast_channel_fdno(), ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_frame_free(), ast_frdup, ast_free, AST_FRIENDLY_OFFSET, ast_log, AST_MALLOCD_DATA, ast_null_frame, ast_timer_ack(), ast_timer_get_event(), AST_TIMING_EVENT_EXPIRED, ast_translate(), ast_frame::data, ast_frame::datalen, dequeue_frame(), LOG_WARNING, ast_frame::mallocd, websocket_pvt::native_format, NULL, ast_frame::offset, websocket_pvt::optimal_frame_size, websocket_pvt::passthrough, ast_frame::ptr, read_from_ws_and_queue(), ast_frame::samples, set_channel_format(), websocket_pvt::silence, websocket_pvt::slin_format, websocket_pvt::timer, websocket_pvt::translator, WS_TIMER_FDNO, and WS_WEBSOCKET_FDNO.

◆ webchan_request()

static struct ast_channel * webchan_request ( const char *  type,
struct ast_format_cap cap,
const struct ast_assigned_ids assignedids,
const struct ast_channel requestor,
const char *  data,
int *  cause 
)
static

Definition at line 1602 of file chan_websocket.c.

1605{
1606 char *parse;
1607 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1608 struct ast_channel *chan = NULL;
1609 struct ast_format *fmt = NULL;
1610 struct ast_format_cap *caps = NULL;
1612 AST_APP_ARG(connection_id);
1614 );
1615 struct ast_flags opts = { 0, };
1616 char *opt_args[OPT_ARG_ARRAY_SIZE];
1617 const char *requestor_name = requestor ? ast_channel_name(requestor) :
1618 (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
1619 RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
1620
1621 global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
1622
1623 ast_debug(3, "%s: WebSocket channel requested\n",
1624 requestor_name);
1625
1626 if (ast_strlen_zero(data)) {
1627 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1628 requestor_name);
1629 goto failure;
1630 }
1631 parse = ast_strdupa(data);
1632 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1633
1634 if (ast_strlen_zero(args.connection_id)) {
1635 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1636 requestor_name);
1637 goto failure;
1638 }
1639
1640 if (!ast_strlen_zero(args.options)
1641 && ast_app_parse_options(websocket_options, &opts, opt_args,
1642 ast_strdupa(args.options))) {
1643 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1644 requestor_name, args.options);
1645 goto failure;
1646 }
1647
1648 if (ast_test_flag(&opts, OPT_WS_CODEC)
1649 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1650 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1651 } else {
1652 /*
1653 * If codec wasn't specified in the dial string,
1654 * use the first format in the capabilities.
1655 */
1656 fmt = ast_format_cap_get_format(cap, 0);
1657 }
1658
1659 if (!fmt) {
1660 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1661 requestor_name, args.connection_id);
1662 goto failure;
1663 }
1664
1665 ast_debug(3, "%s: Using format %s from %s\n",
1666 requestor_name, ast_format_get_name(fmt),
1667 ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
1668
1669 instance = websocket_new(requestor_name, args.connection_id, fmt);
1670 if (!instance) {
1671 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1672 requestor_name);
1673 goto failure;
1674 }
1675
1676 instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER);
1677 if (!instance->passthrough) {
1678 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1679 }
1680
1682 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1683 char *comma;
1684
1685 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1687 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1688 requestor_name);
1689 goto failure;
1690 }
1691
1692 ast_debug(3, "%s: Using URI parameters '%s'\n",
1693 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1694
1696 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1697 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1698 args.connection_id);
1699 goto failure;
1700 }
1701
1702 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1703 comma = instance->uri_params;
1704 /*
1705 * The normal separator for query string components is an
1706 * ampersand ('&') but the Dial app interprets them as additional
1707 * channels to dial in parallel so we instruct users to separate
1708 * the parameters with commas (',') instead. We now have to
1709 * convert those commas back to ampersands.
1710 */
1711 while ((comma = strchr(comma,','))) {
1712 *comma = '&';
1713 }
1714 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1715 }
1716
1717 if (ast_test_flag(&opts, OPT_WS_MSG_FORMAT)) {
1718 instance->control_msg_format = control_msg_format_from_str(opt_args[OPT_ARG_WS_MSG_FORMAT]);
1719
1720 if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_INVALID) {
1721 ast_log(LOG_WARNING, "%s: 'f/control message format' dialstring parameter value missing or invalid. "
1722 "Defaulting to 'plain-text'\n",
1723 ast_channel_name(requestor));
1724 instance->control_msg_format = WEBCHAN_CONTROL_MSG_FORMAT_PLAIN;
1725 }
1726 } else if (global_cfg) {
1727 instance->control_msg_format = global_cfg->control_msg_format;
1728 }
1729
1730 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1731 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1732 if (!chan) {
1733 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1734 goto failure;
1735 }
1736
1737 /* Prevent device state caching as this channel involves ephemeral destinations or sources */
1739 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1740 ast_channel_name(chan), requestor_name,
1741 instance->connection_id);
1742
1743 instance->channel = ao2_bump(chan);
1744 ast_channel_tech_set(instance->channel, &websocket_tech);
1745
1746 if (set_instance_translator(instance) != 0) {
1747 goto failure;
1748 }
1749
1750 if (set_instance_silence_frame(instance) != 0) {
1751 goto failure;
1752 }
1753
1754 if (set_channel_timer(instance) != 0) {
1755 goto failure;
1756 }
1757
1758 if (set_channel_variables(instance) != 0) {
1759 goto failure;
1760 }
1761
1763 if (!caps) {
1764 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1765 goto failure;
1766 }
1767
1768 ast_format_cap_append(caps, instance->native_format, 0);
1769 ast_channel_nativeformats_set(instance->channel, caps);
1770 ast_channel_set_writeformat(instance->channel, instance->native_format);
1771 ast_channel_set_rawwriteformat(instance->channel, instance->native_format);
1772 ast_channel_set_readformat(instance->channel, instance->native_format);
1773 ast_channel_set_rawreadformat(instance->channel, instance->native_format);
1774 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1775 ast_channel_unlock(chan);
1776 ao2_cleanup(caps);
1777
1778 ast_debug(3, "%s: WebSocket channel created to %s\n",
1779 ast_channel_name(chan), args.connection_id);
1780
1781 return chan;
1782
1783failure:
1784 if (chan) {
1785 ast_channel_unlock(chan);
1786 }
1787 *cause = AST_CAUSE_FAILURE;
1788 return NULL;
1789}

References ao2_bump, ao2_cleanup, args, AST_APP_ARG, ast_app_parse_options(), AST_CAUSE_FAILURE, ast_channel_alloc, ast_channel_flags(), ast_channel_name(), ast_channel_nativeformats_set(), ast_channel_set_rawreadformat(), ast_channel_set_rawwriteformat(), ast_channel_set_readformat(), ast_channel_set_writeformat(), ast_channel_tech_pvt_set(), ast_channel_tech_set(), ast_channel_unlock, ast_debug, AST_DECLARE_APP_ARGS, AST_FLAG_DISABLE_DEVSTATE_CACHE, ast_format_cache_get, ast_format_cap_alloc, ast_format_cap_append, AST_FORMAT_CAP_FLAG_DEFAULT, ast_format_cap_get_format(), ast_format_get_name(), ast_log, AST_NONSTANDARD_APP_ARGS, ast_set_flag, ast_sorcery_retrieve_by_id(), AST_STATE_DOWN, ast_strdup, ast_strdupa, ast_strings_equal(), ast_strlen_zero(), ast_test_flag, control_msg_format_from_str(), INCOMING_CONNECTION_ID, LOG_ERROR, LOG_WARNING, NULL, OPT_ARG_ARRAY_SIZE, OPT_ARG_WS_CODEC, OPT_ARG_WS_MSG_FORMAT, OPT_ARG_WS_URI_PARAM, OPT_WS_CODEC, OPT_WS_MSG_FORMAT, OPT_WS_NO_AUTO_ANSWER, OPT_WS_PASSTHROUGH, OPT_WS_URI_PARAM, options, RAII_VAR, set_channel_timer(), set_channel_variables(), set_instance_silence_frame(), set_instance_translator(), sorcery, ast_assigned_ids::uniqueid, validate_uri_parameters(), WEBCHAN_CONTROL_MSG_FORMAT_INVALID, WEBCHAN_CONTROL_MSG_FORMAT_PLAIN, websocket_new(), websocket_options, and websocket_tech.

◆ webchan_send_dtmf_text()

static int webchan_send_dtmf_text ( struct ast_channel ast,
char  digit,
unsigned int  duration 
)
static

Definition at line 1819 of file chan_websocket.c.

1820{
1821 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1822
1823 if (!instance) {
1824 return -1;
1825 }
1826
1827 return send_event(instance, DTMF_END, digit);
1828}

References ast_channel_tech_pvt(), digit, and send_event.

◆ webchan_write()

static int webchan_write ( struct ast_channel ast,
struct ast_frame f 
)
static

Function called when we should write a frame to the channel.

Definition at line 1198 of file chan_websocket.c.

1199{
1200 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1201
1202 if (!instance || !instance->websocket) {
1203 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
1204 ast_channel_name(ast));
1205 return -1;
1206 }
1207
1208 if (f->frametype == AST_FRAME_CNG) {
1209 return 0;
1210 }
1211
1212 if (f->frametype != AST_FRAME_VOICE) {
1213 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
1214 ast_channel_name(ast));
1215 return 0;
1216 }
1217
1219 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
1222 return -1;
1223 }
1224
1226 (char *)f->data.ptr, (uint64_t)f->datalen);
1227}

References ast_channel_name(), ast_channel_tech_pvt(), ast_format_cmp(), AST_FORMAT_CMP_NOT_EQUAL, ast_format_get_name(), AST_FRAME_CNG, AST_FRAME_VOICE, ast_log, AST_WEBSOCKET_OPCODE_BINARY, ast_websocket_write(), ast_frame::data, ast_frame::datalen, ast_frame_subclass::format, ast_frame::frametype, LOG_WARNING, websocket_pvt::native_format, ast_frame::ptr, ast_frame::subclass, and websocket_pvt::websocket.

◆ websocket_destructor()

static void websocket_destructor ( void *  data)
static

Definition at line 1281 of file chan_websocket.c.

1282{
1283 struct websocket_pvt *instance = data;
1284 struct ast_frame *frame = NULL;
1285 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
1286
1287 AST_LIST_LOCK(&instance->frame_queue);
1288 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
1289 ast_frfree(frame);
1290 }
1291 AST_LIST_UNLOCK(&instance->frame_queue);
1292
1293 if (instance->timer) {
1294 ast_timer_close(instance->timer);
1295 instance->timer = NULL;
1296 }
1297
1298 if (instance->channel) {
1299 ast_channel_unref(instance->channel);
1300 instance->channel = NULL;
1301 }
1302 if (instance->websocket) {
1303 ast_websocket_unref(instance->websocket);
1304 instance->websocket = NULL;
1305 }
1306
1307 ao2_cleanup(instance->client);
1308 instance->client = NULL;
1309
1310 ao2_cleanup(instance->native_codec);
1311 instance->native_codec = NULL;
1312
1313 ao2_cleanup(instance->native_format);
1314 instance->native_format = NULL;
1315
1316 ao2_cleanup(instance->slin_codec);
1317 instance->slin_codec = NULL;
1318
1319 ao2_cleanup(instance->slin_format);
1320 instance->slin_format = NULL;
1321
1322 if (instance->silence.data.ptr) {
1323 ast_free(instance->silence.data.ptr);
1324 instance->silence.data.ptr = NULL;
1325 }
1326
1327 if (instance->translator) {
1329 instance->translator = NULL;
1330 }
1331
1332 if (instance->leftover_data) {
1333 ast_free(instance->leftover_data);
1334 instance->leftover_data = NULL;
1335 }
1336
1337 ast_free(instance->uri_params);
1338 ast_free(instance->remote_addr);
1339}

References ao2_cleanup, ast_channel_unref, ast_debug, ast_free, ast_frfree, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_timer_close(), ast_translator_free_path(), ast_websocket_unref(), websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::leftover_data, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, ast_frame::ptr, websocket_pvt::remote_addr, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, websocket_pvt::timer, websocket_pvt::translator, websocket_pvt::uri_params, and websocket_pvt::websocket.

Referenced by websocket_new().

◆ websocket_handoff_to_channel()

static int websocket_handoff_to_channel ( struct websocket_pvt instance)
static

Definition at line 1140 of file chan_websocket.c.

1141{
1142 int res = 0;
1143 int nodelay = 1;
1144 struct ast_sockaddr *remote_addr = ast_websocket_remote_address(instance->websocket);
1145
1146 instance->remote_addr = ast_strdup(ast_sockaddr_stringify(remote_addr));
1147 ast_debug(3, "%s: WebSocket connection with %s established\n",
1148 ast_channel_name(instance->channel), instance->remote_addr);
1149
1150 if (setsockopt(ast_websocket_fd(instance->websocket),
1151 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1152 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
1153 }
1154
1156
1157 res = send_event(instance, MEDIA_START);
1158 if (res != 0 ) {
1159 if (instance->type == AST_WS_TYPE_SERVER) {
1161 } else {
1162 /*
1163 * We were called by webchan_call so just need to set causes.
1164 * The core will hangup the channel.
1165 */
1168 }
1169 return -1;
1170 }
1171
1172 if (!instance->no_auto_answer) {
1173 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
1175 }
1176
1177 return 0;
1178}

References AST_CAUSE_NETWORK_OUT_OF_ORDER, ast_channel_hangupcause_set(), ast_channel_name(), ast_channel_set_fd(), ast_channel_tech_hangupcause_set(), AST_CONTROL_ANSWER, ast_debug, ast_log, ast_queue_control(), ast_sockaddr_stringify(), ast_strdup, ast_websocket_fd(), ast_websocket_remote_address(), AST_WEBSOCKET_STATUS_GOING_AWAY, AST_WS_TYPE_SERVER, websocket_pvt::channel, errno, LOG_WARNING, websocket_pvt::no_auto_answer, websocket_pvt::remote_addr, send_event, websocket_pvt::type, websocket_pvt::websocket, websocket_request_hangup, and WS_WEBSOCKET_FDNO.

Referenced by incoming_ws_established_cb(), and webchan_call().

◆ websocket_new()

static struct websocket_pvt * websocket_new ( const char *  chan_name,
const char *  connection_id,
struct ast_format fmt 
)
static

Definition at line 1354 of file chan_websocket.c.

1356{
1357 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1358 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1359 char uuid[AST_UUID_STR_LEN];
1360 enum ast_websocket_type ws_type;
1361
1362 SCOPED_AO2WRLOCK(locker, instances);
1363
1364 if (ast_strings_equal(connection_id, INCOMING_CONNECTION_ID)) {
1365 connection_id = ast_uuid_generate_str(uuid, sizeof(uuid));
1366 ws_type = AST_WS_TYPE_SERVER;
1367 } else {
1368 ws_type = AST_WS_TYPE_CLIENT;
1369 }
1370
1371 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1372 if (!proxy) {
1373 return NULL;
1374 }
1375 strcpy(proxy->connection_id, connection_id); /* Safe */
1376
1377 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1379 if (!instance) {
1380 return NULL;
1381 }
1382 strcpy(instance->connection_id, connection_id); /* Safe */
1383
1384 instance->type = ws_type;
1385 if (ws_type == AST_WS_TYPE_CLIENT) {
1386 instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id);
1387 if (!instance->client) {
1388 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1389 chan_name, instance->connection_id);
1390 return NULL;
1391 }
1392 }
1393
1394 AST_LIST_HEAD_INIT(&instance->frame_queue);
1395
1396 /*
1397 * We need the codec to calculate the number of samples in a frame
1398 * so we'll get it once and store it in the instance.
1399 *
1400 * References for native_format and native_codec are now held by the
1401 * instance and will be released when the instance is destroyed.
1402 */
1403 instance->native_format = fmt;
1404 instance->native_codec = ast_format_get_codec(instance->native_format);
1405 /*
1406 * References for native_format and native_codec are now held by the
1407 * instance and will be released when the instance is destroyed.
1408 */
1409
1410 /*
1411 * It's not possible for us to re-time or re-frame media if the data
1412 * stream can't be broken up on arbitrary byte boundaries. This is usually
1413 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1414 * We need to force passthrough mode in this case.
1415 */
1416 if (instance->native_codec->minimum_bytes <= 10) {
1417 instance->passthrough = 1;
1418 instance->optimal_frame_size = 0;
1419 } else {
1420 instance->optimal_frame_size =
1421 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1422 / instance->native_codec->minimum_ms;
1423 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1424 if (!instance->leftover_data) {
1425 return NULL;
1426 }
1427 }
1428
1429 ast_debug(3,
1430 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1431 chan_name, ast_format_get_name(instance->native_format),
1432 ast_format_get_sample_rate(instance->native_format),
1433 ast_format_get_default_ms(instance->native_format),
1434 ast_format_get_minimum_ms(instance->native_format),
1435 ast_format_get_minimum_bytes(instance->native_format),
1436 instance->passthrough,
1437 instance->optimal_frame_size);
1438
1439 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1440 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1441 return NULL;
1442 }
1443
1445 return NULL;
1446 }
1447
1448 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1449 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1450 proxy->connection_id);
1451 return NULL;
1452 }
1453 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1454
1455 return ao2_bump(instance);
1456}

References ao2_alloc, ao2_bump, ao2_cleanup, ao2_link_flags, ao2_weakproxy_alloc, ao2_weakproxy_set_object, ao2_weakproxy_subscribe(), ast_calloc, ast_debug, ast_format_get_codec(), ast_format_get_default_ms(), ast_format_get_minimum_bytes(), ast_format_get_minimum_ms(), ast_format_get_name(), ast_format_get_sample_rate(), AST_LIST_HEAD_INIT, ast_log, ast_strings_equal(), ast_uuid_generate_str(), AST_UUID_STR_LEN, ast_websocket_client_retrieve_by_id(), AST_WS_TYPE_CLIENT, AST_WS_TYPE_SERVER, websocket_pvt::connection_id, INCOMING_CONNECTION_ID, instance_proxy_cb(), instances, LOG_ERROR, NULL, OBJ_NOLOCK, RAII_VAR, SCOPED_AO2WRLOCK, uuid(), and websocket_destructor().

Referenced by webchan_request().

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .reload = reload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
static

Definition at line 2151 of file chan_websocket.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 2151 of file chan_websocket.c.

◆ ast_ws_server

struct ast_websocket_server* ast_ws_server
static

Definition at line 73 of file chan_websocket.c.

Referenced by incoming_ws_http_callback(), load_module(), and unload_module().

◆ http_uri

struct ast_http_uri http_uri
static

Definition at line 1976 of file chan_websocket.c.

1976 {
1977 .callback = incoming_ws_http_callback,
1978 .description = "Media over Websocket",
1979 .uri = "media",
1980 .has_subtree = 1,
1981 .data = NULL,
1982 .key = __FILE__,
1983 .no_decode_uri = 1,
1984};

Referenced by load_module(), and unload_module().

◆ instances

struct ao2_container* instances = NULL
static

◆ msg_format_map

const char* msg_format_map[]
static
Initial value:

Definition at line 62 of file chan_websocket.c.

62 {
63 [WEBCHAN_CONTROL_MSG_FORMAT_PLAIN] = "plain-text",
66};

Referenced by control_msg_format_from_str(), and control_msg_format_to_str().

◆ sorcery

struct ast_sorcery* sorcery = NULL
static

Definition at line 54 of file chan_websocket.c.

Referenced by __ast_sorcery_apply_config(), __ast_sorcery_apply_default(), __ast_sorcery_apply_wizard_mapping(), __ast_sorcery_insert_wizard_mapping(), __ast_sorcery_object_field_register(), __ast_sorcery_object_register(), __ast_sorcery_object_type_insert_wizard(), __ast_sorcery_object_type_remove_wizard(), __ast_sorcery_open(), __ast_sorcery_remove_wizard_mapping(), alloc_and_initialize_sorcery(), alloc_and_initialize_sorcery(), alloc_and_initialize_sorcery(), apply_list_configuration(), as_config_load(), as_config_reload(), ast_ari_asterisk_delete_object(), ast_ari_asterisk_get_object(), ast_ari_asterisk_update_object(), ast_sip_destroy_sorcery_global(), ast_sip_initialize_sorcery_auth(), ast_sip_initialize_sorcery_domain_alias(), ast_sip_initialize_sorcery_global(), ast_sip_initialize_sorcery_location(), ast_sip_initialize_sorcery_transport(), ast_sorcery_alloc(), ast_sorcery_copy(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_diff(), ast_sorcery_force_reload(), ast_sorcery_force_reload_object(), ast_sorcery_get_module(), ast_sorcery_get_object_type(), ast_sorcery_get_wizard_mapping(), ast_sorcery_get_wizard_mapping_count(), ast_sorcery_instance_observer_add(), ast_sorcery_instance_observer_remove(), ast_sorcery_is_stale(), ast_sorcery_load(), ast_sorcery_load_object(), ast_sorcery_object_fields_register(), ast_sorcery_object_set_congestion_levels(), ast_sorcery_object_set_copy_handler(), ast_sorcery_object_set_diff_handler(), ast_sorcery_object_unregister(), ast_sorcery_objectset_apply(), ast_sorcery_objectset_create2(), ast_sorcery_objectset_json_create(), ast_sorcery_observer_add(), ast_sorcery_observer_remove(), ast_sorcery_ref(), ast_sorcery_reload(), ast_sorcery_reload_object(), ast_sorcery_retrieve_by_fields(), ast_sorcery_retrieve_by_id(), ast_sorcery_retrieve_by_prefix(), ast_sorcery_retrieve_by_regex(), ast_sorcery_updatebucket_file_wizard_create(), bucket_file_wizard_delete(), bucket_file_wizard_is_stale(), bucket_file_wizard_retrieve(), bucket_file_wizard_update(), bucket_http_wizard_retrieve_id(), bucket_wizard_create(), bucket_wizard_delete(), bucket_wizard_is_stale(), bucket_wizard_retrieve(), can_reuse_registration(), create_object(), deinitialize_sorcery(), deinitialize_sorcery(), global_loaded_observer(), handle_aor(), handle_auth(), handle_auths(), handle_endpoint(), handle_export_primitives(), handle_identify(), handle_phoneprov(), handle_registrations(), instance_created_observer(), instance_destroying_observer(), load_config(), load_module(), memory_cache_full_update(), memory_cache_populate_external(), memory_cache_populate_internal(), memory_cache_stale_check(), memory_cache_stale_check_object(), memory_cache_stale_update_full(), memory_cache_stale_update_object(), mock_retrieve_id(), object_type_loaded_observer(), object_type_registered_observer(), profile_load(), profile_reload(), reload_module(), return_sorcery_object(), sorcery_astdb_create(), sorcery_astdb_filter_objectset(), sorcery_astdb_retrieve_fields(), sorcery_astdb_retrieve_fields_common(), sorcery_astdb_retrieve_id(), sorcery_astdb_retrieve_multiple(), sorcery_astdb_retrieve_prefix(), sorcery_astdb_retrieve_regex(), sorcery_astdb_update(), sorcery_config_internal_load(), sorcery_config_load(), sorcery_config_reload(), sorcery_config_retrieve_fields(), sorcery_config_retrieve_multiple(), sorcery_config_retrieve_prefix(), sorcery_config_retrieve_regex(), sorcery_destructor(), sorcery_function_read(), sorcery_is_configuration_met(), sorcery_is_explicit_name_met(), sorcery_memory_cache_ami_populate(), sorcery_memory_cache_create(), sorcery_memory_cache_load(), sorcery_memory_cache_populate(), sorcery_memory_cache_retrieve_fields(), sorcery_memory_cache_retrieve_id(), sorcery_memory_cache_retrieve_multiple(), sorcery_memory_cache_retrieve_prefix(), sorcery_memory_cache_retrieve_regex(), sorcery_memory_cached_object_alloc(), sorcery_memory_retrieve_fields(), sorcery_memory_retrieve_multiple(), sorcery_memory_retrieve_prefix(), sorcery_memory_retrieve_regex(), sorcery_realtime_create(), sorcery_realtime_filter_objectset(), sorcery_realtime_retrieve_fields(), sorcery_realtime_retrieve_id(), sorcery_realtime_retrieve_multiple(), sorcery_realtime_retrieve_prefix(), sorcery_realtime_retrieve_regex(), sorcery_realtime_update(), sorcery_reloadable(), sorcery_test_retrieve_id(), sorcery_wizard_load(), stale_cache_update_task_data_alloc(), stale_update_task_data_alloc(), tn_config_load(), tn_config_reload(), transport_apply(), unload_module(), vs_config_load(), vs_config_reload(), webchan_request(), and wizard_apply_handler().

◆ websocket_options

const struct ast_app_option websocket_options[128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, [ 'v' ] = { .flag = OPT_WS_URI_PARAM , .arg_index = OPT_ARG_WS_URI_PARAM + 1 }, [ 'p' ] = { .flag = OPT_WS_PASSTHROUGH }, [ 'f' ] = { .flag = OPT_WS_MSG_FORMAT , .arg_index = OPT_ARG_WS_MSG_FORMAT + 1 }, }
static

Definition at line 1600 of file chan_websocket.c.

Referenced by webchan_request().

◆ websocket_tech

struct ast_channel_tech websocket_tech
static

Definition at line 149 of file chan_websocket.c.

149 {
150 .type = "WebSocket",
151 .description = "Media over WebSocket Channel Driver",
152 .requester = webchan_request,
153 .call = webchan_call,
154 .read = webchan_read,
155 .write = webchan_write,
156 .hangup = webchan_hangup,
157 .send_digit_end = webchan_send_dtmf_text,
158};

Referenced by load_module(), unload_module(), and webchan_request().