Asterisk - The Open Source Telephony Project GIT-master-f36a736
spandspflow2pcap.py
Go to the documentation of this file.
1#!/usr/bin/env python
2# vim: set ts=8 sw=4 sts=4 et ai tw=79:
3'''
4Usage: ./spandspflow2pcap.py SPANDSP_LOG SENDFAX_PCAP
5
6Takes a log from Asterisk with SpanDSP, extracts the "received" data
7and puts it in a pcap file. Use 'fax set debug on' and configure
8logger.conf to get fax logs.
9
10Input data should look something like this::
11
12 [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx 5: IFP c0 ...
13
14Output data will look like a valid pcap file ;-)
15
16This allows you to reconstruct received faxes into replayable pcaps.
17
18Replaying is expected to be done by SIPp with sipp-sendfax.xml. The
19SIPp binary used for replaying must have image (fax) support. This means
20you'll need a version higher than 3.5.0 (unreleased when writing this),
21or the git master branch: https://github.com/SIPp/sipp
22
23
24Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019)
25License: Public Domain
26'''
27from base64 import b16decode
28from collections import namedtuple
29from datetime import datetime, timedelta
30from re import search
31from time import mktime
32from struct import pack
33import os
34import sys
35
36
37LOSSY = False
38EMPTY_RECOVERY = False
39
40
41IFP = namedtuple('IFP', 'date seqno data') # datetime, int, bytearray
42
43
44def n2b(text):
45 """
46 Convert "aa bb cc" to bytearray('\xaa\xbb\xcc').
47 """
48 return bytearray(
49 b16decode(text.replace(' ', '').replace('\n', '').upper()))
50
51
52class SkipPacket(Exception):
53 pass
54
55
56class FaxPcap(object):
57 PCAP_PREAMBLE = n2b(
58 'd4 c3 b2 a1 02 00 04 00'
59 '00 00 00 00 00 00 00 00'
60 'ff ff 00 00 71 00 00 00')
61
62 def __init__(self, outfile):
63 self.outfile = outfile
64 self.date = None
65 self.seqno = None
66 self.udpseqno = 128
67 self.prev_data = None
68
69 # Only do this if at pos 0?
70
71 def add(self, ifp):
72 """
73 Add the IFP packet.
74
75 T.38 basic format of UDPTL payload section with redundancy:
76
77 UDPTL_SEQNO
78 - 2 sequence number (big endian)
79 UDPTL_PRIMARY_PAYLOAD (T30?)
80 - 1 subpacket length (excluding this byte)
81 - 1 type of message (e.g. 0xd0 for data(?))
82 - 1 items in data field (e.g. 0x01)
83 - 2 length of data (big endian)
84 - N data
85 RECOVERY (optional)
86 - 2 count of previous seqno packets (big endian)
87 - N UDPTL_PRIMARY_PAYLOAD of (seqno-1)
88 - N UDPTL_PRIMARY_PAYLOAD of (seqno-2)
89 - ...
90 """
91 # First packet?
92 if self.seqno is None:
93 # Add preamble.
94 self._add_preamble()
95 # Start a second late (optional).
96 self._add_garbage(ifp.date)
97
98 # Set sequence, and fill with missing leading zeroes.
99 self.seqno = 0
100 for i in range(ifp.seqno):
101 self.add(IFP(date=ifp.date, seqno=i, data=bytearray([0])))
102
103 # Auto-increasing dates
104 if self.date is None or ifp.date > self.date:
105 self.date = ifp.date
106 elif ifp.date < self.date.replace(microsecond=0):
107 assert False, 'More packets than expected in 1s? {!r}/{!r}'.format(
108 ifp.date, self.date)
109 else:
110 self.date += timedelta(microseconds=9000)
111
112 # Add packet.
113 self.seqno = ifp.seqno
114 try:
115 self.outfile.write(self._make_packet(ifp.data))
116 except SkipPacket:
117 pass
118
119 def _add_preamble(self):
120 self.outfile.write(self.PCAP_PREAMBLE)
121
122 def _add_garbage(self, date):
123 if self.date is None or date > self.date:
124 self.date = date
125
126 self.seqno = 0xffff
127 self.outfile.write(self._make_packet(
128 bytearray(b'GARBAGE'), is_ifp=False))
129
130 def _make_packet(self, ifp_data, is_ifp=True):
131 sum16 = bytearray(b'\x43\x21') # the OS fixes the checksums for us
132
133 data = bytearray()
134 if is_ifp:
135 data.append(len(ifp_data)) # length
136 data.extend(ifp_data) # data
137 self.prev_data, prev_data = data[:], self.prev_data
138 else:
139 data.extend(ifp_data)
140 prev_data = None
141
142 if prev_data:
143 if LOSSY and (self.seqno % 3) == 2:
144 self.udpseqno += 1
145 raise SkipPacket()
146
147 if EMPTY_RECOVERY:
148 # struct ast_frame f[16], we have room for a few
149 # packets.
150 packets = 14
151 data.extend([0, packets + 1] + [0] * packets)
152 data.extend(prev_data)
153 else:
154 # Add 1 previous packet, without the seqno.
155 data.extend([0, 1])
156 data.extend(prev_data)
157
158 # Wrap it in UDP
159 udp = bytearray(
160 b'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % {
161 b'len': pack('>H', len(data) + 10),
162 b'sum16': sum16,
163 b'seqno': pack('>H', self.seqno),
164 b'data': data})
165
166 # Wrap it in IP
167 ip = bytearray(
168 b'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s'
169 b'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % {
170 b'len': pack('>H', len(udp) + 20),
171 b'udpseqno': pack('>H', self.udpseqno),
172 b'sum16': sum16,
173 b'udp': udp})
174
175 # Wrap it in Ethernet
176 ethernet = bytearray(
177 b'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00'
178 b'\x08\x00%(ip)s' % {b'ip': ip})
179
180 # Wrap it in a pcap packet
181 packet = bytearray(b'%(prelude)s%(ethernet)s' % {
182 b'prelude': pack(
183 '<IIII', int(mktime(self.date.timetuple())),
184 self.date.microsecond, len(ethernet), len(ethernet)),
185 b'ethernet': ethernet})
186
187 # Increase values.
188 self.udpseqno += 1
189
190 return packet
191
192
194 def __init__(self, fp):
195 self._fp = fp
196
197 def __iter__(self):
198 r"""
199 Looks for lines line:
200
201 [2013-08-07 15:17:34] FAX[23479] res_fax.c: \
202 FLOW T.38 Rx 5: IFP c0 01 80 00 00 ff
203
204 And yields:
205
206 IFP(date=..., seqno=..., data=...)
207 """
208 prev_seqno = None
209
210 for lineno, line in enumerate(self._fp):
211 if 'FLOW T.38 Rx' not in line:
212 continue
213 if 'IFP' not in line:
214 continue
215
216 match = search(r'(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)', line)
217 assert match
218 date = datetime(*[int(i) for i in match.groups()])
219
220 match = search(r'Rx\s*(\d+):', line)
221 assert match
222 seqno = int(match.groups()[0])
223
224 match = search(r': IFP ([0-9a-f ]+)', line)
225 assert match
226 data = n2b(match.groups()[0])
227
228 if prev_seqno is not None:
229 # Expected all sequence numbers. But you can safely disable
230 # this check.
231 assert seqno == prev_seqno + 1, '%s+1 != %s' % (
232 seqno, prev_seqno)
233 pass
234 prev_seqno = seqno
235
236 yield IFP(date=date, seqno=seqno, data=data)
237
238
239def main(logname, pcapname):
240 with open(sys.argv[1], 'r') as infile:
241 log = SpandspLog(infile)
242
243 # with open(sys.argv[2], 'xb') as outfile: # py3 exclusive write, bin
244 create_or_fail = os.O_CREAT | os.O_EXCL | os.O_WRONLY
245 try:
246 fd = os.open(sys.argv[2], create_or_fail, 0o600)
247 except Exception:
248 raise
249 else:
250 with os.fdopen(fd, 'wb') as outfile:
251 pcap = FaxPcap(outfile)
252 for data in log:
253 pcap.add(data)
254
255
256if __name__ == '__main__':
257 if len(sys.argv) != 3:
258 sys.stderr.write('Usage: {} LOGFILE PCAP\n'.format(sys.argv[0]))
259 sys.exit(1)
260
261 main(sys.argv[1], sys.argv[2])
def _add_garbage(self, date)
def _make_packet(self, ifp_data, is_ifp=True)
def __init__(self, outfile)
static int replace(struct ast_channel *chan, const char *cmd, char *data, struct ast_str **buf, ssize_t len)
Definition: func_strings.c:888
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)
def main(logname, pcapname)