Asterisk - The Open Source Telephony Project GIT-master-80b953f
Loading...
Searching...
No Matches
chan_websocket.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2025, Sangoma Technologies Corporation
5 *
6 * George Joseph <gjoseph@sangoma.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \author George Joseph <gjoseph@sangoma.com>
22 *
23 * \brief Websocket Media Channel
24 *
25 * \ingroup channel_drivers
26 */
27
28/*** MODULEINFO
29 <depend>res_http_websocket</depend>
30 <depend>res_websocket_client</depend>
31 <support_level>core</support_level>
32 ***/
33
34#include "asterisk.h"
35
36#include "asterisk/app.h"
37#include "asterisk/causes.h"
38#include "asterisk/channel.h"
39#include "asterisk/codec.h"
42#include "asterisk/frame.h"
43#include "asterisk/json.h"
44#include "asterisk/lock.h"
45#include "asterisk/mod_format.h"
46#include "asterisk/module.h"
47#include "asterisk/pbx.h"
48#include "asterisk/uuid.h"
49#include "asterisk/timing.h"
50#include "asterisk/translate.h"
52#include "asterisk/sorcery.h"
53
54static struct ast_sorcery *sorcery = NULL;
55
61
62static const char *msg_format_map[] = {
63 [WEBCHAN_CONTROL_MSG_FORMAT_PLAIN] = "plain-text",
66};
67
72
73/* This is from the perspective of the app, NOT Asterisk */
79
80static const char *websocket_media_direction_map[] = {
84};
85
87
88static struct ao2_container *instances = NULL;
89
117
118/*
119 * These are the indexes in the channel's file descriptor array
120 * not the file descriptors themselves.
121 */
122#define WS_TIMER_FDNO (AST_EXTENDED_FDS + 1)
123#define WS_WEBSOCKET_FDNO (AST_EXTENDED_FDS + 2)
124
125#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
126#define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID"
127#define INCOMING_CONNECTION_ID "INCOMING"
128
129#define ANSWER_CHANNEL "ANSWER"
130#define HANGUP_CHANNEL "HANGUP"
131#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
132#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
133#define MARK_MEDIA "MARK_MEDIA"
134#define FLUSH_MEDIA "FLUSH_MEDIA"
135#define GET_DRIVER_STATUS "GET_STATUS"
136#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
137#define PAUSE_MEDIA "PAUSE_MEDIA"
138#define CONTINUE_MEDIA "CONTINUE_MEDIA"
139#define SET_MEDIA_DIRECTION "SET_MEDIA_DIRECTION"
140
141#define QUEUE_LENGTH_MAX 1000
142#define QUEUE_LENGTH_XOFF_LEVEL 900
143#define QUEUE_LENGTH_XON_LEVEL 800
144#define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
145
146/* Forward declarations */
147static int read_from_ws_and_queue(struct websocket_pvt *instance);
148static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause,
149 enum ast_websocket_status_code tech_cause, int line, const char *function);
150static struct ast_channel *webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
151static int webchan_call(struct ast_channel *ast, const char *dest, int timeout);
152static struct ast_frame *webchan_read(struct ast_channel *ast);
153static int webchan_write(struct ast_channel *ast, struct ast_frame *f);
154static int webchan_hangup(struct ast_channel *ast);
155static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration);
156static int set_channel_timer(struct websocket_pvt *instance);
157
158#define websocket_request_hangup(_instance, _cause, _tech) \
159 _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__)
160
162 .type = "WebSocket",
163 .description = "Media over WebSocket Channel Driver",
164 .requester = webchan_request,
165 .call = webchan_call,
166 .read = webchan_read,
167 .write = webchan_write,
168 .hangup = webchan_hangup,
169 .send_digit_end = webchan_send_dtmf_text,
170};
171
184
186{
188 return NULL;
189 }
190 return msg_format_map[value];
191}
192
193/*!
194 * \internal
195 * \brief Catch-all to print events that don't have any data.
196 * \warning Do not call directly.
197 */
198static char *_create_event_nodata(struct websocket_pvt *instance, char *event)
199{
200 char *payload = NULL;
202 struct ast_json * msg = ast_json_pack("{ s:s s:s }",
203 "event", event,
204 "channel_id", ast_channel_uniqueid(instance->channel));
205 if (!msg) {
206 return NULL;
207 }
209 ast_json_unref(msg);
210 } else {
211 payload = ast_strdup(event);
212 }
213
214 return payload;
215}
216
217#define _create_event_MEDIA_XON(_instance) _create_event_nodata(_instance, "MEDIA_XON");
218#define _create_event_MEDIA_XOFF(_instance) _create_event_nodata(_instance, "MEDIA_XOFF");
219#define _create_event_QUEUE_DRAINED(_instance) _create_event_nodata(_instance, "QUEUE_DRAINED");
220
221/*!
222 * \internal
223 * \brief Print the MEDIA_START event.
224 * \warning Do not call directly.
225 */
226static char *_create_event_MEDIA_START(struct websocket_pvt *instance)
227{
228 char *payload = NULL;
229
231 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s, s:s, s:s, s:i, s:i, s:o }",
232 "event", "MEDIA_START",
233 "connection_id", instance->connection_id,
234 "channel", ast_channel_name(instance->channel),
235 "channel_id", ast_channel_uniqueid(instance->channel),
236 "format", ast_format_get_name(instance->native_format),
237 "optimal_frame_size", instance->optimal_frame_size,
238 "ptime", instance->native_codec->default_ms,
239 "channel_variables", ast_json_channel_vars(ast_channel_varshead(
240 instance->channel))
241 );
242 if (!msg) {
243 return NULL;
244 }
246 ast_json_unref(msg);
247 } else {
248 ast_asprintf(&payload, "%s %s:%s %s:%s %s:%s %s:%s %s:%d %s:%d",
249 "MEDIA_START",
250 "connection_id", instance->connection_id,
251 "channel", ast_channel_name(instance->channel),
252 "channel_id", ast_channel_uniqueid(instance->channel),
253 "format", ast_format_get_name(instance->native_format),
254 "optimal_frame_size", instance->optimal_frame_size,
255 "ptime", instance->native_codec->default_ms
256 );
257 }
258
259 return payload;
260}
261
262/*!
263 * \internal
264 * \brief Print the MEDIA_BUFFERING_COMPLETED event.
265 * \warning Do not call directly.
266 */
268 const char *id)
269{
270 char *payload = NULL;
272 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
273 "event", "MEDIA_BUFFERING_COMPLETED",
274 "channel_id", ast_channel_uniqueid(instance->channel),
275 "correlation_id", S_OR(id, "")
276 );
277 if (!msg) {
278 return NULL;
279 }
281 ast_json_unref(msg);
282 } else {
283 ast_asprintf(&payload, "%s%s%s",
284 "MEDIA_BUFFERING_COMPLETED",
285 S_COR(id, " ",""), S_OR(id, ""));
286
287 }
288
289 return payload;
290}
291
292/*!
293 * \internal
294 * \brief Print the MEDIA_MARK_PROCESSED event.
295 * \warning Do not call directly.
296 */
298 const char *id)
299{
300 char *payload = NULL;
302 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
303 "event", "MEDIA_MARK_PROCESSED",
304 "channel_id", ast_channel_uniqueid(instance->channel),
305 "correlation_id", S_OR(id, "")
306 );
307 if (!msg) {
308 return NULL;
309 }
311 ast_json_unref(msg);
312 } else {
313 ast_asprintf(&payload, "%s%s%s",
314 "MEDIA_MARK_PROCESSED",
315 S_COR(id, " ",""), S_OR(id, ""));
316
317 }
318
319 return payload;
320}
321
322/*!
323 * \internal
324 * \brief Print the DTMF_END event.
325 * \warning Do not call directly.
326 */
327static char *_create_event_DTMF_END(struct websocket_pvt *instance,
328 const char digit)
329{
330 char *payload = NULL;
332 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s#}",
333 "event", "DTMF_END",
334 "channel_id", ast_channel_uniqueid(instance->channel),
335 "digit", &digit, 1
336 );
337 if (!msg) {
338 return NULL;
339 }
341 ast_json_unref(msg);
342 } else {
343 ast_asprintf(&payload, "%s digit:%c channel_id:%s",
344 "DTMF_END", digit, ast_channel_uniqueid(instance->channel));
345 }
346
347 return payload;
348}
349
350/*!
351 * \internal
352 * \brief Print the STATUS event.
353 * \warning Do not call directly.
354 */
355static char *_create_event_STATUS(struct websocket_pvt *instance)
356{
357 char *payload = NULL;
358
360 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:i, s:i, s:i, s:b, s:b, s:b }",
361 "event", "STATUS",
362 "channel_id", ast_channel_uniqueid(instance->channel),
363 "queue_length", instance->frame_queue_length,
364 "xon_level", QUEUE_LENGTH_XON_LEVEL,
365 "xoff_level", QUEUE_LENGTH_XOFF_LEVEL,
366 "queue_full", instance->queue_full,
367 "bulk_media", instance->bulk_media_in_progress,
368 "media_paused", instance->queue_paused
369 );
370 if (!msg) {
371 return NULL;
372 }
374 ast_json_unref(msg);
375 } else {
376 ast_asprintf(&payload, "%s channel_id:%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
377 "STATUS",
378 ast_channel_uniqueid(instance->channel),
381 S_COR(instance->queue_full, "true", "false"),
382 S_COR(instance->bulk_media_in_progress, "true", "false"),
383 S_COR(instance->queue_paused, "true", "false")
384 );
385 }
386
387 return payload;
388}
389
390/*!
391 * \internal
392 * \brief Print the ERROR event.
393 * \warning Do not call directly.
394 */
395static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR(
396 struct websocket_pvt *instance, const char *format, ...)
397{
398 char *payload = NULL;
399 char *error_text = NULL;
400 va_list ap;
401 int res = 0;
402
403 va_start(ap, format);
404 res = ast_vasprintf(&error_text, format, ap);
405 va_end(ap);
406 if (res < 0 || !error_text) {
407 return NULL;
408 }
409
410 if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
411 struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
412 "event", "ERROR",
413 "channel_id", ast_channel_uniqueid(instance->channel),
414 "error_text", error_text);
415 ast_free(error_text);
416 if (!msg) {
417 return NULL;
418 }
420 ast_json_unref(msg);
421 } else {
422 ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
423 "ERROR", ast_channel_uniqueid(instance->channel), error_text);
424 ast_free(error_text);
425 }
426
427 return payload;
428}
429
430/*!
431 * \def create_event
432 * \brief Use this macro to create events passing in any event-specific parameters.
433 */
434#define create_event(_instance, _event, ...) \
435 _create_event_ ## _event(_instance, ##__VA_ARGS__)
436
437/*!
438 * \def send_event
439 * \brief Use this macro to create and send events passing in any event-specific parameters.
440 */
441#define send_event(_instance, _event, ...) \
442({ \
443 int _res = -1; \
444 char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \
445 if (_payload && _instance->websocket) { \
446 _res = ast_websocket_write_string(_instance->websocket, _payload); \
447 if (_res != 0) { \
448 ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \
449 ast_channel_name(instance->channel), _payload); \
450 } else { \
451 ast_debug(3, "%s: Sent %s\n", \
452 ast_channel_name(instance->channel), _payload); \
453 }\
454 ast_free(_payload); \
455 } \
456 (_res); \
457})
458
459/*
460 * Reminder... This function gets called by webchan_read which is
461 * triggered by the channel timer firing. It always gets called
462 * every 20ms (or whatever the timer is set to) even if there are
463 * no frames in the queue.
464 */
465static struct ast_frame *dequeue_frame(struct websocket_pvt *instance)
466{
467 struct ast_frame *queued_frame = NULL;
468 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
470
471 /*
472 * If the queue is paused, don't read a frame. Processing
473 * will continue down the function and a silence frame will
474 * be sent in its place.
475 */
476 if (instance->queue_paused) {
477 return NULL;
478 }
479
480 /*
481 * We need to check if we need to send an XON before anything
482 * else because there are multiple escape paths in this function
483 * and we don't want to accidentally keep the queue in a "full"
484 * state.
485 */
486 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
487 instance->queue_full = 0;
488 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
489 ast_channel_name(instance->channel));
490 send_event(instance, MEDIA_XON);
491 }
492
493 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
494
495 /*
496 * If there are no frames in the queue, we need to
497 * return NULL so we can send a silence frame. We also need
498 * to send the QUEUE_DRAINED notification if we were requested
499 * to do so.
500 */
501 if (!queued_frame) {
502 if (instance->report_queue_drained) {
503 instance->report_queue_drained = 0;
504 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
505 ast_channel_name(instance->channel));
506 send_event(instance, QUEUE_DRAINED);
507 }
508 return NULL;
509 }
510
511 /*
512 * The only way a control frame could be present here is as
513 * a result of us calling queue_option_frame() in response
514 * to an incoming TEXT command from the websocket.
515 * We'll be safe and make sure it's a AST_CONTROL_OPTION
516 * frame anyway.
517 *
518 * It's quite possible that there are multiple control frames
519 * in a row in the queue so we need to process consecutive ones
520 * immediately.
521 *
522 * In any case, processing a control frame MUST not use up
523 * a media timeslot so after all control frames have been
524 * processed, we need to read an audio frame and process it.
525 */
526 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
527 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
528 /*
529 * We just need to send the data to the websocket.
530 * The data should already be NULL terminated.
531 */
533 queued_frame->data.ptr);
534 ast_debug(4, "%s: Sent %s\n",
535 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
536 }
537 /*
538 * We do NOT send these to the core so we need to free
539 * the frame and grab the next one. If it's also a
540 * control frame, we need to process it otherwise
541 * continue down in the function.
542 */
543 ast_frame_free(queued_frame, 0);
544 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
545 /*
546 * Jut FYI... We didn't bump the queue length when we added the control
547 * frames so we don't need to decrement it here.
548 */
549 }
550
551 /*
552 * If, after reading all control frames, there are no frames
553 * left in the queue, we need to return NULL so we can send
554 * a silence frame.
555 */
556 if (!queued_frame) {
557 return NULL;
558 }
559
560 instance->frame_queue_length--;
561
562 return queued_frame;
563}
564/*!
565 * \internal
566 *
567 * There are two file descriptors on this channel that can trigger
568 * this function...
569 *
570 * The timer fd (WS_TIMER_FDNO) which gets triggered at a constant
571 * rate determined by the format. In this case, we need to pull a
572 * frame OFF the queue and return it to the core.
573 *
574 * The websocket fd (WS_WEBSOCKET_FDNO) which gets triggered when
575 * there's incoming data to read from the websocket. In this case,
576 * we read the data and put it ON the queue. We'll return a null frame.
577 *
578 */
579static struct ast_frame *webchan_read(struct ast_channel *ast)
580{
581 struct websocket_pvt *instance = NULL;
582 struct ast_frame *native_frame = NULL;
583 int fdno = ast_channel_fdno(ast);
584
585 instance = ast_channel_tech_pvt(ast);
586 if (!instance) {
587 return NULL;
588 }
589
590 if (fdno == WS_WEBSOCKET_FDNO) {
591 read_from_ws_and_queue(instance);
592 return &ast_null_frame;
593 }
594 if (fdno != WS_TIMER_FDNO) {
595 return &ast_null_frame;
596 }
597
599 ast_timer_ack(instance->timer, 1);
600 }
601
602 native_frame = dequeue_frame(instance);
603
604 /*
605 * No frame when the timer fires means we have to return a null frame in its place.
606 */
607 if (!native_frame) {
608 ast_debug(4, "%s: WebSocket read timer fired with no frame available. Returning NULL frame.\n",
609 ast_channel_name(ast));
610 return &ast_null_frame;
611 }
612
613 return native_frame;
614}
615
616static int queue_frame_from_buffer(struct websocket_pvt *instance,
617 char *buffer, size_t len)
618{
619 struct ast_frame fr = { 0, };
620 struct ast_frame *duped_frame = NULL;
621
622 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
624 fr.subclass.format = instance->native_format;
625 fr.samples = instance->native_codec->samples_count(&fr);
626
627 duped_frame = ast_frisolate(&fr);
628 if (!duped_frame) {
629 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
630 ast_channel_name(instance->channel));
631 return -1;
632 }
633
634 {
635 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
637 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
638 instance->frame_queue_length++;
639 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
640 instance->queue_full = 1;
641 send_event(instance, MEDIA_XOFF);
642 }
643 }
644
645 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
646 duped_frame->datalen);
647
648 return 0;
649}
650
651static int queue_option_frame(struct websocket_pvt *instance,
652 char *buffer)
653{
654 struct ast_frame fr = { 0, };
655 struct ast_frame *duped_frame = NULL;
656
657 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
660
661 duped_frame = ast_frisolate(&fr);
662 if (!duped_frame) {
663 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
664 ast_channel_name(instance->channel));
665 return -1;
666 }
667
668 AST_LIST_LOCK(&instance->frame_queue);
669 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
670 AST_LIST_UNLOCK(&instance->frame_queue);
671
672 ast_debug(4, "%s: Queued '%s' option frame\n",
673 ast_channel_name(instance->channel), buffer);
674
675 return 0;
676}
677
678#define ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command) \
679({ \
680 if (instance->passthrough) { \
681 send_event(instance, ERROR, "%s not supported in passthrough mode", command); \
682 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", \
683 ast_channel_name(instance->channel), command); \
684 return 0; \
685 } \
686})
687
688#define ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, direction) \
689({ \
690 if (instance->media_direction == direction) { \
691 send_event(instance, ERROR, "%s not supported while media direction " \
692 "is '%s'", command, websocket_media_direction_map[direction]); \
693 ast_debug(4, "%s: WebSocket media direction is '%s'. Ignoring %s command.\n", \
694 ast_channel_name(instance->channel), websocket_media_direction_map[direction], command); \
695 return 0; \
696 } \
697})
698
699/*!
700 * \internal
701 * \brief Handle commands from the websocket
702 *
703 * \param instance
704 * \param buffer Allocated by caller so don't free.
705 * \retval 0 Success
706 * \retval -1 Failure
707 */
708static int handle_command(struct websocket_pvt *instance, char *buffer)
709{
710 int res = 0;
711 RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
712 const char *command = NULL;
713 char *data = NULL;
714
716 struct ast_json_error json_error;
717
718 json = ast_json_load_buf(buffer, strlen(buffer), &json_error);
719 if (!json) {
720 send_event(instance, ERROR, "Unable to parse JSON command");
721 return -1;
722 }
723 command = ast_json_object_string_get(json, "command");
724 } else {
725 command = buffer;
726 data = strchr(buffer, ' ');
727 if (data) {
728 *data = '\0';
729 data++;
730 }
731 }
732
733 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
735
736 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
738
739 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
740 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
742 AST_LIST_LOCK(&instance->frame_queue);
743 instance->bulk_media_in_progress = 1;
744 AST_LIST_UNLOCK(&instance->frame_queue);
745
746 } else if (ast_strings_equal(command, STOP_MEDIA_BUFFERING)) {
747 const char *id;
748 char *option;
749 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
751
753 id = ast_json_object_string_get(json, "correlation_id");
754 } else {
755 id = data;
756 }
757
758 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
760
761 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
763 (int)instance->leftover_len);
764
765 instance->bulk_media_in_progress = 0;
766 if (instance->leftover_len > 0) {
767 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
768 if (res != 0) {
769 return res;
770 }
771 }
772 instance->leftover_len = 0;
773 option = create_event(instance, MEDIA_BUFFERING_COMPLETED, id);
774 if (!option) {
775 return -1;
776 }
777 res = queue_option_frame(instance, option);
778 ast_free(option);
779
780 } else if (ast_strings_equal(command, MARK_MEDIA)) {
781 const char *id;
782 char *option;
783 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
785
786 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
788
790 id = ast_json_object_string_get(json, "correlation_id");
791 } else {
792 id = data;
793 }
794
795 ast_debug(4, "%s: %s %s\n",
796 ast_channel_name(instance->channel), MARK_MEDIA, id);
797
798 option = create_event(instance, MEDIA_MARK_PROCESSED, id);
799 if (!option) {
800 return -1;
801 }
802 res = queue_option_frame(instance, option);
803 ast_free(option);
804
805 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
806 struct ast_frame *frame = NULL;
807
808 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
809
810 AST_LIST_LOCK(&instance->frame_queue);
811 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
812 ast_frfree(frame);
813 }
814 instance->frame_queue_length = 0;
815 instance->bulk_media_in_progress = 0;
816 instance->leftover_len = 0;
817 AST_LIST_UNLOCK(&instance->frame_queue);
818
819 } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
820 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
821
822 AST_LIST_LOCK(&instance->frame_queue);
823 instance->report_queue_drained = 1;
824 AST_LIST_UNLOCK(&instance->frame_queue);
825
826 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
827 return send_event(instance, STATUS);
828
829 } else if (ast_strings_equal(command, PAUSE_MEDIA)) {
830 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
832 AST_LIST_LOCK(&instance->frame_queue);
833 instance->queue_paused = 1;
834 AST_LIST_UNLOCK(&instance->frame_queue);
835
836 } else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
837 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
839 AST_LIST_LOCK(&instance->frame_queue);
840 instance->queue_paused = 0;
841 AST_LIST_UNLOCK(&instance->frame_queue);
842
843 } else if (ast_strings_equal(command, SET_MEDIA_DIRECTION)) {
844 const char *direction;
845
846 ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command);
847
849 send_event(instance, ERROR, "%s only supports JSON format.\n", command);
850 return 0;
851 }
852
853 direction = ast_json_object_string_get(json, "direction");
854 if (!direction) {
855 send_event(instance, ERROR, "%s requires a 'direction' parameter.\n", command);
856 return 0;
857 }
858
859 if (!strcmp("both", direction)) {
861 return 0;
862 }
863
864 if (!instance->timer) {
865 set_channel_timer(instance);
867 }
868
870
871 } else if (!strcmp("out", direction)) {
873 return 0;
874 }
875
876 if (!instance->timer) {
877 set_channel_timer(instance);
879 }
880
882
883 } else if (!strcmp("in", direction)) {
885 return 0;
886 }
887
888 if (instance->timer) {
890 ast_timer_close(instance->timer);
891 instance->timer = NULL;
893 }
894
896
897 } else {
898 send_event(instance, ERROR, "'%s' is not a valid direction for %s.\n",
899 direction, command);
900 return 0;
901 }
902
903 } else {
904 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
905 ast_channel_name(instance->channel), command);
906 }
907
908 return res;
909}
910
911static int process_text_message(struct websocket_pvt *instance,
912 char *payload, uint64_t payload_len)
913{
914 char *command;
915
916 if (payload_len == 0) {
917 ast_log(LOG_WARNING, "%s: WebSocket TEXT message has 0 length\n",
918 ast_channel_name(instance->channel));
919 return 0;
920 }
921
922 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
923 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
924 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
925 return 0;
926 }
927
928 /*
929 * Unfortunately, payload is not NULL terminated even when it's
930 * a TEXT frame so we need to allocate a new buffer, copy
931 * the data into it, and NULL terminate it.
932 */
933 command = ast_alloca(payload_len + 1);
934 memcpy(command, payload, payload_len); /* Safe */
935 command[payload_len] = '\0';
936 command = ast_strip(command);
937
938 ast_debug(4, "%s: Received: %s\n",
939 ast_channel_name(instance->channel), command);
940
941 return handle_command(instance, command);
942}
943
944static int process_binary_message(struct websocket_pvt *instance,
945 char *payload, uint64_t payload_len)
946{
947 char *next_frame_ptr = NULL;
948 size_t bytes_read = 0;
949 int res = 0;
950 size_t bytes_left = 0;
951
952 {
953 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
955 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
956 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
957 ast_channel_name(instance->channel));
958 return 0;
959 }
960 }
961
962 next_frame_ptr = payload;
963 instance->bytes_read += payload_len;
964
965 if (instance->passthrough) {
966 res = queue_frame_from_buffer(instance, payload, payload_len);
967 return res;
968 }
969
970 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
971 /*
972 * We have leftover data from a previous websocket message.
973 * Try to make a complete frame by appending data from
974 * the current message to the leftover data.
975 */
976 char *append_ptr = instance->leftover_data + instance->leftover_len;
977 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
978 /*
979 * It's possible that even the current message doesn't have enough
980 * data to make a complete frame.
981 */
982 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
983
984 /*
985 * Append whatever we can to the end of the leftover data
986 * even if it's not enough to make a complete frame.
987 */
988 memcpy(append_ptr, payload, bytes_avail_to_copy);
989
990 /*
991 * If leftover data is still short, just return and wait for the
992 * next websocket message.
993 */
994 if (bytes_avail_to_copy < bytes_needed_for_frame) {
995 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
996 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
997 instance->leftover_len += bytes_avail_to_copy;
998 return 0;
999 }
1000
1001 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
1002 if (res < 0) {
1003 return -1;
1004 }
1005
1006 /*
1007 * We stole data from the current payload so decrement payload_len
1008 * and set the next frame pointer after the data in payload
1009 * we just copied.
1010 */
1011 payload_len -= bytes_avail_to_copy;
1012 next_frame_ptr = payload + bytes_avail_to_copy;
1013
1014 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
1015 ast_channel_name(instance->channel),
1016 instance->frame_queue_length,
1017 (int)instance->bytes_read,
1018 (int)(payload_len + bytes_avail_to_copy),
1019 (int)instance->leftover_len,
1020 payload,
1021 next_frame_ptr,
1022 (int)(next_frame_ptr - payload),
1023 (int)payload_len,
1024 (int)bytes_avail_to_copy
1025 );
1026
1027
1028 instance->leftover_len = 0;
1029 }
1030
1031 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
1032 instance->leftover_len = 0;
1033 }
1034
1035 bytes_left = payload_len;
1036 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
1037 res = queue_frame_from_buffer(instance, next_frame_ptr,
1038 instance->optimal_frame_size);
1039 if (res < 0) {
1040 break;
1041 }
1042 bytes_read += instance->optimal_frame_size;
1043 next_frame_ptr += instance->optimal_frame_size;
1044 bytes_left -= instance->optimal_frame_size;
1045 }
1046
1047 if (instance->bulk_media_in_progress && bytes_left > 0) {
1048 /*
1049 * We have a partial frame. Save the leftover data.
1050 */
1051 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
1052 ast_channel_name(instance->channel),
1053 (int)instance->bytes_read,
1054 instance->frame_queue_length,
1055 (int)payload_len,
1056 (int)instance->leftover_len,
1057 payload,
1058 next_frame_ptr,
1059 (int)(next_frame_ptr - payload),
1060 (int)bytes_left
1061 );
1062 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
1063 instance->leftover_len = bytes_left;
1064 }
1065
1066 return 0;
1067}
1068
1069static int read_from_ws_and_queue(struct websocket_pvt *instance)
1070{
1071 uint64_t payload_len = 0;
1072 char *payload = NULL;
1073 enum ast_websocket_opcode opcode;
1074 int fragmented = 0;
1075 int res = 0;
1076
1077 if (!instance->websocket) {
1078 ast_log(LOG_WARNING, "%s: WebSocket session not found\n",
1079 ast_channel_name(instance->channel));
1080 return -1;
1081 }
1082
1083 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
1084 &opcode, &fragmented);
1085
1086 if (res) {
1087 ast_debug(3, "%s: WebSocket read error\n",
1088 ast_channel_name(instance->channel));
1090 return -1;
1091 }
1092 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
1093 (int)payload_len);
1094
1095 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
1096 return process_text_message(instance, payload, payload_len);
1097 }
1098
1099 if (opcode == AST_WEBSOCKET_OPCODE_PING || opcode == AST_WEBSOCKET_OPCODE_PONG) {
1100 return 0;
1101 }
1102
1103 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
1104 ast_debug(3, "%s: WebSocket closed by remote\n",
1105 ast_channel_name(instance->channel));
1107 return -1;
1108 }
1109
1110 if (opcode == AST_WEBSOCKET_OPCODE_BINARY) {
1111 /* If the application's media direction is 'in', drop any media we receive from it */
1113 ast_debug(5, "%s: WebSocket dropped frame (application media direction is 'in')\n",
1114 ast_channel_name(instance->channel));
1115 return 0;
1116 }
1117 } else {
1118 ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n",
1119 ast_channel_name(instance->channel), (int)opcode);
1121 return 0;
1122 }
1123
1124 return process_binary_message(instance, payload, payload_len);
1125}
1126
1128{
1129 int res = 0;
1130 int nodelay = 1;
1131 struct ast_sockaddr *remote_addr = ast_websocket_remote_address(instance->websocket);
1132
1133 instance->remote_addr = ast_strdup(ast_sockaddr_stringify(remote_addr));
1134 ast_debug(3, "%s: WebSocket connection with %s established\n",
1135 ast_channel_name(instance->channel), instance->remote_addr);
1136
1137 if (setsockopt(ast_websocket_fd(instance->websocket),
1138 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1139 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
1140 }
1141
1143
1144 res = send_event(instance, MEDIA_START);
1145 if (res != 0 ) {
1146 if (instance->type == AST_WS_TYPE_SERVER) {
1148 } else {
1149 /*
1150 * We were called by webchan_call so just need to set causes.
1151 * The core will hangup the channel.
1152 */
1155 }
1156 return -1;
1157 }
1158
1159 if (!instance->no_auto_answer) {
1160 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
1162 }
1163
1164 return 0;
1165}
1166
1167static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause,
1168 enum ast_websocket_status_code tech_cause, int line, const char *function)
1169{
1170 if (!instance || !instance->channel) {
1171 return;
1172 }
1173 ast_debug(3, "%s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d)",
1174 ast_channel_name(instance->channel), instance->remote_addr,
1175 function, line,
1176 ast_cause2str(ast_cause), ast_cause, ast_websocket_status_to_str(tech_cause), tech_cause);
1177
1178 if (tech_cause) {
1179 ast_channel_tech_hangupcause_set(instance->channel, tech_cause);
1180 }
1181 ast_queue_hangup_with_cause(instance->channel, ast_cause);
1182}
1183
1184/*! \brief Function called when we should write a frame to the channel */
1185static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
1186{
1187 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1188
1189 if (!instance || !instance->websocket) {
1190 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
1191 ast_channel_name(ast));
1192 return -1;
1193 }
1194
1195 /* The app doesn't want media right now */
1197 return 0;
1198 }
1199
1200 if (f->frametype == AST_FRAME_CNG) {
1201 return 0;
1202 }
1203
1204 if (f->frametype != AST_FRAME_VOICE) {
1205 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
1206 ast_channel_name(ast));
1207 return 0;
1208 }
1209
1211 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
1214 return -1;
1215 }
1216
1218 (char *)f->data.ptr, (uint64_t)f->datalen);
1219}
1220
1221/*!
1222 * \internal
1223 *
1224 * Called by the core to actually call the remote.
1225 * The core will hang up the channel if a non-zero is returned.
1226 * We just need to set hangup causes if appropriate.
1227 */
1228static int webchan_call(struct ast_channel *ast, const char *dest,
1229 int timeout)
1230{
1231 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1233
1234 if (!instance) {
1235 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
1236 ast_channel_name(ast));
1238 return -1;
1239 }
1240
1241 if (instance->type == AST_WS_TYPE_SERVER) {
1242 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
1243 return 0;
1244 }
1245 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
1246
1247 if (!instance->client) {
1248 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
1249 ast_channel_name(ast));
1251 return -1;
1252 }
1253
1254 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
1255 ast_channel_name(ast), dest, instance->connection_id);
1256
1257 if (!ast_strlen_zero(instance->uri_params)) {
1259 }
1260
1261 instance->websocket = ast_websocket_client_connect(instance->client,
1262 instance, ast_channel_name(ast), &result);
1263 if (!instance->websocket || result != WS_OK) {
1264 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
1267 return -1;
1268 }
1269
1270 return websocket_handoff_to_channel(instance);
1271}
1272
1273static void websocket_destructor(void *data)
1274{
1275 struct websocket_pvt *instance = data;
1276 struct ast_frame *frame = NULL;
1277 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
1278
1279 AST_LIST_LOCK(&instance->frame_queue);
1280 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
1281 ast_frfree(frame);
1282 }
1283 AST_LIST_UNLOCK(&instance->frame_queue);
1284
1285 if (instance->timer) {
1286 ast_timer_close(instance->timer);
1287 instance->timer = NULL;
1288 }
1289
1290 if (instance->channel) {
1291 ast_channel_unref(instance->channel);
1292 instance->channel = NULL;
1293 }
1294 if (instance->websocket) {
1295 ast_websocket_unref(instance->websocket);
1296 instance->websocket = NULL;
1297 }
1298
1299 ao2_cleanup(instance->client);
1300 instance->client = NULL;
1301
1302 ao2_cleanup(instance->native_codec);
1303 instance->native_codec = NULL;
1304
1305 ao2_cleanup(instance->native_format);
1306 instance->native_format = NULL;
1307
1308 if (instance->leftover_data) {
1309 ast_free(instance->leftover_data);
1310 instance->leftover_data = NULL;
1311 }
1312
1313 ast_free(instance->uri_params);
1314 ast_free(instance->remote_addr);
1315}
1316
1319 /*! \brief The name of the module owning this sorcery instance */
1321};
1322
1323static void instance_proxy_cb(void *weakproxy, void *data)
1324{
1325 struct instance_proxy *proxy = weakproxy;
1326 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1327 ao2_unlink(instances, weakproxy);
1328}
1329
1330static struct websocket_pvt* websocket_new(const char *chan_name,
1331 const char *connection_id, struct ast_format *fmt)
1332{
1333 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1334 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1335 char uuid[AST_UUID_STR_LEN];
1336 enum ast_websocket_type ws_type;
1337
1338 SCOPED_AO2WRLOCK(locker, instances);
1339
1342 ws_type = AST_WS_TYPE_SERVER;
1343 } else {
1344 ws_type = AST_WS_TYPE_CLIENT;
1345 }
1346
1347 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1348 if (!proxy) {
1349 return NULL;
1350 }
1351 strcpy(proxy->connection_id, connection_id); /* Safe */
1352
1353 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1355 if (!instance) {
1356 return NULL;
1357 }
1358 strcpy(instance->connection_id, connection_id); /* Safe */
1359
1360 instance->type = ws_type;
1361 if (ws_type == AST_WS_TYPE_CLIENT) {
1362 instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id);
1363 if (!instance->client) {
1364 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1365 chan_name, instance->connection_id);
1366 return NULL;
1367 }
1368 }
1369
1370 AST_LIST_HEAD_INIT(&instance->frame_queue);
1371
1372 /*
1373 * We need the codec to calculate the number of samples in a frame
1374 * so we'll get it once and store it in the instance.
1375 *
1376 * References for native_format and native_codec are now held by the
1377 * instance and will be released when the instance is destroyed.
1378 */
1379 instance->native_format = fmt;
1380 instance->native_codec = ast_format_get_codec(instance->native_format);
1381 /*
1382 * References for native_format and native_codec are now held by the
1383 * instance and will be released when the instance is destroyed.
1384 */
1385
1386 /*
1387 * It's not possible for us to re-time or re-frame media if the data
1388 * stream can't be broken up on arbitrary byte boundaries. This is usually
1389 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1390 * We need to force passthrough mode in this case.
1391 */
1392 if (instance->native_codec->minimum_bytes <= 10) {
1393 instance->passthrough = 1;
1394 instance->optimal_frame_size = 0;
1395 } else {
1396 instance->optimal_frame_size =
1397 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1398 / instance->native_codec->minimum_ms;
1399 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1400 if (!instance->leftover_data) {
1401 return NULL;
1402 }
1403 }
1404
1405 ast_debug(3,
1406 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1407 chan_name, ast_format_get_name(instance->native_format),
1408 ast_format_get_sample_rate(instance->native_format),
1409 ast_format_get_default_ms(instance->native_format),
1410 ast_format_get_minimum_ms(instance->native_format),
1411 ast_format_get_minimum_bytes(instance->native_format),
1412 instance->passthrough,
1413 instance->optimal_frame_size);
1414
1415 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1416 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1417 return NULL;
1418 }
1419
1421 return NULL;
1422 }
1423
1424 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1425 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1426 proxy->connection_id);
1427 return NULL;
1428 }
1429 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1430
1431 return ao2_bump(instance);
1432}
1433
1434static int set_channel_timer(struct websocket_pvt *instance)
1435{
1436 int rate = 0;
1437 instance->timer = ast_timer_open();
1438 if (!instance->timer) {
1439 return -1;
1440 }
1441 /* Rate is the number of ticks per second, not the interval. */
1442 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1443 ast_debug(3, "%s: WebSocket timer rate %d\n",
1444 ast_channel_name(instance->channel), rate);
1445 ast_timer_set_rate(instance->timer, rate);
1446 /*
1447 * Calling ast_channel_set_fd will cause the channel thread to call
1448 * webchan_read at 'rate' times per second.
1449 */
1451
1452 return 0;
1453}
1454
1455static int set_channel_variables(struct websocket_pvt *instance)
1456{
1457 char *pkt_size = NULL;
1458 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1459 if (res <= 0) {
1460 return -1;
1461 }
1462
1464 pkt_size);
1465 ast_free(pkt_size);
1467 instance->connection_id);
1468
1469 return 0;
1470}
1471
1473{
1474 char *params = ast_strdupa(uri_params);
1475 char *nvp = NULL;
1476 char *nv = NULL;
1477
1478 /*
1479 * uri_params should be a comma-separated list of key=value pairs.
1480 * For example:
1481 * name1=value1,name2=value2
1482 * We're verifying that each name and value either doesn't need
1483 * to be encoded or that it already is.
1484 */
1485
1486 while((nvp = ast_strsep(&params, ',', 0))) {
1487 /* nvp will be name1=value1 */
1488 while((nv = ast_strsep(&nvp, '=', 0))) {
1489 /* nv will be either name1 or value1 */
1490 if (!ast_uri_verify_encoded(nv)) {
1491 return 0;
1492 }
1493 }
1494 }
1495
1496 return 1;
1497}
1498
1499enum {
1500 OPT_WS_CODEC = (1 << 0),
1506};
1507
1508enum {
1517
1526
1527static struct ast_channel *webchan_request(const char *type,
1528 struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids,
1529 const struct ast_channel *requestor, const char *data, int *cause)
1530{
1531 char *parse;
1532 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1533 struct ast_channel *chan = NULL;
1534 struct ast_format *fmt = NULL;
1535 struct ast_format_cap *caps = NULL;
1537 AST_APP_ARG(connection_id);
1539 );
1540 struct ast_flags opts = { 0, };
1541 char *opt_args[OPT_ARG_ARRAY_SIZE];
1542 const char *requestor_name = requestor ? ast_channel_name(requestor) :
1543 (assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
1544 RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
1545
1546 global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
1547
1548 ast_debug(3, "%s: WebSocket channel requested\n",
1549 requestor_name);
1550
1551 if (ast_strlen_zero(data)) {
1552 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1553 requestor_name);
1554 goto failure;
1555 }
1556 parse = ast_strdupa(data);
1557 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1558
1559 if (ast_strlen_zero(args.connection_id)) {
1560 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1561 requestor_name);
1562 goto failure;
1563 }
1564
1565 if (!ast_strlen_zero(args.options)
1566 && ast_app_parse_options(websocket_options, &opts, opt_args,
1567 ast_strdupa(args.options))) {
1568 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1569 requestor_name, args.options);
1570 goto failure;
1571 }
1572
1573 if (ast_test_flag(&opts, OPT_WS_CODEC)
1574 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1575 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1576 } else {
1577 /*
1578 * If codec wasn't specified in the dial string,
1579 * use the first format in the capabilities.
1580 */
1581 fmt = ast_format_cap_get_format(cap, 0);
1582 }
1583
1584 if (!fmt) {
1585 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1586 requestor_name, args.connection_id);
1587 goto failure;
1588 }
1589
1590 ast_debug(3, "%s: Using format %s from %s\n",
1591 requestor_name, ast_format_get_name(fmt),
1592 ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
1593
1594 instance = websocket_new(requestor_name, args.connection_id, fmt);
1595 if (!instance) {
1596 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1597 requestor_name);
1598 goto failure;
1599 }
1600
1601 instance->media_direction = WEBCHAN_MEDIA_DIRECTION_BOTH;
1603 if (!strcmp("both", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) {
1604 /* The default. Don't need to do anything here other than
1605 * ensure it is an allowed value. */
1606 } else if (!strcmp("out", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) {
1607 instance->media_direction = WEBCHAN_MEDIA_DIRECTION_OUT;
1608 } else if (!strcmp("in", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) {
1609 instance->media_direction = WEBCHAN_MEDIA_DIRECTION_IN;
1610 } else {
1611 ast_log(LOG_ERROR, "Unrecognized option for media direction: '%s'.\n",
1612 opt_args[OPT_ARG_WS_MEDIA_DIRECTION]);
1613 goto failure;
1614 }
1615 }
1616
1617 instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER);
1618 if (!instance->passthrough) {
1619 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1620 }
1621
1623 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1624 char *comma;
1625
1626 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1628 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1629 requestor_name);
1630 goto failure;
1631 }
1632
1633 ast_debug(3, "%s: Using URI parameters '%s'\n",
1634 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1635
1637 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1638 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1639 args.connection_id);
1640 goto failure;
1641 }
1642
1643 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1644 comma = instance->uri_params;
1645 /*
1646 * The normal separator for query string components is an
1647 * ampersand ('&') but the Dial app interprets them as additional
1648 * channels to dial in parallel so we instruct users to separate
1649 * the parameters with commas (',') instead. We now have to
1650 * convert those commas back to ampersands.
1651 */
1652 while ((comma = strchr(comma,','))) {
1653 *comma = '&';
1654 }
1655 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1656 }
1657
1658 if (ast_test_flag(&opts, OPT_WS_MSG_FORMAT)) {
1659 instance->control_msg_format = control_msg_format_from_str(opt_args[OPT_ARG_WS_MSG_FORMAT]);
1660
1661 if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_INVALID) {
1662 ast_log(LOG_WARNING, "%s: 'f/control message format' dialstring parameter value missing or invalid. "
1663 "Defaulting to 'plain-text'\n",
1664 ast_channel_name(requestor));
1665 instance->control_msg_format = WEBCHAN_CONTROL_MSG_FORMAT_PLAIN;
1666 }
1667 } else if (global_cfg) {
1668 instance->control_msg_format = global_cfg->control_msg_format;
1669 }
1670
1671 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1672 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1673 if (!chan) {
1674 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1675 goto failure;
1676 }
1677
1678 /* Prevent device state caching as this channel involves ephemeral destinations or sources */
1680 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1681 ast_channel_name(chan), requestor_name,
1682 instance->connection_id);
1683
1684 instance->channel = ao2_bump(chan);
1685 ast_channel_tech_set(instance->channel, &websocket_tech);
1686
1687 /* If the application's media direction is 'both' or 'out', we need the channel timer. */
1688 if (instance->media_direction != WEBCHAN_MEDIA_DIRECTION_IN
1689 && set_channel_timer(instance) != 0) {
1690 goto failure;
1691 }
1692
1693 if (set_channel_variables(instance) != 0) {
1694 goto failure;
1695 }
1696
1698 if (!caps) {
1699 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1700 goto failure;
1701 }
1702
1703 ast_format_cap_append(caps, instance->native_format, 0);
1704 ast_channel_nativeformats_set(instance->channel, caps);
1705 ast_channel_set_writeformat(instance->channel, instance->native_format);
1706 ast_channel_set_rawwriteformat(instance->channel, instance->native_format);
1707 ast_channel_set_readformat(instance->channel, instance->native_format);
1708 ast_channel_set_rawreadformat(instance->channel, instance->native_format);
1709 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1710 ast_channel_unlock(chan);
1711 ao2_cleanup(caps);
1712
1713 ast_debug(3, "%s: WebSocket channel created to %s\n",
1714 ast_channel_name(chan), args.connection_id);
1715
1716 return chan;
1717
1718failure:
1719 if (chan) {
1720 ast_channel_unlock(chan);
1721 }
1722 *cause = AST_CAUSE_FAILURE;
1723 return NULL;
1724}
1725
1726/*!
1727 * \internal
1728 *
1729 * Called by the core to hang up the channel.
1730 */
1731static int webchan_hangup(struct ast_channel *ast)
1732{
1733 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1734
1735 if (!instance) {
1736 return -1;
1737 }
1738 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1739 ast_channel_name(ast), instance->connection_id);
1740
1741 if (instance->websocket) {
1743 ast_websocket_unref(instance->websocket);
1744 instance->websocket = NULL;
1745 }
1747
1748 /* Clean up the reference from adding the instance to the channel */
1749 ao2_cleanup(instance);
1750
1751 return 0;
1752}
1753
1754static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
1755{
1756 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1757
1758 if (!instance) {
1759 return -1;
1760 }
1761
1762 return send_event(instance, DTMF_END, digit);
1763}
1764
1765/*!
1766 * \internal
1767 *
1768 * Called by res_http_websocket after a client has connected and
1769 * successfully upgraded from HTTP to WebSocket.
1770 *
1771 * Depends on incoming_ws_http_callback parsing the connection_id from
1772 * the HTTP request and storing it in get_params.
1773 */
1774static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
1775 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
1776{
1777 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1778 struct ast_variable *v;
1779 const char *connection_id = NULL;
1780 struct websocket_pvt *instance = NULL;
1781
1782 ast_debug(3, "WebSocket established\n");
1783
1784 for (v = upgrade_headers; v; v = v->next) {
1785 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1786 }
1787 for (v = get_params; v; v = v->next) {
1788 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1789 }
1790
1791 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1792 if (!connection_id) {
1793 /*
1794 * This can't really happen because websocket_http_callback won't
1795 * let it get this far if it can't add the connection_id to the
1796 * get_params.
1797 * Just in case though...
1798 */
1799 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1802 return;
1803 }
1804
1806 if (!instance) {
1807 /*
1808 * This also can't really happen because websocket_http_callback won't
1809 * let it get this far if it can't find the instance.
1810 * Just in case though...
1811 */
1812 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1815 return;
1816 }
1817 instance->websocket = ao2_bump(ast_ws_session);
1818
1820 ao2_cleanup(instance);
1821 /*
1822 * The instance is the channel's responsibility now.
1823 * We just return here.
1824 */
1825}
1826
1827/*!
1828 * \internal
1829 *
1830 * Called by the core http server after a client connects but before
1831 * the upgrade from HTTP to Websocket. We need to save the URI in
1832 * the CONNECTION_ID in a get_param because it contains the connection UUID
1833 * we gave to the client when they used externalMedia to create the channel.
1834 * incoming_ws_established_cb() will use this to retrieve the chan_websocket
1835 * instance.
1836 */
1838 const struct ast_http_uri *urih, const char *uri,
1839 enum ast_http_method method, struct ast_variable *get_params,
1840 struct ast_variable *headers)
1841{
1842 struct ast_http_uri fake_urih = {
1844 };
1845 int res = 0;
1846 /*
1847 * Normally the http server will destroy the get_params
1848 * when the session ends but if there weren't any initially
1849 * and we create some and add them to the list, the http server
1850 * won't know about it so we have to destroy it ourselves.
1851 */
1852 int destroy_get_params = (get_params == NULL);
1853 struct ast_variable *v = NULL;
1854 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1855
1856 ast_debug(2, "URI: %s Starting\n", uri);
1857
1858 /*
1859 * The client will have issued the GET request with a URI of
1860 * /media/<connection_id>
1861 *
1862 * Since this callback is registered for the /media URI prefix the
1863 * http server will strip that off the front of the URI passing in
1864 * only the path components after that in the 'uri' parameter.
1865 * This should leave only the connection id without a leading '/'.
1866 */
1867 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1868 if (!instance) {
1869 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1870 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1871 return -1;
1872 }
1873
1874 /*
1875 * We don't allow additional connections using the same connection id.
1876 */
1877 if (instance->websocket) {
1878 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1879 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1880 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1881 return -1;
1882 }
1883
1884 v = ast_variable_new("CONNECTION_ID", uri, "");
1885 if (!v) {
1886 ast_http_error(ser, 500, "Server error", "");
1887 return -1;
1888 }
1889 ast_variable_list_append(&get_params, v);
1890
1891 for (v = get_params; v; v = v->next) {
1892 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1893 }
1894
1895 /*
1896 * This will ultimately call internal_ws_established_cb() so
1897 * this function will block until the websocket is closed and
1898 * internal_ws_established_cb() returns;
1899 */
1900 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1901 get_params, headers);
1902 if (destroy_get_params) {
1903 ast_variables_destroy(get_params);
1904 }
1905
1906 ast_debug(2, "URI: %s DONE\n", uri);
1907
1908 return res;
1909}
1910
1911static struct ast_http_uri http_uri = {
1913 .description = "Media over Websocket",
1914 .uri = "media",
1915 .has_subtree = 1,
1916 .data = NULL,
1917 .key = __FILE__,
1918 .no_decode_uri = 1,
1919};
1920
1924
1926 struct ast_variable *var, void *obj)
1927{
1928 struct webchan_conf_global *cfg = obj;
1929
1931
1933 ast_log(LOG_ERROR, "chan_websocket.conf: Invalid value '%s' for "
1934 "control_mesage_format. Must be 'plain-text' or 'json'\n",
1935 var->value);
1936 return -1;
1937 }
1938
1939 return 0;
1940}
1941
1942static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
1943{
1944 const struct webchan_conf_global *cfg = obj;
1945
1947
1948 return 0;
1949}
1950
1951static void *global_alloc(const char *name)
1952{
1954 sizeof(*cfg), NULL);
1955
1956 if (!cfg) {
1957 return NULL;
1958 }
1959
1960 return cfg;
1961}
1962
1963static int global_apply(const struct ast_sorcery *sorcery, void *obj)
1964{
1965 struct webchan_conf_global *cfg = obj;
1966
1967 ast_debug(1, "control_msg_format: %s\n",
1969
1970 return 0;
1971}
1972
1973static int load_config(void)
1974{
1975 ast_debug(2, "Initializing Websocket Client Configuration\n");
1977 if (!sorcery) {
1978 ast_log(LOG_ERROR, "Failed to open sorcery\n");
1979 return -1;
1980 }
1981
1982 ast_sorcery_apply_default(sorcery, "global", "config",
1983 "chan_websocket.conf,criteria=type=global,single_object=yes,explicit_name=global");
1984
1986 ast_log(LOG_ERROR, "Failed to register chan_websocket global object with sorcery\n");
1988 sorcery = NULL;
1989 return -1;
1990 }
1991
1992 ast_sorcery_object_field_register_nodoc(sorcery, "global", "type", "", OPT_NOOP_T, 0, 0);
1993 ast_sorcery_register_cust(global, control_message_format, "plain-text");
1994
1996
1997 return 0;
1998}
1999
2000/*! \brief Function called when our module is unloaded */
2019
2020static int reload_module(void)
2021{
2022 ast_debug(2, "Reloading chan_websocket configuration\n");
2024
2025 return 0;
2026}
2027
2028/*! \brief Function called when our module is loaded */
2029static int load_module(void)
2030{
2031 int res = 0;
2032 struct ast_websocket_protocol *protocol;
2033
2034 res = load_config();
2035 if (res != 0) {
2037 }
2038
2041 }
2042
2045 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
2046 unload_module();
2048 }
2049
2051 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
2052 instance_proxy_sort_fn, instance_proxy_cmp_fn);
2053 if (!instances) {
2055 "Failed to allocate the chan_websocket instance registry\n");
2056 unload_module();
2058 }
2059
2061 if (!ast_ws_server) {
2062 unload_module();
2064 }
2065
2066 protocol = ast_websocket_sub_protocol_alloc("media");
2067 if (!protocol) {
2068 unload_module();
2070 }
2073
2075
2077}
2078
2079AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Websocket Media Channel",
2080 .support_level = AST_MODULE_SUPPORT_CORE,
2081 .load = load_module,
2082 .unload = unload_module,
2084 .load_pri = AST_MODPRI_CHANNEL_DRIVER,
2085 .requires = "res_http_websocket,res_websocket_client",
char digit
enum queue_result id
Definition app_queue.c:1790
#define var
Definition ast_expr2f.c:605
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition astmm.h:288
#define ast_free(a)
Definition astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition astmm.h:298
#define ast_vasprintf(ret, fmt, ap)
A wrapper for vasprintf()
Definition astmm.h:278
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition astmm.h:202
#define ast_log
Definition astobj2.c:42
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
Definition astobj2.h:579
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition astobj2.h:365
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition astobj2.h:2048
#define ao2_cleanup(obj)
Definition astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition astobj2.h:1554
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition astobj2.h:2064
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition astobj2.h:1748
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition astobj2.h:480
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
Definition astobj2.h:550
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition astobj2.h:2032
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition astobj2.h:1063
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition astobj2.h:1303
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition astobj2.h:1211
Internal Asterisk hangup causes.
#define AST_CAUSE_FAILURE
Definition causes.h:150
#define AST_CAUSE_NORMAL
Definition causes.h:151
#define AST_CAUSE_NETWORK_OUT_OF_ORDER
Definition causes.h:121
#define AST_CAUSE_NO_ROUTE_DESTINATION
Definition causes.h:100
static PGresult * result
Definition cel_pgsql.c:84
static const char type[]
#define ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command)
#define FLUSH_MEDIA
static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause, enum ast_websocket_status_code tech_cause, int line, const char *function)
#define QUEUE_LENGTH_XON_LEVEL
#define MARK_MEDIA
#define send_event(_instance, _event,...)
Use this macro to create and send events passing in any event-specific parameters.
static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
#define ANSWER_CHANNEL
static void instance_proxy_cb(void *weakproxy, void *data)
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static void * global_alloc(const char *name)
@ OPT_WS_MEDIA_DIRECTION
@ OPT_WS_CODEC
@ OPT_WS_PASSTHROUGH
@ OPT_WS_URI_PARAM
@ OPT_WS_MSG_FORMAT
@ OPT_WS_NO_AUTO_ANSWER
static const char * control_msg_format_to_str(enum webchan_control_msg_format value)
static int validate_uri_parameters(const char *uri_params)
static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
Function called when we should write a frame to the channel.
static char * _create_event_MEDIA_BUFFERING_COMPLETED(struct websocket_pvt *instance, const char *id)
static const char * msg_format_map[]
static int webchan_hangup(struct ast_channel *ast)
static struct ast_channel_tech websocket_tech
#define HANGUP_CHANNEL
static struct ast_frame * webchan_read(struct ast_channel *ast)
#define websocket_request_hangup(_instance, _cause, _tech)
#define MEDIA_WEBSOCKET_CONNECTION_ID
#define WS_WEBSOCKET_FDNO
static char * _create_event_MEDIA_MARK_PROCESSED(struct websocket_pvt *instance, const char *id)
static int read_from_ws_and_queue(struct websocket_pvt *instance)
webchan_control_msg_format
@ WEBCHAN_CONTROL_MSG_FORMAT_JSON
@ WEBCHAN_CONTROL_MSG_FORMAT_PLAIN
@ WEBCHAN_CONTROL_MSG_FORMAT_INVALID
static int handle_command(struct websocket_pvt *instance, char *buffer)
static int global_control_message_format_from_str(const struct aco_option *opt, struct ast_variable *var, void *obj)
static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
static struct ast_http_uri http_uri
static int reload_module(void)
#define GET_DRIVER_STATUS
static int set_channel_variables(struct websocket_pvt *instance)
webchan_media_direction
@ WEBCHAN_MEDIA_DIRECTION_BOTH
@ WEBCHAN_MEDIA_DIRECTION_OUT
@ WEBCHAN_MEDIA_DIRECTION_IN
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static void websocket_destructor(void *data)
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
#define WS_TIMER_FDNO
static struct ast_sorcery * sorcery
#define ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, direction)
#define START_MEDIA_BUFFERING
static int set_channel_timer(struct websocket_pvt *instance)
static char * _create_event_STATUS(struct websocket_pvt *instance)
static char * _create_event_DTMF_END(struct websocket_pvt *instance, const char digit)
#define QUEUE_LENGTH_XOFF_LEVEL
static const struct ast_app_option websocket_options[128]
#define PAUSE_MEDIA
#define create_event(_instance, _event,...)
Use this macro to create events passing in any event-specific parameters.
static struct ao2_container * instances
static char * _create_event_ERROR(struct websocket_pvt *instance, const char *format,...)
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
#define INCOMING_CONNECTION_ID
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
#define REPORT_QUEUE_DRAINED
#define CONTINUE_MEDIA
#define STOP_MEDIA_BUFFERING
static int load_module(void)
Function called when our module is loaded.
static const char * websocket_media_direction_map[]
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout)
static enum webchan_control_msg_format control_msg_format_from_str(const char *value)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static struct ast_websocket_server * ast_ws_server
#define SET_MEDIA_DIRECTION
static char * _create_event_MEDIA_START(struct websocket_pvt *instance)
static int unload_module(void)
Function called when our module is unloaded.
static int global_control_message_format_to_str(const void *obj, const intptr_t *args, char **buf)
#define MAX_TEXT_MESSAGE_LEN
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
#define QUEUE_LENGTH_MAX
static int websocket_handoff_to_channel(struct websocket_pvt *instance)
static struct ast_channel * webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
static char * _create_event_nodata(struct websocket_pvt *instance, char *event)
static int load_config(void)
static int global_apply(const struct ast_sorcery *sorcery, void *obj)
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
@ OPT_ARG_WS_PASSTHROUGH
@ OPT_ARG_WS_URI_PARAM
@ OPT_ARG_WS_MEDIA_DIRECTION
@ OPT_ARG_WS_MSG_FORMAT
@ OPT_ARG_WS_CODEC
@ OPT_ARG_WS_NO_AUTO_ANSWER
@ OPT_ARG_ARRAY_SIZE
General Asterisk PBX channel definitions.
const char * ast_channel_name(const struct ast_channel *chan)
int ast_channel_tech_hangupcause(const struct ast_channel *chan)
void * ast_channel_tech_pvt(const struct ast_channel *chan)
void ast_channel_tech_hangupcause_set(struct ast_channel *chan, int value)
struct varshead * ast_channel_varshead(struct ast_channel *chan)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition channel.h:1299
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
int ast_channel_fdno(const struct ast_channel *chan)
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
Definition channel.c:571
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
Definition channel.c:1289
struct ast_flags * ast_channel_flags(struct ast_channel *chan)
int ast_queue_frame(struct ast_channel *chan, struct ast_frame *f)
Queue one or more frames to a channel's frame queue.
Definition channel.c:1171
const char * ast_channel_uniqueid(const struct ast_channel *chan)
int ast_queue_hangup_with_cause(struct ast_channel *chan, int cause)
Queue a hangup frame with hangupcause set.
Definition channel.c:1213
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
Definition channel.c:540
#define ast_channel_unref(c)
Decrease channel reference count.
Definition channel.h:3018
const char * ast_cause2str(int cause) attribute_pure
Gives the string form of a given cause code.
Definition channel.c:613
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
Definition channel.c:2417
@ AST_FLAG_DISABLE_DEVSTATE_CACHE
Definition channel.h:1049
void ast_channel_internal_fd_clear(struct ast_channel *chan, int which)
void ast_channel_hangupcause_set(struct ast_channel *chan, int value)
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
Definition channel.h:2983
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
@ AST_STATE_DOWN
Codec API.
@ AST_MEDIA_TYPE_UNKNOWN
Definition codec.h:31
@ OPT_NOOP_T
Type for a default handler that should do nothing.
char buf[BUFSIZE]
Definition eagi_proxy.c:66
struct ast_codec * ast_format_get_codec(const struct ast_format *format)
Get the codec associated with a format.
Definition format.c:324
unsigned int ast_format_get_minimum_bytes(const struct ast_format *format)
Get the minimum number of bytes expected in a frame for this format.
Definition format.c:374
unsigned int ast_format_get_sample_rate(const struct ast_format *format)
Get the sample rate of a media format.
Definition format.c:379
unsigned int ast_format_get_minimum_ms(const struct ast_format *format)
Get the minimum amount of media carried in this format.
Definition format.c:364
enum ast_format_cmp_res ast_format_cmp(const struct ast_format *format1, const struct ast_format *format2)
Compare two formats.
Definition format.c:201
@ AST_FORMAT_CMP_NOT_EQUAL
Definition format.h:38
const char * ast_format_get_name(const struct ast_format *format)
Get the name associated with a format.
Definition format.c:334
unsigned int ast_format_get_default_ms(const struct ast_format *format)
Get the default framing size (in milliseconds) for a format.
Definition format.c:359
Media Format Cache API.
#define ast_format_cache_get(name)
Retrieve a named format from the cache.
int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_type type)
Add all codecs Asterisk knows about for a specific type to the capabilities structure.
Definition format_cap.c:216
struct ast_format * ast_format_cap_get_format(const struct ast_format_cap *cap, int position)
Get the format at a specific index.
Definition format_cap.c:400
@ AST_FORMAT_CAP_FLAG_DEFAULT
Definition format_cap.h:38
#define ast_format_cap_append(cap, format, framing)
Add format capability to capabilities structure.
Definition format_cap.h:99
#define ast_format_cap_alloc(flags)
Allocate a new ast_format_cap structure.
Definition format_cap.h:49
static const char name[]
Definition format_mp3.c:68
direction
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
Definition func_uuid.c:52
ast_http_method
HTTP Request methods known by Asterisk.
Definition http.h:58
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
Definition http.c:771
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition http.c:714
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Definition http.c:739
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
int AST_OPTIONAL_API_NAME() ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
ast_websocket_status_code
Websocket Status Codes from RFC-6455.
@ AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA
@ AST_WEBSOCKET_STATUS_NORMAL
@ AST_WEBSOCKET_STATUS_GOING_AWAY
@ AST_WEBSOCKET_STATUS_INTERNAL_ERROR
int AST_OPTIONAL_API_NAME() ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int AST_OPTIONAL_API_NAME() ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
struct ast_sockaddr *AST_OPTIONAL_API_NAME() ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
int AST_OPTIONAL_API_NAME() ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
int AST_OPTIONAL_API_NAME() ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_PING
@ AST_WEBSOCKET_OPCODE_PONG
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
const char *AST_OPTIONAL_API_NAME() ast_websocket_status_to_str(enum ast_websocket_status_code code)
Convert a websocket status code to a string.
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_CLIENT
@ AST_WS_TYPE_SERVER
struct ast_websocket_protocol *AST_OPTIONAL_API_NAME() ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int AST_OPTIONAL_API_NAME() ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
struct ast_websocket_server *AST_OPTIONAL_API_NAME() ast_websocket_server_create(void)
Creates a ast_websocket_server.
void AST_OPTIONAL_API_NAME() ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
const char *AST_OPTIONAL_API_NAME() ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
Application convenience functions, designed to give consistent look and feel to Asterisk apps.
#define AST_APP_ARG(name)
Define an application argument.
#define END_OPTIONS
#define AST_APP_OPTIONS(holder, options...)
Declares an array of options for an application.
#define AST_APP_OPTION_ARG(option, flagno, argno)
Declares an application option that accepts an argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define BEGIN_OPTIONS
#define AST_APP_OPTION(option, flagno)
Declares an application option that does not accept an argument.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
Definition main/app.c:3067
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition extconf.c:1260
Asterisk internal frame definitions.
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
Definition main/frame.c:176
#define ast_frfree(fr)
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
@ AST_FRAME_CONTROL
@ AST_CONTROL_ANSWER
@ AST_CONTROL_OPTION
struct ast_frame ast_null_frame
Definition main/frame.c:79
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
Asterisk JSON abstraction layer.
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition json.h:600
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition json.c:73
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition json.c:612
@ AST_JSON_COMPACT
Definition json.h:793
struct ast_json * ast_json_load_buf(const char *buffer, size_t buflen, struct ast_json_error *error)
Parse buffer with known length into a JSON object or array.
Definition json.c:585
struct ast_json * ast_json_channel_vars(struct varshead *channelvars)
Construct a JSON object from a ast_var_t list.
Definition json.c:941
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition json.c:484
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
#define AST_LIST_LOCK(head)
Locks a list.
Definition linkedlists.h:40
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
#define AST_LIST_HEAD(name, type)
Defines a structure to be used to hold a list of specified type.
Asterisk locking-related definitions:
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
Definition lock.h:621
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Definition lock.h:590
int errno
Header for providers of file and format handling routines. Clients of these routines should include "...
Asterisk module definitions.
@ AST_MODFLAG_LOAD_ORDER
Definition module.h:331
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition module.h:557
@ AST_MODPRI_CHANNEL_DRIVER
Definition module.h:341
@ AST_MODULE_SUPPORT_CORE
Definition module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition module.h:78
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition netsock2.h:256
Core PBX routines and definitions.
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.
static int reload(void)
const char * method
Definition res_pjsip.c:1273
static struct @521 args
#define NULL
Definition resample.c:96
Sorcery Data Access Layer API.
#define ast_sorcery_unref(sorcery)
Decrease the reference count of a sorcery structure.
Definition sorcery.h:1500
#define ast_sorcery_object_field_register_nodoc(sorcery, type, name, default_val, opt_type, flags,...)
Register a field within an object without documentation.
Definition sorcery.h:987
#define ast_sorcery_register_cust(object, option, def_value)
Register a custom field within an object.
Definition sorcery.h:1767
void ast_sorcery_load(const struct ast_sorcery *sorcery)
Inform any wizards to load persistent objects.
Definition sorcery.c:1441
void * ast_sorcery_retrieve_by_id(const struct ast_sorcery *sorcery, const char *type, const char *id)
Retrieve an object using its unique identifier.
Definition sorcery.c:1917
#define ast_sorcery_object_register(sorcery, type, alloc, transform, apply)
Register an object type.
Definition sorcery.h:837
void ast_sorcery_reload(const struct ast_sorcery *sorcery)
Inform any wizards to reload persistent objects.
Definition sorcery.c:1472
void * ast_sorcery_generic_alloc(size_t size, ao2_destructor_fn destructor)
Allocate a generic sorcery capable object.
Definition sorcery.c:1792
#define ast_sorcery_apply_default(sorcery, type, name, data)
Definition sorcery.h:476
#define ast_sorcery_open()
Open a new sorcery structure.
Definition sorcery.h:406
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition strings.h:80
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
Definition strings.h:87
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition strings.h:65
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Definition strings.h:223
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition utils.c:1869
Generic container type.
Structure to pass both assignedid values to channel drivers.
Definition channel.h:606
const char * uniqueid
Definition channel.h:607
Structure to describe a channel "technology", ie a channel driver See for examples:
Definition channel.h:648
struct ast_format_cap * capabilities
Definition channel.h:652
const char *const type
Definition channel.h:649
Main Channel structure associated with a channel.
const char * data
Represents a media codec within Asterisk.
Definition codec.h:42
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
Definition codec.h:58
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
Definition codec.h:68
Structure used to handle boolean flags.
Definition utils.h:220
Format capabilities structure, holds formats + preference order + etc.
Definition format_cap.c:54
Definition of a media format.
Definition format.c:43
struct ast_format * format
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@235 data
Definition of a URI handler.
Definition http.h:102
ast_http_callback callback
Definition http.h:107
void * data
Definition http.h:116
JSON parsing error information.
Definition json.h:887
Abstract JSON element (object, array, string, int, ...).
Socket address structure.
Definition netsock2.h:97
Full structure for sorcery.
Definition sorcery.c:231
describes a server instance
Definition tcptls.h:151
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
Structure for a WebSocket server.
Structure definition for session.
char connection_id[0]
The name of the module owning this sorcery instance.
enum webchan_control_msg_format control_msg_format
char connection_id[0]
struct ast_format * native_format
struct ast_channel * channel
enum webchan_control_msg_format control_msg_format
struct websocket_pvt::@141 frame_queue
enum ast_websocket_type type
struct ast_timer * timer
pthread_t outbound_read_thread
struct ast_codec * native_codec
struct ast_websocket_client * client
struct ast_websocket * websocket
int value
Definition syslog.c:37
static struct aco_type global
static struct test_options options
Timing source management.
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
Definition timing.c:154
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
Definition timing.c:171
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
Definition timing.c:166
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
Definition timing.c:186
struct ast_timer * ast_timer_open(void)
Open a timer.
Definition timing.c:122
@ AST_TIMING_EVENT_EXPIRED
Definition timing.h:58
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Definition timing.c:161
Support for translation of data formats. translate.c.
#define ast_test_flag(p, flag)
Definition utils.h:64
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition utils.h:981
#define MIN(a, b)
Definition utils.h:252
#define ARRAY_IN_BOUNDS(v, a)
Checks to see if value is within the bounds of the given array.
Definition utils.h:727
#define ast_set_flag(p, flag)
Definition utils.h:71
int ast_uri_verify_encoded(const char *string)
Verify if a string is valid as a URI component.
Definition utils.c:779
Universally unique identifier support.
#define AST_UUID_STR_LEN
Definition uuid.h:27
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
Definition uuid.c:141
void ast_websocket_client_add_uri_params(struct ast_websocket_client *wc, const char *uri_params)
Add additional parameters to the URI.
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.