I was working with a network contains Rpi and esp8266. I am trying to understand the DHCP message handshaking. From the tests, I understood that, esp8266 will initiate the DHCPDISCOVER messages in intervals 2,4,8,16sec if no DHCPOFFER is received. This test was carried out by blocking the specific esp8266 in dnsmasq.conf file.
Now I need to test, what if the DHCPACK is not received at the esp8266. Will they reissue the command DHCPREQUEST? or DHCPDISCOVER?
But DHCP server running in rpi(dnsmasq) has no provision for blocking the DHCPACK signal. So how can I do the test?
I tried to simulate the DHCP server using python but nothing is worked.
Anyway! I got my answer. This may be useful for others and that's why i'm sharing it.
I made the DHCP server using python and I was able to control the DHCP packets sent by the server. The code is below.
import os
os.sys.path.append('/usr/local/lib/python3.6/site-packages')
# DHCP Discover Package
from scapy.all import *
import binascii
import _thread
import time
import logging
from logging.handlers import RotatingFileHandler
log_formatter = logging.Formatter('%(asctime)s : %(message)s')
logFile = '/home/pi/simulation/dhcp.log'
rotate_handler = RotatingFileHandler(logFile, mode='a', maxBytes=1024*1024,
backupCount=1)
rotate_handler.setFormatter(log_formatter)
Logging = logging.getLogger()
Logging.setLevel(logging.DEBUG)
Logging.addHandler(rotate_handler)
Version = "1.0.0"
Logging.debug("DHCP server "+ Version)
interface="eth1"
server_mac="70:88:6b:82:7d:30"
client_ip="192.168.42.25"
server_ip="192.168.42.1"
subnet_mask="255.255.255.0"
lease_time=86400
renewal_time = int(lease_time/2)
rebind_time = int(float(lease_time)*.875)
block_ack=True
block_offer=True
# Define a function for threads
def sent_discover( threadName, delay):
while(1):
xid_random = random.randint(1, 900000000)
mac_random = str(RandMAC())
print(mac_random)
client_mac_id = binascii.unhexlify(mac_random.replace(':', ''))
dhcp_discover = Ether(src=mac_random, dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=client_mac_id, xid=xid_random) / DHCP(options=[("message-type", "discover"), "end"])
sendp(dhcp_discover, iface=interface)
time.sleep(delay)
def sniff_discover(threadName, delay):
print("sniff called")
def detect_dhcp(pkt):
#print("Detecting..")
if DHCP in pkt:
##Below DHCP server
if pkt[DHCP].options[0][1] == 1:
Logging.debug("DHCPDISCOVER(%s) %s"%(interface, pkt[Ether].src))
Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
IP_Request = IP(src=server_ip, dst=client_ip)
UDP_Request = UDP(sport=67, dport=68)
BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
DHCP_Request = DHCP(options=[("message-type", 'offer'), ("server_id", server_ip),
("lease_time", lease_time), ("renewal_time", renewal_time),
("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
Offer = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
if not block_offer:
sendp(Offer, iface=interface)
Logging.debug("DHCPOFFER(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
print("DHCPOFFER sent")
else:
print("DHCPOFFER ignored")
if pkt[DHCP].options[0][1] == 3:
Logging.debug("DHCPREQUEST(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
IP_Request = IP(src=server_ip, dst=client_ip)
UDP_Request = UDP(sport=67, dport=68)
BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
DHCP_Request = DHCP(options=[("message-type", 'ack'), ("server_id", server_ip),
("lease_time", lease_time), ("renewal_time", renewal_time),
("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
Ack = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
if not block_ack:
sendp(Ack, iface=interface)
Logging.debug("DHCPACK(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
print("DHCPACK sent")
else:
print("DHCPACK is ignored")
sniff(filter="port 67", iface=interface, prn=detect_dhcp)
try:
_thread.start_new_thread( sniff_discover, ("Thread-1", 0, ) )
#_thread.start_new_thread( sent_discover, ("Thread-2", 10, ) )
except:
print ("Error: Unable to start thread")
while 1:
pass