4Usage: ./spandspflow2pcap.py SPANDSP_LOG SENDFAX_PCAP 
    6Takes a log from Asterisk with SpanDSP, extracts the "received" data 
    7and puts it in a pcap file. Use 'fax set debug on' and configure 
    8logger.conf to get fax logs. 
   10Input data should look something like this:: 
   12    [2013-08-07 15:17:34] FAX[23479] res_fax.c: FLOW T.38 Rx     5: IFP c0 ... 
   14Output data will look like a valid pcap file ;-) 
   16This allows you to reconstruct received faxes into replayable pcaps. 
   18Replaying is expected to be done by SIPp with sipp-sendfax.xml. The 
   19SIPp binary used for replaying must have image (fax) support. This means 
   20you'll need a version higher than 3.5.0 (unreleased when writing this), 
   21or the git master branch: https://github.com/SIPp/sipp 
   24Author: Walter Doekes, OSSO B.V. (2013,2015,2016,2019) 
   27from base64 
import b16decode
 
   28from collections 
import namedtuple
 
   29from datetime 
import datetime, timedelta
 
   31from time 
import mktime
 
   32from struct 
import pack
 
   41IFP = namedtuple(
'IFP', 
'date seqno data')  
 
   46    Convert "aa bb cc" to bytearray('\xaa\xbb\xcc'). 
   49        b16decode(text.replace(
' ', 
'').
replace(
'\n', 
'').upper()))
 
 
   58        'd4 c3 b2 a1 02 00 04 00' 
   59        '00 00 00 00 00 00 00 00' 
   60        'ff ff 00 00 71 00 00 00')
 
   75        T.38 basic format of UDPTL payload section with redundancy: 
   78        - 2 sequence number (big endian) 
   79        UDPTL_PRIMARY_PAYLOAD (T30?) 
   80        - 1 subpacket length (excluding this byte) 
   81        - 1 type of message (e.g. 0xd0 for data(?)) 
   82        - 1 items in data field (e.g. 0x01) 
   83        - 2 length of data (big endian) 
   86        - 2 count of previous seqno packets (big endian) 
   87        - N UDPTL_PRIMARY_PAYLOAD of (seqno-1) 
   88        - N UDPTL_PRIMARY_PAYLOAD of (seqno-2) 
   92        if self.
seqno is None:
 
  100            for i 
in range(ifp.seqno):
 
  101                self.
add(
IFP(date=ifp.date, seqno=i, data=bytearray([0])))
 
  104        if self.
date is None or ifp.date > self.
date:
 
  107            assert False, 
'More packets than expected in 1s? {!r}/{!r}'.format(
 
  110            self.
date += timedelta(microseconds=9000)
 
  113        self.
seqno = ifp.seqno
 
 
  123        if self.
date is None or date > self.
date:
 
  128            bytearray(b
'GARBAGE'), is_ifp=
False))
 
 
  131        sum16 = bytearray(b
'\x43\x21')  
 
  135            data.append(
len(ifp_data))  
 
  136            data.extend(ifp_data)       
 
  139            data.extend(ifp_data)
 
  143            if LOSSY 
and (self.
seqno % 3) == 2:
 
  151                data.extend([0, packets + 1] + [0] * packets)
 
  152                data.extend(prev_data)
 
  156                data.extend(prev_data)
 
  160            b
'\x00\x01\x00\x02%(len)s%(sum16)s%(seqno)s%(data)s' % {
 
  161                b
'len': pack(
'>H', 
len(data) + 10),
 
  163                b
'seqno': pack(
'>H', self.
seqno),
 
  168            b
'\x45\xb8%(len)s%(udpseqno)s\x00\x00\xf9\x11%(sum16)s' 
  169            b
'\x01\x01\x01\x01\x02\x02\x02\x02%(udp)s' % {
 
  170                b
'len': pack(
'>H', 
len(udp) + 20),
 
  171                b
'udpseqno': pack(
'>H', self.
udpseqno),
 
  176        ethernet = bytearray(
 
  177            b
'\x00\x00\x00\x01\x00\x06\x00\x30\x48\xb1\x1c\x34\x00\x00' 
  178            b
'\x08\x00%(ip)s' % {b
'ip': ip})
 
  181        packet = bytearray(b
'%(prelude)s%(ethernet)s' % {
 
  183                '<IIII', int(mktime(self.
date.timetuple())),
 
  184                self.
date.microsecond, 
len(ethernet), 
len(ethernet)),
 
  185            b
'ethernet': ethernet})
 
 
 
  199        Looks for lines line: 
  201            [2013-08-07 15:17:34] FAX[23479] res_fax.c: \ 
  202              FLOW T.38 Rx     5: IFP c0 01 80 00 00 ff 
  206            IFP(date=..., seqno=..., data=...) 
  210        for lineno, line 
in enumerate(self.
_fp):
 
  211            if 'FLOW T.38 Rx' not in line:
 
  213            if 'IFP' not in line:
 
  216            match = search(
r'(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)', line)
 
  218            date = datetime(*[int(i) 
for i 
in match.groups()])
 
  220            match = search(
r'Rx\s*(\d+):', line)
 
  222            seqno = int(match.groups()[0])
 
  224            match = search(
r': IFP ([0-9a-f ]+)', line)
 
  226            data = 
n2b(match.groups()[0])
 
  228            if prev_seqno 
is not None:
 
  231                assert seqno == prev_seqno + 1, 
'%s+1 != %s' % (
 
  236            yield IFP(date=date, seqno=seqno, data=data)
 
 
 
  240    with open(sys.argv[1], 
'r') 
as infile:
 
  244        create_or_fail = os.O_CREAT | os.O_EXCL | os.O_WRONLY
 
  246            fd = os.open(sys.argv[2], create_or_fail, 0o600)
 
  250            with os.fdopen(fd, 
'wb') 
as outfile:
 
 
  256if __name__ == 
'__main__':
 
  257    if len(sys.argv) != 3:
 
  258        sys.stderr.write(
'Usage: {} LOGFILE PCAP\n'.format(sys.argv[0]))
 
  261    main(sys.argv[1], sys.argv[2])
 
_make_packet(self, ifp_data, is_ifp=True)
static int replace(struct ast_channel *chan, const char *cmd, char *data, struct ast_str **buf, ssize_t len)
static int len(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t buflen)