Asterisk - The Open Source Telephony Project GIT-master-9647a4f
Loading...
Searching...
No Matches
Data Structures | Macros | Enumerations | Functions | Variables
chan_websocket.c File Reference

Websocket Media Channel. More...

#include "asterisk.h"
#include "asterisk/app.h"
#include "asterisk/causes.h"
#include "asterisk/channel.h"
#include "asterisk/codec.h"
#include "asterisk/http_websocket.h"
#include "asterisk/format_cache.h"
#include "asterisk/frame.h"
#include "asterisk/json.h"
#include "asterisk/lock.h"
#include "asterisk/mod_format.h"
#include "asterisk/module.h"
#include "asterisk/pbx.h"
#include "asterisk/uuid.h"
#include "asterisk/timing.h"
#include "asterisk/translate.h"
#include "asterisk/websocket_client.h"
#include "asterisk/sorcery.h"
Include dependency graph for chan_websocket.c:

Go to the source code of this file.

Data Structures

struct  instance_proxy
 
struct  webchan_conf_global
 
struct  websocket_pvt
 

Macros

#define _create_event_MEDIA_XOFF(_instance)   _create_event_nodata(_instance, "MEDIA_XOFF");
 
#define _create_event_MEDIA_XON(_instance)   _create_event_nodata(_instance, "MEDIA_XON");
 
#define _create_event_QUEUE_DRAINED(_instance)   _create_event_nodata(_instance, "QUEUE_DRAINED");
 
#define ANSWER_CHANNEL   "ANSWER"
 
#define CONTINUE_MEDIA   "CONTINUE_MEDIA"
 
#define create_event(_instance, _event, ...)    _create_event_ ## _event(_instance, ##__VA_ARGS__)
 Use this macro to create events passing in any event-specific parameters.
 
#define FLUSH_MEDIA   "FLUSH_MEDIA"
 
#define GET_DRIVER_STATUS   "GET_STATUS"
 
#define HANGUP_CHANNEL   "HANGUP"
 
#define INCOMING_CONNECTION_ID   "INCOMING"
 
#define MARK_MEDIA   "MARK_MEDIA"
 
#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
 
#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"
 
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
 
#define PAUSE_MEDIA   "PAUSE_MEDIA"
 
#define QUEUE_LENGTH_MAX   1000
 
#define QUEUE_LENGTH_XOFF_LEVEL   900
 
#define QUEUE_LENGTH_XON_LEVEL   800
 
#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"
 
#define send_event(_instance, _event, ...)
 Use this macro to create and send events passing in any event-specific parameters.
 
#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"
 
#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"
 

Enumerations

enum  {
  OPT_WS_CODEC = (1 << 0) , OPT_WS_NO_AUTO_ANSWER = (1 << 1) , OPT_WS_URI_PARAM = (1 << 2) , OPT_WS_PASSTHROUGH = (1 << 3) ,
  OPT_WS_MSG_FORMAT = (1 << 4)
}
 
enum  {
  OPT_ARG_WS_CODEC , OPT_ARG_WS_NO_AUTO_ANSWER , OPT_ARG_WS_URI_PARAM , OPT_ARG_WS_PASSTHROUGH ,
  OPT_ARG_WS_MSG_FORMAT , OPT_ARG_ARRAY_SIZE
}
 
enum  webchan_control_msg_format { WEBCHAN_CONTROL_MSG_FORMAT_PLAIN = 0 , WEBCHAN_CONTROL_MSG_FORMAT_JSON , WEBCHAN_CONTROL_MSG_FORMAT_INVALID }
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
static char * _create_event_DTMF_END (struct websocket_pvt *instance, const char digit)
 
static char * _create_event_ERROR (struct websocket_pvt *instance, const char *format,...)
 
static char * _create_event_MEDIA_BUFFERING_COMPLETED (struct websocket_pvt *instance, const char *id)
 
static char * _create_event_MEDIA_MARK_PROCESSED (struct websocket_pvt *instance, const char *id)
 
static char * _create_event_MEDIA_START (struct websocket_pvt *instance)
 
static char * _create_event_nodata (struct websocket_pvt *instance, char *event)
 
static char * _create_event_STATUS (struct websocket_pvt *instance)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
static enum webchan_control_msg_format control_msg_format_from_str (const char *value)
 
static const char * control_msg_format_to_str (enum webchan_control_msg_format value)
 
static struct ast_framedequeue_frame (struct websocket_pvt *instance)
 
static void * global_alloc (const char *name)
 
static int global_apply (const struct ast_sorcery *sorcery, void *obj)
 
