Search code examples
networkingraspberry-pi3esp8266dhcp

How to block DHCPACK from dnsmasq?


I was working with a network contains Rpi and esp8266. I am trying to understand the DHCP message handshaking. From the tests, I understood that, esp8266 will initiate the DHCPDISCOVER messages in intervals 2,4,8,16sec if no DHCPOFFER is received. This test was carried out by blocking the specific esp8266 in dnsmasq.conf file.

Now I need to test, what if the DHCPACK is not received at the esp8266. Will they reissue the command DHCPREQUEST? or DHCPDISCOVER?

But DHCP server running in rpi(dnsmasq) has no provision for blocking the DHCPACK signal. So how can I do the test?

I tried to simulate the DHCP server using python but nothing is worked.


Solution

  • Anyway! I got my answer. This may be useful for others and that's why i'm sharing it.

    I made the DHCP server using python and I was able to control the DHCP packets sent by the server. The code is below.

    import os
    os.sys.path.append('/usr/local/lib/python3.6/site-packages')
    # DHCP Discover Package
    
    from scapy.all import *
    import binascii
    import _thread
    import time
    
    
    import logging
    from logging.handlers import RotatingFileHandler
    log_formatter = logging.Formatter('%(asctime)s : %(message)s')
    
    
    logFile = '/home/pi/simulation/dhcp.log'
    
    rotate_handler = RotatingFileHandler(logFile, mode='a', maxBytes=1024*1024, 
                                     backupCount=1)
    rotate_handler.setFormatter(log_formatter)
    
    Logging = logging.getLogger()
    Logging.setLevel(logging.DEBUG)
    Logging.addHandler(rotate_handler)
    
    
    Version = "1.0.0"
    Logging.debug("DHCP server "+ Version)
    
    interface="eth1"
    
    server_mac="70:88:6b:82:7d:30"
    client_ip="192.168.42.25"
    server_ip="192.168.42.1"
    subnet_mask="255.255.255.0"
    lease_time=86400
    renewal_time = int(lease_time/2)
    rebind_time = int(float(lease_time)*.875)
    
    block_ack=True
    block_offer=True
    # Define a function for threads
    def sent_discover( threadName, delay):
        while(1):
            xid_random = random.randint(1, 900000000)
            mac_random = str(RandMAC())
            print(mac_random)
            client_mac_id = binascii.unhexlify(mac_random.replace(':', ''))
            dhcp_discover = Ether(src=mac_random, dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=client_mac_id, xid=xid_random) / DHCP(options=[("message-type", "discover"), "end"])
            sendp(dhcp_discover, iface=interface)
            time.sleep(delay)
    def sniff_discover(threadName, delay):
        print("sniff called")
        def detect_dhcp(pkt):
            #print("Detecting..")
            if DHCP in pkt:
               
                ##Below DHCP server
                if pkt[DHCP].options[0][1] == 1:
                    Logging.debug("DHCPDISCOVER(%s) %s"%(interface, pkt[Ether].src))
                    Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
                    IP_Request = IP(src=server_ip, dst=client_ip)
                    UDP_Request = UDP(sport=67, dport=68)
                    BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
                    DHCP_Request = DHCP(options=[("message-type", 'offer'), ("server_id", server_ip),
                                                 ("lease_time", lease_time), ("renewal_time", renewal_time),
                                                 ("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
                                                 ("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
                                                 ("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
                    Offer = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
                    if not block_offer:
                        sendp(Offer, iface=interface)
                        Logging.debug("DHCPOFFER(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                        print("DHCPOFFER sent")
                    else:
                        print("DHCPOFFER ignored")
                if pkt[DHCP].options[0][1] == 3:
                    Logging.debug("DHCPREQUEST(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                    Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
                    IP_Request = IP(src=server_ip, dst=client_ip)
                    UDP_Request = UDP(sport=67, dport=68)
                    BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
                    DHCP_Request = DHCP(options=[("message-type", 'ack'), ("server_id", server_ip),
                                                 ("lease_time", lease_time), ("renewal_time", renewal_time),
                                                 ("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
                                                 ("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
                                                 ("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
                    Ack = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
                    if not block_ack:
                        sendp(Ack, iface=interface)
                        Logging.debug("DHCPACK(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                        print("DHCPACK sent")
                    else:
                        print("DHCPACK is ignored")
                    
                    
        sniff(filter="port 67", iface=interface, prn=detect_dhcp)
    try:
        _thread.start_new_thread( sniff_discover, ("Thread-1", 0, ) )
        #_thread.start_new_thread( sent_discover, ("Thread-2", 10, ) )
    except:
        print ("Error: Unable to start thread")
    while 1:
        pass