Search code examples
pythonpacket-sniffersimpacketpcapy

Packet sniffer in python using pcapy impacket


I'm trying to create a packet sniffer using pcapy and impacket. I'm stuck with data extraction phase. Unfortunately impacket is not properly documented. At least i could n't find one. Could anyone tel me where to find the documentation or what functions i could use to extract data from captured packet?

edit

my current code

import datetime
import pcapy
import sys
from impacket.ImpactPacket import *
from impacket.ImpactDecoder import *

def main(argv):

    dev='ppp0'
    print "Sniffing device " + dev

    cap = pcapy.open_live(dev , 65536 , 1 , 0)

    while(1) :
        try:
            (header, packet) = cap.next()            
            eth= LinuxSLLDecoder().decode(packet)
            ip=eth.child()  #internet layer
            trans=ip.child()#transport layer

            try:                
                print 'protocol=',
                if ip.get_ip_p() == UDP.protocol:
                    print 'UDP'
                if ip.get_ip_p() == TCP.protocol:
                    print 'TCP','port=',trans.get_th_dport()
                    print trans.child()
                if ip.get_ip_p() == ICMP.protocol:
                    print 'ICMP'

                print 'src=',ip.get_ip_src(),'dest=',ip.get_ip_dst()

                print ''

            except:
                pass


        except pcapy.PcapError:
            continue             


if __name__ == "__main__":
  main(sys.argv)

Sample Output

src= xxx.xxx.xxx.xx dest= xx.xxx.xx.xx

protocol= TCP port= 443

1703 0300 2400 0000 0000 0000 07e2 a2a5    ....$...........
09fe 5b15 3cf1 803d 0c83 8ada 082e 8269    ..[.<..=.......i
0007 8b33 7d6b 5c1a 01                     ...3}k\..

What i want to do is extract more data, For example extract the url (if there is a url in packet)


Solution

  • I ran into similar problem. I guess when there is no documentation, the best documentation is the source code! And with python we are lucky to have source code most of the time. Anyway, I would suggest looking into ImpactDecoder.py and ImpactPacket.py. First one give some insights as far as how packets get decoded and second gives information on actual packets as a class and their methods. For instance, ImpactPacket.py and class PacketBuffer has following methods that you were probably looking for::

    def set_bytes_from_string(self, data):
        "Sets the value of the packet buffer from the string 'data'"
        self.__bytes = array.array('B', data)
    
    def get_buffer_as_string(self):
        "Returns the packet buffer as a string object"
        return self.__bytes.tostring()
    
    def get_bytes(self):
        "Returns the packet buffer as an array"
        return self.__bytes
    
    def set_bytes(self, bytes):
        "Set the packet buffer from an array"
        # Make a copy to be safe
        self.__bytes = array.array('B', bytes.tolist())
    
    def set_byte(self, index, value):
        "Set byte at 'index' to 'value'"
        index = self.__validate_index(index, 1)
        self.__bytes[index] = value
    
    def get_byte(self, index):
        "Return byte at 'index'"
        index = self.__validate_index(index, 1)
        return self.__bytes[index]
    
    def set_word(self, index, value, order = '!'):
        "Set 2-byte word at 'index' to 'value'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 2)
        ary = array.array("B", struct.pack(order + 'H', value))
        if -2 == index:
            self.__bytes[index:] = ary
        else:
            self.__bytes[index:index+2] = ary
    
    def get_word(self, index, order = '!'):
        "Return 2-byte word at 'index'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 2)
        if -2 == index:
            bytes = self.__bytes[index:]
        else:
            bytes = self.__bytes[index:index+2]
        (value,) = struct.unpack(order + 'H', bytes.tostring())
        return value
    
    def set_long(self, index, value, order = '!'):
        "Set 4-byte 'value' at 'index'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 4)
        ary = array.array("B", struct.pack(order + 'L', value))
        if -4 == index:
            self.__bytes[index:] = ary
        else:
            self.__bytes[index:index+4] = ary
    
    def get_long(self, index, order = '!'):
        "Return 4-byte value at 'index'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 4)
        if -4 == index:
            bytes = self.__bytes[index:]
        else:
            bytes = self.__bytes[index:index+4]
        (value,) = struct.unpack(order + 'L', bytes.tostring())
        return value
    
    def set_long_long(self, index, value, order = '!'):
        "Set 8-byte 'value' at 'index'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 8)
        ary = array.array("B", struct.pack(order + 'Q', value))
        if -8 == index:
            self.__bytes[index:] = ary
        else:
            self.__bytes[index:index+8] = ary
    
    def get_long_long(self, index, order = '!'):
        "Return 8-byte value at 'index'. See struct module's documentation to understand the meaning of 'order'."
        index = self.__validate_index(index, 8)
        if -8 == index:
            bytes = self.__bytes[index:]
        else:
            bytes = self.__bytes[index:index+8]
        (value,) = struct.unpack(order + 'Q', bytes.tostring())
        return value
    
    
    def get_ip_address(self, index):
        "Return 4-byte value at 'index' as an IP string"
        index = self.__validate_index(index, 4)
        if -4 == index:
            bytes = self.__bytes[index:]
        else:
            bytes = self.__bytes[index:index+4]
        return socket.inet_ntoa(bytes.tostring())
    
    def set_ip_address(self, index, ip_string):
        "Set 4-byte value at 'index' from 'ip_string'"
        index = self.__validate_index(index, 4)
        raw = socket.inet_aton(ip_string)
        (b1,b2,b3,b4) = struct.unpack("BBBB", raw)
        self.set_byte(index, b1)
        self.set_byte(index + 1, b2)
        self.set_byte(index + 2, b3)
        self.set_byte(index + 3, b4)
    

    The other super useful class from ImpactPacket.py is ProtocolLayer, that gives us following methods::

    def child(self):
        "Return the child of this protocol layer"
        return self.__child
    
    def parent(self):
        "Return the parent of this protocol layer"
        return self.__parent
    

    So, basically impacket uses matreshka doll approach, and you can go to any layer you want using child and parent methods and use any methods of the PacketBuffer class on any layer. Pretty cool, huh? Furthermore, particular layers (or packets) have their specific methods but you would have to go dig ImpactPacket.py and ImpactDecoder.py if you want to find more about those.

    Good luck and cheers mate!