static int global_control_message_format_from_str (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int global_control_message_format_to_str (const void *obj, const intptr_t *args, char **buf)
 
static int handle_command (struct websocket_pvt *instance, char *buffer)
 
static void incoming_ws_established_cb (struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
 
static int incoming_ws_http_callback (struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
 
static void instance_proxy_cb (void *weakproxy, void *data)
 
static int load_config (void)
 
static int load_module (void)
 Function called when our module is loaded.
 
static int process_binary_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int process_text_message (struct websocket_pvt *instance, char *payload, uint64_t payload_len)
 
static int queue_frame_from_buffer (struct websocket_pvt *instance, char *buffer, size_t len)
 
static int queue_option_frame (struct websocket_pvt *instance, char *buffer)
 
static int read_from_ws_and_queue (struct websocket_pvt *instance)
 
static void * read_thread_handler (void *obj)
 
static int reload_module (void)
 
static void set_channel_format (struct websocket_pvt *instance, struct ast_format *fmt)
 
static int set_channel_timer (struct websocket_pvt *instance)
 
static int set_channel_variables (struct websocket_pvt *instance)
 
static int set_instance_silence_frame (struct websocket_pvt *instance)
 
static int set_instance_translator (struct websocket_pvt *instance)
 
static int unload_module (void)
 Function called when our module is unloaded.
 
static int validate_uri_parameters (const char *uri_params)
 
static int webchan_call (struct ast_channel *ast, const char *dest, int timeout)
 
static int webchan_hangup (struct ast_channel *ast)
 
static struct ast_framewebchan_read (struct ast_channel *ast)
 
static struct ast_channelwebchan_request (const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
 
static int webchan_send_dtmf_text (struct ast_channel *ast, char digit, unsigned int duration)
 
static int webchan_write (struct ast_channel *ast, struct ast_frame *f)
 Function called when we should write a frame to the channel.
 
static void websocket_destructor (void *data)
 
static struct websocket_pvtwebsocket_new (const char *chan_name, const char *connection_id, struct ast_format *fmt)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .reload = reload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct ast_websocket_serverast_ws_server
 
static struct ast_http_uri http_uri
 
static struct ao2_containerinstances = NULL
 
static const char * msg_format_map []
 
static struct ast_sorcerysorcery = NULL
 
static const struct ast_app_option websocket_options [128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, [ 'v' ] = { .flag = OPT_WS_URI_PARAM , .arg_index = OPT_ARG_WS_URI_PARAM + 1 }, [ 'p' ] = { .flag = OPT_WS_PASSTHROUGH }, [ 'f' ] = { .flag = OPT_WS_MSG_FORMAT , .arg_index = OPT_ARG_WS_MSG_FORMAT + 1 }, }
 
static struct ast_channel_tech websocket_tech
 

Detailed Description

Websocket Media Channel.

Author
George Joseph gjose.nosp@m.ph@s.nosp@m.angom.nosp@m.a.co.nosp@m.m

Definition in file chan_websocket.c.

Macro Definition Documentation

◆ _create_event_MEDIA_XOFF

#define _create_event_MEDIA_XOFF (   _instance)    _create_event_nodata(_instance, "MEDIA_XOFF");

Definition at line 192 of file chan_websocket.c.

◆ _create_event_MEDIA_XON

#define _create_event_MEDIA_XON (   _instance)    _create_event_nodata(_instance, "MEDIA_XON");

Definition at line 191 of file chan_websocket.c.

◆ _create_event_QUEUE_DRAINED

#define _create_event_QUEUE_DRAINED (   _instance)    _create_event_nodata(_instance, "QUEUE_DRAINED");

Definition at line 193 of file chan_websocket.c.

◆ ANSWER_CHANNEL

#define ANSWER_CHANNEL   "ANSWER"

Definition at line 111 of file chan_websocket.c.

◆ CONTINUE_MEDIA

#define CONTINUE_MEDIA   "CONTINUE_MEDIA"

Definition at line 120 of file chan_websocket.c.

◆ create_event

#define create_event (   _instance,
  _event,
  ... 
)     _create_event_ ## _event(_instance, ##__VA_ARGS__)

Use this macro to create events passing in any event-specific parameters.

Definition at line 408 of file chan_websocket.c.

415 { \
416 int _res = -1; \
417 char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
418 if (_payload) { \
419 _res = ast_websocket_write_string(_instance->websocket, _payload); \
420 if (_res != 0) { \
421 ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
422 ast_channel_name(instance->channel), _payload); \
423 } else { \
424 ast_debug(4, "%s: Sent %s\n", \
425 ast_channel_name(instance->channel), _payload); \
426 }\
427 ast_free(_payload); \
428 } \
429 (_res); \
430})
431
432static void set_channel_format(struct websocket_pvt * instance,
433 struct ast_format *fmt)
434{
439 ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt));
440 }
441}
442
443/*
444 * Reminder... This function gets called by webchan_read which is
445 * triggered by the channel timer firing. It always gets called
446 * every 20ms (or whatever the timer is set to) even if there are
447 * no frames in the queue.
448 */
449static struct ast_frame *dequeue_frame(struct websocket_pvt *instance)
450{
451 struct ast_frame *queued_frame = NULL;
452 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
454
455 /*
456 * If the queue is paused, don't read a frame. Processing
457 * will continue down the function and a silence frame will
458 * be sent in its place.
459 */
460 if (instance->queue_paused) {
461 return NULL;
462 }
463
464 /*
465 * We need to check if we need to send an XON before anything
466 * else because there are multiple escape paths in this function
467 * and we don't want to accidentally keep the queue in a "full"
468 * state.
469 */
470 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
471 instance->queue_full = 0;
472 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
473 ast_channel_name(instance->channel));
474 send_event(instance, MEDIA_XON);
475 }
476
477 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
478
479 /*
480 * If there are no frames in the queue, we need to
481 * return NULL so we can send a silence frame. We also need
482 * to send the QUEUE_DRAINED notification if we were requested
483 * to do so.
484 */
485 if (!queued_frame) {
486 if (instance->report_queue_drained) {
487 instance->report_queue_drained = 0;
488 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
489 ast_channel_name(instance->channel));
490 send_event(instance, QUEUE_DRAINED);
491 }
492 return NULL;
493 }
494
495 /*
496 * The only way a control frame could be present here is as
497 * a result of us calling queue_option_frame() in response
498 * to an incoming TEXT command from the websocket.
499 * We'll be safe and make sure it's a AST_CONTROL_OPTION
500 * frame anyway.
501 *
502 * It's quite possible that there are multiple control frames
503 * in a row in the queue so we need to process consecutive ones
504 * immediately.
505 *
506 * In any case, processing a control frame MUST not use up
507 * a media timeslot so after all control frames have been
508 * processed, we need to read an audio frame and process it.
509 */
510 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
511 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
512 /*
513 * We just need to send the data to the websocket.
514 * The data should already be NULL terminated.
515 */
517 queued_frame->data.ptr);
518 ast_debug(4, "%s: Sent %s\n",
519 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
520 }
521 /*
522 * We do NOT send these to the core so we need to free
523 * the frame and grab the next one. If it's also a
524 * control frame, we need to process it otherwise
525 * continue down in the function.
526 */
527 ast_frame_free(queued_frame, 0);
528 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
529 /*
530 * Jut FYI... We didn't bump the queue length when we added the control
531 * frames so we don't need to decrement it here.
532 */
533 }
534
535 /*
536 * If, after reading all control frames, there are no frames
537 * left in the queue, we need to return NULL so we can send
538 * a silence frame.
539 */
540 if (!queued_frame) {
541 return NULL;
542 }
543
544 instance->frame_queue_length--;
545
546 return queued_frame;
547}
548/*!
549 * \internal
550 *
551 * Called by the core channel thread each time the instance timer fires.
552 *
553 */
554static struct ast_frame *webchan_read(struct ast_channel *ast)
555{
556 struct websocket_pvt *instance = NULL;
557 struct ast_frame *native_frame = NULL;
558 struct ast_frame *slin_frame = NULL;
559
560 instance = ast_channel_tech_pvt(ast);
561 if (!instance) {
562 return NULL;
563 }
564
566 ast_timer_ack(instance->timer, 1);
567 }
568
569 native_frame = dequeue_frame(instance);
570
571 /*
572 * No frame when the timer fires means we have to create and
573 * return a silence frame in its place.
574 */
575 if (!native_frame) {
576 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
577 set_channel_format(instance, instance->slin_format);
578 slin_frame = ast_frdup(&instance->silence);
579 return slin_frame;
580 }
581
582 /*
583 * If we're in passthrough mode or the frame length is already optimal_frame_size,
584 * we can just return it.
585 */
586 if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) {
587 set_channel_format(instance, instance->native_format);
588 return native_frame;
589 }
590
591 /*
592 * If we're here, we have a short frame that we need to pad
593 * with silence.
594 */
595
596 if (instance->translator) {
597 slin_frame = ast_translate(instance->translator, native_frame, 0);
598 if (!slin_frame) {
599 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
600 ast_channel_name(ast), native_frame->datalen);
601 return NULL;
602 }
603 ast_frame_free(native_frame, 0);
604 } else {
605 /*
606 * If there was no translator then the native format
607 * was already slin.
608 */
609 slin_frame = native_frame;
610 }
611
612 set_channel_format(instance, instance->slin_format);
613
614 /*
615 * So now we have an slin frame but it's probably still short
616 * so we create a new data buffer with the correct length
617 * which is filled with zeros courtesy of ast_calloc.
618 * We then copy the short frame data into the new buffer
619 * and set the offset to AST_FRIENDLY_OFFSET so that
620 * the core can read the data without any issues.
621 * If the original frame data was mallocd, we need to free the old
622 * data buffer so we don't leak memory and we need to set
623 * mallocd to AST_MALLOCD_DATA so that the core knows
624 * it needs to free the new data buffer when it's done.
625 */
626
627 if (slin_frame->datalen != instance->silence.datalen) {
628 char *old_data = slin_frame->data.ptr;
629 int old_len = slin_frame->datalen;
630 int old_offset = slin_frame->offset;
631 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
632 ast_channel_name(ast), instance->silence.datalen,
633 slin_frame->datalen);
634
635 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
636 if (!slin_frame->data.ptr) {
637 ast_frame_free(slin_frame, 0);
638 return NULL;
639 }
640 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
641 slin_frame->offset = AST_FRIENDLY_OFFSET;
642 memcpy(slin_frame->data.ptr, old_data, old_len);
643 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
644 ast_free(old_data - old_offset);
645 }
646 slin_frame->mallocd |= AST_MALLOCD_DATA;
647 slin_frame->datalen = instance->silence.datalen;
648 slin_frame->samples = instance->silence.samples;
649 }
650
651 return slin_frame;
652}
653
654static int queue_frame_from_buffer(struct websocket_pvt *instance,
655 char *buffer, size_t len)
656{
657 struct ast_frame fr = { 0, };
658 struct ast_frame *duped_frame = NULL;
659
660 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
662 fr.subclass.format = instance->native_format;
663 fr.samples = instance->native_codec->samples_count(&fr);
664
665 duped_frame = ast_frisolate(&fr);
666 if (!duped_frame) {
667 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
668 ast_channel_name(instance->channel));
669 return -1;
670 }
671
672 {
673 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
675 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
676 instance->frame_queue_length++;
677 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
678 instance->queue_full = 1;
679 send_event(instance, MEDIA_XOFF);
680 }
681 }
682
683 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
684 duped_frame->datalen);
685
686 return 0;
687}
688
689static int queue_option_frame(struct websocket_pvt *instance,
690 char *buffer)
691{
692 struct ast_frame fr = { 0, };
693 struct ast_frame *duped_frame = NULL;
694
695 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
698
699 duped_frame = ast_frisolate(&fr);
700 if (!duped_frame) {
701 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
702 ast_channel_name(instance->channel));
703 return -1;
704 }
705
706 AST_LIST_LOCK(&instance->frame_queue);
707 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
708 AST_LIST_UNLOCK(&instance->frame_queue);
709
710 ast_debug(4, "%s: Queued '%s' option frame\n",
711 ast_channel_name(instance->channel), buffer);
712
713 return 0;
714}
715
716/*!
717 * \internal
718 * \brief Handle commands from the websocket
719 *
720 * \param instance
721 * \param buffer Allocated by caller so don't free.
722 * \retval 0 Success
723 * \retval -1 Failure
724 */
725static int handle_command(struct websocket_pvt *instance, char *buffer)
726{
727 int res = 0;
728 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
729 const char *command = NULL;
730 char *data = NULL;
731
733 struct ast_json_error json_error;
734
735 json = ast_json_load_buf(buffer, strlen(buffer), &json_error);
736 if (!json) {
737 send_event(instance, ERROR, "Unable to parse JSON command");
738 return -1;
739 }
740 command = ast_json_object_string_get(json, "command");
741 } else {
742 command = buffer;
743 data = strchr(buffer, ' ');
744 if (data) {
745 *data = '\0';
746 data++;
747 }
748 }
749
750 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
752
753 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
755
756 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
757 if (instance->passthrough) {
758 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
759 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
760 ast_channel_name(instance->channel), command);
761 return 0;
762 }
763 AST_LIST_LOCK(&instance->frame_queue);
764 instance->bulk_media_in_progress = 1;
765 AST_LIST_UNLOCK(&instance->frame_queue);
766
767 } else if (ast_strings_equal(command, STOP_MEDIA_BUFFERING)) {
768 const char *id;
769 char *option;
770 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
772
774 id = ast_json_object_string_get(json, "correlation_id");
775 } else {
776 id = data;
777 }
778
779 if (instance->passthrough) {
780 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
781 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
782 ast_channel_name(instance->channel), command);
783 return 0;
784 }
785
786 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
788 (int)instance->leftover_len);
789
790 instance->bulk_media_in_progress = 0;
791 if (instance->leftover_len > 0) {
792 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
793 if (res != 0) {
794 return res;
795 }
796 }
797 instance->leftover_len = 0;
798 option = create_event(instance, MEDIA_BUFFERING_COMPLETED, id);
799 if (!option) {
800 return -1;
801 }
802 res = queue_option_frame(instance, option);
803 ast_free(option);
804
805 } else if (ast_strings_equal(command, MARK_MEDIA)) {
806 const char *id;
807 char *option;
808 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
810
811 if (instance->passthrough) {
812 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
813 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
814 ast_channel_name(instance->channel), command);
815 return 0;
816 }
817
819 id = ast_json_object_string_get(json, "correlation_id");
820 } else {
821 id = data;
822 }
823
824 ast_debug(4, "%s: %s %s\n",
825 ast_channel_name(instance->channel), MARK_MEDIA, id);
826
827 option = create_event(instance, MEDIA_MARK_PROCESSED, id);
828 if (!option) {
829 return -1;
830 }
831 res = queue_option_frame(instance, option);
832 ast_free(option);
833
834 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
835 struct ast_frame *frame = NULL;
836
837 if (instance->passthrough) {
838 send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
839 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
840 ast_channel_name(instance->channel), command);
841 return 0;
842 }
843
844 AST_LIST_LOCK(&instance->frame_queue);
845 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
846 ast_frfree(frame);
847 }
848 instance->frame_queue_length = 0;
849 instance->bulk_media_in_progress = 0;
850 instance->leftover_len = 0;
851 AST_LIST_UNLOCK(&instance->frame_queue);
852
853 } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
854 if (instance->passthrough) {
855 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
856 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
857 ast_channel_name(instance->channel), command);
858 return 0;
859 }
860
861 AST_LIST_LOCK(&instance->frame_queue);
862 instance->report_queue_drained = 1;
863 AST_LIST_UNLOCK(&instance->frame_queue);
864
865 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
866 return send_event(instance, STATUS);
867
868 } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
869 if (instance->passthrough) {
870 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
871 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
872 ast_channel_name(instance->channel), command);
873 return 0;
874 }
875 AST_LIST_LOCK(&instance->frame_queue);
876 instance->queue_paused = 1;
877 AST_LIST_UNLOCK(&instance->frame_queue);
878
879 } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
880 if (instance->passthrough) {
881 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
882 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
883 ast_channel_name(instance->channel), command);
884 return 0;
885 }
886 AST_LIST_LOCK(&instance->frame_queue);
887 instance->queue_paused = 0;
888 AST_LIST_UNLOCK(&instance->frame_queue);
889
890 } else {
891 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
892 ast_channel_name(instance->channel), command);
893 }
894
895 return res;
896}
897
898static int process_text_message(struct websocket_pvt *instance,
899 char *payload, uint64_t payload_len)
900{
901 char *command;
902
903 if (payload_len == 0) {
904 ast_log(LOG_WARNING, "%s: WebSocket TEXT message has 0 length\n",
905 ast_channel_name(instance->channel));
906 return 0;
907 }
908
909 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
910 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
911 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
912 return 0;
913 }
914
915 /*
916 * Unfortunately, payload is not NULL terminated even when it's
917 * a TEXT frame so we need to allocate a new buffer, copy
918 * the data into it, and NULL terminate it.
919 */
920 command = ast_alloca(payload_len + 1);
921 memcpy(command, payload, payload_len); /* Safe */
922 command[payload_len] = '\0';
923 command = ast_strip(command);
924
925 ast_debug(4, "%s: Received: %s\n",
926 ast_channel_name(instance->channel), command);
927
928 return handle_command(instance, command);
929}
930
931static int process_binary_message(struct websocket_pvt *instance,
932 char *payload, uint64_t payload_len)
933{
934 char *next_frame_ptr = NULL;
935 size_t bytes_read = 0;
936 int res = 0;
937 size_t bytes_left = 0;
938
939 {
940 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
942 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
943 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
944 ast_channel_name(instance->channel));
945 return 0;
946 }
947 }
948
949 next_frame_ptr = payload;
950 instance->bytes_read += payload_len;
951
952 if (instance->passthrough) {
953 res = queue_frame_from_buffer(instance, payload, payload_len);
954 return res;
955 }
956
957 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
958 /*
959 * We have leftover data from a previous websocket message.
960 * Try to make a complete frame by appending data from
961 * the current message to the leftover data.
962 */
963 char *append_ptr = instance->leftover_data + instance->leftover_len;
964 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
965 /*
966 * It's possible that even the current message doesn't have enough
967 * data to make a complete frame.
968 */
969 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
970
971 /*
972 * Append whatever we can to the end of the leftover data
973 * even if it's not enough to make a complete frame.
974 */
975 memcpy(append_ptr, payload, bytes_avail_to_copy);
976
977 /*
978 * If leftover data is still short, just return and wait for the
979 * next websocket message.
980 */
981 if (bytes_avail_to_copy < bytes_needed_for_frame) {
982 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
983 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
984 instance->leftover_len += bytes_avail_to_copy;
985 return 0;
986 }
987
988 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
989 if (res < 0) {
990 return -1;
991 }
992
993 /*
994 * We stole data from the current payload so decrement payload_len
995 * and set the next frame pointer after the data in payload
996 * we just copied.
997 */
998 payload_len -= bytes_avail_to_copy;
999 next_frame_ptr = payload + bytes_avail_to_copy;
1000
1001 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
1002 ast_channel_name(instance->channel),
1003 instance->frame_queue_length,
1004 (int)instance->bytes_read,
1005 (int)(payload_len + bytes_avail_to_copy),
1006 (int)instance->leftover_len,
1007 payload,
1008 next_frame_ptr,
1009 (int)(next_frame_ptr - payload),
1010 (int)payload_len,
1011 (int)bytes_avail_to_copy
1012 );
1013
1014
1015 instance->leftover_len = 0;
1016 }
1017
1018 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
1019 instance->leftover_len = 0;
1020 }
1021
1022 bytes_left = payload_len;
1023 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
1024 res = queue_frame_from_buffer(instance, next_frame_ptr,
1025 instance->optimal_frame_size);
1026 if (res < 0) {
1027 break;
1028 }
1029 bytes_read += instance->optimal_frame_size;
1030 next_frame_ptr += instance->optimal_frame_size;
1031 bytes_left -= instance->optimal_frame_size;
1032 }
1033
1034 if (instance->bulk_media_in_progress && bytes_left > 0) {
1035 /*
1036 * We have a partial frame. Save the leftover data.
1037 */
1038 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
1039 ast_channel_name(instance->channel),
1040 (int)instance->bytes_read,
1041 instance->frame_queue_length,
1042 (int)payload_len,
1043 (int)instance->leftover_len,
1044 payload,
1045 next_frame_ptr,
1046 (int)(next_frame_ptr - payload),
1047 (int)bytes_left
1048 );
1049 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
1050 instance->leftover_len = bytes_left;
1051 }
1052
1053 return 0;
1054}
1055
1056static int read_from_ws_and_queue(struct websocket_pvt *instance)
1057{
1058 uint64_t payload_len = 0;
1059 char *payload = NULL;
1060 enum ast_websocket_opcode opcode;
1061 int fragmented = 0;
1062 int res = 0;
1063
1064 if (!instance || !instance->websocket) {
1065 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1066 ast_channel_name(instance->channel));
1067 return -1;
1068 }
1069
1070 ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel));
1071 res = ast_wait_for_input(
1072 ast_websocket_fd(instance->websocket), -1);
1073 if (res <= 0) {
1074 ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n",
1075 ast_channel_name(instance->channel), strerror(errno));
1076 return -1;
1077 }
1078
1079 /*
1080 * We need to lock here to prevent the websocket handle from
1081 * being pulled out from under us if the core sends us a
1082 * hangup request.
1083 */
1084 ao2_lock(instance);
1085 if (!instance->websocket) {
1086 ao2_unlock(instance);
1087 return -1;
1088 }
1089
1090 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
1091 &opcode, &fragmented);
1092 ao2_unlock(instance);
1093 if (res) {
1094 return -1;
1095 }
1096 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
1097 (int)payload_len);
1098
1099 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
1100 return process_text_message(instance, payload, payload_len);
1101 }
1102
1103 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
1104 ast_debug(5, "%s: WebSocket closed by remote\n",
1105 ast_channel_name(instance->channel));
1106 return -1;
1107 }
1108
1109 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
1110 ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n",
1111 ast_channel_name(instance->channel), (int)opcode);
1112 return 0;
1113 }
1114
1115 return process_binary_message(instance, payload, payload_len);
1116}
1117
1118/*!
1119 * \internal
1120 *
1121 * For incoming websocket connections, this function gets called by
1122 * incoming_ws_established_cb() and is run in the http server thread
1123 * handling the websocket connection.
1124 *
1125 * For outgoing websocket connections, this function gets started as
1126 * a background thread by webchan_call().
1127 */
1128static void *read_thread_handler(void *obj)
1129{
1130 RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup);
1131 int res = 0;
1132
1133 ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel));
1134
1135 res = send_event(instance, MEDIA_START);
1136 if (res != 0 ) {
1138 return NULL;
1139 }
1140
1141 if (!instance->no_auto_answer) {
1142 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
1144 }
1145
1146 while (read_from_ws_and_queue(instance) == 0)
1147 {
1148 }
1149
1150 /*
1151 * websocket_hangup will take care of closing the websocket if needed.
1152 */
1153 ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel));
1155
1156 return NULL;
1157}
1158
1159/*! \brief Function called when we should write a frame to the channel */
1160static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
1161{
1162 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1163
1164 if (!instance || !instance->websocket) {
1165 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
1166 ast_channel_name(ast));
1167 return -1;
1168 }
1169
1170 if (f->frametype == AST_FRAME_CNG) {
1171 return 0;
1172 }
1173
1174 if (f->frametype != AST_FRAME_VOICE) {
1175 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
1176 ast_channel_name(ast));
1177 return 0;
1178 }
1179
1181 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
1184 return -1;
1185 }
1186
1188 (char *)f->data.ptr, (uint64_t)f->datalen);
1189}
1190
1191/*!
1192 * \internal
1193 *
1194 * Called by the core to actually call the remote.
1195 */
1196static int webchan_call(struct ast_channel *ast, const char *dest,
1197 int timeout)
1198{
1199 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1200 int nodelay = 1;
1202
1203 if (!instance) {
1204 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1205 ast_channel_name(ast));
1206 return -1;
1207 }
1208
1209 if (instance->type == AST_WS_TYPE_SERVER) {
1210 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
1211 return 0;
1212 }
1213 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
1214
1215 if (!instance->client) {
1216 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
1217 ast_channel_name(ast));
1218 return -1;
1219 }
1220
1221 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
1222 ast_channel_name(ast), dest, instance->connection_id);
1223
1224 if (!ast_strlen_zero(instance->uri_params)) {
1226 }
1227
1228 instance->websocket = ast_websocket_client_connect(instance->client,
1229 instance, ast_channel_name(ast), &result);
1230 if (!instance->websocket || result != WS_OK) {
1231 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
1233 return -1;
1234 }
1235
1236 if (setsockopt(ast_websocket_fd(instance->websocket),
1237 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1238 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
1239 }
1240
1241 ast_debug(3, "%s: WebSocket connection to %s established\n",
1242 ast_channel_name(ast), dest);
1243
1244 /* read_thread_handler() will clean up the bump */
1246 read_thread_handler, ao2_bump(instance))) {
1247 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast));
1248 ao2_cleanup(instance);
1249 return -1;
1250 }
1251
1252 return 0;
1253}
1254
1255static void websocket_destructor(void *data)
1256{
1257 struct websocket_pvt *instance = data;
1258 struct ast_frame *frame = NULL;
1259 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
1260
1261 AST_LIST_LOCK(&instance->frame_queue);
1262 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
1263 ast_frfree(frame);
1264 }
1265 AST_LIST_UNLOCK(&instance->frame_queue);
1266
1267 if (instance->timer) {
1268 ast_timer_close(instance->timer);
1269 instance->timer = NULL;
1270 }
1271
1272 if (instance->channel) {
1273 ast_channel_unref(instance->channel);
1274 instance->channel = NULL;
1275 }
1276 if (instance->websocket) {
1277 ast_websocket_unref(instance->websocket);
1278 instance->websocket = NULL;
1279 }
1280
1281 ao2_cleanup(instance->client);
1282 instance->client = NULL;
1283
1284 ao2_cleanup(instance->native_codec);
1285 instance->native_codec = NULL;
1286
1287 ao2_cleanup(instance->native_format);
1288 instance->native_format = NULL;
1289
1290 ao2_cleanup(instance->slin_codec);
1291 instance->slin_codec = NULL;
1292
1293 ao2_cleanup(instance->slin_format);
1294 instance->slin_format = NULL;
1295
1296 if (instance->silence.data.ptr) {
1297 ast_free(instance->silence.data.ptr);
1298 instance->silence.data.ptr = NULL;
1299 }
1300
1301 if (instance->translator) {
1303 instance->translator = NULL;
1304 }
1305
1306 if (instance->leftover_data) {
1307 ast_free(instance->leftover_data);
1308 instance->leftover_data = NULL;
1309 }
1310
1311 ast_free(instance->uri_params);
1312}
1313
1314struct instance_proxy {
1315 AO2_WEAKPROXY();
1316 /*! \brief The name of the module owning this sorcery instance */
1317 char connection_id[0];
1318};
1319
1320static void instance_proxy_cb(void *weakproxy, void *data)
1321{
1322 struct instance_proxy *proxy = weakproxy;
1323 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1324 ao2_unlink(instances, weakproxy);
1325}
1326
1327static struct websocket_pvt* websocket_new(const char *chan_name,
1328 const char *connection_id, struct ast_format *fmt)
1329{
1330 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1331 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1332 char uuid[AST_UUID_STR_LEN];
1333 enum ast_websocket_type ws_type;
1334
1335 SCOPED_AO2WRLOCK(locker, instances);
1336
1339 ws_type = AST_WS_TYPE_SERVER;
1340 } else {
1341 ws_type = AST_WS_TYPE_CLIENT;
1342 }
1343
1344 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1345 if (!proxy) {
1346 return NULL;
1347 }
1348 strcpy(proxy->connection_id, connection_id); /* Safe */
1349
1350 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1352 if (!instance) {
1353 return NULL;
1354 }
1355 strcpy(instance->connection_id, connection_id); /* Safe */
1356
1357 instance->type = ws_type;
1358 if (ws_type == AST_WS_TYPE_CLIENT) {
1360 if (!instance->client) {
1361 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1362 chan_name, instance->connection_id);
1363 return NULL;
1364 }
1365 }
1366
1367 AST_LIST_HEAD_INIT(&instance->frame_queue);
1368
1369 /*
1370 * We need the codec to calculate the number of samples in a frame
1371 * so we'll get it once and store it in the instance.
1372 *
1373 * References for native_format and native_codec are now held by the
1374 * instance and will be released when the instance is destroyed.
1375 */
1376 instance->native_format = fmt;
1377 instance->native_codec = ast_format_get_codec(instance->native_format);
1378 /*
1379 * References for native_format and native_codec are now held by the
1380 * instance and will be released when the instance is destroyed.
1381 */
1382
1383 /*
1384 * It's not possible for us to re-time or re-frame media if the data
1385 * stream can't be broken up on arbitrary byte boundaries. This is usually
1386 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1387 * We need to force passthrough mode in this case.
1388 */
1389 if (instance->native_codec->minimum_bytes <= 10) {
1390 instance->passthrough = 1;
1391 instance->optimal_frame_size = 0;
1392 } else {
1393 instance->optimal_frame_size =
1394 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1395 / instance->native_codec->minimum_ms;
1396 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1397 if (!instance->leftover_data) {
1398 return NULL;
1399 }
1400 }
1401
1402 ast_debug(3,
1403 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1404 chan_name, ast_format_get_name(instance->native_format),
1409 instance->passthrough,
1410 instance->optimal_frame_size);
1411
1412 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1413 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1414 return NULL;
1415 }
1416
1418 return NULL;
1419 }
1420
1421 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1422 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1423 proxy->connection_id);
1424 return NULL;
1425 }
1426 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1427
1428 return ao2_bump(instance);
1429}
1430
1431static int set_instance_translator(struct websocket_pvt *instance)
1432{
1434 instance->slin_format = ao2_bump(instance->native_format);
1435 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1436 return 0;
1437 }
1438
1440 if (!instance->slin_format) {
1441 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1442 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1443 return -1;
1444 }
1445 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1449
1450 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1451 if (!instance->translator) {
1452 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1454 ast_format_get_name(instance->slin_format));
1455 return -1;
1456 }
1457
1458 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1459 return 0;
1460}
1461
1462static int set_instance_silence_frame(struct websocket_pvt *instance)
1463{
1464 instance->silence.frametype = AST_FRAME_VOICE;
1465 instance->silence.datalen =
1466 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1467 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1468 /*
1469 * Even though we'll calloc the data pointer, we don't mark it as
1470 * mallocd because this frame will be around for a while and we don't
1471 * want it accidentally freed before we're done with it.
1472 */
1473 instance->silence.mallocd = 0;
1474 instance->silence.offset = 0;
1475 instance->silence.src = __PRETTY_FUNCTION__;
1476 instance->silence.subclass.format = instance->slin_format;
1477 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1478 if (!instance->silence.data.ptr) {
1479 return -1;
1480 }
1481
1482 return 0;
1483}
1484
1485static int set_channel_timer(struct websocket_pvt *instance)
1486{
1487 int rate = 0;
1488 instance->timer = ast_timer_open();
1489 if (!instance->timer) {
1490 return -1;
1491 }
1492 /* Rate is the number of ticks per second, not the interval. */
1493 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1494 ast_debug(3, "%s: WebSocket timer rate %d\n",
1495 ast_channel_name(instance->channel), rate);
1496 ast_timer_set_rate(instance->timer, rate);
1497 /*
1498 * Calling ast_channel_set_fd will cause the channel thread to call
1499 * webchan_read at 'rate' times per second.
1500 */
1501 ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer));
1502
1503 return 0;
1504}
1505
1506static int set_channel_variables(struct websocket_pvt *instance)
1507{
1508 char *pkt_size = NULL;
1509 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1510 if (res <= 0) {
1511 return -1;
1512 }
1513
1515 pkt_size);
1516 ast_free(pkt_size);
1518 instance->connection_id);
1519
1520 return 0;
1521}
1522
1523static int validate_uri_parameters(const char *uri_params)
1524{
1525 char *params = ast_strdupa(uri_params);
1526 char *nvp = NULL;
1527 char *nv = NULL;
1528
1529 /*
1530 * uri_params should be a comma-separated list of key=value pairs.
1531 * For example:
1532 * name1=value1,name2=value2
1533 * We're verifying that each name and value either doesn't need
1534 * to be encoded or that it already is.
1535 */
1536
1537 while((nvp = ast_strsep(&params, ',', 0))) {
1538 /* nvp will be name1=value1 */
1539 while((nv = ast_strsep(&nvp, '=', 0))) {
1540 /* nv will be either name1 or value1 */
1541 if (!ast_uri_verify_encoded(nv)) {
1542 return 0;
1543 }
1544 }
1545 }
1546
1547 return 1;
1548}
1549
1550enum {
1551 OPT_WS_CODEC = (1 << 0),
1552 OPT_WS_NO_AUTO_ANSWER = (1 << 1),
1553 OPT_WS_URI_PARAM = (1 << 2),
1554 OPT_WS_PASSTHROUGH = (1 << 3),
1555 OPT_WS_MSG_FORMAT = (1 << 4),
1556};
1557
1558enum {
1565};
1566
1573 END_OPTIONS );
1574
1575static struct ast_channel *webchan_request(const char *type,
1576 struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids,
1577 const struct ast_channel *requestor, const char *data, int *cause)
1578{
1579 char *parse;
1580 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1581 struct ast_channel *chan = NULL;
1582 struct ast_format *fmt = NULL;
1583 struct ast_format_cap *caps = NULL;
1585 AST_APP_ARG(connection_id);
1587 );
1588 struct ast_flags opts = { 0, };
1589 char *opt_args[OPT_ARG_ARRAY_SIZE];
1590 const char *requestor_name = requestor ? ast_channel_name(requestor) :
1591 (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
1592 RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
1593
1594 global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
1595
1596 ast_debug(3, "%s: WebSocket channel requested\n",
1597 requestor_name);
1598
1599 if (ast_strlen_zero(data)) {
1600 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1601 requestor_name);
1602 goto failure;
1603 }
1604 parse = ast_strdupa(data);
1605 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1606
1607 if (ast_strlen_zero(args.connection_id)) {
1608 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1609 requestor_name);
1610 goto failure;
1611 }
1612
1613 if (!ast_strlen_zero(args.options)
1614 && ast_app_parse_options(websocket_options, &opts, opt_args,
1615 ast_strdupa(args.options))) {
1616 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1617 requestor_name, args.options);
1618 goto failure;
1619 }
1620
1621 if (ast_test_flag(&opts, OPT_WS_CODEC)
1622 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1623 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1624 } else {
1625 /*
1626 * If codec wasn't specified in the dial string,
1627 * use the first format in the capabilities.
1628 */
1629 fmt = ast_format_cap_get_format(cap, 0);
1630 }
1631
1632 if (!fmt) {
1633 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1634 requestor_name, args.connection_id);
1635 goto failure;
1636 }
1637
1638 ast_debug(3, "%s: Using format %s from %s\n",
1639 requestor_name, ast_format_get_name(fmt),
1640 ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
1641
1642 instance = websocket_new(requestor_name, args.connection_id, fmt);
1643 if (!instance) {
1644 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1645 requestor_name);
1646 goto failure;
1647 }
1648
1650 if (!instance->passthrough) {
1651 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1652 }
1653
1655 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1656 char *comma;
1657
1658 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1660 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1661 requestor_name);
1662 goto failure;
1663 }
1664
1665 ast_debug(3, "%s: Using URI parameters '%s'\n",
1666 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1667
1669 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1670 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1671 args.connection_id);
1672 goto failure;
1673 }
1674
1675 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1676 comma = instance->uri_params;
1677 /*
1678 * The normal separator for query string components is an
1679 * ampersand ('&') but the Dial app interprets them as additional
1680 * channels to dial in parallel so we instruct users to separate
1681 * the parameters with commas (',') instead. We now have to
1682 * convert those commas back to ampersands.
1683 */
1684 while ((comma = strchr(comma,','))) {
1685 *comma = '&';
1686 }
1687 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1688 }
1689
1690 if (ast_test_flag(&opts, OPT_WS_MSG_FORMAT)) {
1692
1694 ast_log(LOG_WARNING, "%s: 'f/control message format' dialstring parameter value missing or invalid. "
1695 "Defaulting to 'plain-text'\n",
1696 ast_channel_name(requestor));
1698 }
1699 } else if (global_cfg) {
1700 instance->control_msg_format = global_cfg->control_msg_format;
1701 }
1702
1703 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1704 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1705 if (!chan) {
1706 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1707 goto failure;
1708 }
1709
1710 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1711 ast_channel_name(chan), requestor_name,
1712 instance->connection_id);
1713
1714 instance->channel = ao2_bump(chan);
1716
1717 if (set_instance_translator(instance) != 0) {
1718 goto failure;
1719 }
1720
1721 if (set_instance_silence_frame(instance) != 0) {
1722 goto failure;
1723 }
1724
1725 if (set_channel_timer(instance) != 0) {
1726 goto failure;
1727 }
1728
1729 if (set_channel_variables(instance) != 0) {
1730 goto failure;
1731 }
1732
1734 if (!caps) {
1735 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1736 goto failure;
1737 }
1738
1739 ast_format_cap_append(caps, instance->native_format, 0);
1740 ast_channel_nativeformats_set(instance->channel, caps);
1743 ast_channel_set_readformat(instance->channel, instance->native_format);
1745 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1746 ast_channel_unlock(chan);
1747 ao2_cleanup(caps);
1748
1749 ast_debug(3, "%s: WebSocket channel created to %s\n",
1750 ast_channel_name(chan), args.connection_id);
1751
1752 return chan;
1753
1754failure:
1755 if (chan) {
1756 ast_channel_unlock(chan);
1757 }
1758 *cause = AST_CAUSE_FAILURE;
1759 return NULL;
1760}
1761
1762/*!
1763 * \internal
1764 *
1765 * Called by the core to hang up the channel.
1766 */
1767static int webchan_hangup(struct ast_channel *ast)
1768{
1769 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1770
1771 if (!instance) {
1772 return -1;
1773 }
1774 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1775 ast_channel_name(ast), instance->connection_id);
1776
1777 /*
1778 * We need to lock because read_from_ws_and_queue() is probably waiting
1779 * on the websocket file descriptor and will unblock and immediately try to
1780 * check the websocket and read from it. We don't want to pull the
1781 * websocket out from under it between the check and read.
1782 */
1783 ao2_lock(instance);
1784 if (instance->websocket) {
1785 ast_websocket_close(instance->websocket, 1000);
1786 ast_websocket_unref(instance->websocket);
1787 instance->websocket = NULL;
1788 }
1790 ao2_unlock(instance);
1791
1792 /* Clean up the reference from adding the instance to the channel */
1793 ao2_cleanup(instance);
1794
1795 return 0;
1796}
1797
1798static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
1799{
1800 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1801
1802 if (!instance) {
1803 return -1;
1804 }
1805
1806 return send_event(instance, DTMF_END, digit);
1807}
1808
1809/*!
1810 * \internal
1811 *
1812 * Called by res_http_websocket after a client has connected and
1813 * successfully upgraded from HTTP to WebSocket.
1814 *
1815 * Depends on incoming_ws_http_callback parsing the connection_id from
1816 * the HTTP request and storing it in get_params.
1817 */
1818static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
1819 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
1820{
1821 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1822 struct ast_variable *v;
1823 const char *connection_id = NULL;
1824 struct websocket_pvt *instance = NULL;
1825 int nodelay = 1;
1826
1827 ast_debug(3, "WebSocket established\n");
1828
1829 for (v = upgrade_headers; v; v = v->next) {
1830 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1831 }
1832 for (v = get_params; v; v = v->next) {
1833 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1834 }
1835
1836 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1837 if (!connection_id) {
1838 /*
1839 * This can't really happen because websocket_http_callback won't
1840 * let it get this far if it can't add the connection_id to the
1841 * get_params.
1842 * Just in case though...
1843 */
1844 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1846 ast_websocket_close(ast_ws_session, 1000);
1847 return;
1848 }
1849
1851 if (!instance) {
1852 /*
1853 * This also can't really happen because websocket_http_callback won't
1854 * let it get this far if it can't find the instance.
1855 * Just in case though...
1856 */
1857 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1859 ast_websocket_close(ast_ws_session, 1000);
1860 return;
1861 }
1862 instance->websocket = ao2_bump(ast_ws_session);
1863
1864 if (setsockopt(ast_websocket_fd(instance->websocket),
1865 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1866 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno));
1867 }
1868
1869 /* read_thread_handler cleans up the bump */
1870 read_thread_handler(ao2_bump(instance));
1871
1872 ao2_cleanup(instance);
1873 ast_debug(3, "WebSocket closed\n");
1874}
1875
1876/*!
1877 * \internal
1878 *
1879 * Called by the core http server after a client connects but before
1880 * the upgrade from HTTP to Websocket. We need to save the URI in
1881 * the CONNECTION_ID in a get_param because it contains the connection UUID
1882 * we gave to the client when they used externalMedia to create the channel.
1883 * incoming_ws_established_cb() will use this to retrieve the chan_websocket
1884 * instance.
1885 */
1887 const struct ast_http_uri *urih, const char *uri,
1888 enum ast_http_method method, struct ast_variable *get_params,
1889 struct ast_variable *headers)
1890{
1891 struct ast_http_uri fake_urih = {
1893 };
1894 int res = 0;
1895 /*
1896 * Normally the http server will destroy the get_params
1897 * when the session ends but if there weren't any initially
1898 * and we create some and add them to the list, the http server
1899 * won't know about it so we have to destroy it ourselves.
1900 */
1901 int destroy_get_params = (get_params == NULL);
1902 struct ast_variable *v = NULL;
1903 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1904
1905 ast_debug(2, "URI: %s Starting\n", uri);
1906
1907 /*
1908 * The client will have issued the GET request with a URI of
1909 * /media/<connection_id>
1910 *
1911 * Since this callback is registered for the /media URI prefix the
1912 * http server will strip that off the front of the URI passing in
1913 * only the path components after that in the 'uri' parameter.
1914 * This should leave only the connection id without a leading '/'.
1915 */
1916 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1917 if (!instance) {
1918 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1919 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1920 return -1;
1921 }
1922
1923 /*
1924 * We don't allow additional connections using the same connection id.
1925 */
1926 if (instance->websocket) {
1927 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1928 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1929 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1930 return -1;
1931 }
1932
1933 v = ast_variable_new("CONNECTION_ID", uri, "");
1934 if (!v) {
1935 ast_http_error(ser, 500, "Server error", "");
1936 return -1;
1937 }
1938 ast_variable_list_append(&get_params, v);
1939
1940 for (v = get_params; v; v = v->next) {
1941 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1942 }
1943
1944 /*
1945 * This will ultimately call internal_ws_established_cb() so
1946 * this function will block until the websocket is closed and
1947 * internal_ws_established_cb() returns;
1948 */
1949 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1950 get_params, headers);
1951 if (destroy_get_params) {
1952 ast_variables_destroy(get_params);
1953 }
1954
1955 ast_debug(2, "URI: %s DONE\n", uri);
1956
1957 return res;
1958}
1959
1960static struct ast_http_uri http_uri = {
1962 .description = "Media over Websocket",
1963 .uri = "media",
1964 .has_subtree = 1,
1965 .data = NULL,
1966 .key = __FILE__,
1967 .no_decode_uri = 1,
1968};
1969
1973
1974static int global_control_message_format_from_str(const struct aco_option *opt,
1975 struct ast_variable *var, void *obj)
1976{
1977 struct webchan_conf_global *cfg = obj;
1978
1980
1982 ast_log(LOG_ERROR, "chan_websocket.conf: Invalid value '%s' for "
1983 "control_mesage_format. Must be 'plain-text' or 'json'\n",
1984 var->value);
1985 return -1;
1986 }
1987
1988 return 0;
1989}
1990
1991static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
1992{
1993 const struct webchan_conf_global *cfg = obj;
1994
1996
1997 return 0;
1998}
1999
2000static void *global_alloc(const char *name)
2001{
2003 sizeof(*cfg), NULL);
2004
2005 if (!cfg) {
2006 return NULL;
2007 }
2008
2009 return cfg;
2010}
2011
2012static int global_apply(const struct ast_sorcery *sorcery, void *obj)
2013{
2014 struct webchan_conf_global *cfg = obj;
2015
2016 ast_debug(1, "control_msg_format: %s\n",
2018
2019 return 0;
2020}
2021
2022static int load_config(void)
2023{
2024 ast_debug(2, "Initializing Websocket Client Configuration\n");
2026 if (!sorcery) {
2027 ast_log(LOG_ERROR, "Failed to open sorcery\n");
2028 return -1;
2029 }
2030
2031 ast_sorcery_apply_default(sorcery, "global", "config",
2032 "chan_websocket.conf,criteria=type=global,single_object=yes,explicit_name=global");
2033
2035 ast_log(LOG_ERROR, "Failed to register chan_websocket global object with sorcery\n");
2037 sorcery = NULL;
2038 return -1;
2039 }
2040
2041 ast_sorcery_object_field_register_nodoc(sorcery, "global", "type", "", OPT_NOOP_T, 0, 0);
2042 ast_sorcery_register_cust(global, control_message_format, "plain-text");
2043
2045
2046 return 0;
2047}
2048
2049/*! \brief Function called when our module is unloaded */
2050static int unload_module(void)
2051{
2055
2059
2061 instances = NULL;
2062
2064 sorcery = NULL;
2065
2066 return 0;
2067}
2068
2069static int reload_module(void)
2070{
2071 ast_debug(2, "Reloading chan_websocket configuration\n");
2073
2074 return 0;
2075}
2076
2077/*! \brief Function called when our module is loaded */
2078static int load_module(void)
2079{
2080 int res = 0;
2081 struct ast_websocket_protocol *protocol;
2082
2083 res = load_config();
2084 if (res != 0) {
2086 }
2087
2090 }
2091
2094 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
2095 unload_module();
2097 }
2098
2100 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
2101 instance_proxy_sort_fn, instance_proxy_cmp_fn);
2102 if (!instances) {
2104 "Failed to allocate the chan_websocket instance registry\n");
2105 unload_module();
2107 }
2108
2110 if (!ast_ws_server) {
2111 unload_module();
2113 }
2114
2115 protocol = ast_websocket_sub_protocol_alloc("media");
2116 if (!protocol) {
2117 unload_module();
2119 }
2122
2124
2126}
2127
2128AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Websocket Media Channel",
2129 .support_level = AST_MODULE_SUPPORT_CORE,
2130 .load = load_module,
2131 .unload = unload_module,
2133 .load_pri = AST_MODPRI_CHANNEL_DRIVER,
2134 .requires = "res_http_websocket,res_websocket_client",
2135);
char digit
enum queue_result id
Definition app_queue.c:1771
#define var
Definition ast_expr2f.c:605
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ast_log
Definition astobj2.c:42
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
Definition astobj2.h:579
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition astobj2.h:365
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition astobj2.h:2064
#define ao2_unlock(a)
Definition astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748
#define ao2_lock(a)
Definition astobj2.h:717
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
Definition astobj2.h:550
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition astobj2.h:2032
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition astobj2.h:1211
#define AST_CAUSE_FAILURE
Definition causes.h:150
static PGresult * result
Definition cel_pgsql.c:84
static const char type[]
static int set_instance_translator(struct websocket_pvt *instance)
#define FLUSH_MEDIA
#define QUEUE_LENGTH_XON_LEVEL
#define MARK_MEDIA
#define send_event(_instance, _event,...)
Use this macro to create and send events passing in any event-specific parameters.
static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
#define ANSWER_CHANNEL
static void instance_proxy_cb(void *weakproxy, void *data)
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static void * global_alloc(const char *name)
static const char * control_msg_format_to_str(enum webchan_control_msg_format value)
static int validate_uri_parameters(const char *uri_params)
static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
Function called when we should write a frame to the channel.
static int webchan_hangup(struct ast_channel *ast)
static struct ast_channel_tech websocket_tech
#define HANGUP_CHANNEL
static struct ast_frame * webchan_read(struct ast_channel *ast)
@ OPT_ARG_WS_PASSTHROUGH
@ OPT_ARG_WS_URI_PARAM
@ OPT_ARG_WS_MSG_FORMAT
@ OPT_ARG_WS_CODEC
@ OPT_ARG_WS_NO_AUTO_ANSWER
@ OPT_ARG_ARRAY_SIZE
#define MEDIA_WEBSOCKET_CONNECTION_ID
static int read_from_ws_and_queue(struct websocket_pvt *instance)
@ WEBCHAN_CONTROL_MSG_FORMAT_JSON
@ WEBCHAN_CONTROL_MSG_FORMAT_PLAIN
@ WEBCHAN_CONTROL_MSG_FORMAT_INVALID
static int handle_command(struct websocket_pvt *instance, char *buffer)
static int global_control_message_format_from_str(const struct aco_option *opt, struct ast_variable *var, void *obj)
static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
static struct ast_http_uri http_uri
static int reload_module(void)
#define GET_DRIVER_STATUS
static int set_channel_variables(struct websocket_pvt *instance)
@ OPT_WS_CODEC
@ OPT_WS_PASSTHROUGH
@ OPT_WS_URI_PARAM
@ OPT_WS_MSG_FORMAT
@ OPT_WS_NO_AUTO_ANSWER
static void * read_thread_handler(void *obj)
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static void websocket_destructor(void *data)
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
static struct ast_sorcery * sorcery
#define START_MEDIA_BUFFERING
static int set_channel_timer(struct websocket_pvt *instance)
#define QUEUE_LENGTH_XOFF_LEVEL
static const struct ast_app_option websocket_options[128]
#define PAUSE_MEDIA
#define create_event(_instance, _event,...)
Use this macro to create events passing in any event-specific parameters.
static struct ao2_container * instances
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
static void set_channel_format(struct websocket_pvt *instance, struct ast_format *fmt)
#define INCOMING_CONNECTION_ID
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
#define REPORT_QUEUE_DRAINED
#define CONTINUE_MEDIA
#define STOP_MEDIA_BUFFERING
static int load_module(void)
Function called when our module is loaded.
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout)
static enum webchan_control_msg_format control_msg_format_from_str(const char *value)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static int set_instance_silence_frame(struct websocket_pvt *instance)
static struct ast_websocket_server * ast_ws_server
static int unload_module(void)
Function called when our module is unloaded.
static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
#define MAX_TEXT_MESSAGE_LEN
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
#define QUEUE_LENGTH_MAX
static struct ast_channel * webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
static int load_config(void)
static int global_apply(const struct ast_sorcery *sorcery, void *obj)
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
const char * ast_channel_name(const struct ast_channel *chan)
void * ast_channel_tech_pvt(const struct ast_channel *chan)
struct ast_format * ast_channel_rawreadformat(struct ast_channel *chan)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition channel.h:1299
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
Definition channel.c:570
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
Definition channel.c:1288
int ast_set_read_format(struct ast_channel *chan, struct ast_format *format)
Sets read format on channel chan.
Definition channel.c:5757
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
Definition channel.c:539
#define ast_channel_unref(c)
Decrease channel reference count.
Definition channel.h:3018
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
Definition channel.c:2416
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
Definition channel.h:2983
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
struct ast_format * ast_channel_readformat(struct ast_channel *chan)
@ AST_STATE_DOWN
@ AST_MEDIA_TYPE_UNKNOWN
Definition codec.h:31
@ OPT_NOOP_T
Type for a default handler that should do nothing.
char buf[BUFSIZE]
Definition eagi_proxy.c:66
struct ast_codec * ast_format_get_codec(const struct ast_format *format)
Get the codec associated with a format.
Definition format.c:324
unsigned int ast_format_get_minimum_bytes(const struct ast_format *format)
Get the minimum number of bytes expected in a frame for this format.
Definition format.c:374
unsigned int ast_format_get_sample_rate(const struct ast_format *format)
Get the sample rate of a media format.
Definition format.c:379
unsigned int ast_format_get_minimum_ms(const struct ast_format *format)
Get the minimum amount of media carried in this format.
Definition format.c:364
enum ast_format_cmp_res ast_format_cmp(const struct ast_format *format1, const struct ast_format *format2)
Compare two formats.
Definition format.c:201
@ AST_FORMAT_CMP_NOT_EQUAL
Definition format.h:38
const char * ast_format_get_name(const struct ast_format *format)
Get the name associated with a format.
Definition format.c:334
unsigned int ast_format_get_default_ms(const struct ast_format *format)
Get the default framing size (in milliseconds) for a format.
Definition format.c:359
int ast_format_cache_is_slinear(struct ast_format *format)
Determines if a format is one of the cached slin formats.
#define ast_format_cache_get(name)
Retrieve a named format from the cache.
struct ast_format * ast_format_cache_get_slin_by_rate(unsigned int rate)
Retrieve the best signed linear format given a sample rate.
int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_type type)
Add all codecs Asterisk knows about for a specific type to the capabilities structure.
Definition format_cap.c:216
struct ast_format * ast_format_cap_get_format(const struct ast_format_cap *cap, int position)
Get the format at a specific index.
Definition format_cap.c:400
@ AST_FORMAT_CAP_FLAG_DEFAULT
Definition format_cap.h:38
#define ast_format_cap_append(cap, format, framing)
Add format capability to capabilities structure.
Definition format_cap.h:99
#define ast_format_cap_alloc(flags)
Allocate a new ast_format_cap structure.
Definition format_cap.h:49
static const char name[]
Definition format_mp3.c:68
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
Definition func_uuid.c:52
ast_http_method
HTTP Request methods known by Asterisk.
Definition http.h:58
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
Definition http.c:721
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition http.c:664
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Definition http.c:689
int AST_OPTIONAL_API_NAME() ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
int AST_OPTIONAL_API_NAME() ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int AST_OPTIONAL_API_NAME() ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
int AST_OPTIONAL_API_NAME() ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
int AST_OPTIONAL_API_NAME() ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_CLIENT
@ AST_WS_TYPE_SERVER
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME() ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int AST_OPTIONAL_API_NAME() ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
struct ast_websocket_server *AST_OPTIONAL_API_NAME() ast_websocket_server_create(void)
Creates a ast_websocket_server.
void AST_OPTIONAL_API_NAME() ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
const char *AST_OPTIONAL_API_NAME() ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
#define AST_APP_ARG(name)
Define an application argument.
#define END_OPTIONS
#define AST_APP_OPTIONS(holder, options...)
Declares an array of options for an application.
#define AST_APP_OPTION_ARG(option, flagno, argno)
Declares an application option that accepts an argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define BEGIN_OPTIONS
#define AST_APP_OPTION(option, flagno)
Declares an application option that does not accept an argument.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
Definition main/app.c:3066
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition extconf.c:1260
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
Definition main/frame.c:176
#define ast_frdup(fr)
Copies a frame.
#define ast_frfree(fr)
#define AST_MALLOCD_DATA
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
#define AST_FRIENDLY_OFFSET
Offset into a frame's data buffer.
@ AST_FRAME_CONTROL
@ AST_CONTROL_ANSWER
@ AST_CONTROL_HANGUP
@ AST_CONTROL_OPTION
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition json.h:600
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition json.c:585
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
#define AST_LIST_LOCK(head)
Locks a list.
Definition linkedlists.h:40
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
Definition lock.h:621
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Definition lock.h:590
int errno
@ AST_MODFLAG_LOAD_ORDER
Definition module.h:331
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition module.h:557
@ AST_MODPRI_CHANNEL_DRIVER
Definition module.h:341
@ AST_MODULE_SUPPORT_CORE
Definition module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.
static int reload(void)
const char * method
Definition res_pjsip.c:1279
static struct @519 args
#define NULL
Definition resample.c:96
#define ast_sorcery_unref(sorcery)
Decrease the reference count of a sorcery structure.
Definition sorcery.h:1500
#define ast_sorcery_object_field_register_nodoc(sorcery, type, name, default_val, opt_type, flags,...)
Register a field within an object without documentation.
Definition sorcery.h:987
#define ast_sorcery_register_cust(object, option, def_value)
Register a custom field within an object.
Definition sorcery.h:1767
void ast_sorcery_load(const struct ast_sorcery *sorcery)
Inform any wizards to load persistent objects.
Definition sorcery.c:1441
void * ast_sorcery_retrieve_by_id(const struct ast_sorcery *sorcery, const char *type, const char *id)
Retrieve an object using its unique identifier.
Definition sorcery.c:1917
#define ast_sorcery_object_register(sorcery, type, alloc, transform, apply)
Register an object type.
Definition sorcery.h:837
void ast_sorcery_reload(const struct ast_sorcery *sorcery)
Inform any wizards to reload persistent objects.
Definition sorcery.c:1472
void * ast_sorcery_generic_alloc(size_t size, ao2_destructor_fn destructor)
Allocate a generic sorcery capable object.
Definition sorcery.c:1792
#define ast_sorcery_apply_default(sorcery, type, name, data)
Definition sorcery.h:476
#define ast_sorcery_open()
Open a new sorcery structure.
Definition sorcery.h:406
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Definition strings.h:223
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition utils.c:1871
Structure to pass both assignedid values to channel drivers.
Definition channel.h:606
struct ast_format_cap * capabilities
Definition channel.h:652
Main Channel structure associated with a channel.
const char * data
unsigned int sample_rate
Sample rate (number of samples carried in a second)
Definition codec.h:52
unsigned int minimum_bytes
Length in bytes of the data payload of a minimum_ms frame.
Definition codec.h:60
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
Definition codec.h:58
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
Definition codec.h:68
unsigned int minimum_ms
Minimum length of media that can be carried (in milliseconds) in a frame.
Definition codec.h:54
Structure used to handle boolean flags.
Definition utils.h:220
Format capabilities structure, holds formats + preference order + etc.
Definition format_cap.c:54
Definition of a media format.
Definition format.c:43
struct ast_format * format
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@239 data
Definition of a URI handler.
Definition http.h:102
ast_http_callback callback
Definition http.h:107
void * data
Definition http.h:116
JSON parsing error information.
Definition json.h:887
Abstract JSON element (object, array, string, int, ...).
Full structure for sorcery.
Definition sorcery.c:231
describes a server instance
Definition tcptls.h:151
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
Structure definition for session.
char connection_id[0]
The name of the module owning this sorcery instance.
enum webchan_control_msg_format control_msg_format
char connection_id[0]
struct ast_format * native_format
struct websocket_pvt::@145 frame_queue
struct ast_channel * channel
struct ast_codec * slin_codec
enum webchan_control_msg_format control_msg_format
enum ast_websocket_type type
struct ast_timer * timer
struct ast_format * slin_format
pthread_t outbound_read_thread
struct ast_frame silence
struct ast_codec * native_codec
struct ast_websocket_client * client
struct ast_websocket * websocket
struct ast_trans_pvt * translator
static struct aco_type global
static struct test_options options
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
Definition timing.c:154
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
Definition timing.c:171
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
Definition timing.c:166
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
Definition timing.c:186
struct ast_timer * ast_timer_open(void)
Open a timer.
Definition timing.c:122
@ AST_TIMING_EVENT_EXPIRED
Definition timing.h:58
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Definition timing.c:161
struct ast_frame * ast_translate(struct ast_trans_pvt *tr, struct ast_frame *f, int consume)
translates one or more frames Apply an input frame into the translator and receive zero or one output...
Definition translate.c:566
void ast_translator_free_path(struct ast_trans_pvt *tr)
Frees a translator path Frees the given translator path structure.
Definition translate.c:476
struct ast_trans_pvt * ast_translator_build_path(struct ast_format *dest, struct ast_format *source)
Builds a translator path Build a path (possibly NULL) from source to dest.
Definition translate.c:486
#define ast_test_flag(p, flag)
Definition utils.h:64
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define MIN(a, b)
Definition utils.h:252
int ast_wait_for_input(int fd, int ms)
Definition utils.c:1734
#define ast_pthread_create_detached_background(a, b, c, d)
Definition utils.h:637
int ast_uri_verify_encoded(const char *string)
Verify if a string is valid as a URI component.
Definition utils.c:781
#define AST_UUID_STR_LEN
Definition uuid.h:27
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
Definition uuid.c:141
void ast_websocket_client_add_uri_params(struct ast_websocket_client *wc, const char *uri_params)
Add additional parameters to the URI.
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.

