Asterisk - The Open Source Telephony Project GIT-master-ff80666
chan_websocket.c
Go to the documentation of this file.
1/*
2 * Asterisk -- An open source telephony toolkit.
3 *
4 * Copyright (C) 2025, Sangoma Technologies Corporation
5 *
6 * George Joseph <gjoseph@sangoma.com>
7 *
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
13 *
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
17 */
18
19/*! \file
20 *
21 * \author George Joseph <gjoseph@sangoma.com>
22 *
23 * \brief Websocket Media Channel
24 *
25 * \ingroup channel_drivers
26 */
27
28/*** MODULEINFO
29 <depend>res_http_websocket</depend>
30 <depend>res_websocket_client</depend>
31 <support_level>core</support_level>
32 ***/
33
34/*** DOCUMENTATION
35 <info name="Dial_Resource" language="en_US" tech="WebSocket">
36 <para>WebSocket Dial Strings:</para>
37 <para><literal>Dial(WebSocket/connectionid[/websocket_options])</literal></para>
38 <para>WebSocket Parameters:</para>
39 <enumlist>
40 <enum name="connectionid">
41 <para>For outgoing WebSockets, this is the ID of the connection
42 in websocket_client.conf to use for the call. To accept incoming
43 WebSocket connections use the literal <literal>INCOMING</literal></para>
44 </enum>
45 <enum name="websocket_options">
46 <para>Options to control how the WebSocket channel behaves.</para>
47 <enumlist>
48 <enum name="c(codec) - Specify the codec to use in the channel">
49 <para></para>
50 <para> If not specified, the first codec from the caller's channel will be used.
51 </para>
52 </enum>
53 <enum name="n - Don't auto answer">
54 <para>Normally, the WebSocket channel will be answered when
55 connection is established with the remote app. If this
56 option is specified however, the channel will not be
57 answered until the <literal>ANSWER</literal> command is
58 received from the remote app or the remote app calls the
59 /channels/answer ARI endpoint.
60 </para>
61 </enum>
62 <enum name="p - Passthrough mode">
63 <para>In passthrough mode, the channel driver won't attempt
64 to re-frame or re-time media coming in over the websocket from
65 the remote app. This can be used for any codec but MUST be used
66 for codecs that use packet headers or whose data stream can't be
67 broken up on arbitrary byte boundaries. In this case, the remote
68 app is fully responsible for correctly framing and timing media
69 sent to Asterisk and the MEDIA text commands that could be sent
70 over the websocket are disabled. Currently, passthrough mode is
71 automatically set for the opus, speex and g729 codecs.
72 </para>
73 </enum>
74 <enum name="v(uri_parameters) - Add parameters to the outbound URI">
75 <para>This option allows you to add additional parameters to the
76 outbound URI. The format is:
77 <literal>v(param1=value1,param2=value2...)</literal>
78 </para>
79 <para>You must ensure that no parameter name or value contains
80 characters not valid in a URL. The easiest way to do this is to
81 use the URIENCODE() dialplan function to encode them. Be aware
82 though that each name and value must be encoded separately. You
83 can't simply encode the whole string.</para>
84 </enum>
85 </enumlist>
86 </enum>
87 </enumlist>
88 <para>Examples:
89 </para>
90 <example title="Make an outbound WebSocket connection using connection 'connection1' and the 'sln16' codec.">
91 same => n,Dial(WebSocket/connection1/c(sln16))
92 </example>
93 <example title="Make an outbound WebSocket connection using connection 'connection1' and the 'opus' codec. Passthrough mode will automatically be set.">
94 same => n,Dial(WebSocket/connection1/c(opus))
95 </example>
96 <example title="Listen for an incoming WebSocket connection and don't auto-answer it.">
97 same => n,Dial(WebSocket/INCOMING/n)
98 </example>
99 <example title="Add URI parameters.">
100 same => n,Dial(WebSocket/connection1/v(${URIENCODE(vari able)}=${URIENCODE(${CHANNEL})},variable2=$(URIENCODE(${EXTEN})}))
101 </example>
102 </info>
103***/
104#include "asterisk.h"
105
106#include "asterisk/app.h"
107#include "asterisk/causes.h"
108#include "asterisk/channel.h"
109#include "asterisk/codec.h"
112#include "asterisk/frame.h"
113#include "asterisk/lock.h"
114#include "asterisk/mod_format.h"
115#include "asterisk/module.h"
116#include "asterisk/pbx.h"
117#include "asterisk/uuid.h"
118#include "asterisk/timing.h"
119#include "asterisk/translate.h"
121
123
124static struct ao2_container *instances = NULL;
125
153};
154
155#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
156#define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID"
157#define INCOMING_CONNECTION_ID "INCOMING"
158
159#define ANSWER_CHANNEL "ANSWER"
160#define HANGUP_CHANNEL "HANGUP"
161#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
162#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
163#define FLUSH_MEDIA "FLUSH_MEDIA"
164#define GET_DRIVER_STATUS "GET_STATUS"
165#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
166#define PAUSE_MEDIA "PAUSE_MEDIA"
167#define CONTINUE_MEDIA "CONTINUE_MEDIA"
168
169#define MEDIA_START "MEDIA_START"
170#define MEDIA_XON "MEDIA_XON"
171#define MEDIA_XOFF "MEDIA_XOFF"
172#define QUEUE_DRAINED "QUEUE_DRAINED"
173#define DRIVER_STATUS "STATUS"
174#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED"
175#define DTMF_END "DTMF_END"
176
177#define QUEUE_LENGTH_MAX 1000
178#define QUEUE_LENGTH_XOFF_LEVEL 900
179#define QUEUE_LENGTH_XON_LEVEL 800
180#define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
181
182/* Forward declarations */
183static struct ast_channel *webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause);
184static int webchan_call(struct ast_channel *ast, const char *dest, int timeout);
185static struct ast_frame *webchan_read(struct ast_channel *ast);
186static int webchan_write(struct ast_channel *ast, struct ast_frame *f);
187static int webchan_hangup(struct ast_channel *ast);
188static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration);
189
191 .type = "WebSocket",
192 .description = "Media over WebSocket Channel Driver",
193 .requester = webchan_request,
194 .call = webchan_call,
195 .read = webchan_read,
196 .write = webchan_write,
197 .hangup = webchan_hangup,
198 .send_digit_end = webchan_send_dtmf_text,
199};
200
201static void set_channel_format(struct websocket_pvt * instance,
202 struct ast_format *fmt)
203{
208 ast_debug(4, "Switching readformat to %s\n", ast_format_get_name(fmt));
209 }
210}
211
212/*
213 * Reminder... This function gets called by webchan_read which is
214 * triggered by the channel timer firing. It always gets called
215 * every 20ms (or whatever the timer is set to) even if there are
216 * no frames in the queue.
217 */
218static struct ast_frame *dequeue_frame(struct websocket_pvt *instance)
219{
220 struct ast_frame *queued_frame = NULL;
221 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
223
224 /*
225 * If the queue is paused, don't read a frame. Processing
226 * will continue down the function and a silence frame will
227 * be sent in its place.
228 */
229 if (instance->queue_paused) {
230 return NULL;
231 }
232
233 /*
234 * We need to check if we need to send an XON before anything
235 * else because there are multiple escape paths in this function
236 * and we don't want to accidentally keep the queue in a "full"
237 * state.
238 */
239 if (instance->queue_full && instance->frame_queue_length < QUEUE_LENGTH_XON_LEVEL) {
240 instance->queue_full = 0;
241 ast_debug(4, "%s: WebSocket sending MEDIA_XON\n",
242 ast_channel_name(instance->channel));
244 }
245
246 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
247
248 /*
249 * If there are no frames in the queue, we need to
250 * return NULL so we can send a silence frame. We also need
251 * to send the QUEUE_DRAINED notification if we were requested
252 * to do so.
253 */
254 if (!queued_frame) {
255 if (instance->report_queue_drained) {
256 instance->report_queue_drained = 0;
257 ast_debug(4, "%s: WebSocket sending QUEUE_DRAINED\n",
258 ast_channel_name(instance->channel));
260 }
261 return NULL;
262 }
263
264 /*
265 * The only way a control frame could be present here is as
266 * a result of us calling queue_option_frame() in response
267 * to an incoming TEXT command from the websocket.
268 * We'll be safe and make sure it's a AST_CONTROL_OPTION
269 * frame anyway.
270 *
271 * It's quite possible that there are multiple control frames
272 * in a row in the queue so we need to process consecutive ones
273 * immediately.
274 *
275 * In any case, processing a control frame MUST not use up
276 * a media timeslot so after all control frames have been
277 * processed, we need to read an audio frame and process it.
278 */
279 while (queued_frame && queued_frame->frametype == AST_FRAME_CONTROL) {
280 if (queued_frame->subclass.integer == AST_CONTROL_OPTION) {
281 /*
282 * We just need to send the data to the websocket.
283 * The data should already be NULL terminated.
284 */
286 queued_frame->data.ptr);
287 ast_debug(4, "%s: WebSocket sending %s\n",
288 ast_channel_name(instance->channel), (char *)queued_frame->data.ptr);
289 }
290 /*
291 * We do NOT send these to the core so we need to free
292 * the frame and grab the next one. If it's also a
293 * control frame, we need to process it otherwise
294 * continue down in the function.
295 */
296 ast_frame_free(queued_frame, 0);
297 queued_frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list);
298 /*
299 * Jut FYI... We didn't bump the queue length when we added the control
300 * frames so we don't need to decrement it here.
301 */
302 }
303
304 /*
305 * If, after reading all control frames, there are no frames
306 * left in the queue, we need to return NULL so we can send
307 * a silence frame.
308 */
309 if (!queued_frame) {
310 return NULL;
311 }
312
313 instance->frame_queue_length--;
314
315 return queued_frame;
316}
317/*!
318 * \internal
319 *
320 * Called by the core channel thread each time the instance timer fires.
321 *
322 */
323static struct ast_frame *webchan_read(struct ast_channel *ast)
324{
325 struct websocket_pvt *instance = NULL;
326 struct ast_frame *native_frame = NULL;
327 struct ast_frame *slin_frame = NULL;
328
329 instance = ast_channel_tech_pvt(ast);
330 if (!instance) {
331 return NULL;
332 }
333
335 ast_timer_ack(instance->timer, 1);
336 }
337
338 native_frame = dequeue_frame(instance);
339
340 /*
341 * No frame when the timer fires means we have to create and
342 * return a silence frame in its place.
343 */
344 if (!native_frame) {
345 ast_debug(5, "%s: WebSocket read timer fired with no frame available. Returning silence.\n", ast_channel_name(ast));
346 set_channel_format(instance, instance->slin_format);
347 slin_frame = ast_frdup(&instance->silence);
348 return slin_frame;
349 }
350
351 /*
352 * If we're in passthrough mode or the frame length is already optimal_frame_size,
353 * we can just return it.
354 */
355 if (instance->passthrough || native_frame->datalen == instance->optimal_frame_size) {
356 set_channel_format(instance, instance->native_format);
357 return native_frame;
358 }
359
360 /*
361 * If we're here, we have a short frame that we need to pad
362 * with silence.
363 */
364
365 if (instance->translator) {
366 slin_frame = ast_translate(instance->translator, native_frame, 0);
367 if (!slin_frame) {
368 ast_log(LOG_WARNING, "%s: Failed to translate %d byte frame\n",
369 ast_channel_name(ast), native_frame->datalen);
370 return NULL;
371 }
372 ast_frame_free(native_frame, 0);
373 } else {
374 /*
375 * If there was no translator then the native format
376 * was already slin.
377 */
378 slin_frame = native_frame;
379 }
380
381 set_channel_format(instance, instance->slin_format);
382
383 /*
384 * So now we have an slin frame but it's probably still short
385 * so we create a new data buffer with the correct length
386 * which is filled with zeros courtesy of ast_calloc.
387 * We then copy the short frame data into the new buffer
388 * and set the offset to AST_FRIENDLY_OFFSET so that
389 * the core can read the data without any issues.
390 * If the original frame data was mallocd, we need to free the old
391 * data buffer so we don't leak memory and we need to set
392 * mallocd to AST_MALLOCD_DATA so that the core knows
393 * it needs to free the new data buffer when it's done.
394 */
395
396 if (slin_frame->datalen != instance->silence.datalen) {
397 char *old_data = slin_frame->data.ptr;
398 int old_len = slin_frame->datalen;
399 int old_offset = slin_frame->offset;
400 ast_debug(4, "%s: WebSocket read short frame. Expected %d got %d. Filling with silence\n",
401 ast_channel_name(ast), instance->silence.datalen,
402 slin_frame->datalen);
403
404 slin_frame->data.ptr = ast_calloc(1, instance->silence.datalen + AST_FRIENDLY_OFFSET);
405 if (!slin_frame->data.ptr) {
406 ast_frame_free(slin_frame, 0);
407 return NULL;
408 }
409 slin_frame->data.ptr += AST_FRIENDLY_OFFSET;
410 slin_frame->offset = AST_FRIENDLY_OFFSET;
411 memcpy(slin_frame->data.ptr, old_data, old_len);
412 if (slin_frame->mallocd & AST_MALLOCD_DATA) {
413 ast_free(old_data - old_offset);
414 }
415 slin_frame->mallocd |= AST_MALLOCD_DATA;
416 slin_frame->datalen = instance->silence.datalen;
417 slin_frame->samples = instance->silence.samples;
418 }
419
420 return slin_frame;
421}
422
423static int queue_frame_from_buffer(struct websocket_pvt *instance,
424 char *buffer, size_t len)
425{
426 struct ast_frame fr = { 0, };
427 struct ast_frame *duped_frame = NULL;
428
429 AST_FRAME_SET_BUFFER(&fr, buffer, 0, len);
431 fr.subclass.format = instance->native_format;
432 fr.samples = instance->native_codec->samples_count(&fr);
433
434 duped_frame = ast_frisolate(&fr);
435 if (!duped_frame) {
436 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
437 ast_channel_name(instance->channel));
438 return -1;
439 }
440
441 {
442 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
444 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
445 instance->frame_queue_length++;
446 if (!instance->queue_full && instance->frame_queue_length >= QUEUE_LENGTH_XOFF_LEVEL) {
447 instance->queue_full = 1;
448 ast_debug(4, "%s: WebSocket sending %s\n",
451 }
452 }
453
454 ast_debug(5, "%s: Queued %d byte frame\n", ast_channel_name(instance->channel),
455 duped_frame->datalen);
456
457 return 0;
458}
459
460static int queue_option_frame(struct websocket_pvt *instance,
461 char *buffer)
462{
463 struct ast_frame fr = { 0, };
464 struct ast_frame *duped_frame = NULL;
465
466 AST_FRAME_SET_BUFFER(&fr, buffer, 0, strlen(buffer) + 1);
469
470 duped_frame = ast_frisolate(&fr);
471 if (!duped_frame) {
472 ast_log(LOG_WARNING, "%s: Failed to isolate frame\n",
473 ast_channel_name(instance->channel));
474 return -1;
475 }
476
477 AST_LIST_LOCK(&instance->frame_queue);
478 AST_LIST_INSERT_TAIL(&instance->frame_queue, duped_frame, frame_list);
479 AST_LIST_UNLOCK(&instance->frame_queue);
480
481 ast_debug(4, "%s: Queued '%s' option frame\n",
482 ast_channel_name(instance->channel), buffer);
483
484 return 0;
485}
486
487static int process_text_message(struct websocket_pvt *instance,
488 char *payload, uint64_t payload_len)
489{
490 int res = 0;
491 char *command;
492
493 if (payload_len > MAX_TEXT_MESSAGE_LEN) {
494 ast_log(LOG_WARNING, "%s: WebSocket TEXT message of length %d exceeds maximum length of %d\n",
495 ast_channel_name(instance->channel), (int)payload_len, MAX_TEXT_MESSAGE_LEN);
496 return 0;
497 }
498
499 /*
500 * Unfortunately, payload is not NULL terminated even when it's
501 * a TEXT frame so we need to allocate a new buffer, copy
502 * the data into it, and NULL terminate it.
503 */
504 command = ast_alloca(payload_len + 1);
505 memcpy(command, payload, payload_len); /* Safe */
506 command[payload_len] = '\0';
507 command = ast_strip(command);
508
509 ast_debug(4, "%s: WebSocket %s command received\n",
510 ast_channel_name(instance->channel), command);
511
512 if (ast_strings_equal(command, ANSWER_CHANNEL)) {
514
515 } else if (ast_strings_equal(command, HANGUP_CHANNEL)) {
517
518 } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
519 if (instance->passthrough) {
520 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
521 ast_channel_name(instance->channel), command);
522 return 0;
523 }
524 AST_LIST_LOCK(&instance->frame_queue);
525 instance->bulk_media_in_progress = 1;
526 AST_LIST_UNLOCK(&instance->frame_queue);
527
528 } else if (ast_begins_with(command, STOP_MEDIA_BUFFERING)) {
529 char *id;
530 char *option;
531 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
533
534 id = ast_strip(command + strlen(STOP_MEDIA_BUFFERING));
535
536 if (instance->passthrough) {
537 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
538 ast_channel_name(instance->channel), command);
539 return 0;
540 }
541
542 ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n",
544 (int)instance->leftover_len);
545
546 instance->bulk_media_in_progress = 0;
547 if (instance->leftover_len > 0) {
548 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->leftover_len);
549 if (res != 0) {
550 return res;
551 }
552 }
553 instance->leftover_len = 0;
554 res = ast_asprintf(&option, "%s%s%s", MEDIA_BUFFERING_COMPLETED,
555 S_COR(!ast_strlen_zero(id), " ", ""), S_OR(id, ""));
556 if (res <= 0 || !option) {
557 return res;
558 }
559 res = queue_option_frame(instance, option);
560 ast_free(option);
561
562 } else if (ast_strings_equal(command, FLUSH_MEDIA)) {
563 struct ast_frame *frame = NULL;
564
565 if (instance->passthrough) {
566 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
567 ast_channel_name(instance->channel), command);
568 return 0;
569 }
570
571 AST_LIST_LOCK(&instance->frame_queue);
572 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
573 ast_frfree(frame);
574 }
575 instance->frame_queue_length = 0;
576 instance->bulk_media_in_progress = 0;
577 instance->leftover_len = 0;
578 AST_LIST_UNLOCK(&instance->frame_queue);
579
580 } else if (ast_strings_equal(payload, REPORT_QUEUE_DRAINED)) {
581 if (instance->passthrough) {
582 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
583 ast_channel_name(instance->channel), command);
584 return 0;
585 }
586
587 AST_LIST_LOCK(&instance->frame_queue);
588 instance->report_queue_drained = 1;
589 AST_LIST_UNLOCK(&instance->frame_queue);
590
591 } else if (ast_strings_equal(command, GET_DRIVER_STATUS)) {
592 char *status = NULL;
593
594 res = ast_asprintf(&status, "%s queue_length:%d xon_level:%d xoff_level:%d queue_full:%s bulk_media:%s media_paused:%s",
598 S_COR(instance->queue_full, "true", "false"),
599 S_COR(instance->bulk_media_in_progress, "true", "false"),
600 S_COR(instance->queue_paused, "true", "false")
601 );
602 if (res <= 0 || !status) {
604 res = -1;
605 } else {
606 ast_debug(4, "%s: WebSocket status: %s\n",
607 ast_channel_name(instance->channel), status);
610 }
611
612 } else if (ast_strings_equal(payload, PAUSE_MEDIA)) {
613 if (instance->passthrough) {
614 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
615 ast_channel_name(instance->channel), command);
616 return 0;
617 }
618 AST_LIST_LOCK(&instance->frame_queue);
619 instance->queue_paused = 1;
620 AST_LIST_UNLOCK(&instance->frame_queue);
621
622 } else if (ast_strings_equal(payload, CONTINUE_MEDIA)) {
623 if (instance->passthrough) {
624 ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
625 ast_channel_name(instance->channel), command);
626 return 0;
627 }
628 AST_LIST_LOCK(&instance->frame_queue);
629 instance->queue_paused = 0;
630 AST_LIST_UNLOCK(&instance->frame_queue);
631
632 } else {
633 ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n",
634 ast_channel_name(instance->channel), command);
635 }
636
637 return res;
638}
639
640static int process_binary_message(struct websocket_pvt *instance,
641 char *payload, uint64_t payload_len)
642{
643 char *next_frame_ptr = NULL;
644 size_t bytes_read = 0;
645 int res = 0;
646 size_t bytes_left = 0;
647
648 {
649 SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
651 if (instance->frame_queue_length >= QUEUE_LENGTH_MAX) {
652 ast_debug(4, "%s: WebSocket queue is full. Ignoring incoming binary message.\n",
653 ast_channel_name(instance->channel));
654 return 0;
655 }
656 }
657
658 next_frame_ptr = payload;
659 instance->bytes_read += payload_len;
660
661 if (instance->passthrough) {
662 res = queue_frame_from_buffer(instance, payload, payload_len);
663 return res;
664 }
665
666 if (instance->bulk_media_in_progress && instance->leftover_len > 0) {
667 /*
668 * We have leftover data from a previous websocket message.
669 * Try to make a complete frame by appending data from
670 * the current message to the leftover data.
671 */
672 char *append_ptr = instance->leftover_data + instance->leftover_len;
673 size_t bytes_needed_for_frame = instance->optimal_frame_size - instance->leftover_len;
674 /*
675 * It's possible that even the current message doesn't have enough
676 * data to make a complete frame.
677 */
678 size_t bytes_avail_to_copy = MIN(bytes_needed_for_frame, payload_len);
679
680 /*
681 * Append whatever we can to the end of the leftover data
682 * even if it's not enough to make a complete frame.
683 */
684 memcpy(append_ptr, payload, bytes_avail_to_copy);
685
686 /*
687 * If leftover data is still short, just return and wait for the
688 * next websocket message.
689 */
690 if (bytes_avail_to_copy < bytes_needed_for_frame) {
691 ast_debug(4, "%s: Leftover data %d bytes but only %d new bytes available of %d needed. Appending and waiting for next message.\n",
692 ast_channel_name(instance->channel), (int)instance->leftover_len, (int)bytes_avail_to_copy, (int)bytes_needed_for_frame);
693 instance->leftover_len += bytes_avail_to_copy;
694 return 0;
695 }
696
697 res = queue_frame_from_buffer(instance, instance->leftover_data, instance->optimal_frame_size);
698 if (res < 0) {
699 return -1;
700 }
701
702 /*
703 * We stole data from the current payload so decrement payload_len
704 * and set the next frame pointer after the data in payload
705 * we just copied.
706 */
707 payload_len -= bytes_avail_to_copy;
708 next_frame_ptr = payload + bytes_avail_to_copy;
709
710 ast_debug(5, "%s: --- BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d NPL: %4d BAC: %3d\n",
711 ast_channel_name(instance->channel),
712 instance->frame_queue_length,
713 (int)instance->bytes_read,
714 (int)(payload_len + bytes_avail_to_copy),
715 (int)instance->leftover_len,
716 payload,
717 next_frame_ptr,
718 (int)(next_frame_ptr - payload),
719 (int)payload_len,
720 (int)bytes_avail_to_copy
721 );
722
723
724 instance->leftover_len = 0;
725 }
726
727 if (!instance->bulk_media_in_progress && instance->leftover_len > 0) {
728 instance->leftover_len = 0;
729 }
730
731 bytes_left = payload_len;
732 while (bytes_read < payload_len && bytes_left >= instance->optimal_frame_size) {
733 res = queue_frame_from_buffer(instance, next_frame_ptr,
734 instance->optimal_frame_size);
735 if (res < 0) {
736 break;
737 }
738 bytes_read += instance->optimal_frame_size;
739 next_frame_ptr += instance->optimal_frame_size;
740 bytes_left -= instance->optimal_frame_size;
741 }
742
743 if (instance->bulk_media_in_progress && bytes_left > 0) {
744 /*
745 * We have a partial frame. Save the leftover data.
746 */
747 ast_debug(5, "%s: +++ BR: %4d FQ: %4d PL: %4d LOL: %3d P: %p NFP: %p OFF: %4d BL: %4d\n",
748 ast_channel_name(instance->channel),
749 (int)instance->bytes_read,
750 instance->frame_queue_length,
751 (int)payload_len,
752 (int)instance->leftover_len,
753 payload,
754 next_frame_ptr,
755 (int)(next_frame_ptr - payload),
756 (int)bytes_left
757 );
758 memcpy(instance->leftover_data, next_frame_ptr, bytes_left);
759 instance->leftover_len = bytes_left;
760 }
761
762 return 0;
763}
764
765static int read_from_ws_and_queue(struct websocket_pvt *instance)
766{
767 uint64_t payload_len = 0;
768 char *payload = NULL;
769 enum ast_websocket_opcode opcode;
770 int fragmented = 0;
771 int res = 0;
772
773 if (!instance || !instance->websocket) {
774 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
775 ast_channel_name(instance->channel));
776 return -1;
777 }
778
779 ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel));
780 res = ast_wait_for_input(
781 ast_websocket_fd(instance->websocket), -1);
782 if (res <= 0) {
783 ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n",
784 ast_channel_name(instance->channel), strerror(errno));
785 return -1;
786 }
787
788 /*
789 * We need to lock here to prevent the websocket handle from
790 * being pulled out from under us if the core sends us a
791 * hangup request.
792 */
793 ao2_lock(instance);
794 if (!instance->websocket) {
795 ao2_unlock(instance);
796 return -1;
797 }
798
799 res = ast_websocket_read(instance->websocket, &payload, &payload_len,
800 &opcode, &fragmented);
801 ao2_unlock(instance);
802 if (res) {
803 return -1;
804 }
805 ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel),
806 (int)payload_len);
807
808 if (opcode == AST_WEBSOCKET_OPCODE_TEXT) {
809 return process_text_message(instance, payload, payload_len);
810 }
811
812 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
813 ast_debug(5, "%s: WebSocket closed by remote\n",
814 ast_channel_name(instance->channel));
815 return -1;
816 }
817
818 if (opcode != AST_WEBSOCKET_OPCODE_BINARY) {
819 ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n",
820 ast_channel_name(instance->channel), (int)opcode);
821 return 0;
822 }
823
824 return process_binary_message(instance, payload, payload_len);
825}
826
827/*!
828 * \internal
829 *
830 * For incoming websocket connections, this function gets called by
831 * incoming_ws_established_cb() and is run in the http server thread
832 * handling the websocket connection.
833 *
834 * For outgoing websocket connections, this function gets started as
835 * a background thread by webchan_call().
836 */
837static void *read_thread_handler(void *obj)
838{
839 RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup);
840 RAII_VAR(char *, command, NULL, ast_free);
841 int res = 0;
842
843 ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel));
844
845 /*
846 * We need to tell the remote app what channel this media is for.
847 * This is especially important for outbound connections otherwise
848 * the app won't know who the media is for.
849 */
850 res = ast_asprintf(&command, "%s connection_id:%s channel:%s format:%s optimal_frame_size:%d ptime:%d", MEDIA_START,
851 instance->connection_id, ast_channel_name(instance->channel),
852 ast_format_get_name(instance->native_format),
853 instance->optimal_frame_size, instance->native_codec->default_ms);
854 if (res <= 0 || !command) {
855 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
856 ast_log(LOG_ERROR, "%s: Failed to create MEDIA_START\n", ast_channel_name(instance->channel));
857 return NULL;
858 }
859 res = ast_websocket_write_string(instance->websocket, command);
860 if (res != 0) {
861 ast_log(LOG_ERROR, "%s: Failed to send MEDIA_START\n", ast_channel_name(instance->channel));
862 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
863 return NULL;
864 }
865 ast_debug(3, "%s: Sent %s\n", ast_channel_name(instance->channel),
866 command);
867
868 if (!instance->no_auto_answer) {
869 ast_debug(3, "%s: ANSWER by auto_answer\n", ast_channel_name(instance->channel));
870 ast_queue_control(instance->channel, AST_CONTROL_ANSWER);
871 }
872
873 while (read_from_ws_and_queue(instance) == 0)
874 {
875 }
876
877 /*
878 * websocket_hangup will take care of closing the websocket if needed.
879 */
880 ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel));
881 ast_queue_control(instance->channel, AST_CONTROL_HANGUP);
882
883 return NULL;
884}
885
886/*! \brief Function called when we should write a frame to the channel */
887static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
888{
889 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
890
891 if (!instance || !instance->websocket) {
892 ast_log(LOG_WARNING, "%s: WebSocket instance or client not found\n",
893 ast_channel_name(ast));
894 return -1;
895 }
896
897 if (f->frametype != AST_FRAME_VOICE) {
898 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports AST_FRAME_VOICE frames\n",
899 ast_channel_name(ast));
900 return -1;
901 }
902
904 ast_log(LOG_WARNING, "%s: This WebSocket channel only supports the '%s' format, not '%s'\n",
907 return -1;
908 }
909
911 (char *)f->data.ptr, (uint64_t)f->datalen);
912}
913
914/*!
915 * \internal
916 *
917 * Called by the core to actually call the remote.
918 */
919static int webchan_call(struct ast_channel *ast, const char *dest,
920 int timeout)
921{
922 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
923 int nodelay = 1;
925
926 if (!instance) {
927 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n",
928 ast_channel_name(ast));
929 return -1;
930 }
931
932 if (instance->type == AST_WS_TYPE_SERVER) {
933 ast_debug(3, "%s: Websocket call incoming\n", ast_channel_name(instance->channel));
934 return 0;
935 }
936 ast_debug(3, "%s: Websocket call outgoing\n", ast_channel_name(instance->channel));
937
938 if (!instance->client) {
939 ast_log(LOG_WARNING, "%s: WebSocket client not found\n",
940 ast_channel_name(ast));
941 return -1;
942 }
943
944 ast_debug(3, "%s: WebSocket call requested to %s. cid: %s\n",
945 ast_channel_name(ast), dest, instance->connection_id);
946
947 if (!ast_strlen_zero(instance->uri_params)) {
949 }
950
951 instance->websocket = ast_websocket_client_connect(instance->client,
952 instance, ast_channel_name(ast), &result);
953 if (!instance->websocket || result != WS_OK) {
954 ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n",
956 return -1;
957 }
958
959 if (setsockopt(ast_websocket_fd(instance->websocket),
960 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
961 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno));
962 }
963
964 ast_debug(3, "%s: WebSocket connection to %s established\n",
965 ast_channel_name(ast), dest);
966
967 /* read_thread_handler() will clean up the bump */
969 read_thread_handler, ao2_bump(instance))) {
970 ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast));
971 ao2_cleanup(instance);
972 return -1;
973 }
974
975 return 0;
976}
977
978static void websocket_destructor(void *data)
979{
980 struct websocket_pvt *instance = data;
981 struct ast_frame *frame = NULL;
982 ast_debug(3, "%s: WebSocket instance freed\n", instance->connection_id);
983
984 AST_LIST_LOCK(&instance->frame_queue);
985 while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) {
986 ast_frfree(frame);
987 }
988 AST_LIST_UNLOCK(&instance->frame_queue);
989
990 if (instance->timer) {
991 ast_timer_close(instance->timer);
992 instance->timer = NULL;
993 }
994
995 if (instance->channel) {
996 ast_channel_unref(instance->channel);
997 instance->channel = NULL;
998 }
999 if (instance->websocket) {
1000 ast_websocket_unref(instance->websocket);
1001 instance->websocket = NULL;
1002 }
1003
1004 ao2_cleanup(instance->client);
1005 instance->client = NULL;
1006
1007 ao2_cleanup(instance->native_codec);
1008 instance->native_codec = NULL;
1009
1010 ao2_cleanup(instance->native_format);
1011 instance->native_format = NULL;
1012
1013 ao2_cleanup(instance->slin_codec);
1014 instance->slin_codec = NULL;
1015
1016 ao2_cleanup(instance->slin_format);
1017 instance->slin_format = NULL;
1018
1019 if (instance->silence.data.ptr) {
1020 ast_free(instance->silence.data.ptr);
1021 instance->silence.data.ptr = NULL;
1022 }
1023
1024 if (instance->translator) {
1026 instance->translator = NULL;
1027 }
1028
1029 if (instance->leftover_data) {
1030 ast_free(instance->leftover_data);
1031 instance->leftover_data = NULL;
1032 }
1033
1034 ast_free(instance->uri_params);
1035}
1036
1039 /*! \brief The name of the module owning this sorcery instance */
1041};
1042
1043static void instance_proxy_cb(void *weakproxy, void *data)
1044{
1045 struct instance_proxy *proxy = weakproxy;
1046 ast_debug(3, "%s: WebSocket instance removed from instances\n", proxy->connection_id);
1047 ao2_unlink(instances, weakproxy);
1048}
1049
1050static struct websocket_pvt* websocket_new(const char *chan_name,
1051 const char *connection_id, struct ast_format *fmt)
1052{
1053 RAII_VAR(struct instance_proxy *, proxy, NULL, ao2_cleanup);
1054 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1055 char uuid[AST_UUID_STR_LEN];
1056 enum ast_websocket_type ws_type;
1057
1058 SCOPED_AO2WRLOCK(locker, instances);
1059
1062 ws_type = AST_WS_TYPE_SERVER;
1063 } else {
1064 ws_type = AST_WS_TYPE_CLIENT;
1065 }
1066
1067 proxy = ao2_weakproxy_alloc(sizeof(*proxy) + strlen(connection_id) + 1, NULL);
1068 if (!proxy) {
1069 return NULL;
1070 }
1071 strcpy(proxy->connection_id, connection_id); /* Safe */
1072
1073 instance = ao2_alloc(sizeof(*instance) + strlen(connection_id) + 1,
1075 if (!instance) {
1076 return NULL;
1077 }
1078 strcpy(instance->connection_id, connection_id); /* Safe */
1079
1080 instance->type = ws_type;
1081 if (ws_type == AST_WS_TYPE_CLIENT) {
1082 instance->client = ast_websocket_client_retrieve_by_id(instance->connection_id);
1083 if (!instance->client) {
1084 ast_log(LOG_ERROR, "%s: WebSocket client connection '%s' not found\n",
1085 chan_name, instance->connection_id);
1086 return NULL;
1087 }
1088 }
1089
1090 AST_LIST_HEAD_INIT(&instance->frame_queue);
1091
1092 /*
1093 * We need the codec to calculate the number of samples in a frame
1094 * so we'll get it once and store it in the instance.
1095 *
1096 * References for native_format and native_codec are now held by the
1097 * instance and will be released when the instance is destroyed.
1098 */
1099 instance->native_format = fmt;
1100 instance->native_codec = ast_format_get_codec(instance->native_format);
1101 /*
1102 * References for native_format and native_codec are now held by the
1103 * instance and will be released when the instance is destroyed.
1104 */
1105
1106 /*
1107 * It's not possible for us to re-time or re-frame media if the data
1108 * stream can't be broken up on arbitrary byte boundaries. This is usually
1109 * indicated by the codec's minimum_bytes being small (10 bytes or less).
1110 * We need to force passthrough mode in this case.
1111 */
1112 if (instance->native_codec->minimum_bytes <= 10) {
1113 instance->passthrough = 1;
1114 instance->optimal_frame_size = 0;
1115 } else {
1116 instance->optimal_frame_size =
1117 (instance->native_codec->default_ms * instance->native_codec->minimum_bytes)
1118 / instance->native_codec->minimum_ms;
1119 instance->leftover_data = ast_calloc(1, instance->optimal_frame_size);
1120 if (!instance->leftover_data) {
1121 return NULL;
1122 }
1123 }
1124
1125 ast_debug(3,
1126 "%s: WebSocket channel native format '%s' Sample rate: %d ptime: %dms minms: %u minbytes: %u passthrough: %d optimal_frame_size: %d\n",
1127 chan_name, ast_format_get_name(instance->native_format),
1128 ast_format_get_sample_rate(instance->native_format),
1129 ast_format_get_default_ms(instance->native_format),
1130 ast_format_get_minimum_ms(instance->native_format),
1131 ast_format_get_minimum_bytes(instance->native_format),
1132 instance->passthrough,
1133 instance->optimal_frame_size);
1134
1135 /* We have exclusive access to proxy and sorcery, no need for locking here. */
1136 if (ao2_weakproxy_set_object(proxy, instance, OBJ_NOLOCK)) {
1137 return NULL;
1138 }
1139
1141 return NULL;
1142 }
1143
1144 if (!ao2_link_flags(instances, proxy, OBJ_NOLOCK)) {
1145 ast_log(LOG_ERROR, "%s: Unable to link WebSocket instance to instances\n",
1146 proxy->connection_id);
1147 return NULL;
1148 }
1149 ast_debug(3, "%s: WebSocket instance created and linked\n", proxy->connection_id);
1150
1151 return ao2_bump(instance);
1152}
1153
1154static int set_instance_translator(struct websocket_pvt *instance)
1155{
1157 instance->slin_format = ao2_bump(instance->native_format);
1158 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1159 return 0;
1160 }
1161
1163 if (!instance->slin_format) {
1164 ast_log(LOG_ERROR, "%s: Unable to get slin format for rate %d\n",
1165 ast_channel_name(instance->channel), instance->native_codec->sample_rate);
1166 return -1;
1167 }
1168 ast_debug(3, "%s: WebSocket channel slin format '%s' Sample rate: %d ptime: %dms\n",
1172
1173 instance->translator = ast_translator_build_path(instance->slin_format, instance->native_format);
1174 if (!instance->translator) {
1175 ast_log(LOG_ERROR, "%s: Unable to build translator path from '%s' to '%s'\n",
1177 ast_format_get_name(instance->slin_format));
1178 return -1;
1179 }
1180
1181 instance->slin_codec = ast_format_get_codec(instance->slin_format);
1182 return 0;
1183}
1184
1185static int set_instance_silence_frame(struct websocket_pvt *instance)
1186{
1187 instance->silence.frametype = AST_FRAME_VOICE;
1188 instance->silence.datalen =
1189 (instance->slin_codec->default_ms * instance->slin_codec->minimum_bytes) / instance->slin_codec->minimum_ms;
1190 instance->silence.samples = instance->silence.datalen / sizeof(uint16_t);
1191 /*
1192 * Even though we'll calloc the data pointer, we don't mark it as
1193 * mallocd because this frame will be around for a while and we don't
1194 * want it accidentally freed before we're done with it.
1195 */
1196 instance->silence.mallocd = 0;
1197 instance->silence.offset = 0;
1198 instance->silence.src = __PRETTY_FUNCTION__;
1199 instance->silence.subclass.format = instance->slin_format;
1200 instance->silence.data.ptr = ast_calloc(1, instance->silence.datalen);
1201 if (!instance->silence.data.ptr) {
1202 return -1;
1203 }
1204
1205 return 0;
1206}
1207
1208static int set_channel_timer(struct websocket_pvt *instance)
1209{
1210 int rate = 0;
1211 instance->timer = ast_timer_open();
1212 if (!instance->timer) {
1213 return -1;
1214 }
1215 /* Rate is the number of ticks per second, not the interval. */
1216 rate = 1000 / ast_format_get_default_ms(instance->native_format);
1217 ast_debug(3, "%s: WebSocket timer rate %d\n",
1218 ast_channel_name(instance->channel), rate);
1219 ast_timer_set_rate(instance->timer, rate);
1220 /*
1221 * Calling ast_channel_set_fd will cause the channel thread to call
1222 * webchan_read at 'rate' times per second.
1223 */
1224 ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer));
1225
1226 return 0;
1227}
1228
1229static int set_channel_variables(struct websocket_pvt *instance)
1230{
1231 char *pkt_size = NULL;
1232 int res = ast_asprintf(&pkt_size, "%d", instance->optimal_frame_size);
1233 if (res <= 0) {
1234 return -1;
1235 }
1236
1238 pkt_size);
1239 ast_free(pkt_size);
1241 instance->connection_id);
1242
1243 return 0;
1244}
1245
1247{
1248 char *params = ast_strdupa(uri_params);
1249 char *nvp = NULL;
1250 char *nv = NULL;
1251
1252 /*
1253 * uri_params should be a comma-separated list of key=value pairs.
1254 * For example:
1255 * name1=value1,name2=value2
1256 * We're verifying that each name and value either doesn't need
1257 * to be encoded or that it already is.
1258 */
1259
1260 while((nvp = ast_strsep(&params, ',', 0))) {
1261 /* nvp will be name1=value1 */
1262 while((nv = ast_strsep(&nvp, '=', 0))) {
1263 /* nv will be either name1 or value1 */
1264 if (!ast_uri_verify_encoded(nv)) {
1265 return 0;
1266 }
1267 }
1268 }
1269
1270 return 1;
1271}
1272
1273enum {
1274 OPT_WS_CODEC = (1 << 0),
1278};
1279
1280enum {
1287
1294
1295static struct ast_channel *webchan_request(const char *type,
1296 struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids,
1297 const struct ast_channel *requestor, const char *data, int *cause)
1298{
1299 char *parse;
1300 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1301 struct ast_channel *chan = NULL;
1302 struct ast_format *fmt = NULL;
1303 struct ast_format_cap *caps = NULL;
1305 AST_APP_ARG(connection_id);
1307 );
1308 struct ast_flags opts = { 0, };
1309 char *opt_args[OPT_ARG_ARRAY_SIZE];
1310 const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel";
1311
1312 ast_debug(3, "%s: WebSocket channel requested\n",
1313 requestor_name);
1314
1315 if (ast_strlen_zero(data)) {
1316 ast_log(LOG_ERROR, "%s: A connection id is required for the 'WebSocket' channel\n",
1317 requestor_name);
1318 goto failure;
1319 }
1320 parse = ast_strdupa(data);
1321 AST_NONSTANDARD_APP_ARGS(args, parse, '/');
1322
1323 if (ast_strlen_zero(args.connection_id)) {
1324 ast_log(LOG_ERROR, "%s: connection_id is required for the 'WebSocket' channel\n",
1325 requestor_name);
1326 goto failure;
1327 }
1328
1329 if (!ast_strlen_zero(args.options)
1330 && ast_app_parse_options(websocket_options, &opts, opt_args,
1331 ast_strdupa(args.options))) {
1332 ast_log(LOG_ERROR, "%s: 'WebSocket' channel options '%s' parse error\n",
1333 requestor_name, args.options);
1334 goto failure;
1335 }
1336
1337 if (ast_test_flag(&opts, OPT_WS_CODEC)
1338 && !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
1339 ast_debug(3, "%s: Using specified format %s\n",
1340 requestor_name, opt_args[OPT_ARG_WS_CODEC]);
1341 fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
1342 } else {
1343 /*
1344 * If codec wasn't specified in the dial string,
1345 * use the first format in the capabilities.
1346 */
1347 ast_debug(3, "%s: Using format %s from requesting channel\n",
1348 requestor_name, opt_args[OPT_ARG_WS_CODEC]);
1349 fmt = ast_format_cap_get_format(cap, 0);
1350 }
1351
1352 if (!fmt) {
1353 ast_log(LOG_WARNING, "%s: No codec found for sending media to connection '%s'\n",
1354 requestor_name, args.connection_id);
1355 goto failure;
1356 }
1357
1358 instance = websocket_new(requestor_name, args.connection_id, fmt);
1359 if (!instance) {
1360 ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",
1361 requestor_name);
1362 goto failure;
1363 }
1364
1365 instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER);
1366 if (!instance->passthrough) {
1367 instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH);
1368 }
1369
1371 && !ast_strlen_zero(opt_args[OPT_ARG_WS_URI_PARAM])) {
1372 char *comma;
1373
1374 if (ast_strings_equal(args.connection_id, INCOMING_CONNECTION_ID)) {
1376 "%s: URI parameters are not allowed for 'WebSocket/INCOMING' channels\n",
1377 requestor_name);
1378 goto failure;
1379 }
1380
1381 ast_debug(3, "%s: Using URI parameters '%s'\n",
1382 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM]);
1383
1385 ast_log(LOG_ERROR, "%s: Invalid URI parameters '%s' in WebSocket/%s dial string\n",
1386 requestor_name, opt_args[OPT_ARG_WS_URI_PARAM],
1387 args.connection_id);
1388 goto failure;
1389 }
1390
1391 instance->uri_params = ast_strdup(opt_args[OPT_ARG_WS_URI_PARAM]);
1392 comma = instance->uri_params;
1393 /*
1394 * The normal separator for query string components is an
1395 * ampersand ('&') but the Dial app interprets them as additional
1396 * channels to dial in parallel so we instruct users to separate
1397 * the parameters with commas (',') instead. We now have to
1398 * convert those commas back to ampersands.
1399 */
1400 while ((comma = strchr(comma,','))) {
1401 *comma = '&';
1402 }
1403 ast_debug(3, "%s: Using final URI '%s'\n", requestor_name, instance->uri_params);
1404 }
1405
1406 chan = ast_channel_alloc(1, AST_STATE_DOWN, "", "", "", "", "", assignedids,
1407 requestor, 0, "WebSocket/%s/%p", args.connection_id, instance);
1408 if (!chan) {
1409 ast_log(LOG_ERROR, "%s: Unable to alloc channel\n", ast_channel_name(requestor));
1410 goto failure;
1411 }
1412
1413 ast_debug(3, "%s: WebSocket channel %s allocated for connection %s\n",
1414 ast_channel_name(chan), requestor_name,
1415 instance->connection_id);
1416
1417 instance->channel = ao2_bump(chan);
1418 ast_channel_tech_set(instance->channel, &websocket_tech);
1419
1420 if (set_instance_translator(instance) != 0) {
1421 goto failure;
1422 }
1423
1424 if (set_instance_silence_frame(instance) != 0) {
1425 goto failure;
1426 }
1427
1428 if (set_channel_timer(instance) != 0) {
1429 goto failure;
1430 }
1431
1432 if (set_channel_variables(instance) != 0) {
1433 goto failure;
1434 }
1435
1437 if (!caps) {
1438 ast_log(LOG_ERROR, "%s: Unable to alloc caps\n", requestor_name);
1439 goto failure;
1440 }
1441
1442 ast_format_cap_append(caps, instance->native_format, 0);
1443 ast_channel_nativeformats_set(instance->channel, caps);
1444 ast_channel_set_writeformat(instance->channel, instance->native_format);
1445 ast_channel_set_rawwriteformat(instance->channel, instance->native_format);
1446 ast_channel_set_readformat(instance->channel, instance->native_format);
1447 ast_channel_set_rawreadformat(instance->channel, instance->native_format);
1448 ast_channel_tech_pvt_set(chan, ao2_bump(instance));
1449 ast_channel_unlock(chan);
1450 ao2_cleanup(caps);
1451
1452 ast_debug(3, "%s: WebSocket channel created to %s\n",
1453 ast_channel_name(chan), args.connection_id);
1454
1455 return chan;
1456
1457failure:
1458 if (chan) {
1459 ast_channel_unlock(chan);
1460 }
1461 *cause = AST_CAUSE_FAILURE;
1462 return NULL;
1463}
1464
1465/*!
1466 * \internal
1467 *
1468 * Called by the core to hang up the channel.
1469 */
1470static int webchan_hangup(struct ast_channel *ast)
1471{
1472 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1473
1474 if (!instance) {
1475 return -1;
1476 }
1477 ast_debug(3, "%s: WebSocket call hangup. cid: %s\n",
1478 ast_channel_name(ast), instance->connection_id);
1479
1480 /*
1481 * We need to lock because read_from_ws_and_queue() is probably waiting
1482 * on the websocket file descriptor and will unblock and immediately try to
1483 * check the websocket and read from it. We don't want to pull the
1484 * websocket out from under it between the check and read.
1485 */
1486 ao2_lock(instance);
1487 if (instance->websocket) {
1488 ast_websocket_close(instance->websocket, 1000);
1489 ast_websocket_unref(instance->websocket);
1490 instance->websocket = NULL;
1491 }
1493 ao2_unlock(instance);
1494
1495 /* Clean up the reference from adding the instance to the channel */
1496 ao2_cleanup(instance);
1497
1498 return 0;
1499}
1500
1501static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
1502{
1503 struct websocket_pvt *instance = ast_channel_tech_pvt(ast);
1504 char *command;
1505 int res = 0;
1506
1507 if (!instance) {
1508 return -1;
1509 }
1510
1511 res = ast_asprintf(&command, "%s digit:%c", DTMF_END, digit);
1512 if (res <= 0 || !command) {
1513 ast_log(LOG_ERROR, "%s: Failed to create DTMF_END\n", ast_channel_name(instance->channel));
1514 return 0;
1515 }
1516 res = ast_websocket_write_string(instance->websocket, command);
1517 if (res != 0) {
1518 ast_log(LOG_ERROR, "%s: Failed to send DTMF_END\n", ast_channel_name(instance->channel));
1519 ast_free(command);
1520 return 0;
1521 }
1522 ast_debug(3, "%s: Sent %s\n", ast_channel_name(instance->channel), command);
1523 ast_free(command);
1524 return 0;
1525}
1526
1527/*!
1528 * \internal
1529 *
1530 * Called by res_http_websocket after a client has connected and
1531 * successfully upgraded from HTTP to WebSocket.
1532 *
1533 * Depends on incoming_ws_http_callback parsing the connection_id from
1534 * the HTTP request and storing it in get_params.
1535 */
1536static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
1537 struct ast_variable *get_params, struct ast_variable *upgrade_headers)
1538{
1539 RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
1540 struct ast_variable *v;
1541 const char *connection_id = NULL;
1542 struct websocket_pvt *instance = NULL;
1543 int nodelay = 1;
1544
1545 ast_debug(3, "WebSocket established\n");
1546
1547 for (v = upgrade_headers; v; v = v->next) {
1548 ast_debug(4, "Header-> %s: %s\n", v->name, v->value);
1549 }
1550 for (v = get_params; v; v = v->next) {
1551 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1552 }
1553
1554 connection_id = ast_variable_find_in_list(get_params, "CONNECTION_ID");
1555 if (!connection_id) {
1556 /*
1557 * This can't really happen because websocket_http_callback won't
1558 * let it get this far if it can't add the connection_id to the
1559 * get_params.
1560 * Just in case though...
1561 */
1562 ast_log(LOG_WARNING, "WebSocket connection id not found\n");
1564 ast_websocket_close(ast_ws_session, 1000);
1565 return;
1566 }
1567
1569 if (!instance) {
1570 /*
1571 * This also can't really happen because websocket_http_callback won't
1572 * let it get this far if it can't find the instance.
1573 * Just in case though...
1574 */
1575 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id);
1577 ast_websocket_close(ast_ws_session, 1000);
1578 return;
1579 }
1580 instance->websocket = ao2_bump(ast_ws_session);
1581
1582 if (setsockopt(ast_websocket_fd(instance->websocket),
1583 IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) {
1584 ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno));
1585 }
1586
1587 /* read_thread_handler cleans up the bump */
1588 read_thread_handler(ao2_bump(instance));
1589
1590 ao2_cleanup(instance);
1591 ast_debug(3, "WebSocket closed\n");
1592}
1593
1594/*!
1595 * \internal
1596 *
1597 * Called by the core http server after a client connects but before
1598 * the upgrade from HTTP to Websocket. We need to save the URI in
1599 * the CONNECTION_ID in a get_param because it contains the connection UUID
1600 * we gave to the client when they used externalMedia to create the channel.
1601 * incoming_ws_established_cb() will use this to retrieve the chan_websocket
1602 * instance.
1603 */
1605 const struct ast_http_uri *urih, const char *uri,
1606 enum ast_http_method method, struct ast_variable *get_params,
1607 struct ast_variable *headers)
1608{
1609 struct ast_http_uri fake_urih = {
1611 };
1612 int res = 0;
1613 /*
1614 * Normally the http server will destroy the get_params
1615 * when the session ends but if there weren't any initially
1616 * and we create some and add them to the list, the http server
1617 * won't know about it so we have to destroy it ourselves.
1618 */
1619 int destroy_get_params = (get_params == NULL);
1620 struct ast_variable *v = NULL;
1621 RAII_VAR(struct websocket_pvt *, instance, NULL, ao2_cleanup);
1622
1623 ast_debug(2, "URI: %s Starting\n", uri);
1624
1625 /*
1626 * The client will have issued the GET request with a URI of
1627 * /media/<connection_id>
1628 *
1629 * Since this callback is registered for the /media URI prefix the
1630 * http server will strip that off the front of the URI passing in
1631 * only the path components after that in the 'uri' parameter.
1632 * This should leave only the connection id without a leading '/'.
1633 */
1634 instance = ao2_weakproxy_find(instances, uri, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
1635 if (!instance) {
1636 ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", uri);
1637 ast_http_error(ser, 404, "Not found", "WebSocket instance not found");
1638 return -1;
1639 }
1640
1641 /*
1642 * We don't allow additional connections using the same connection id.
1643 */
1644 if (instance->websocket) {
1645 ast_log(LOG_WARNING, "%s: Websocket already connected for channel '%s'\n",
1646 uri, instance->channel ? ast_channel_name(instance->channel) : "unknown");
1647 ast_http_error(ser, 409, "Conflict", "Another websocket connection exists for this connection id");
1648 return -1;
1649 }
1650
1651 v = ast_variable_new("CONNECTION_ID", uri, "");
1652 if (!v) {
1653 ast_http_error(ser, 500, "Server error", "");
1654 return -1;
1655 }
1656 ast_variable_list_append(&get_params, v);
1657
1658 for (v = get_params; v; v = v->next) {
1659 ast_debug(4, " Param-> %s: %s\n", v->name, v->value);
1660 }
1661
1662 /*
1663 * This will ultimately call internal_ws_established_cb() so
1664 * this function will block until the websocket is closed and
1665 * internal_ws_established_cb() returns;
1666 */
1667 res = ast_websocket_uri_cb(ser, &fake_urih, uri, method,
1668 get_params, headers);
1669 if (destroy_get_params) {
1670 ast_variables_destroy(get_params);
1671 }
1672
1673 ast_debug(2, "URI: %s DONE\n", uri);
1674
1675 return res;
1676}
1677
1678static struct ast_http_uri http_uri = {
1680 .description = "Media over Websocket",
1681 .uri = "media",
1682 .has_subtree = 1,
1683 .data = NULL,
1684 .key = __FILE__,
1685 .no_decode_uri = 1,
1686};
1687
1688/*! \brief Function called when our module is unloaded */
1689static int unload_module(void)
1690{
1694
1698
1700 instances = NULL;
1701
1702 return 0;
1703}
1704
1708
1709/*! \brief Function called when our module is loaded */
1710static int load_module(void)
1711{
1712 int res = 0;
1713 struct ast_websocket_protocol *protocol;
1714
1717 }
1718
1721 ast_log(LOG_ERROR, "Unable to register channel class 'WebSocket'\n");
1722 unload_module();
1724 }
1725
1727 AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, 17, instance_proxy_hash_fn,
1728 instance_proxy_sort_fn, instance_proxy_cmp_fn);
1729 if (!instances) {
1731 "Failed to allocate the chan_websocket instance registry\n");
1732 unload_module();
1734 }
1735
1737 if (!ast_ws_server) {
1738 unload_module();
1740 }
1741
1742 protocol = ast_websocket_sub_protocol_alloc("media");
1743 if (!protocol) {
1744 unload_module();
1746 }
1749
1751
1753}
1754
1755AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Websocket Media Channel",
1756 .support_level = AST_MODULE_SUPPORT_CORE,
1757 .load = load_module,
1758 .unload = unload_module,
1759 .load_pri = AST_MODPRI_CHANNEL_DRIVER,
1760 .requires = "res_http_websocket,res_websocket_client",
char digit
jack_status_t status
Definition: app_jack.c:149
enum queue_result id
Definition: app_queue.c:1767
Asterisk main include file. File version handling, generic pbx functions.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ast_free(a)
Definition: astmm.h:180
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition: astmm.h:298
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
#define ast_log
Definition: astobj2.c:42
#define ao2_weakproxy_set_object(weakproxy, obj, flags)
Associate weakproxy with obj.
Definition: astobj2.h:579
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
@ AO2_ALLOC_OPT_LOCK_RWLOCK
Definition: astobj2.h:365
#define AO2_STRING_FIELD_CMP_FN(stype, field)
Creates a compare function for a structure string field.
Definition: astobj2.h:2048
#define ao2_cleanup(obj)
Definition: astobj2.h:1934
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define AO2_STRING_FIELD_SORT_FN(stype, field)
Creates a sort function for a structure string field.
Definition: astobj2.h:2064
#define ao2_unlock(a)
Definition: astobj2.h:729
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object.
Definition: astobj2.h:1748
#define ao2_lock(a)
Definition: astobj2.h:717
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_weakproxy_alloc(data_size, destructor_fn)
Allocate an ao2_weakproxy object.
Definition: astobj2.h:550
#define AO2_STRING_FIELD_HASH_FN(stype, field)
Creates a hash function for a structure string field.
Definition: astobj2.h:2032
@ OBJ_NOLOCK
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
@ OBJ_SEARCH_KEY
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:409
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
@ AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE
Replace objects with duplicate keys in container.
Definition: astobj2.h:1211
Internal Asterisk hangup causes.
#define AST_CAUSE_FAILURE
Definition: causes.h:150
static PGresult * result
Definition: cel_pgsql.c:84
static const char type[]
Definition: chan_ooh323.c:109
static int set_instance_translator(struct websocket_pvt *instance)
#define FLUSH_MEDIA
#define MEDIA_BUFFERING_COMPLETED
#define QUEUE_LENGTH_XON_LEVEL
#define QUEUE_DRAINED
static int incoming_ws_http_callback(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_params, struct ast_variable *headers)
#define ANSWER_CHANNEL
static void instance_proxy_cb(void *weakproxy, void *data)
static struct ast_frame * dequeue_frame(struct websocket_pvt *instance)
static int validate_uri_parameters(const char *uri_params)
static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
Function called when we should write a frame to the channel.
#define MEDIA_XOFF
static int webchan_hangup(struct ast_channel *ast)
static struct ast_channel_tech websocket_tech
#define HANGUP_CHANNEL
static struct ast_frame * webchan_read(struct ast_channel *ast)
#define MEDIA_START
#define MEDIA_WEBSOCKET_CONNECTION_ID
#define DTMF_END
#define DRIVER_STATUS
static int read_from_ws_and_queue(struct websocket_pvt *instance)
static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration)
static struct ast_http_uri http_uri
#define GET_DRIVER_STATUS
static int set_channel_variables(struct websocket_pvt *instance)
static void * read_thread_handler(void *obj)
static int process_text_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static void websocket_destructor(void *data)
static int queue_frame_from_buffer(struct websocket_pvt *instance, char *buffer, size_t len)
#define START_MEDIA_BUFFERING
static int set_channel_timer(struct websocket_pvt *instance)
#define MEDIA_XON
#define QUEUE_LENGTH_XOFF_LEVEL
static const struct ast_app_option websocket_options[128]
#define PAUSE_MEDIA
static struct ao2_container * instances
#define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE
static void set_channel_format(struct websocket_pvt *instance, struct ast_format *fmt)
#define INCOMING_CONNECTION_ID
static struct websocket_pvt * websocket_new(const char *chan_name, const char *connection_id, struct ast_format *fmt)
#define REPORT_QUEUE_DRAINED
#define CONTINUE_MEDIA
#define STOP_MEDIA_BUFFERING
static int load_module(void)
Function called when our module is loaded.
static int webchan_call(struct ast_channel *ast, const char *dest, int timeout)
static int process_binary_message(struct websocket_pvt *instance, char *payload, uint64_t payload_len)
static int set_instance_silence_frame(struct websocket_pvt *instance)
static struct ast_websocket_server * ast_ws_server
@ OPT_ARG_WS_PASSTHROUGH
@ OPT_ARG_WS_URI_PARAM
@ OPT_ARG_WS_CODEC
@ OPT_ARG_WS_NO_AUTO_ANSWER
@ OPT_ARG_ARRAY_SIZE
static int unload_module(void)
Function called when our module is unloaded.
#define MAX_TEXT_MESSAGE_LEN
static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *get_params, struct ast_variable *upgrade_headers)
#define QUEUE_LENGTH_MAX
static struct ast_channel * webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause)
static int queue_option_frame(struct websocket_pvt *instance, char *buffer)
@ OPT_WS_CODEC
@ OPT_WS_PASSTHROUGH
@ OPT_WS_URI_PARAM
@ OPT_WS_NO_AUTO_ANSWER
General Asterisk PBX channel definitions.
const char * ast_channel_name(const struct ast_channel *chan)
void * ast_channel_tech_pvt(const struct ast_channel *chan)
struct ast_format * ast_channel_rawreadformat(struct ast_channel *chan)
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition: channel.h:1299
void ast_channel_nativeformats_set(struct ast_channel *chan, struct ast_format_cap *value)
void ast_channel_unregister(const struct ast_channel_tech *tech)
Unregister a channel technology.
Definition: channel.c:570
int ast_queue_control(struct ast_channel *chan, enum ast_control_frame_type control)
Queue a control frame without payload.
Definition: channel.c:1270
int ast_set_read_format(struct ast_channel *chan, struct ast_format *format)
Sets read format on channel chan.
Definition: channel.c:5730
void ast_channel_set_rawreadformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_tech_pvt_set(struct ast_channel *chan, void *value)
void ast_channel_set_rawwriteformat(struct ast_channel *chan, struct ast_format *format)
void ast_channel_set_readformat(struct ast_channel *chan, struct ast_format *format)
int ast_channel_register(const struct ast_channel_tech *tech)
Register a channel technology (a new channel driver) Called by a channel module to register the kind ...
Definition: channel.c:539
#define ast_channel_unref(c)
Decrease channel reference count.
Definition: channel.h:3008
void ast_channel_set_fd(struct ast_channel *chan, int which, int fd)
Definition: channel.c:2396
void ast_channel_tech_set(struct ast_channel *chan, const struct ast_channel_tech *value)
#define ast_channel_unlock(chan)
Definition: channel.h:2973
void ast_channel_set_writeformat(struct ast_channel *chan, struct ast_format *format)
struct ast_format * ast_channel_readformat(struct ast_channel *chan)
@ AST_STATE_DOWN
Definition: channelstate.h:36
Codec API.
@ AST_MEDIA_TYPE_UNKNOWN
Definition: codec.h:31
struct ast_codec * ast_format_get_codec(const struct ast_format *format)
Get the codec associated with a format.
Definition: format.c:324
unsigned int ast_format_get_minimum_bytes(const struct ast_format *format)
Get the minimum number of bytes expected in a frame for this format.
Definition: format.c:374
unsigned int ast_format_get_sample_rate(const struct ast_format *format)
Get the sample rate of a media format.
Definition: format.c:379
unsigned int ast_format_get_minimum_ms(const struct ast_format *format)
Get the minimum amount of media carried in this format.
Definition: format.c:364
enum ast_format_cmp_res ast_format_cmp(const struct ast_format *format1, const struct ast_format *format2)
Compare two formats.
Definition: format.c:201
@ AST_FORMAT_CMP_NOT_EQUAL
Definition: format.h:38
const char * ast_format_get_name(const struct ast_format *format)
Get the name associated with a format.
Definition: format.c:334
unsigned int ast_format_get_default_ms(const struct ast_format *format)
Get the default framing size (in milliseconds) for a format.
Definition: format.c:359
Media Format Cache API.
int ast_format_cache_is_slinear(struct ast_format *format)
Determines if a format is one of the cached slin formats.
Definition: format_cache.c:534
#define ast_format_cache_get(name)
Retrieve a named format from the cache.
Definition: format_cache.h:278
struct ast_format * ast_format_cache_get_slin_by_rate(unsigned int rate)
Retrieve the best signed linear format given a sample rate.
Definition: format_cache.c:512
int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_type type)
Add all codecs Asterisk knows about for a specific type to the capabilities structure.
Definition: format_cap.c:216
struct ast_format * ast_format_cap_get_format(const struct ast_format_cap *cap, int position)
Get the format at a specific index.
Definition: format_cap.c:400
@ AST_FORMAT_CAP_FLAG_DEFAULT
Definition: format_cap.h:38
#define ast_format_cap_append(cap, format, framing)
Add format capability to capabilities structure.
Definition: format_cap.h:99
#define ast_format_cap_alloc(flags)
Allocate a new ast_format_cap structure.
Definition: format_cap.h:49
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
static int uuid(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
Definition: func_uuid.c:52
ast_http_method
HTTP Request methods known by Asterisk.
Definition: http.h:58
void ast_http_uri_unlink(struct ast_http_uri *urihandler)
Unregister a URI handler.
Definition: http.c:721
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition: http.c:664
int ast_http_uri_link(struct ast_http_uri *urihandler)
Register a URI handler.
Definition: http.c:689
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
struct ast_websocket_protocol * ast_websocket_sub_protocol_alloc(const char *name)
Allocate a websocket sub-protocol instance.
int ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
const char * ast_websocket_result_to_str(enum ast_websocket_result result)
Convert a websocket result code to a string.
ast_websocket_result
Result code for a websocket client.
@ WS_OK
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
int ast_websocket_uri_cb(struct ast_tcptls_session_instance *ser, const struct ast_http_uri *urih, const char *uri, enum ast_http_method method, struct ast_variable *get_vars, struct ast_variable *headers)
Callback suitable for use with a ast_http_uri.
int ast_websocket_server_add_protocol2(struct ast_websocket_server *server, struct ast_websocket_protocol *protocol)
Add a sub-protocol handler to the given server.
int ast_websocket_write_string(struct ast_websocket *ws, const char *buf)
Construct and transmit a WebSocket frame containing string data.
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
ast_websocket_type
WebSocket connection/configuration types.
@ AST_WS_TYPE_CLIENT
@ AST_WS_TYPE_SERVER
struct ast_websocket_server * ast_websocket_server_create(void)
Creates a ast_websocket_server.
int ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
Application convenience functions, designed to give consistent look and feel to Asterisk apps.
#define AST_APP_ARG(name)
Define an application argument.
#define END_OPTIONS
#define AST_APP_OPTIONS(holder, options...)
Declares an array of options for an application.
#define AST_APP_OPTION_ARG(option, flagno, argno)
Declares an application option that accepts an argument.
#define AST_DECLARE_APP_ARGS(name, arglist)
Declare a structure to hold an application's arguments.
#define BEGIN_OPTIONS
#define AST_APP_OPTION(option, flagno)
Declares an application option that does not accept an argument.
#define AST_NONSTANDARD_APP_ARGS(args, parse, sep)
Performs the 'nonstandard' argument separation process for an application.
int ast_app_parse_options(const struct ast_app_option *options, struct ast_flags *flags, char **args, char *optstr)
Parses a string containing application options and sets flags/arguments.
Definition: main/app.c:3066
const char * ast_variable_find_in_list(const struct ast_variable *list, const char *variable)
Gets the value of a variable from a variable list by name.
Definition: main/config.c:1014
#define ast_variable_new(name, value, filename)
#define ast_variable_list_append(head, new_var)
void ast_variables_destroy(struct ast_variable *var)
Free variable list.
Definition: extconf.c:1260
Asterisk internal frame definitions.
#define ast_frisolate(fr)
Makes a frame independent of any static storage.
void ast_frame_free(struct ast_frame *frame, int cache)
Frees a frame or list of frames.
Definition: main/frame.c:176
#define ast_frdup(fr)
Copies a frame.
#define ast_frfree(fr)
#define AST_MALLOCD_DATA
#define AST_FRAME_SET_BUFFER(fr, _base, _ofs, _datalen)
#define AST_FRIENDLY_OFFSET
Offset into a frame's data buffer.
@ AST_FRAME_VOICE
@ AST_FRAME_CONTROL
@ AST_CONTROL_ANSWER
@ AST_CONTROL_HANGUP
@ AST_CONTROL_OPTION
#define ast_debug(level,...)
Log a DEBUG message.
#define LOG_ERROR
#define LOG_WARNING
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:731
#define AST_LIST_HEAD_INIT(head)
Initializes a list head structure.
Definition: linkedlists.h:626
#define AST_LIST_LOCK(head)
Locks a list.
Definition: linkedlists.h:40
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:833
#define AST_LIST_UNLOCK(head)
Attempts to unlock a list.
Definition: linkedlists.h:140
#define AST_LIST_HEAD(name, type)
Defines a structure to be used to hold a list of specified type.
Definition: linkedlists.h:173
Asterisk locking-related definitions:
#define SCOPED_AO2WRLOCK(varname, obj)
scoped lock specialization for ao2 write locks.
Definition: lock.h:621
#define SCOPED_LOCK(varname, lock, lockfunc, unlockfunc)
Scoped Locks.
Definition: lock.h:590
int errno
Header for providers of file and format handling routines. Clients of these routines should include "...
Asterisk module definitions.
@ AST_MODFLAG_LOAD_ORDER
Definition: module.h:331
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
Definition: module.h:557
@ AST_MODPRI_CHANNEL_DRIVER
Definition: module.h:341
@ AST_MODULE_SUPPORT_CORE
Definition: module.h:121
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
@ AST_MODULE_LOAD_SUCCESS
Definition: module.h:70
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
Core PBX routines and definitions.
int pbx_builtin_setvar_helper(struct ast_channel *chan, const char *name, const char *value)
Add a variable to the channel variable stack, removing the most recently set value for the same name.
const char * method
Definition: res_pjsip.c:1279
#define NULL
Definition: resample.c:96
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition: strings.c:238
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one.
Definition: strings.h:80
#define S_COR(a, b, c)
returns the equivalent of logic or for strings, with an additional boolean check: second one if not e...
Definition: strings.h:87
static force_inline int attribute_pure ast_strlen_zero(const char *s)
Definition: strings.h:65
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
Definition: strings.h:97
char * ast_strip(char *s)
Strip leading/trailing whitespace from a string.
Definition: strings.h:223
char * ast_strsep(char **s, const char sep, uint32_t flags)
Act like strsep but ignore separators inside quotes.
Definition: utils.c:1871
Generic container type.
Structure to pass both assignedid values to channel drivers.
Definition: channel.h:606
Structure to describe a channel "technology", ie a channel driver See for examples:
Definition: channel.h:648
struct ast_format_cap * capabilities
Definition: channel.h:652
const char *const type
Definition: channel.h:649
Main Channel structure associated with a channel.
const char * data
Represents a media codec within Asterisk.
Definition: codec.h:42
unsigned int sample_rate
Sample rate (number of samples carried in a second)
Definition: codec.h:52
unsigned int minimum_bytes
Length in bytes of the data payload of a minimum_ms frame.
Definition: codec.h:60
unsigned int default_ms
Default length of media carried (in milliseconds) in a frame.
Definition: codec.h:58
int(* samples_count)(struct ast_frame *frame)
Retrieve the number of samples in a frame.
Definition: codec.h:68
unsigned int minimum_ms
Minimum length of media that can be carried (in milliseconds) in a frame.
Definition: codec.h:54
Structure used to handle boolean flags.
Definition: utils.h:217
Format capabilities structure, holds formats + preference order + etc.
Definition: format_cap.c:54
Definition of a media format.
Definition: format.c:43
struct ast_format * format
Data structure associated with a single frame of data.
struct ast_frame_subclass subclass
enum ast_frame_type frametype
union ast_frame::@231 data
const char * src
Definition of a URI handler.
Definition: http.h:102
ast_http_callback callback
Definition: http.h:107
void * data
Definition: http.h:116
describes a server instance
Definition: tcptls.h:151
Default structure for translators, with the basic fields and buffers, all allocated as part of the sa...
Definition: translate.h:213
Structure for variables, used for configurations and for channel variables.
struct ast_variable * next
A websocket protocol implementation.
ast_websocket_callback session_established
Callback called when a new session is established. Mandatory.
Structure for a WebSocket server.
Structure definition for session.
char connection_id[0]
The name of the module owning this sorcery instance.
char connection_id[0]
char * leftover_data
struct ast_format * native_format
struct ast_channel * channel
struct ast_codec * slin_codec
struct websocket_pvt::@139 frame_queue
int bulk_media_in_progress
enum ast_websocket_type type
struct ast_timer * timer
struct ast_format * slin_format
pthread_t outbound_read_thread
struct ast_frame silence
struct ast_codec * native_codec
struct ast_websocket_client * client
struct ast_websocket * websocket
struct ast_trans_pvt * translator
const char * args
static struct test_options options
Timing source management.
void ast_timer_close(struct ast_timer *handle)
Close an opened timing handle.
Definition: timing.c:154
int ast_timer_ack(const struct ast_timer *handle, unsigned int quantity)
Acknowledge a timer event.
Definition: timing.c:171
int ast_timer_set_rate(const struct ast_timer *handle, unsigned int rate)
Set the timing tick rate.
Definition: timing.c:166
enum ast_timer_event ast_timer_get_event(const struct ast_timer *handle)
Retrieve timing event.
Definition: timing.c:186
struct ast_timer * ast_timer_open(void)
Open a timer.
Definition: timing.c:122
@ AST_TIMING_EVENT_EXPIRED
Definition: timing.h:58
int ast_timer_fd(const struct ast_timer *handle)
Get a poll()-able file descriptor for a timer.
Definition: timing.c:161
Support for translation of data formats. translate.c.
struct ast_frame * ast_translate(struct ast_trans_pvt *tr, struct ast_frame *f, int consume)
translates one or more frames Apply an input frame into the translator and receive zero or one output...
Definition: translate.c:566
void ast_translator_free_path(struct ast_trans_pvt *tr)
Frees a translator path Frees the given translator path structure.
Definition: translate.c:476
struct ast_trans_pvt * ast_translator_build_path(struct ast_format *dest, struct ast_format *source)
Builds a translator path Build a path (possibly NULL) from source to dest.
Definition: translate.c:486
#define ast_test_flag(p, flag)
Definition: utils.h:63
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:978
#define MIN(a, b)
Definition: utils.h:249
int ast_wait_for_input(int fd, int ms)
Definition: utils.c:1734
#define ast_pthread_create_detached_background(a, b, c, d)
Definition: utils.h:634
int ast_uri_verify_encoded(const char *string)
Verify if a string is valid as a URI component.
Definition: utils.c:781
Universally unique identifier support.
#define AST_UUID_STR_LEN
Definition: uuid.h:27
char * ast_uuid_generate_str(char *buf, size_t size)
Generate a UUID string.
Definition: uuid.c:141
void ast_websocket_client_add_uri_params(struct ast_websocket_client *wc, const char *uri_params)
Add additional parameters to the URI.
struct ast_websocket_client * ast_websocket_client_retrieve_by_id(const char *id)
Retrieve a websocket client object by ID.
struct ast_websocket * ast_websocket_client_connect(struct ast_websocket_client *wc, void *lock_obj, const char *display_name, enum ast_websocket_result *result)
Connect to a websocket server using the configured authentication, retry and TLS options.