Search code examples
pythonnetworkingscapybacnet

Scapy sr() with custom layers


When using the sr() or srp() function - how does Scapy know that a received packet is an answer to the packet I have sent?

I have written a custom protocol which imitates BACNet. I can send a WHO_IS packet to a BACNet device and the device answers with an I_AM packet, which is correctly disassembled because of layer binding, but the sr function does not recognize it as an answer. How do I make Scapy accept those packets as answers?

Update: Thats what my layer classes look like. I think the answers() methods look alright, but it still doesn't work. Is there anything I might have misunderstood when implementing the answers() methods? In my understanding self refers to the class of the layer itself and other is the received packet in question. So, in order to pass the payload on to the next higher layer I pass other.payload and call the answers() method with the next higher layers class. The layers are stacked like Ether/IP/UDP/BVLC/NPDU/APDU.

class BVLC(Packet):

    name = "BVLC"
    fields_desc = [
       # many fields
                   ]

    def post_build(self, pkt, pay):
        if self.length == None:                
            pkt = pkt[0:2] + struct.pack('!H', len(pay)+len(pkt)) + pkt[4:]  
        return pkt+pay

    def answers(self, other):
        if not isinstance(other,BVLC):
            return 0
        else:
            return NPDU.answers(other.payload)


class NPDU(Packet):

    name = "NPDU"
    fields_desc = [ 
        # again, many fields
                   ]

    def answers(self, other):
        if not isinstance(other,NPDU):
            return 0
        else:
            return APDU.answers(other.payload)

class APDU(Packet):

    name = "APDU"
    fields_desc = [
          # many fields and the only relevant field in this case
            ConditionalField(
                ByteEnumField(
                    "serviceChoice", None, APDU_Service_Unconfirmed.revDict()),
                    lambda pkt: pkt.pduType == APDU_Type.CONFIRMED_SERVICE_REQUEST or
                                pkt.pduType == APDU_Type.UNCONFIRMED_SERVICE_REQUEST), 
                   ]

    def answers(self, other):
        if not isinstance(other,APDU):
            return 0
        else:
            if self.serviceChoice == APDU_Service_Unconfirmed.WHO_IS and\
               other.serviceChoice == APDU_Service_Unconfirmed.I_AM:
                return 1
        return 0

Solution

  • Scapy sr just calls recv_packet.answers(sent_packet) for each packet that got sent. You will therefore have to implement def answers() for your layer, see:

    def answers(self, other):
        """DEV: true if self is an answer from other"""
        if other.__class__ == self.__class__:
            return self.payload.answers(other.payload)
        return 0
    

    Here's an excerpt of the original TCP layer:

    def answers(self, other):
        if not isinstance(other, TCP):
            return 0
        if conf.checkIPsrc:
            if not ((self.sport == other.dport) and
                    (self.dport == other.sport)):
                return 0
        if (abs(other.seq-self.ack) > 2+len(other.payload)):
            return 0
        return 1