◆ FLUSH_MEDIA

#define FLUSH_MEDIA   "FLUSH_MEDIA"

Definition at line 116 of file chan_websocket.c.

◆ GET_DRIVER_STATUS

#define GET_DRIVER_STATUS   "GET_STATUS"

Definition at line 117 of file chan_websocket.c.

◆ HANGUP_CHANNEL

#define HANGUP_CHANNEL   "HANGUP"

Definition at line 112 of file chan_websocket.c.

◆ INCOMING_CONNECTION_ID

#define INCOMING_CONNECTION_ID   "INCOMING"

Definition at line 109 of file chan_websocket.c.

◆ MARK_MEDIA

#define MARK_MEDIA   "MARK_MEDIA"

Definition at line 115 of file chan_websocket.c.

◆ MAX_TEXT_MESSAGE_LEN

#define MAX_TEXT_MESSAGE_LEN   MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))

Definition at line 125 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_CONNECTION_ID

#define MEDIA_WEBSOCKET_CONNECTION_ID   "MEDIA_WEBSOCKET_CONNECTION_ID"

Definition at line 108 of file chan_websocket.c.

◆ MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE

#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE   "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"

Definition at line 107 of file chan_websocket.c.

◆ PAUSE_MEDIA

#define PAUSE_MEDIA   "PAUSE_MEDIA"

