Search code examples
pythonbufferscapypcapbytesio

Writing pcap to Buffer with Scapy


I'm having trouble writing a pcap to a file buffer, it's important I do not touch disk for these pcap captures and yes they must be live.

sudo scapy
>>> import io
>>> cap = sniff(timeout=30)
>>> buf = io.BytesIO()
>>> wrpcap(buf, cap)
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/usr/lib/python2.7/dist-packages/scapy/utils.py", line 524, in wrpcap
    with PcapWriter(filename, *args, **kargs) as fdesc:
  File "/usr/lib/python2.7/dist-packages/scapy/utils.py", line 682, in __init__
    self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz)
TypeError: coercing to Unicode: need string or buffer, _io.BytesIO found

This usually happens when you do open(None), is this a bug in the PcapWriter function in Scapy Utils?

I've also tried this with: buf.seek(0) before writing and it still fails.


Solution

  • I got code from scapy (utils.py) and created memwrpcap which can write to io.BytesIO.

    buf = io.BytesIO()
    memwrpcap(buf, cap)
    

    (after writing it doesn't close buffer and you can move to beginning to read from buffer.)

    After that I used standard open() and write() to save data from io.BytesIO and compared this file with file created with wrpcap

    diff -c test-std.pcap test-mem.pcap
    

    and it seems they are identical so io.BytesIO has data in pcap format.

    Full code - memwrpcam, MemoryPcapWriter and code which I used to test it.

    #
    # from: scapy/utils.py
    #
    
    from scapy.all import *
    
    def memwrpcap(filename, pkt, *args, **kargs):
        """Write a list of packets to a pcap file
        gz: set to 1 to save a gzipped capture
        linktype: force linktype value
        endianness: "<" or ">", force endianness"""
    
        # use MemoryPcapWriter instead of PcapWriter
        with MemoryPcapWriter(filename, *args, **kargs) as fdesc:
            fdesc.write(pkt)
    
    
    class MemoryPcapWriter(PcapWriter):
        """A stream PCAP writer with more control than wrpcap()"""
        def __init__(self, filename, linktype=None, gz=False, endianness="", append=False, sync=False):
            """
            linktype: force linktype to a given value. If None, linktype is taken
                      from the first writter packet
            gz: compress the capture on the fly
            endianness: force an endianness (little:"<", big:">"). Default is native
            append: append packets to the capture file instead of truncating it
            sync: do not bufferize writes to the capture file
            """
    
            self.linktype = linktype
            self.header_present = 0
            self.append=append
            self.gz = gz
            self.endian = endianness
            self.filename=filename
            self.sync=sync
            bufsz=4096
            if sync:
                bufsz=0
    
            # use filename or file-like object 
            if isinstance(self.filename, str):
                self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz)
            else: # file-like object 
                self.f = filename
    
        def __exit__(self, exc_type, exc_value, tracback):
            self.flush()
            if isinstance(self.filename, str):
                self.close() # don't close file-like object
    
    
    # --- main ---
    
    #
    # run script with sudo
    #
    # compare results (on Linux)
    #    diff -s test-std.pcap test-mem.pcap
    #
    
    from scapy.all import *
    
    import io
    
    cap = sniff(timeout=5)
    
    # save to pcap file
    wrpcap('test-std.pcap', cap)
    
    # save to buffer
    buf = io.BytesIO()
    memwrpcap(buf, cap)
    
    # move to beginning and save to file
    #print('current position:', buf.tell())
    buf.seek(0)
    #print('current position:', buf.tell())
    
    with open('test-mem.pcap', 'wb') as fp:
        fp.write(buf.read())