I made a custom packet layer by the module "Scapy". I want to use the sr
function in the same module to send a UDP packet carrying this custom layer as its payload to a remote device and sniff responses from the device. Below is my code to implement for this purpose:
from scapy.all import *
PORTNUMBER = 8888
class MyLayer(Packet):
name = "MyLayer"
fields_desc = [BitField("id", 1, 16),
FieldLenField("len", None, length_of = "data"),
StrLenField("data", "", length_from = lambda pkt: pkt.len)]
def answers(self, other):
print(">>> Running answers of MyLayer")
if other.__class__ == self.__class__:
return 1
return 0
bind_layers(UDP, MyLayer, dport = PORTNUMBER)
if __name__ == "__main__":
src = "192.168.1.200"
dst = "192.168.1.255"
ipHeader = IP(src = src, dst = dst)
udpHeader = UDP(sport = PORTNUMBER, dport = PORTNUMBER)
myLayer = MyLayer(data = "abcd1234")
reqPacket = ipHeader/udpHeader/myLayer
results, _ = sr(reqPacket,
timeout = 2,
filter = f"udp dst port {PORTNUMBER}",
iface = "NIC for Test")
As the code above illustrates, I create the custom protocol layer called MyLayer
which is the custom protocol I need. I hope sr
will use the answers
function in the MyLayer
class to check if a sniffed packet should be regarded as a response for the packet it sent. And to confirm that the answers
function of MyLayer
is called by sr
, I print the message ">>> Running answers of MyLayer" in the beginning of the answers
function.
However, this code does not work. The response packets from the remote device have all been disregarded by the sr
function. And the message ">>> Running answers of MyLayer" has never been printed. I think the reason for why the code above did not work is that the received packet failed to pass check of sr
function by the answer
function of the IP
layer.
Can anyone tell me if there is any way to make the sr
function able to regard the received packets as legitimate answers for the packet it sent? Thank you in advance.
Thanks to the reply of MonteChrist0, I found that the root cause of my problem is related to the broadcast IP address of the request packet. So I add the line conf.checkIPaddr = 0
to my code. Moreover, the multi
parameter of the sr
function has to be assigned the value True
or the function will only return the request packet as the answer. Below is the code which works for me:
from scapy.all import *
PORTNUMBER = 8888
conf.checkIPaddr = 0
class MyLayer(Packet):
name = "MyLayer"
fields_desc = [BitField("id", 1, 16),
FieldLenField("len", None, length_of = "data"),
StrLenField("data", "", length_from = lambda pkt: pkt.len)]
if __name__ == "__main__":
src = "192.168.1.200"
dst = "192.168.1.255"
ipHeader = IP(src = src, dst = dst)
udpHeader = UDP(sport = PORTNUMBER, dport = PORTNUMBER)
myLayer = MyLayer(data = "abcd1234")
reqPacket = ipHeader/udpHeader/myLayer
results, _ = sr(reqPacket,
timeout = 2,
filter = f"udp dst port {PORTNUMBER}",
multi = True,
iface = "NIC for Test")
I remove the lines for the answers
function and bind_layers
in the code above to prove that adding the line conf.checkIPaddr = 0
alone is enough to make sc
to recognize the response packets for the broadcast packets sent by sr
as answers in my case.
But as MonteChrist0 pointed out earlier, if we want to implement the answers
function of the MyLayer
class and want it to be called by sr
, the line for the bind_layers
will be required. So a more complete version of the code will be as follows:
from scapy.all import *
PORTNUMBER = 8888
conf.checkIPaddr = 0
class MyLayer(Packet):
name = "MyLayer"
fields_desc = [BitField("id", 1, 16),
FieldLenField("len", None, length_of = "data"),
StrLenField("data", "", length_from = lambda pkt: pkt.len)]
def answers(self, other):
if other.__class__ == self.__class__:
return 1
return 0
bind_layers(UDP, MyLayer, dport = PORTNUMBER)
if __name__ == "__main__":
nic = "192.168.1.200"
dst = "192.168.1.255"
ipHeader = IP(src = nic, dst = dst)
udpHeader = UDP(sport = PORTNUMBER, dport = PORTNUMBER)
myLayer = MyLayer(data = "abcd1234")
reqPacket = ipHeader/udpHeader/myLayer
results, _ = sr(reqPacket,
timeout = 2,
filter = f"dst host {nic} and udp dst port {PORTNUMBER}",
multi = True,
iface = "NIC for Test")
( I think it should be better to filter the request packet by the filter
parameter of sr
so that the request packet will not be regarded as an answer packet by sr
. That is why I modify the condition following the filter
parameter in the code above as well. )