Definition at line 119 of file chan_websocket.c.

◆ QUEUE_LENGTH_MAX

#define QUEUE_LENGTH_MAX   1000

Definition at line 122 of file chan_websocket.c.

◆ QUEUE_LENGTH_XOFF_LEVEL

#define QUEUE_LENGTH_XOFF_LEVEL   900

Definition at line 123 of file chan_websocket.c.

◆ QUEUE_LENGTH_XON_LEVEL

#define QUEUE_LENGTH_XON_LEVEL   800

Definition at line 124 of file chan_websocket.c.

◆ REPORT_QUEUE_DRAINED

#define REPORT_QUEUE_DRAINED   "REPORT_QUEUE_DRAINED"

Definition at line 118 of file chan_websocket.c.

◆ send_event

#define send_event (   _instance,
  _event,
  ... 
)

Use this macro to create and send events passing in any event-specific parameters.

Definition at line 415 of file chan_websocket.c.

416 { \
417 int _res = -1; \
418 char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
419 if (_payload) { \
420 _res = ast_websocket_write_string(_instance->websocket, _payload); \
421 if (_res != 0) { \
422 ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
423 ast_channel_name(instance->channel), _payload); \
424 } else { \
425 ast_debug(4, "%s: Sent %s\n", \
426 ast_channel_name(instance->channel), _payload); \
427 }\
428 ast_free(_payload); \
429 } \
430 (_res); \
431})

◆ START_MEDIA_BUFFERING

#define START_MEDIA_BUFFERING   "START_MEDIA_BUFFERING"

Definition at line 113 of file chan_websocket.c.

◆ STOP_MEDIA_BUFFERING

#define STOP_MEDIA_BUFFERING   "STOP_MEDIA_BUFFERING"

Definition at line 114 of file chan_websocket.c.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
OPT_WS_CODEC 
OPT_WS_NO_AUTO_ANSWER 
OPT_WS_URI_PARAM 
OPT_WS_PASSTHROUGH 
OPT_WS_MSG_FORMAT 

Definition at line 1551 of file chan_websocket.c.

1551 {
1552 OPT_WS_CODEC = (1 << 0),
1553 OPT_WS_NO_AUTO_ANSWER = (1 << 1),
1554 OPT_WS_URI_PARAM = (1 << 2),
1555 OPT_WS_PASSTHROUGH = (1 << 3),
1556 OPT_WS_MSG_FORMAT = (1 << 4),
1557};

◆ anonymous enum

anonymous enum
Enumerator
OPT_ARG_WS_CODEC 
OPT_ARG_WS_NO_AUTO_ANSWER 
OPT_ARG_WS_URI_PARAM 
OPT_ARG_WS_PASSTHROUGH 
OPT_ARG_WS_MSG_FORMAT 
OPT_ARG_ARRAY_SIZE 

Definition at line 1559 of file chan_websocket.c.

◆ webchan_control_msg_format

Enumerator
WEBCHAN_CONTROL_MSG_FORMAT_PLAIN 
WEBCHAN_CONTROL_MSG_FORMAT_JSON 
WEBCHAN_CONTROL_MSG_FORMAT_INVALID 

Definition at line 56 of file chan_websocket.c.

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 2136 of file chan_websocket.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 2136 of file chan_websocket.c.

◆ _create_event_DTMF_END()

static char * _create_event_DTMF_END ( struct websocket_pvt instance,
const char  digit 
)
static

Definition at line 301 of file chan_websocket.c.

303{
304 char *payload = NULL;
306 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s#}",
307 "event", "DTMF_END",
308 "channel_id", ast_channel_uniqueid(instance->channel),
309 "digit", &digit, 1
310 );
311 if (!msg) {
312 return NULL;
313 }
315 ast_json_unref(msg);
316 } else {
317 ast_asprintf(&payload, "%s digit:%c channel_id:%s",
318 "DTMF_END", digit, ast_channel_uniqueid(instance->channel));
319 }
320
321 return payload;
322}
const char * ast_channel_uniqueid(const struct ast_channel *chan)
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
@ AST_JSON_COMPACT
Definition json.h:793
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition json.c:484

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, digit, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_ERROR()

static char * _create_event_ERROR ( struct websocket_pvt instance,
const char *  format,
  ... 
)
static

Definition at line 369 of file chan_websocket.c.

371{
372 char *payload = NULL;
373 char *error_text = NULL;
374 va_list ap;
375 int res = 0;
376
377 va_start(ap, format);
378 res = ast_vasprintf(&error_text, format, ap);
379 va_end(ap);
380 if (res < 0 || !error_text) {
381 return NULL;
382 }
383
385 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
386 "event", "ERROR",
387 "channel_id", ast_channel_uniqueid(instance->channel),
388 "error_text", error_text);
389 ast_free(error_text);
390 if (!msg) {
391 return NULL;
392 }
394 ast_json_unref(msg);
395 } else {
396 ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
397 "ERROR", ast_channel_uniqueid(instance->channel), error_text);
398 ast_free(error_text);
399 }
400
401 return payload;
402}
#define ast_vasprintf(ret, fmt, ap)
A wrapper for vasprintf()
Definition astmm.h:278

References ast_asprintf, ast_channel_uniqueid(), ast_free, AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), ast_vasprintf, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_BUFFERING_COMPLETED()

static char * _create_event_MEDIA_BUFFERING_COMPLETED ( struct websocket_pvt instance,
const char *  id 
)
static

Definition at line 241 of file chan_websocket.c.

