首页 > 解决方案 > 如何阻止来自 dnsmasq 的 DHCPACK?

问题描述

我正在使用一个包含 Rpi 和 esp8266 的网络。我正在尝试了解 DHCP 消息握手。从测试中,我了解到,如果没有收到 DHCPOFFER,esp8266 将在 2、4、8、16 秒的时间间隔内启动 DHCPDISCOVER 消息。该测试是通过阻止 dnsmasq.conf 文件中的特定 esp8266 来进行的。

现在我需要测试一下,如果 esp8266 没有收到 DHCPACK 怎么办。他们会重新发出命令 DHCPREQUEST 吗?或 DHCPDISCOVER?

但是在 rpi(dnsmasq) 中运行的 DHCP 服务器没有阻止 DHCPACK 信号的规定。那么我该如何进行测试呢?

我尝试使用 python 模拟 DHCP 服务器,但没有任何效果。

标签: networkingraspberry-pi3esp8266dhcp

解决方案


反正!我得到了我的答案。这可能对其他人有用,这就是我分享它的原因。

我使用python制作了DHCP服务器,并且能够控制服务器发送的DHCP数据包。代码如下。

import os
os.sys.path.append('/usr/local/lib/python3.6/site-packages')
# DHCP Discover Package

from scapy.all import *
import binascii
import _thread
import time


import logging
from logging.handlers import RotatingFileHandler
log_formatter = logging.Formatter('%(asctime)s : %(message)s')


logFile = '/home/pi/simulation/dhcp.log'

rotate_handler = RotatingFileHandler(logFile, mode='a', maxBytes=1024*1024, 
                                 backupCount=1)
rotate_handler.setFormatter(log_formatter)

Logging = logging.getLogger()
Logging.setLevel(logging.DEBUG)
Logging.addHandler(rotate_handler)


Version = "1.0.0"
Logging.debug("DHCP server "+ Version)

interface="eth1"

server_mac="70:88:6b:82:7d:30"
client_ip="192.168.42.25"
server_ip="192.168.42.1"
subnet_mask="255.255.255.0"
lease_time=86400
renewal_time = int(lease_time/2)
rebind_time = int(float(lease_time)*.875)

block_ack=True
block_offer=True
# Define a function for threads
def sent_discover( threadName, delay):
    while(1):
        xid_random = random.randint(1, 900000000)
        mac_random = str(RandMAC())
        print(mac_random)
        client_mac_id = binascii.unhexlify(mac_random.replace(':', ''))
        dhcp_discover = Ether(src=mac_random, dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=client_mac_id, xid=xid_random) / DHCP(options=[("message-type", "discover"), "end"])
        sendp(dhcp_discover, iface=interface)
        time.sleep(delay)
def sniff_discover(threadName, delay):
    print("sniff called")
    def detect_dhcp(pkt):
        #print("Detecting..")
        if DHCP in pkt:
           
            ##Below DHCP server
            if pkt[DHCP].options[0][1] == 1:
                Logging.debug("DHCPDISCOVER(%s) %s"%(interface, pkt[Ether].src))
                Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
                IP_Request = IP(src=server_ip, dst=client_ip)
                UDP_Request = UDP(sport=67, dport=68)
                BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
                DHCP_Request = DHCP(options=[("message-type", 'offer'), ("server_id", server_ip),
                                             ("lease_time", lease_time), ("renewal_time", renewal_time),
                                             ("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
                                             ("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
                                             ("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
                Offer = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
                if not block_offer:
                    sendp(Offer, iface=interface)
                    Logging.debug("DHCPOFFER(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                    print("DHCPOFFER sent")
                else:
                    print("DHCPOFFER ignored")
            if pkt[DHCP].options[0][1] == 3:
                Logging.debug("DHCPREQUEST(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                Ether_Request = Ether(src=server_mac, dst=pkt[Ether].src)
                IP_Request = IP(src=server_ip, dst=client_ip)
                UDP_Request = UDP(sport=67, dport=68)
                BOOTP_Request = BOOTP(op=2,yiaddr=client_ip,siaddr=server_ip,chaddr=pkt[BOOTP].chaddr, xid=pkt[BOOTP].xid)
                DHCP_Request = DHCP(options=[("message-type", 'ack'), ("server_id", server_ip),
                                             ("lease_time", lease_time), ("renewal_time", renewal_time),
                                             ("rebinding_time", rebind_time), ("subnet_mask", subnet_mask),
                                             ("broadcast_address", "192.168.42.255"), ("router", "192.168.42.1"),
                                             ("name_server", "192.168.42.1"), ("domain", "pyClar"), "end"])
                Ack = Ether_Request / IP_Request / UDP_Request / BOOTP_Request / DHCP_Request
                if not block_ack:
                    sendp(Ack, iface=interface)
                    Logging.debug("DHCPACK(%s) %s %s"%(interface, client_ip, pkt[Ether].src))
                    print("DHCPACK sent")
                else:
                    print("DHCPACK is ignored")
                
                
    sniff(filter="port 67", iface=interface, prn=detect_dhcp)
try:
    _thread.start_new_thread( sniff_discover, ("Thread-1", 0, ) )
    #_thread.start_new_thread( sent_discover, ("Thread-2", 10, ) )
except:
    print ("Error: Unable to start thread")
while 1:
    pass

推荐阅读