65 const pj_sockaddr_t *rem_addr,
68 pjsip_transport_callback callback)
71 uint64_t
len = tdata->buf.cur - tdata->buf.start;
106 shutdown(fd, SHUT_RDWR);
121 pj_atomic_destroy(wstransport->
transport.ref_cnt);
125 pj_lock_destroy(wstransport->
transport.lock);
132 if (wstransport->
rdata.tp_info.pool) {
133 pjsip_endpt_release_pool(wstransport->
transport.endpt, wstransport->
rdata.tp_info.pool);
142 pjsip_transport_shutdown(&wstransport->
transport);
165 pjsip_tp_state_callback state_cb;
168 struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
183 snprintf(newtransport->
transport.obj_name, PJ_MAX_OBJ_NAME,
"ws%p-%d",
188 if (!(pool = pjsip_endpt_create_pool(endpt,
"ws", 512, 512))) {
200 if (
status != PJ_SUCCESS) {
204 status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->
transport.lock);
205 if (
status != PJ_SUCCESS) {
220 ast_debug(4,
"Creating websocket transport for %s:%s\n",
221 newtransport->
transport.type_name, ws_addr_str);
224 strlen(newtransport->
transport.type_name) + strlen(ws_addr_str) +
sizeof(
" to "));
225 sprintf(newtransport->
transport.info,
"%s to %s", newtransport->
transport.type_name, ws_addr_str);
227 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&
buf, ws_addr_str), &newtransport->
transport.key.rem_addr);
228 if (newtransport->
transport.key.rem_addr.addr.sa_family == pj_AF_INET6()) {
234 newtransport->
transport.addr_len = pj_sockaddr_get_len(&newtransport->
transport.key.rem_addr);
237 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&
buf, ws_addr_str), &newtransport->
transport.local_addr);
243 newtransport->
transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->
transport.key.type);
244 newtransport->
transport.dir = PJSIP_TP_DIR_INCOMING;
251 (pjsip_transport *)newtransport);
252 if (
status != PJ_SUCCESS) {
260 newtransport->
rdata.tp_info.pool = pjsip_endpt_create_pool(endpt,
"rtd%p",
261 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
262 if (!newtransport->
rdata.tp_info.pool) {
264 pjsip_transport_destroy((pjsip_transport *)newtransport);
271 state_cb = pjsip_tpmgr_get_state_cb(newtransport->
transport.tpmgr);
273 pjsip_transport_state_info state_info;
275 memset(&state_info, 0,
sizeof(state_info));
276 state_cb(&newtransport->
transport, PJSIP_TP_STATE_CONNECTED, &state_info);
301 pjsip_rx_data *rdata = &newtransport->
rdata;
306 pj_gettimeofday(&rdata->pkt_info.timestamp);
309 pj_memcpy(rdata->pkt_info.packet, read_data->
payload, pjsip_pkt_len);
310 rdata->pkt_info.len = pjsip_pkt_len;
311 rdata->pkt_info.zero = 0;
314 rdata->pkt_info.src_addr_len =
sizeof(rdata->pkt_info.src_addr);
319 recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
321 pj_pool_reset(rdata->tp_info.pool);
328 int write_timeout = -1;
347 ast_debug(5,
"Found %s transport with write timeout: %d\n",
437 pjsip_param *x_orig_host;
440#define MAX_PORT_LEN 5
442 if (rdata->msg_info.msg->type != PJSIP_REQUEST_MSG ||
443 rdata->msg_info.msg->line.req.method.id != PJSIP_REGISTER_METHOD) {
447 ast_debug(1,
"Saving contact '%.*s:%d'\n",
448 (
int)uri->host.slen, uri->host.ptr, uri->port);
450 x_orig_host = PJ_POOL_ALLOC_T(rdata->tp_info.pool, pjsip_param);
451 x_orig_host->name = pj_strdup3(rdata->tp_info.pool,
"x-ast-orig-host");
453 p_value.ptr = (
char*)pj_pool_alloc(rdata->tp_info.pool, p_value.slen + 1);
454 p_value.slen = snprintf(p_value.ptr, p_value.slen + 1,
"%.*s:%d", (
int)uri->host.slen, uri->host.ptr, uri->port);
455 pj_strassign(&x_orig_host->value, &p_value);
456 pj_list_insert_before(&uri->other_param, x_orig_host);
466 static const pj_str_t STR_WS = {
"ws", 2 };
467 pjsip_contact_hdr *contact;
469 long type = rdata->tp_info.transport->key.type;
475 contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT,
NULL);
478 && (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
479 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
480 const pj_str_t *txp_str = &STR_WS;
488 const char *ipv6_s =
"", *ipv6_e =
"";
490 if (pj_strchr(&uri->host,
':')) {
495 ast_log(
LOG_DEBUG,
"%s re-writing Contact URI from %s%.*s%s:%d%s%.*s to %s;transport=%s\n",
496 pjsip_rx_data_get_info(rdata),
497 ipv6_s, (
int) pj_strlen(&uri->host), pj_strbuf(&uri->host), ipv6_e, uri->port,
498 pj_strlen(&uri->transport_param) ?
";transport=" :
"",
499 (
int) pj_strlen(&uri->transport_param), pj_strbuf(&uri->transport_param),
500 pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buffer,
sizeof(src_addr_buffer), 3),
504 pj_strdup2(rdata->tp_info.pool, &uri->host, rdata->pkt_info.src_name);
505 uri->port = rdata->pkt_info.src_port;
506 pj_strdup(rdata->tp_info.pool, &uri->transport_param, txp_str);
509 rdata->msg_info.via->rport_param = 0;
515 .name = {
"WebSocket Transport Module", 26 },
517 .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
525 if (
session->inv_session->state == PJSIP_INV_STATE_NULL) {
549 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE,
"ws", 5060, &
transport_type_wss);
550 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE | PJSIP_TRANSPORT_IPV6,
"ws", 5060, &
transport_type_wss_ipv6);
581 .
requires =
"res_pjsip,res_http_websocket",
Asterisk main include file. File version handling, generic pbx functions.
static struct ast_mansession session
#define ao2_iterator_next(iter)
@ AO2_ALLOC_OPT_LOCK_NOLOCK
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
static struct ao2_container * transport_states
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
struct ast_taskprocessor * ast_sip_create_serializer(const char *name)
Create a new serializer for SIP tasks.
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int(*sip_task)(void *), void *task_data)
Push a task to the serializer and wait for it to complete.
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
int ast_websocket_is_secure(struct ast_websocket *session)
Get whether the WebSocket session is using a secure transport or not.
int ast_websocket_remove_protocol(const char *name, ast_websocket_callback callback)
Remove a sub-protocol handler from the default /ws server.
int ast_websocket_set_timeout(struct ast_websocket *session, int timeout)
Set the timeout on a non-blocking WebSocket session.
int ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close a WebSocket session by sending a message with the CLOSE opcode and an optional code.
int ast_websocket_set_nonblock(struct ast_websocket *session)
Set the socket of a WebSocket session to be non-blocking.
int ast_websocket_read(struct ast_websocket *session, char **payload, uint64_t *payload_len, enum ast_websocket_opcode *opcode, int *fragmented)
Read a WebSocket frame and handle it.
struct ast_sockaddr * ast_websocket_remote_address(struct ast_websocket *session)
Get the remote address for a WebSocket connected session.
int ast_websocket_wait_for_input(struct ast_websocket *session, int timeout)
Wait for the WebSocket session to be ready to be read.
struct ast_sockaddr * ast_websocket_local_address(struct ast_websocket *session)
Get the local address for a WebSocket connection session.
void ast_websocket_ref(struct ast_websocket *session)
Increase the reference count for a WebSocket session.
#define AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT
Default websocket write timeout, in ms.
ast_websocket_opcode
WebSocket operation codes.
@ AST_WEBSOCKET_OPCODE_BINARY
@ AST_WEBSOCKET_OPCODE_CLOSE
@ AST_WEBSOCKET_OPCODE_TEXT
void ast_websocket_unref(struct ast_websocket *session)
Decrease the reference count for a WebSocket session.
int ast_websocket_fd(struct ast_websocket *session)
Get the file descriptor for a WebSocket session.
int ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Construct and transmit a WebSocket frame.
int ast_websocket_add_protocol(const char *name, ast_websocket_callback callback)
Add a sub-protocol handler to the default /ws server.
#define DEBUG_ATLEAST(level)
#define ast_debug(level,...)
Log a DEBUG message.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Asterisk module definitions.
#define AST_MODULE_INFO(keystr, flags_to_set, desc, fields...)
@ AST_MODULE_SUPPORT_CORE
#define ASTERISK_GPL_KEY
The text the key() function should return.
@ AST_MODULE_LOAD_SUCCESS
@ AST_MODULE_LOAD_DECLINE
Module has failed to load, may be in an inconsistent state.
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
#define ast_sockaddr_port(addr)
Get the port number of a socket address.
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
#define AST_SOCKADDR_BUFLEN
void ast_sip_unregister_service(pjsip_module *module)
struct ao2_container * ast_sip_get_transport_states(void)
Retrieves all transport states.
int ast_sip_register_service(pjsip_module *module)
Register a SIP service in Asterisk.
pjsip_endpoint * ast_sip_get_pjsip_endpoint(void)
Get a pointer to the PJSIP endpoint.
@ AST_SIP_SUPPLEMENT_PRIORITY_FIRST
struct ast_sorcery * ast_sip_get_sorcery(void)
Get a pointer to the SIP sorcery structure.
#define ast_sip_session_register_supplement(supplement)
void ast_sip_session_unregister_supplement(struct ast_sip_session_supplement *supplement)
Unregister a an supplement to SIP session processing.
static void save_orig_contact_host(pjsip_rx_data *rdata, pjsip_sip_uri *uri)
static void transport_dtor(void *arg)
static int get_write_timeout(void)
static int ws_obj_name_serial
static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
Function called when an INVITE goes out.
static pj_status_t ws_send_msg(pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, int addr_len, void *token, pjsip_transport_callback callback)
Send a message over the WebSocket connection.
static pjsip_module websocket_module
static pj_status_t ws_shutdown(pjsip_transport *transport)
Shut down the pjsip transport.
static int transport_create(void *data)
Create a pjsip transport.
static int transport_type_wss_ipv6
static struct ast_taskprocessor * create_websocket_serializer(void)
static struct ast_sip_session_supplement websocket_supplement
Supplement for adding Websocket functionality to dialog.
static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
WebSocket connection handler.
static int transport_type_wss
WebSocket transport module.
static int transport_shutdown(void *data)
static int load_module(void)
static pj_status_t ws_destroy(pjsip_transport *transport)
Destroy the pjsip transport.
static int transport_read(void *data)
Pass WebSocket data into pjsip transport manager.
static int unload_module(void)
static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
Store the transport a message came in on, so it can be used for outbound messages to that contact.
void * ast_sorcery_retrieve_by_id(const struct ast_sorcery *sorcery, const char *type, const char *id)
Retrieve an object using its unique identifier.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
A supplement to SIP message processing.
A structure describing a SIP session.
Structure for SIP transport information.
A ast_taskprocessor structure is a singleton by name.
Structure for variables, used for configurations and for channel variables.
Structure definition for session.
struct ws_transport * transport
struct ast_websocket * ws_session
struct ws_transport * transport
Wrapper for pjsip_transport, for storing the WebSocket session.
pjsip_transport transport
struct ast_websocket * ws_session
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).