243{
244 char *payload = NULL;
246 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
247 "event", "MEDIA_BUFFERING_COMPLETED",
248 "channel_id", ast_channel_uniqueid(instance->channel),
249 "correlation_id", S_OR(id, "")
250 );
251 if (!msg) {
252 return NULL;
253 }
255 ast_json_unref(msg);
256 } else {
257 ast_asprintf(&payload, "%s%s%s",
258 "MEDIA_BUFFERING_COMPLETED",
259 S_COR(id, " ",""), S_OR(id, ""));
260
261 }
262
263 return payload;
264}
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
Definition strings.h:87

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, S_COR, S_OR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_MARK_PROCESSED()

static char * _create_event_MEDIA_MARK_PROCESSED ( struct websocket_pvt instance,
const char *  id 
)
static

Definition at line 271 of file chan_websocket.c.

273{
274 char *payload = NULL;
276 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
277 "event", "MEDIA_MARK_PROCESSED",
278 "channel_id", ast_channel_uniqueid(instance->channel),
279 "correlation_id", S_OR(id, "")
280 );
281 if (!msg) {
282 return NULL;
283 }
285 ast_json_unref(msg);
286 } else {
287 ast_asprintf(&payload, "%s%s%s",
288 "MEDIA_MARK_PROCESSED",
289 S_COR(id, " ",""), S_OR(id, ""));
290
291 }
292
293 return payload;
294}

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, S_COR, S_OR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_MEDIA_START()

static char * _create_event_MEDIA_START ( struct websocket_pvt instance)
static

Definition at line 200 of file chan_websocket.c.

201{
202 char *payload = NULL;
203
205 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s, s:s, s:s, s:i, s:i, s:o }",
206 "event", "MEDIA_START",
207 "connection_id", instance->connection_id,
208 "channel", ast_channel_name(instance->channel),
209 "channel_id", ast_channel_uniqueid(instance->channel),
210 "format", ast_format_get_name(instance->native_format),
211 "optimal_frame_size", instance->optimal_frame_size,
212 "ptime", instance->native_codec->default_ms,
213 "channel_variables", ast_json_channel_vars(ast_channel_varshead(
214 instance->channel))
215 );
216 if (!msg) {
217 return NULL;
218 }
220 ast_json_unref(msg);
221 } else {
222 ast_asprintf(&payload, "%s %s:%s %s:%s %s:%s %s:%s %s:%d %s:%d",
223 "MEDIA_START",
224 "connection_id", instance->connection_id,
225 "channel", ast_channel_name(instance->channel),
226 "channel_id", ast_channel_uniqueid(instance->channel),
227 "format", ast_format_get_name(instance->native_format),
228 "optimal_frame_size", instance->optimal_frame_size,
229 "ptime", instance->native_codec->default_ms
230 );
231 }
232
233 return payload;
234}
struct varshead * ast_channel_varshead(struct ast_channel *chan)
struct ast_json * ast_json_channel_vars(struct varshead *channelvars)
Construct a JSON object from a ast_var_t list.
Definition json.c:941

References ast_asprintf, ast_channel_name(), ast_channel_uniqueid(), ast_channel_varshead(), ast_format_get_name(), ast_json_channel_vars(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::channel, websocket_pvt::connection_id, websocket_pvt::control_msg_format, ast_codec::default_ms, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, websocket_pvt::optimal_frame_size, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_nodata()

static char * _create_event_nodata ( struct websocket_pvt instance,
char *  event 
)
static

Definition at line 172 of file chan_websocket.c.

173{
174 char *payload = NULL;
176 struct ast_json * msg = ast_json_pack("{ s:s s:s }",
177 "event", event,
178 "channel_id", ast_channel_uniqueid(instance->channel));
179 if (!msg) {
180 return NULL;
181 }
183 ast_json_unref(msg);
184 } else {
185 payload = ast_strdup(event);
186 }
187
188 return payload;
189}

References ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), ast_strdup, websocket_pvt::channel, websocket_pvt::control_msg_format, NULL, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ _create_event_STATUS()

static char * _create_event_STATUS ( struct websocket_pvt instance)
static

Definition at line 329 of file chan_websocket.c.

330{
331 char *payload = NULL;
332
334 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:i, s:i, s:i, s:b, s:b, s:b }",
335 "event", "STATUS",
336 "channel_id", ast_channel_uniqueid(instance->channel),
337 "queue_length", instance->frame_queue_length,
338 "xon_level", QUEUE_LENGTH_XON_LEVEL,
339 "xoff_level", QUEUE_LENGTH_XOFF_LEVEL,
340 "queue_full", instance->queue_full,
341 "bulk_media", instance->bulk_media_in_progress,
342 "media_paused", instance->queue_paused
343 );
344 if (!msg) {
345 return NULL;
346 }
348 ast_json_unref(msg);
349 } else {
350 ast_asprintf(&payload, "%s channel_id:%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
351 "STATUS",
352 ast_channel_uniqueid(instance->channel),
355 S_COR(instance->queue_full, "true", "false"),
356 S_COR(instance->bulk_media_in_progress, "true", "false"),
357 S_COR(instance->queue_paused, "true", "false")
358 );
359 }
360
361 return payload;
362}

References ast_asprintf, ast_channel_uniqueid(), AST_JSON_COMPACT, ast_json_dump_string_format(), ast_json_pack(), ast_json_unref(), websocket_pvt::bulk_media_in_progress, websocket_pvt::channel, websocket_pvt::control_msg_format, websocket_pvt::frame_queue_length, NULL, websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, QUEUE_LENGTH_XON_LEVEL, websocket_pvt::queue_paused, S_COR, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

◆ AST_MODULE_SELF_SYM()

struct ast_module * AST_MODULE_SELF_SYM ( void  )

Definition at line 2136 of file chan_websocket.c.

◆ control_msg_format_from_str()

static enum webchan_control_msg_format control_msg_format_from_str ( const char *  value)
static

◆ control_msg_format_to_str()

static const char * control_msg_format_to_str ( enum webchan_control_msg_format  value)
static

Definition at line 159 of file chan_websocket.c.

160{
162 return NULL;
163 }
164 return msg_format_map[value];
165}
#define ARRAY_IN_BOUNDS(v, a)
Checks to see if value is within the bounds of the given array.
Definition utils.h:727

References ARRAY_IN_BOUNDS, msg_format_map, NULL, and value.

Referenced by global_apply(), and global_control_message_format_to_str().

◆ dequeue_frame()

static struct ast_frame * dequeue_frame ( struct websocket_pvt instance)
static

Definition at line 450 of file chan_websocket.c.

451{
452 struct ast_frame *queued_frame = NULL;
453 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
455
456 /*
457 * If the queue is paused, don't read a frame. Processing
458 * will continue down the function and a silence frame will
459 * be sent in its place.
460 */
461 if (instance->queue_paused) {
462 return NULL;
463 }
464
465 /*
466 * We need to check if we need to send an XON before anything
467 * else because there are multiple escape paths in this function
468 * and we don't want to accidentally keep the queue in a "full"
469 * state.
470 */
471 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
472 instance->queue_full = 0;
473 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
474 ast_channel_name(instance->channel));
475 send_event(instance, MEDIA_XON);
476 }
477
478 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
479
480 /*
481 * If there are no frames in the queue, we need to
482 * return NULL so we can send a silence frame. We also need
483 * to send the QUEUE_DRAINED notification if we were requested
484 * to do so.
485 */
486 if (!queued_frame) {
487 if (instance->report_queue_drained) {
488 instance->report_queue_drained = 0;
489 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
490 ast_channel_name(instance->channel));
491 send_event(instance, QUEUE_DRAINED);
492 }
493 return NULL;
494 }
495
496 /*
497 * The only way a control frame could be present here is as
498 * a result of us calling queue_option_frame() in response
499 * to an incoming TEXT command from the websocket.
500 * We'll be safe and make sure it's a AST_CONTROL_OPTION
501 * frame anyway.
502 *
503 * It's quite possible that there are multiple control frames
504 * in a row in the queue so we need to process consecutive ones
505 * immediately.
506 *
507 * In any case, processing a control frame MUST not use up
508 * a media timeslot so after all control frames have been
509 * processed, we need to read an audio frame and process it.
510 */
511 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
512 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
513 /*
514 * We just need to send the data to the websocket.
515 * The data should already be NULL terminated.
516 */
518 queued_frame->data.ptr);
519 ast_debug(4, "%s: Sent %s\n",
520 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
521 }
522 /*
523 * We do NOT send these to the core so we need to free
524 * the frame and grab the next one. If it's also a
525 * control frame, we need to process it otherwise
526 * continue down in the function.
527 */
528 ast_frame_free(queued_frame, 0);
529 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
530 /*
531 * Jut FYI... We didn't bump the queue length when we added the control
532 * frames so we don't need to decrement it here.
533 */
534 }
535
536 /*
537 * If, after reading all control frames, there are no frames
538 * left in the queue, we need to return NULL so we can send
539 * a silence frame.
540 */
541 if (!queued_frame) {
542 return NULL;
543 }
544
545 instance->frame_queue_length--;
546
547 return queued_frame;
548}

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, ast_frame_free(), AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_websocket_write_string(), websocket_pvt::channel, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, ast_frame_subclass::integer, NULL, ast_frame::ptr, websocket_pvt::queue_full, QUEUE_LENGTH_XON_LEVEL, websocket_pvt::queue_paused, websocket_pvt::report_queue_drained, SCOPED_LOCK, send_event, ast_frame::subclass, and websocket_pvt::websocket.

Referenced by webchan_read().

◆ global_alloc()

static void * global_alloc ( const char *  name)
static

Definition at line 2001 of file chan_websocket.c.

2002{
2004 sizeof(*cfg), NULL);
2005
2006 if (!cfg) {
2007 return NULL;
2008 }
2009
2010 return cfg;
2011}

References ast_sorcery_generic_alloc(), and NULL.

Referenced by load_config().

◆ global_apply()

static int global_apply ( const struct ast_sorcery sorcery,
void *  obj 
)
static

Definition at line 2013 of file chan_websocket.c.

2014{
2015 struct webchan_conf_global *cfg = obj;
2016
2017 ast_debug(1, "control_msg_format: %s\n",
2019
2020 return 0;
2021}

References ast_debug, webchan_conf_global::control_msg_format, and control_msg_format_to_str().

Referenced by load_config().

◆ global_control_message_format_from_str()

static int global_control_message_format_from_str ( const struct aco_option opt,
struct ast_variable var,
void *  obj 
)
static

Definition at line 1975 of file chan_websocket.c.

1977{
1978 struct webchan_conf_global *cfg = obj;
1979
1981
1983 ast_log(LOG_ERROR, "chan_websocket.conf: Invalid value '%s' for "
1984 "control_mesage_format. Must be 'plain-text' or 'json'\n",
1985 var->value);
1986 return -1;
1987 }
1988
1989 return 0;
1990}

References ast_log, webchan_conf_global::control_msg_format, control_msg_format_from_str(), LOG_ERROR, var, and WEBCHAN_CONTROL_MSG_FORMAT_INVALID.

◆ global_control_message_format_to_str()

static int global_control_message_format_to_str ( const void *  obj,
const intptr_t *  args,
char **  buf 
)
static

Definition at line 1992 of file chan_websocket.c.

1993{
1994 const struct webchan_conf_global *cfg = obj;
1995
1997
1998 return 0;
1999}

References ast_strdup, buf, webchan_conf_global::control_msg_format, and control_msg_format_to_str().

◆ handle_command()

static int handle_command ( struct websocket_pvt instance,
char *  buffer 
)
static

Definition at line 726 of file chan_websocket.c.

727{
728 int res = 0;
729 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
730 const char *command = NULL;
731 char *data = NULL;
732
734 struct ast_json_error json_error;
735
736 json = ast_json_load_buf(buffer, strlen(buffer), &json_error);
737 if (!json) {
738 send_event(instance, ERROR, "Unable to parse JSON command");
739 return -1;
740 }
741 command = ast_json_object_string_get(json, "command");
742 } else {
743 command = buffer;
744 data = strchr(buffer, ' ');
745 if (data) {
746 *data = '\0';
747 data++;
748 }
749 }
750
751 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
753
754 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
756
757 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
758 if (instance->passthrough) {
759 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
760 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
761 ast_channel_name(instance->channel), command);
762 return 0;
763 }
764 AST_LIST_LOCK(&instance->frame_queue);
765 instance->bulk_media_in_progress = 1;
766 AST_LIST_UNLOCK(&instance->frame_queue);
767
768 } else if (ast_strings_equal(command, STOP_MEDIA_BUFFERING)) {
769 const char *id;
770 char *option;
771 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
773
775 id = ast_json_object_string_get(json, "correlation_id");
776 } else {
777 id = data;
778 }
779
780 if (instance->passthrough) {
781 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
782 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
783 ast_channel_name(instance->channel), command);
784 return 0;
785 }
786
787 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
789 (int)instance->leftover_len);
790
791 instance->bulk_media_in_progress = 0;
792 if (instance->leftover_len > 0) {
793 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
794 if (res != 0) {
795 return res;
796 }
797 }
798 instance->leftover_len = 0;
799 option = create_event(instance, MEDIA_BUFFERING_COMPLETED, id);
800 if (!option) {
801 return -1;
802 }
803 res = queue_option_frame(instance, option);
804 ast_free(option);
805
806 } else if (ast_strings_equal(command, MARK_MEDIA)) {
807 const char *id;
808 char *option;
809 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
811
812 if (instance->passthrough) {
813 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
814 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
815 ast_channel_name(instance->channel), command);
816 return 0;
817 }
818
820 id = ast_json_object_string_get(json, "correlation_id");
821 } else {
822 id = data;
823 }
824
825 ast_debug(4, "%s: %s %s\n",
826 ast_channel_name(instance->channel), MARK_MEDIA, id);
827
828 option = create_event(instance, MEDIA_MARK_PROCESSED, id);
829 if (!option) {
830 return -1;
831 }
832 res = queue_option_frame(instance, option);
833 ast_free(option);
834
835 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
836 struct ast_frame *frame = NULL;
837
838 if (instance->passthrough) {
839 send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
840 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
841 ast_channel_name(instance->channel), command);
842 return 0;
843 }
844
845 AST_LIST_LOCK(&instance->frame_queue);
846 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
847 ast_frfree(frame);
848 }
849 instance->frame_queue_length = 0;
850 instance->bulk_media_in_progress = 0;
851 instance->leftover_len = 0;
852 AST_LIST_UNLOCK(&instance->frame_queue);
853
854 } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
855 if (instance->passthrough) {
856 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
857 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
858 ast_channel_name(instance->channel), command);
859 return 0;
860 }
861
862 AST_LIST_LOCK(&instance->frame_queue);
863 instance->report_queue_drained = 1;
864 AST_LIST_UNLOCK(&instance->frame_queue);
865
866 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
867 return send_event(instance, STATUS);
868
869 } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
870 if (instance->passthrough) {
871 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
872 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
873 ast_channel_name(instance->channel), command);
874 return 0;
875 }
876 AST_LIST_LOCK(&instance->frame_queue);
877 instance->queue_paused = 1;
878 AST_LIST_UNLOCK(&instance->frame_queue);
879
880 } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
881 if (instance->passthrough) {
882 send_event(instance, ERROR, "%s not supported in passthrough mode", command);
883 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
884 ast_channel_name(instance->channel), command);
885 return 0;
886 }
887 AST_LIST_LOCK(&instance->frame_queue);
888 instance->queue_paused = 0;
889 AST_LIST_UNLOCK(&instance->frame_queue);
890
891 } else {
892 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
893 ast_channel_name(instance->channel), command);
894 }
895
896 return res;
897}

References ANSWER_CHANNEL, ast_channel_name(), AST_CONTROL_ANSWER, AST_CONTROL_HANGUP, ast_debug, ast_free, ast_frfree, ast_json_load_buf(), ast_json_object_string_get, ast_json_unref(), AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_log, ast_queue_control(), ast_strings_equal(), websocket_pvt::bulk_media_in_progress, websocket_pvt::channel, CONTINUE_MEDIA, websocket_pvt::control_msg_format, create_event, ast_frame::data, FLUSH_MEDIA, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, GET_DRIVER_STATUS, HANGUP_CHANNEL, id, websocket_pvt::leftover_data, websocket_pvt::leftover_len, LOG_WARNING, MARK_MEDIA, NULL, websocket_pvt::passthrough, PAUSE_MEDIA, queue_frame_from_buffer(), queue_option_frame(), websocket_pvt::queue_paused, RAII_VAR, websocket_pvt::report_queue_drained, REPORT_QUEUE_DRAINED, SCOPED_LOCK, send_event, START_MEDIA_BUFFERING, STOP_MEDIA_BUFFERING, and WEBCHAN_CONTROL_MSG_FORMAT_JSON.

Referenced by process_text_message().

◆ incoming_ws_established_cb()

static void incoming_ws_established_cb ( struct ast_websocket ast_ws_session,
struct ast_variable get_params,
struct ast_variable upgrade_headers 
)
static

Definition at line 1819 of file chan_websocket.c.

1821{
1822 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1823 struct ast_variable *v;
1824 const char *connection_id = NULL;
1825 struct websocket_pvt *instance = NULL;
1826 int nodelay = 1;
1827
1828 ast_debug(3, "WebSocket established\n");
1829
1830 for (v = upgrade_headers; v; v = v->next) {
1831 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1832 }
1833 for (v = get_params; v; v = v->next) {
1834 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1835 }
1836
1837 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1838 if (!connection_id) {
1839 /*
1840 * This can't really happen because websocket_http_callback won't
1841 * let it get this far if it can't add the connection_id to the
1842 * get_params.
1843 * Just in case though...
1844 */
1845 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1847 ast_websocket_close(ast_ws_session, 1000);
1848 return;
1849 }
1850
1852 if (!instance) {
1853 /*
1854 * This also can't really happen because websocket_http_callback won't
1855 * let it get this far if it can't find the instance.
1856 * Just in case though...
1857 */
1858 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1860 ast_websocket_close(ast_ws_session, 1000);
1861 return;
1862 }
1863 instance->websocket = ao2_bump(ast_ws_session);
1864
1865 if (setsockopt(ast_websocket_fd(instance->websocket),
1866 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1867 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno));
1868 }
1869
1870 /* read_thread_handler cleans up the bump */
1871 read_thread_handler(ao2_bump(instance));
1872
1873 ao2_cleanup(instance);
1874 ast_debug(3, "WebSocket closed\n");
1875}

References ao2_bump, ao2_cleanup, ao2_weakproxy_find, AST_CONTROL_HANGUP, ast_debug, ast_log, ast_queue_control(), ast_variable_find_in_list(), ast_websocket_close(), ast_websocket_fd(), ast_websocket_unref(), websocket_pvt::channel, websocket_pvt::connection_id, errno, instances, LOG_WARNING, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, read_thread_handler(), ast_variable::value, and websocket_pvt::websocket.

Referenced by load_module().

◆ incoming_ws_http_callback()

static int incoming_ws_http_callback ( struct ast_tcptls_session_instance ser,
const struct ast_http_uri urih,
const char *  uri,
enum ast_http_method  method,
struct ast_variable get_params,
struct ast_variable headers 
)
static

Definition at line 1887 of file chan_websocket.c.

1891{
1892 struct ast_http_uri fake_urih = {
1894 };
1895 int res = 0;
1896 /*
1897 * Normally the http server will destroy the get_params
1898 * when the session ends but if there weren't any initially
1899 * and we create some and add them to the list, the http server
1900 * won't know about it so we have to destroy it ourselves.
1901 */
1902 int destroy_get_params = (get_params == NULL);
1903 struct ast_variable *v = NULL;
1904 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1905
1906 ast_debug(2, "URI: %s Starting\n", uri);
1907
1908 /*
1909 * The client will have issued the GET request with a URI of
1910 * /media/<connection_id>
1911 *
1912 * Since this callback is registered for the /media URI prefix the
1913 * http server will strip that off the front of the URI passing in
1914 * only the path components after that in the 'uri' parameter.
1915 * This should leave only the connection id without a leading '/'.
1916 */
1917 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1918 if (!instance) {
1919 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1920 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1921 return -1;
1922 }
1923
1924 /*
1925 * We don't allow additional connections using the same connection id.
1926 */
1927 if (instance->websocket) {
1928 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1929 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1930 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1931 return -1;
1932 }
1933
1934 v = ast_variable_new("CONNECTION_ID", uri, "");
1935 if (!v) {
1936 ast_http_error(ser, 500, "Server error", "");
1937 return -1;
1938 }
1939 ast_variable_list_append(&get_params, v);
1940
1941 for (v = get_params; v; v = v->next) {
1942 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1943 }
1944
1945 /*
1946 * This will ultimately call internal_ws_established_cb() so
1947 * this function will block until the websocket is closed and
1948 * internal_ws_established_cb() returns;
1949 */
1950 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1951 get_params, headers);
1952 if (destroy_get_params) {
1953 ast_variables_destroy(get_params);
1954 }
1955
1956 ast_debug(2, "URI: %s DONE\n", uri);
1957
1958 return res;
1959}

References ao2_cleanup, ao2_weakproxy_find, ast_channel_name(), ast_debug, ast_http_error(), ast_log, ast_variable_list_append, ast_variable_new, ast_variables_destroy(), ast_websocket_uri_cb(), ast_ws_server, ast_http_uri::data, instances, LOG_WARNING, method, ast_variable::name, ast_variable::next, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, and ast_variable::value.

◆ instance_proxy_cb()

static void instance_proxy_cb ( void *  weakproxy,
void *  data 
)
static

Definition at line 1321 of file chan_websocket.c.

1322{
1323 struct instance_proxy *proxy = weakproxy;
1324 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1325 ao2_unlink(instances, weakproxy);
1326}

References ao2_unlink, ast_debug, instance_proxy::connection_id, and instances.

Referenced by websocket_new().

◆ load_config()

static int load_config ( void  )
static

Definition at line 2023 of file chan_websocket.c.

2024{
2025 ast_debug(2, "Initializing Websocket Client Configuration\n");
2027 if (!sorcery) {
2028 ast_log(LOG_ERROR, "Failed to open sorcery\n");
2029 return -1;
2030 }
2031
2032 ast_sorcery_apply_default(sorcery, "global", "config",
2033 "chan_websocket.conf,criteria=type=global,single_object=yes,explicit_name=global");
2034
2036 ast_log(LOG_ERROR, "Failed to register chan_websocket global object with sorcery\n");
2038 sorcery = NULL;
2039 return -1;
2040 }
2041
2042 ast_sorcery_object_field_register_nodoc(sorcery, "global", "type", "", OPT_NOOP_T, 0, 0);
2043 ast_sorcery_register_cust(global, control_message_format, "plain-text");
2044
2046
2047 return 0;
2048}

References ast_debug, ast_log, ast_sorcery_apply_default, ast_sorcery_load(), ast_sorcery_object_field_register_nodoc, ast_sorcery_object_register, ast_sorcery_open, ast_sorcery_register_cust, ast_sorcery_unref, global, global_alloc(), global_apply(), LOG_ERROR, NULL, OPT_NOOP_T, and sorcery.

Referenced by load_module().

◆ load_module()

static int load_module ( void  )
static

Function called when our module is loaded.

Definition at line 2079 of file chan_websocket.c.

2080{
2081 int res = 0;
2082 struct ast_websocket_protocol *protocol;
2083
2084 res = load_config();
2085 if (res != 0) {
2087 }
2088
2091 }
2092
2095 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
2096 unload_module();
2098 }
2099
2101 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
2102 instance_proxy_sort_fn, instance_proxy_cmp_fn);
2103 if (!instances) {
2105 "Failed to allocate the chan_websocket instance registry\n");
2106 unload_module();
2108 }
2109
2111 if (!ast_ws_server) {
2112 unload_module();
2114 }
2115
2116 protocol = ast_websocket_sub_protocol_alloc("media");
2117 if (!protocol) {
2118 unload_module();
2120 }
2123
2125
2127}

References AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_container_alloc_hash, AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, ast_channel_register(), ast_format_cap_alloc, ast_format_cap_append_by_type(), AST_FORMAT_CAP_FLAG_DEFAULT, ast_http_uri_link(), ast_log, AST_MEDIA_TYPE_UNKNOWN, AST_MODULE_LOAD_DECLINE, AST_MODULE_LOAD_SUCCESS, ast_websocket_server_add_protocol2(), ast_websocket_server_create(), ast_websocket_sub_protocol_alloc(), ast_ws_server, ast_channel_tech::capabilities, http_uri, incoming_ws_established_cb(), instances, load_config(), LOG_ERROR, LOG_WARNING, ast_websocket_protocol::session_established, unload_module(), and websocket_tech.

◆ process_binary_message()

static int process_binary_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 932 of file chan_websocket.c.

934{
935 char *next_frame_ptr = NULL;
936 size_t bytes_read = 0;
937 int res = 0;
938 size_t bytes_left = 0;
939
940 {
941 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
943 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
944 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
945 ast_channel_name(instance->channel));
946 return 0;
947 }
948 }
949
950 next_frame_ptr = payload;
951 instance->bytes_read += payload_len;
952
953 if (instance->passthrough) {
954 res = queue_frame_from_buffer(instance, payload, payload_len);
955 return res;
956 }
957
958 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
959 /*
960 * We have leftover data from a previous websocket message.
961 * Try to make a complete frame by appending data from
962 * the current message to the leftover data.
963 */
964 char *append_ptr = instance->leftover_data + instance->leftover_len;
965 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
966 /*
967 * It's possible that even the current message doesn't have enough
968 * data to make a complete frame.
969 */
970 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
971
972 /*
973 * Append whatever we can to the end of the leftover data
974 * even if it's not enough to make a complete frame.
975 */
976 memcpy(append_ptr, payload, bytes_avail_to_copy);
977
978 /*
979 * If leftover data is still short, just return and wait for the
980 * next websocket message.
981 */
982 if (bytes_avail_to_copy < bytes_needed_for_frame) {
983 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
984 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
985 instance->leftover_len += bytes_avail_to_copy;
986 return 0;
987 }
988
989 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
990 if (res < 0) {
991 return -1;
992 }
993
994 /*
995 * We stole data from the current payload so decrement payload_len
996 * and set the next frame pointer after the data in payload
997 * we just copied.
998 */
999 payload_len -= bytes_avail_to_copy;
1000 next_frame_ptr = payload + bytes_avail_to_copy;
1001
1002 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
1003 ast_channel_name(instance->channel),
1004 instance->frame_queue_length,
1005 (int)instance->bytes_read,
1006 (int)(payload_len + bytes_avail_to_copy),
1007 (int)instance->leftover_len,
1008 payload,
1009 next_frame_ptr,
1010 (int)(next_frame_ptr - payload),
1011 (int)payload_len,
1012 (int)bytes_avail_to_copy
1013 );
1014
1015
1016 instance->leftover_len = 0;
1017 }
1018
1019 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
1020 instance->leftover_len = 0;
1021 }
1022
1023 bytes_left = payload_len;
1024 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
1025 res = queue_frame_from_buffer(instance, next_frame_ptr,
1026 instance->optimal_frame_size);
1027 if (res < 0) {
1028 break;
1029 }
1030 bytes_read += instance->optimal_frame_size;
1031 next_frame_ptr += instance->optimal_frame_size;
1032 bytes_left -= instance->optimal_frame_size;
1033 }
1034
1035 if (instance->bulk_media_in_progress && bytes_left > 0) {
1036 /*
1037 * We have a partial frame. Save the leftover data.
1038 */
1039 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
1040 ast_channel_name(instance->channel),
1041 (int)instance->bytes_read,
1042 instance->frame_queue_length,
1043 (int)payload_len,
1044 (int)instance->leftover_len,
1045 payload,
1046 next_frame_ptr,
1047 (int)(next_frame_ptr - payload),
1048 (int)bytes_left
1049 );
1050 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
1051 instance->leftover_len = bytes_left;
1052 }
1053
1054 return 0;
1055}

References ast_channel_name(), ast_debug, AST_LIST_LOCK, AST_LIST_UNLOCK, websocket_pvt::bulk_media_in_progress, websocket_pvt::bytes_read, websocket_pvt::channel, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, websocket_pvt::leftover_data, websocket_pvt::leftover_len, MIN, NULL, websocket_pvt::optimal_frame_size, websocket_pvt::passthrough, queue_frame_from_buffer(), QUEUE_LENGTH_MAX, and SCOPED_LOCK.

Referenced by read_from_ws_and_queue().

◆ process_text_message()

static int process_text_message ( struct websocket_pvt instance,
char *  payload,
uint64_t  payload_len 
)
static

Definition at line 899 of file chan_websocket.c.

901{
902 char *command;
903
904 if (payload_len == 0) {
905 ast_log(LOG_WARNING, "%s: WebSocket TEXT message has 0 length\n",
906 ast_channel_name(instance->channel));
907 return 0;
908 }
909
910 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
911 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
912 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
913 return 0;
914 }
915
916 /*
917 * Unfortunately, payload is not NULL terminated even when it's
918 * a TEXT frame so we need to allocate a new buffer, copy
919 * the data into it, and NULL terminate it.
920 */
921 command = ast_alloca(payload_len + 1);
922 memcpy(command, payload, payload_len); /* Safe */
923 command[payload_len] = '\0';
924 command = ast_strip(command);
925
926 ast_debug(4, "%s: Received: %s\n",
927 ast_channel_name(instance->channel), command);
928
929 return handle_command(instance, command);
930}

References ast_alloca, ast_channel_name(), ast_debug, ast_log, ast_strip(), websocket_pvt::channel, handle_command(), LOG_WARNING, and MAX_TEXT_MESSAGE_LEN.

Referenced by read_from_ws_and_queue().

◆ queue_frame_from_buffer()

static int queue_frame_from_buffer ( struct websocket_pvt instance,
char *  buffer,
size_t  len 
)
static

Definition at line 655 of file chan_websocket.c.

657{
658 struct ast_frame fr = { 0, };
659 struct ast_frame *duped_frame = NULL;
660
661 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
663 fr.subclass.format = instance->native_format;
664 fr.samples = instance->native_codec->samples_count(&fr);
665
666 duped_frame = ast_frisolate(&fr);
667 if (!duped_frame) {
668 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
669 ast_channel_name(instance->channel));
670 return -1;
671 }
672
673 {
674 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
676 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
677 instance->frame_queue_length++;
678 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
679 instance->queue_full = 1;
680 send_event(instance, MEDIA_XOFF);
681 }
682 }
683
684 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
685 duped_frame->datalen);
686
687 return 0;
688}

References ast_channel_name(), ast_debug, AST_FRAME_SET_BUFFER, AST_FRAME_VOICE, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, websocket_pvt::channel, ast_frame::datalen, ast_frame_subclass::format, websocket_pvt::frame_queue, websocket_pvt::frame_queue_length, ast_frame::frametype, len(), LOG_WARNING, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, websocket_pvt::queue_full, QUEUE_LENGTH_XOFF_LEVEL, ast_frame::samples, ast_codec::samples_count, SCOPED_LOCK, send_event, and ast_frame::subclass.

Referenced by handle_command(), and process_binary_message().

◆ queue_option_frame()

static int queue_option_frame ( struct websocket_pvt instance,
char *  buffer 
)
static

Definition at line 690 of file chan_websocket.c.

692{
693 struct ast_frame fr = { 0, };
694 struct ast_frame *duped_frame = NULL;
695
696 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
699
700 duped_frame = ast_frisolate(&fr);
701 if (!duped_frame) {
702 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
703 ast_channel_name(instance->channel));
704 return -1;
705 }
706
707 AST_LIST_LOCK(&instance->frame_queue);
708 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
709 AST_LIST_UNLOCK(&instance->frame_queue);
710
711 ast_debug(4, "%s: Queued '%s' option frame\n",
712 ast_channel_name(instance->channel), buffer);
713
714 return 0;
715}

References ast_channel_name(), AST_CONTROL_OPTION, ast_debug, AST_FRAME_CONTROL, AST_FRAME_SET_BUFFER, ast_frisolate, AST_LIST_INSERT_TAIL, AST_LIST_LOCK, AST_LIST_UNLOCK, ast_log, websocket_pvt::channel, websocket_pvt::frame_queue, ast_frame::frametype, ast_frame_subclass::integer, LOG_WARNING, NULL, and ast_frame::subclass.

Referenced by handle_command().

◆ read_from_ws_and_queue()

static int read_from_ws_and_queue ( struct websocket_pvt instance)
static

Definition at line 1057 of file chan_websocket.c.

1058{
1059 uint64_t payload_len = 0;
1060 char *payload = NULL;
1061 enum ast_websocket_opcode opcode;
1062 int fragmented = 0;
1063 int res = 0;
1064
1065 if (!instance || !instance->websocket) {
1066 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1067 ast_channel_name(instance->channel));
1068 return -1;
1069 }
1070
1071 ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel));
1072 res = ast_wait_for_input(
1073 ast_websocket_fd(instance->websocket), -1);
1074 if (res <= 0) {
1075 ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n",
1076 ast_channel_name(instance->channel), strerror(errno));
1077 return -1;
1078 }
1079
1080 /*
1081 * We need to lock here to prevent the websocket handle from
1082 * being pulled out from under us if the core sends us a
1083 * hangup request.
1084 */
1085 ao2_lock(instance);
1086 if (!instance->websocket) {
1087 ao2_unlock(instance);
1088 return -1;
1089 }
1090
1091 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
1092 &opcode, &fragmented);
1093 ao2_unlock(instance);
1094 if (res) {
1095 return -1;
1096 }
1097 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
1098 (int)payload_len);
1099
1100 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
1101 return process_text_message(instance, payload, payload_len);
1102 }
1103
1104 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
1105 ast_debug(5, "%s: WebSocket closed by remote\n",
1106 ast_channel_name(instance->channel));
1107 return -1;
1108 }
1109
1110 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
1111 ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n",
1112 ast_channel_name(instance->channel), (int)opcode);
1113 return 0;
1114 }
1115
1116 return process_binary_message(instance, payload, payload_len);
1117}

References ao2_lock, ao2_unlock, ast_channel_name(), ast_debug, ast_log, ast_wait_for_input(), ast_websocket_fd(), AST_WEBSOCKET_OPCODE_BINARY, AST_WEBSOCKET_OPCODE_CLOSE, AST_WEBSOCKET_OPCODE_TEXT, ast_websocket_read(), websocket_pvt::channel, errno, LOG_WARNING, NULL, process_binary_message(), process_text_message(), and websocket_pvt::websocket.

Referenced by read_thread_handler().

◆ read_thread_handler()

static void * read_thread_handler ( void *  obj)
static

Definition at line 1129 of file chan_websocket.c.

1130{
1131 RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup);
1132 int res = 0;
1133
1134 ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel));
1135
1136 res = send_event(instance, MEDIA_START);
1137 if (res != 0 ) {
1138 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
1139 return NULL;
1140 }
1141
1142 if (!instance->no_auto_answer) {
1143 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
1144 ast_queue_control(instance->channel, AST_CONTROL_ANSWER);
1145 }
1146
1147 while (read_from_ws_and_queue(instance) == 0)
1148 {
1149 }
1150
1151 /*
1152 * websocket_hangup will take care of closing the websocket if needed.
1153 */
1154 ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel));
1155 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
1156
1157 return NULL;
1158}

References ao2_cleanup, ast_channel_name(), AST_CONTROL_ANSWER, AST_CONTROL_HANGUP, ast_debug, ast_queue_control(), NULL, RAII_VAR, read_from_ws_and_queue(), and send_event.

Referenced by incoming_ws_established_cb(), and webchan_call().

◆ reload_module()

static int reload_module ( void  )
static

Definition at line 2070 of file chan_websocket.c.

2071{
2072 ast_debug(2, "Reloading chan_websocket configuration\n");
2074
2075 return 0;
2076}

References ast_debug, ast_sorcery_reload(), and sorcery.

◆ set_channel_format()

static void set_channel_format ( struct websocket_pvt instance,
struct ast_format fmt 
)
static

◆ set_channel_timer()

static int set_channel_timer ( struct websocket_pvt instance)
static

Definition at line 1486 of file chan_websocket.c.

1487{
1488 int rate = 0;
1489 instance->timer = ast_timer_open();
1490 if (!instance->timer) {
1491 return -1;
1492 }
1493 /* Rate is the number of ticks per second, not the interval. */
1494 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1495 ast_debug(3, "%s: WebSocket timer rate %d\n",
1496 ast_channel_name(instance->channel), rate);
1497 ast_timer_set_rate(instance->timer, rate);
1498 /*
1499 * Calling ast_channel_set_fd will cause the channel thread to call
1500 * webchan_read at 'rate' times per second.
1501 */
1502 ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer));
1503
1504 return 0;
1505}

References ast_channel_name(), ast_channel_set_fd(), ast_debug, ast_format_get_default_ms(), ast_timer_fd(), ast_timer_open(), ast_timer_set_rate(), websocket_pvt::channel, websocket_pvt::native_format, and websocket_pvt::timer.

Referenced by webchan_request().

◆ set_channel_variables()

static int set_channel_variables ( struct websocket_pvt instance)
static

Definition at line 1507 of file chan_websocket.c.

1508{
1509 char *pkt_size = NULL;
1510 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1511 if (res <= 0) {
1512 return -1;
1513 }
1514
1516 pkt_size);
1517 ast_free(pkt_size);
1519 instance->connection_id);
1520
1521 return 0;
1522}

References ast_asprintf, ast_free, websocket_pvt::channel, websocket_pvt::connection_id, MEDIA_WEBSOCKET_CONNECTION_ID, MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE, NULL, websocket_pvt::optimal_frame_size, and pbx_builtin_setvar_helper().

Referenced by webchan_request().

◆ set_instance_silence_frame()

static int set_instance_silence_frame ( struct websocket_pvt instance)
static

Definition at line 1463 of file chan_websocket.c.

1464{
1465 instance->silence.frametype = AST_FRAME_VOICE;
1466 instance->silence.datalen =
1467 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1468 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1469 /*
1470 * Even though we'll calloc the data pointer, we don't mark it as
1471 * mallocd because this frame will be around for a while and we don't
1472 * want it accidentally freed before we're done with it.
1473 */
1474 instance->silence.mallocd = 0;
1475 instance->silence.offset = 0;
1476 instance->silence.src = __PRETTY_FUNCTION__;
1477 instance->silence.subclass.format = instance->slin_format;
1478 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1479 if (!instance->silence.data.ptr) {
1480 return -1;
1481 }
1482
1483 return 0;
1484}

References ast_calloc, AST_FRAME_VOICE, ast_frame::data, ast_frame::datalen, ast_codec::default_ms, ast_frame_subclass::format, ast_frame::frametype, ast_frame::mallocd, ast_codec::minimum_bytes, ast_codec::minimum_ms, ast_frame::offset, ast_frame::ptr, ast_frame::samples, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, ast_frame::src, and ast_frame::subclass.

Referenced by webchan_request().

◆ set_instance_translator()

static int set_instance_translator ( struct websocket_pvt instance)
static

Definition at line 1432 of file chan_websocket.c.

1433{
1435 instance->slin_format = ao2_bump(instance->native_format);
1436 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1437 return 0;
1438 }
1439
1441 if (!instance->slin_format) {
1442 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1443 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1444 return -1;
1445 }
1446 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1450
1451 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1452 if (!instance->translator) {
1453 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1455 ast_format_get_name(instance->slin_format));
1456 return -1;
1457 }
1458
1459 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1460 return 0;
1461}

References ao2_bump, ast_channel_name(), ast_debug, ast_format_cache_get_slin_by_rate(), ast_format_cache_is_slinear(), ast_format_get_codec(), ast_format_get_default_ms(), ast_format_get_name(), ast_format_get_sample_rate(), ast_log, ast_translator_build_path(), websocket_pvt::channel, LOG_ERROR, websocket_pvt::native_codec, websocket_pvt::native_format, ast_codec::sample_rate, websocket_pvt::slin_codec, websocket_pvt::slin_format, and websocket_pvt::translator.

Referenced by webchan_request().

◆ unload_module()

static int unload_module ( void  )
static

◆ validate_uri_parameters()

static int validate_uri_parameters ( const char *  uri_params)
static

Definition at line 1524 of file chan_websocket.c.

1525{
1526 char *params = ast_strdupa(uri_params);
1527 char *nvp = NULL;
1528 char *nv = NULL;
1529
1530 /*
1531 * uri_params should be a comma-separated list of key=value pairs.
1532 * For example:
1533 * name1=value1,name2=value2
1534 * We're verifying that each name and value either doesn't need
1535 * to be encoded or that it already is.
1536 */
1537
1538 while((nvp = ast_strsep(&params, ',', 0))) {
1539 /* nvp will be name1=value1 */
1540 while((nv = ast_strsep(&nvp, '=', 0))) {
1541 /* nv will be either name1 or value1 */
1542 if (!ast_uri_verify_encoded(nv)) {
1543 return 0;
1544 }
1545 }
1546 }
1547
1548 return 1;
1549}

References ast_strdupa, ast_strsep(), ast_uri_verify_encoded(), NULL, and websocket_pvt::uri_params.

Referenced by webchan_request().

◆ webchan_call()

static int webchan_call ( struct ast_channel ast,
const char *  dest,
int  timeout 
)
static

Definition at line 1197 of file chan_websocket.c.

1199{
1200 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1201 int nodelay = 1;
1203
1204 if (!instance) {
1205 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1206 ast_channel_name(ast));
1207 return -1;
1208 }
1209
1210 if (instance->type == AST_WS_TYPE_SERVER) {
1211 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
1212 return 0;
1213 }
1214 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
1215
1216 if (!instance->client) {
1217 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
1218 ast_channel_name(ast));
1219 return -1;
1220 }
1221
1222 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
1223 ast_channel_name(ast), dest, instance->connection_id);
1224
1225 if (!ast_strlen_zero(instance->uri_params)) {
1227 }
1228
1229 instance->websocket = ast_websocket_client_connect(instance->client,
1230 instance, ast_channel_name(ast), &result);
1231 if (!instance->websocket || result != WS_OK) {
1232 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
1234 return -1;
1235 }
1236
1237 if (setsockopt(ast_websocket_fd(instance->websocket),
1238 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1239 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
1240 }
1241
1242 ast_debug(3, "%s: WebSocket connection to %s established\n",
1243 ast_channel_name(ast), dest);
1244
1245 /* read_thread_handler() will clean up the bump */
1247 read_thread_handler, ao2_bump(instance))) {
1248 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast));
1249 ao2_cleanup(instance);
1250 return -1;
1251 }
1252
1253 return 0;
1254}

References ao2_bump, ao2_cleanup, ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_log, ast_pthread_create_detached_background, ast_strlen_zero(), ast_websocket_client_add_uri_params(), ast_websocket_client_connect(), ast_websocket_fd(), ast_websocket_result_to_str(), AST_WS_TYPE_SERVER, websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, errno, LOG_WARNING, NULL, websocket_pvt::outbound_read_thread, read_thread_handler(), result, websocket_pvt::type, websocket_pvt::uri_params, websocket_pvt::websocket, and WS_OK.

◆ webchan_hangup()

static int webchan_hangup ( struct ast_channel ast)
static

Definition at line 1768 of file chan_websocket.c.

1769{
1770 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1771
1772 if (!instance) {
1773 return -1;
1774 }
1775 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1776 ast_channel_name(ast), instance->connection_id);
1777
1778 /*
1779 * We need to lock because read_from_ws_and_queue() is probably waiting
1780 * on the websocket file descriptor and will unblock and immediately try to
1781 * check the websocket and read from it. We don't want to pull the
1782 * websocket out from under it between the check and read.
1783 */
1784 ao2_lock(instance);
1785 if (instance->websocket) {
1786 ast_websocket_close(instance->websocket, 1000);
1787 ast_websocket_unref(instance->websocket);
1788 instance->websocket = NULL;
1789 }
1791 ao2_unlock(instance);
1792
1793 /* Clean up the reference from adding the instance to the channel */
1794 ao2_cleanup(instance);
1795
1796 return 0;
1797}

References ao2_cleanup, ao2_lock, ao2_unlock, ast_channel_name(), ast_channel_tech_pvt(), ast_channel_tech_pvt_set(), ast_debug, ast_websocket_close(), ast_websocket_unref(), websocket_pvt::connection_id, NULL, and websocket_pvt::websocket.

◆ webchan_read()

static struct ast_frame * webchan_read ( struct ast_channel ast)
static

Definition at line 555 of file chan_websocket.c.

556{
557 struct websocket_pvt *instance = NULL;
558 struct ast_frame *native_frame = NULL;
559 struct ast_frame *slin_frame = NULL;
560
561 instance = ast_channel_tech_pvt(ast);
562 if (!instance) {
563 return NULL;
564 }
565
567 ast_timer_ack(instance->timer, 1);
568 }
569
570 native_frame = dequeue_frame(instance);
571
572 /*
573 * No frame when the timer fires means we have to create and
574 * return a silence frame in its place.
575 */
576 if (!native_frame) {
577 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
578 set_channel_format(instance, instance->slin_format);
579 slin_frame = ast_frdup(&instance->silence);
580 return slin_frame;
581 }
582
583 /*
584 * If we're in passthrough mode or the frame length is already optimal_frame_size,
585 * we can just return it.
586 */
587 if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) {
588 set_channel_format(instance, instance->native_format);
589 return native_frame;
590 }
591
592 /*
593 * If we're here, we have a short frame that we need to pad
594 * with silence.
595 */
596
597 if (instance->translator) {
598 slin_frame = ast_translate(instance->translator, native_frame, 0);
599 if (!slin_frame) {
600 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
601 ast_channel_name(ast), native_frame->datalen);
602 return NULL;
603 }
604 ast_frame_free(native_frame, 0);
605 } else {
606 /*
607 * If there was no translator then the native format
608 * was already slin.
609 */
610 slin_frame = native_frame;
611 }
612
613 set_channel_format(instance, instance->slin_format);
614
615 /*
616 * So now we have an slin frame but it's probably still short
617 * so we create a new data buffer with the correct length
618 * which is filled with zeros courtesy of ast_calloc.
619 * We then copy the short frame data into the new buffer
620 * and set the offset to AST_FRIENDLY_OFFSET so that
621 * the core can read the data without any issues.
622 * If the original frame data was mallocd, we need to free the old
623 * data buffer so we don't leak memory and we need to set
624 * mallocd to AST_MALLOCD_DATA so that the core knows
625 * it needs to free the new data buffer when it's done.
626 */
627
628 if (slin_frame->datalen != instance->silence.datalen) {
629 char *old_data = slin_frame->data.ptr;
630 int old_len = slin_frame->datalen;
631 int old_offset = slin_frame->offset;
632 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
633 ast_channel_name(ast), instance->silence.datalen,
634 slin_frame->datalen);
635
636 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
637 if (!slin_frame->data.ptr) {
638 ast_frame_free(slin_frame, 0);
639 return NULL;
640 }
641 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
642 slin_frame->offset = AST_FRIENDLY_OFFSET;
643 memcpy(slin_frame->data.ptr, old_data, old_len);
644 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
645 ast_free(old_data - old_offset);
646 }
647 slin_frame->mallocd |= AST_MALLOCD_DATA;
648 slin_frame->datalen = instance->silence.datalen;
649 slin_frame->samples = instance->silence.samples;
650 }
651
652 return slin_frame;
653}

References ast_calloc, ast_channel_name(), ast_channel_tech_pvt(), ast_debug, ast_frame_free(), ast_frdup, ast_free, AST_FRIENDLY_OFFSET, ast_log, AST_MALLOCD_DATA, ast_timer_ack(), ast_timer_get_event(), AST_TIMING_EVENT_EXPIRED, ast_translate(), ast_frame::data, ast_frame::datalen, dequeue_frame(), LOG_WARNING, ast_frame::mallocd, websocket_pvt::native_format, NULL, ast_frame::offset, websocket_pvt::optimal_frame_size, websocket_pvt::passthrough, ast_frame::ptr, ast_frame::samples, set_channel_format(), websocket_pvt::silence, websocket_pvt::slin_format, websocket_pvt::timer, and websocket_pvt::translator.

◆ webchan_request()

static struct ast_channel * webchan_request ( const char *  type,
struct ast_format_cap cap,
const struct ast_assigned_ids assignedids,
const struct ast_channel requestor,
const char *  data,
int *  cause 
)
static

Definition at line 1576 of file chan_websocket.c.

1579{
1580 char *parse;
1581 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1582 struct ast_channel *chan = NULL;
1583 struct ast_format *fmt = NULL;
1584 struct ast_format_cap *caps = NULL;
1586 AST_APP_ARG(connection_id);
1588 );
1589 struct ast_flags opts = { 0, };
1590 char *opt_args[OPT_ARG_ARRAY_SIZE];
1591 const char *requestor_name = requestor ? ast_channel_name(requestor) :
1592 (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
1593 RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
1594
1595 global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
1596
1597 ast_debug(3, "%s: WebSocket channel requested\n",
1598 requestor_name);
1599
1600 if (ast_strlen_zero(data)) {
1601 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1602 requestor_name);
1603 goto failure;
1604 }
1605 parse = ast_strdupa(data);
1606 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1607
1608 if (ast_strlen_zero(args.connection_id)) {
1609 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1610 requestor_name);
1611 goto failure;
1612 }
1613
1614 if (!ast_strlen_zero(args.options)
1615 && ast_app_parse_options(websocket_options, &opts, opt_args,
1616 ast_strdupa(args.options))) {
1617 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1618 requestor_name, args.options);
1619 goto failure;
1620 }
1621
1622 if (ast_test_flag(&opts, OPT_WS_CODEC)
1623 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1624 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1625 } else {
1626 /*
1627 * If codec wasn't specified in the dial string,
1628 * use the first format in the capabilities.
1629 */
1630 fmt = ast_format_cap_get_format(cap, 0);
1631 }
1632
1633 if (!fmt) {
1634 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1635 requestor_name, args.connection_id);
1636 goto failure;
1637 }
1638
1639 ast_debug(3, "%s: Using format %s from %s\n",
1640 requestor_name, ast_format_get_name(fmt),
1641 ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
1642
1643 instance = websocket_new(requestor_name, args.connection_id, fmt);
1644 if (!instance) {
1645 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1646 requestor_name);
1647 goto failure;
1648 }
1649
1650 instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER);
1651 if (!instance->passthrough) {
1652 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1653 }
1654
1656 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1657 char *comma;
1658
1659 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1661 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1662 requestor_name);
1663 goto failure;
1664 }
1665
1666 ast_debug(3, "%s: Using URI parameters '%s'\n",
1667 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1668
1670 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1671 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1672 args.connection_id);
1673 goto failure;
1674 }
1675
1676 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1677 comma = instance->uri_params;
1678 /*
1679 * The normal separator for query string components is an
1680 * ampersand ('&') but the Dial app interprets them as additional
1681 * channels to dial in parallel so we instruct users to separate
1682 * the parameters with commas (',') instead. We now have to
1683 * convert those commas back to ampersands.
1684 */
1685 while ((comma = strchr(comma,','))) {
1686 *comma = '&';
1687 }
1688 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1689 }
1690
1691 if (ast_test_flag(&opts, OPT_WS_MSG_FORMAT)) {
1692 instance->control_msg_format = control_msg_format_from_str(opt_args[OPT_ARG_WS_MSG_FORMAT]);
1693
1694 if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_INVALID) {
1695 ast_log(LOG_WARNING, "%s: 'f/control message format' dialstring parameter value missing or invalid. "
1696 "Defaulting to 'plain-text'\n",
1697 ast_channel_name(requestor));
1698 instance->control_msg_format = WEBCHAN_CONTROL_MSG_FORMAT_PLAIN;
1699 }
1700 } else if (global_cfg) {
1701 instance->control_msg_format = global_cfg->control_msg_format;
1702 }
1703
1704 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1705 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1706 if (!chan) {
1707 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1708 goto failure;
1709 }
1710
1711 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1712 ast_channel_name(chan), requestor_name,
1713 instance->connection_id);
1714
1715 instance->channel = ao2_bump(chan);
1716 ast_channel_tech_set(instance->channel, &websocket_tech);
1717
1718 if (set_instance_translator(instance) != 0) {
1719 goto failure;
1720 }
1721
1722 if (set_instance_silence_frame(instance) != 0) {
1723 goto failure;
1724 }
1725
1726 if (set_channel_timer(instance) != 0) {
1727 goto failure;
1728 }
1729
1730 if (set_channel_variables(instance) != 0) {
1731 goto failure;
1732 }
1733
1735 if (!caps) {
1736 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1737 goto failure;
1738 }
1739
1740 ast_format_cap_append(caps, instance->native_format, 0);
1741 ast_channel_nativeformats_set(instance->channel, caps);
1742 ast_channel_set_writeformat(instance->channel, instance->native_format);
1743 ast_channel_set_rawwriteformat(instance->channel, instance->native_format);
1744 ast_channel_set_readformat(instance->channel, instance->native_format);
1745 ast_channel_set_rawreadformat(instance->channel, instance->native_format);
1746 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1747 ast_channel_unlock(chan);
1748 ao2_cleanup(caps);
1749
1750 ast_debug(3, "%s: WebSocket channel created to %s\n",
1751 ast_channel_name(chan), args.connection_id);
1752
1753 return chan;
1754
1755failure:
1756 if (chan) {
1757 ast_channel_unlock(chan);
1758 }
1759 *cause = AST_CAUSE_FAILURE;
1760 return NULL;
1761}

References ao2_bump, ao2_cleanup, args, AST_APP_ARG, ast_app_parse_options(), AST_CAUSE_FAILURE, ast_channel_alloc, ast_channel_name(), ast_channel_nativeformats_set(), ast_channel_set_rawreadformat(), ast_channel_set_rawwriteformat(), ast_channel_set_readformat(), ast_channel_set_writeformat(), ast_channel_tech_pvt_set(), ast_channel_tech_set(), ast_channel_unlock, ast_debug, AST_DECLARE_APP_ARGS, ast_format_cache_get, ast_format_cap_alloc, ast_format_cap_append, AST_FORMAT_CAP_FLAG_DEFAULT, ast_format_cap_get_format(), ast_format_get_name(), ast_log, AST_NONSTANDARD_APP_ARGS, ast_sorcery_retrieve_by_id(), AST_STATE_DOWN, ast_strdup, ast_strdupa, ast_strings_equal(), ast_strlen_zero(), ast_test_flag, control_msg_format_from_str(), INCOMING_CONNECTION_ID, LOG_ERROR, LOG_WARNING, NULL, OPT_ARG_ARRAY_SIZE, OPT_ARG_WS_CODEC, OPT_ARG_WS_MSG_FORMAT, OPT_ARG_WS_URI_PARAM, OPT_WS_CODEC, OPT_WS_MSG_FORMAT, OPT_WS_NO_AUTO_ANSWER, OPT_WS_PASSTHROUGH, OPT_WS_URI_PARAM, options, RAII_VAR, set_channel_timer(), set_channel_variables(), set_instance_silence_frame(), set_instance_translator(), sorcery, ast_assigned_ids::uniqueid, validate_uri_parameters(), WEBCHAN_CONTROL_MSG_FORMAT_INVALID, WEBCHAN_CONTROL_MSG_FORMAT_PLAIN, websocket_new(), websocket_options, and websocket_tech.

◆ webchan_send_dtmf_text()

static int webchan_send_dtmf_text ( struct ast_channel ast,
char  digit,
unsigned int  duration 
)
static

Definition at line 1799 of file chan_websocket.c.

1800{
1801 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1802
1803 if (!instance) {
1804 return -1;
1805 }
1806
1807 return send_event(instance, DTMF_END, digit);
1808}

References ast_channel_tech_pvt(), digit, and send_event.

◆ webchan_write()

static int webchan_write ( struct ast_channel ast,
struct ast_frame f 
)
static

Function called when we should write a frame to the channel.

Definition at line 1161 of file chan_websocket.c.

1162{
1163 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1164
1165 if (!instance || !instance->websocket) {
1166 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
1167 ast_channel_name(ast));
1168 return -1;
1169 }
1170
1171 if (f->frametype == AST_FRAME_CNG) {
1172 return 0;
1173 }
1174
1175 if (f->frametype != AST_FRAME_VOICE) {
1176 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
1177 ast_channel_name(ast));
1178 return 0;
1179 }
1180
1182 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
1185 return -1;
1186 }
1187
1189 (char *)f->data.ptr, (uint64_t)f->datalen);
1190}

References ast_channel_name(), ast_channel_tech_pvt(), ast_format_cmp(), AST_FORMAT_CMP_NOT_EQUAL, ast_format_get_name(), AST_FRAME_CNG, AST_FRAME_VOICE, ast_log, AST_WEBSOCKET_OPCODE_BINARY, ast_websocket_write(), ast_frame::data, ast_frame::datalen, ast_frame_subclass::format, ast_frame::frametype, LOG_WARNING, websocket_pvt::native_format, ast_frame::ptr, ast_frame::subclass, and websocket_pvt::websocket.

◆ websocket_destructor()

static void websocket_destructor ( void *  data)
static

Definition at line 1256 of file chan_websocket.c.

1257{
1258 struct websocket_pvt *instance = data;
1259 struct ast_frame *frame = NULL;
1260 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
1261
1262 AST_LIST_LOCK(&instance->frame_queue);
1263 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
1264 ast_frfree(frame);
1265 }
1266 AST_LIST_UNLOCK(&instance->frame_queue);
1267
1268 if (instance->timer) {
1269 ast_timer_close(instance->timer);
1270 instance->timer = NULL;
1271 }
1272
1273 if (instance->channel) {
1274 ast_channel_unref(instance->channel);
1275 instance->channel = NULL;
1276 }
1277 if (instance->websocket) {
1278 ast_websocket_unref(instance->websocket);
1279 instance->websocket = NULL;
1280 }
1281
1282 ao2_cleanup(instance->client);
1283 instance->client = NULL;
1284
1285 ao2_cleanup(instance->native_codec);
1286 instance->native_codec = NULL;
1287
1288 ao2_cleanup(instance->native_format);
1289 instance->native_format = NULL;
1290
1291 ao2_cleanup(instance->slin_codec);
1292 instance->slin_codec = NULL;
1293
1294 ao2_cleanup(instance->slin_format);
1295 instance->slin_format = NULL;
1296
1297 if (instance->silence.data.ptr) {
1298 ast_free(instance->silence.data.ptr);
1299 instance->silence.data.ptr = NULL;
1300 }
1301
1302 if (instance->translator) {
1304 instance->translator = NULL;
1305 }
1306
1307 if (instance->leftover_data) {
1308 ast_free(instance->leftover_data);
1309 instance->leftover_data = NULL;
1310 }
1311
1312 ast_free(instance->uri_params);
1313}

References ao2_cleanup, ast_channel_unref, ast_debug, ast_free, ast_frfree, AST_LIST_LOCK, AST_LIST_REMOVE_HEAD, AST_LIST_UNLOCK, ast_timer_close(), ast_translator_free_path(), ast_websocket_unref(), websocket_pvt::channel, websocket_pvt::client, websocket_pvt::connection_id, ast_frame::data, websocket_pvt::frame_queue, websocket_pvt::leftover_data, websocket_pvt::native_codec, websocket_pvt::native_format, NULL, ast_frame::ptr, websocket_pvt::silence, websocket_pvt::slin_codec, websocket_pvt::slin_format, websocket_pvt::timer, websocket_pvt::translator, websocket_pvt::uri_params, and websocket_pvt::websocket.

Referenced by websocket_new().

◆ websocket_new()

static struct websocket_pvt * websocket_new ( const char *  chan_name,
const char *  connection_id,
struct ast_format fmt 
)
static

Definition at line 1328 of file chan_websocket.c.

1330{
1331 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1332 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1333 char uuid[AST_UUID_STR_LEN];
1334 enum ast_websocket_type ws_type;
1335
1336 SCOPED_AO2WRLOCK(locker, instances);
1337
1338 if (ast_strings_equal(connection_id, INCOMING_CONNECTION_ID)) {
1339 connection_id = ast_uuid_generate_str(uuid, sizeof(uuid));
1340 ws_type = AST_WS_TYPE_SERVER;
1341 } else {
1342 ws_type = AST_WS_TYPE_CLIENT;
1343 }
1344
1345 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1346 if (!proxy) {
1347 return NULL;
1348 }
1349 strcpy(proxy->connection_id, connection_id); /* Safe */
1350
1351 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1353 if (!instance) {
1354 return NULL;
1355 }
1356 strcpy(instance->connection_id, connection_id); /* Safe */
1357
1358 instance->type = ws_type;
1359 if (ws_type == AST_WS_TYPE_CLIENT) {
1360 instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id);
1361 if (!instance->client) {
1362 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1363 chan_name, instance->connection_id);
1364 return NULL;
1365 }
1366 }
1367
1368 AST_LIST_HEAD_INIT(&instance->frame_queue);
1369
1370 /*
1371 * We need the codec to calculate the number of samples in a frame
1372 * so we'll get it once and store it in the instance.
1373 *
1374 * References for native_format and native_codec are now held by the
1375 * instance and will be released when the instance is destroyed.
1376 */
1377 instance->native_format = fmt;
1378 instance->native_codec = ast_format_get_codec(instance->native_format);
1379 /*
1380 * References for native_format and native_codec are now held by the
1381 * instance and will be released when the instance is destroyed.
1382 */
1383
1384 /*
1385 * It's not possible for us to re-time or re-frame media if the data
1386 * stream can't be broken up on arbitrary byte boundaries. This is usually
1387 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1388 * We need to force passthrough mode in this case.
1389 */
1390 if (instance->native_codec->minimum_bytes <= 10) {
1391 instance->passthrough = 1;
1392 instance->optimal_frame_size = 0;
1393 } else {
1394 instance->optimal_frame_size =
1395 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1396 / instance->native_codec->minimum_ms;
1397 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1398 if (!instance->leftover_data) {
1399 return NULL;
1400 }
1401 }
1402
1403 ast_debug(3,
1404 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1405 chan_name, ast_format_get_name(instance->native_format),
1406 ast_format_get_sample_rate(instance->native_format),
1407 ast_format_get_default_ms(instance->native_format),
1408 ast_format_get_minimum_ms(instance->native_format),
1409 ast_format_get_minimum_bytes(instance->native_format),
1410 instance->passthrough,
1411 instance->optimal_frame_size);
1412
1413 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1414 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1415 return NULL;
1416 }
1417
1419 return NULL;
1420 }
1421
1422 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1423 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1424 proxy->connection_id);
1425 return NULL;
1426 }
1427 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1428
1429 return ao2_bump(instance);
1430}

References ao2_alloc, ao2_bump, ao2_cleanup, ao2_link_flags, ao2_weakproxy_alloc, ao2_weakproxy_set_object, ao2_weakproxy_subscribe(), ast_calloc, ast_debug, ast_format_get_codec(), ast_format_get_default_ms(), ast_format_get_minimum_bytes(), ast_format_get_minimum_ms(), ast_format_get_name(), ast_format_get_sample_rate(), AST_LIST_HEAD_INIT, ast_log, ast_strings_equal(), ast_uuid_generate_str(), AST_UUID_STR_LEN, ast_websocket_client_retrieve_by_id(), AST_WS_TYPE_CLIENT, AST_WS_TYPE_SERVER, websocket_pvt::connection_id, INCOMING_CONNECTION_ID, instance_proxy_cb(), instances, LOG_ERROR, NULL, OBJ_NOLOCK, RAII_VAR, SCOPED_AO2WRLOCK, uuid(), and websocket_destructor().

Referenced by webchan_request().

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "Websocket Media Channel" , .key = ASTERISK_GPL_KEY , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .reload = reload_module, .load_pri = AST_MODPRI_CHANNEL_DRIVER, .requires = "res_http_websocket,res_websocket_client", }
static

Definition at line 2136 of file chan_websocket.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 2136 of file chan_websocket.c.

◆ ast_ws_server

struct ast_websocket_server* ast_ws_server
static

Definition at line 73 of file chan_websocket.c.

Referenced by incoming_ws_http_callback(), load_module(), and unload_module().

◆ http_uri

struct ast_http_uri http_uri
static

Definition at line 1961 of file chan_websocket.c.

1961 {
1962 .callback = incoming_ws_http_callback,
1963 .description = "Media over Websocket",
1964 .uri = "media",
1965 .has_subtree = 1,
1966 .data = NULL,
1967 .key = __FILE__,
1968 .no_decode_uri = 1,
1969};

Referenced by load_module(), and unload_module().

◆ instances

struct ao2_container* instances = NULL
static

◆ msg_format_map

const char* msg_format_map[]
static
Initial value:

Definition at line 62 of file chan_websocket.c.

62 {
63 [WEBCHAN_CONTROL_MSG_FORMAT_PLAIN] = "plain-text",
66};

Referenced by control_msg_format_from_str(), and control_msg_format_to_str().

◆ sorcery

struct ast_sorcery* sorcery = NULL
static

Definition at line 54 of file chan_websocket.c.

Referenced by __ast_sorcery_apply_config(), __ast_sorcery_apply_default(), __ast_sorcery_apply_wizard_mapping(), __ast_sorcery_insert_wizard_mapping(), __ast_sorcery_object_field_register(), __ast_sorcery_object_register(), __ast_sorcery_object_type_insert_wizard(), __ast_sorcery_object_type_remove_wizard(), __ast_sorcery_open(), __ast_sorcery_remove_wizard_mapping(), alloc_and_initialize_sorcery(), alloc_and_initialize_sorcery(), alloc_and_initialize_sorcery(), apply_list_configuration(), as_config_load(), as_config_reload(), ast_ari_asterisk_delete_object(), ast_ari_asterisk_get_object(), ast_ari_asterisk_update_object(), ast_sip_destroy_sorcery_global(), ast_sip_initialize_sorcery_auth(), ast_sip_initialize_sorcery_domain_alias(), ast_sip_initialize_sorcery_global(), ast_sip_initialize_sorcery_location(), ast_sip_initialize_sorcery_transport(), ast_sorcery_alloc(), ast_sorcery_copy(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_diff(), ast_sorcery_force_reload(), ast_sorcery_force_reload_object(), ast_sorcery_get_module(), ast_sorcery_get_object_type(), ast_sorcery_get_wizard_mapping(), ast_sorcery_get_wizard_mapping_count(), ast_sorcery_instance_observer_add(), ast_sorcery_instance_observer_remove(), ast_sorcery_is_stale(), ast_sorcery_load(), ast_sorcery_load_object(), ast_sorcery_object_fields_register(), ast_sorcery_object_set_congestion_levels(), ast_sorcery_object_set_copy_handler(), ast_sorcery_object_set_diff_handler(), ast_sorcery_object_unregister(), ast_sorcery_objectset_apply(), ast_sorcery_objectset_create2(), ast_sorcery_objectset_json_create(), ast_sorcery_observer_add(), ast_sorcery_observer_remove(), ast_sorcery_ref(), ast_sorcery_reload(), ast_sorcery_reload_object(), ast_sorcery_retrieve_by_fields(), ast_sorcery_retrieve_by_id(), ast_sorcery_retrieve_by_prefix(), ast_sorcery_retrieve_by_regex(), ast_sorcery_update(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), AST_TEST_DEFINE(), bucket_file_wizard_create(), bucket_file_wizard_delete(), bucket_file_wizard_is_stale(), bucket_file_wizard_retrieve(), bucket_file_wizard_update(), bucket_http_wizard_retrieve_id(), bucket_wizard_create(), bucket_wizard_delete(), bucket_wizard_is_stale(), bucket_wizard_retrieve(), can_reuse_registration(), create_object(), deinitialize_sorcery(), deinitialize_sorcery(), global_loaded_observer(), handle_aor(), handle_auth(), handle_auths(), handle_endpoint(), handle_export_primitives(), handle_identify(), handle_phoneprov(), handle_registrations(), instance_created_observer(), instance_destroying_observer(), load_config(), load_module(), memory_cache_full_update(), memory_cache_populate(), memory_cache_stale_check(), memory_cache_stale_check_object(), memory_cache_stale_update_full(), memory_cache_stale_update_object(), mock_retrieve_id(), object_type_loaded_observer(), object_type_registered_observer(), profile_load(), profile_reload(), reload_module(), return_sorcery_object(), sorcery_astdb_create(), sorcery_astdb_filter_objectset(), sorcery_astdb_retrieve_fields(), sorcery_astdb_retrieve_fields_common(), sorcery_astdb_retrieve_id(), sorcery_astdb_retrieve_multiple(), sorcery_astdb_retrieve_prefix(), sorcery_astdb_retrieve_regex(), sorcery_astdb_update(), sorcery_config_internal_load(), sorcery_config_load(), sorcery_config_reload(), sorcery_config_retrieve_fields(), sorcery_config_retrieve_multiple(), sorcery_config_retrieve_prefix(), sorcery_config_retrieve_regex(), sorcery_destructor(), sorcery_function_read(), sorcery_is_configuration_met(), sorcery_is_explicit_name_met(), sorcery_memory_cache_create(), sorcery_memory_cache_load(), sorcery_memory_cache_retrieve_fields(), sorcery_memory_cache_retrieve_id(), sorcery_memory_cache_retrieve_multiple(), sorcery_memory_cache_retrieve_prefix(), sorcery_memory_cache_retrieve_regex(), sorcery_memory_cached_object_alloc(), sorcery_memory_retrieve_fields(), sorcery_memory_retrieve_multiple(), sorcery_memory_retrieve_prefix(), sorcery_memory_retrieve_regex(), sorcery_realtime_create(), sorcery_realtime_filter_objectset(), sorcery_realtime_retrieve_fields(), sorcery_realtime_retrieve_id(), sorcery_realtime_retrieve_multiple(), sorcery_realtime_retrieve_prefix(), sorcery_realtime_retrieve_regex(), sorcery_realtime_update(), sorcery_reloadable(), sorcery_test_retrieve_id(), sorcery_wizard_load(), stale_cache_update_task_data_alloc(), stale_update_task_data_alloc(), tn_config_load(), tn_config_reload(), transport_apply(), unload_module(), vs_config_load(), vs_config_reload(), webchan_request(), and wizard_apply_handler().

◆ websocket_options

const struct ast_app_option websocket_options[128] = { [ 'c' ] = { .flag = OPT_WS_CODEC , .arg_index = OPT_ARG_WS_CODEC + 1 }, [ 'n' ] = { .flag = OPT_WS_NO_AUTO_ANSWER }, [ 'v' ] = { .flag = OPT_WS_URI_PARAM , .arg_index = OPT_ARG_WS_URI_PARAM + 1 }, [ 'p' ] = { .flag = OPT_WS_PASSTHROUGH }, [ 'f' ] = { .flag = OPT_WS_MSG_FORMAT , .arg_index = OPT_ARG_WS_MSG_FORMAT + 1 }, }
static

Definition at line 1574 of file chan_websocket.c.

Referenced by webchan_request().

◆ websocket_tech

struct ast_channel_tech websocket_tech
static

Definition at line 135 of file chan_websocket.c.

135 {
136 .type = "WebSocket",
137 .description = "Media over WebSocket Channel Driver",
138 .requester = webchan_request,
139 .call = webchan_call,
140 .read = webchan_read,
141 .write = webchan_write,
142 .hangup = webchan_hangup,
143 .send_digit_end = webchan_send_dtmf_text,
144};

Referenced by load_module(), unload_module(), and webchan_request().