我将为你提供一个从简单到相对完整的实现思路和指南,并使用 Python 来演示核心代码片段,这只是一个教学示例,用于学习原理,不适合直接在生产环境使用。

1、权威DNS服务器:托管特定域区的域名记录(如example.com),并对其拥有权威性,当有人查询这些域名时,它提供最终的答案。
2、递归DNS服务器:接收客户端的查询请求,替客户端进行递归查询(询问根服务器 -> TLD 服务器 -> 权威服务器),最后将结果返回给客户端,我们常用的8.8.8.8 (Google DNS) 和1.1.1.1 (Cloudflare DNS) 就是递归服务器。
我们将实现一个简单的权威DNS服务器 和一个简单的递归DNS服务器 的核心部分。
这个服务器将监听 UDP 53 端口,并响应对特定域名的查询。
DNS 消息是二进制的,格式如下:

+---------------------+ | Header | +---------------------+ | Question | // 查询的问题 +---------------------+ | Answer | // 响应资源记录 (RRs) +---------------------+ | Authority | // 授权资源记录 +---------------------+ | Additional | // 附加信息记录 +---------------------+
Header 部分包含了一些标志位,如是否是查询/响应、操作码、返回码等。
我们需要编写函数来解析收到的请求和构建发出的响应。
步骤 2:使用 Python 的socket 库监听 UDP 53 端口
import socket
创建一个 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
绑定到本地地址和端口 53
server_address = ('0.0.0.0', 53)
sock.bind(server_address)
print("DNS server is listening on port 53...")
while True:
# 接收数据和客户端地址
data, client_address = sock.recvfrom(512) # DNS UDP 报文通常小于 512 字节
print(f"Received query from {client_address}")
# 在这里处理 DNS 请求
response = handle_dns_query(data)
# 将响应发送回客户端
sock.sendto(response, client_address)这是一个简化的handle_dns_query 函数,我们需要解析请求中的问题部分,看看查询的是什么域名和类型。
def handle_dns_query(data):
# 1. 解析请求的 Header 和 Question 部分
transaction_id = data[:2] # 前两个字节是事务ID,需要原样返回
flags = data[2:4] # 标志位,我们需要修改它来构造响应
# 问题数量 (通常为1)
qdcount = data[4:6]
# 跳过 Header (12字节),开始解析 Question 部分
# Question 格式: <域名><类型(2字节)><类(2字节)>
# 域名是压缩的: 例如3www6google3com0 表示www.google.com.
domain, offset = parse_domain_name(data, 12) # 从第12字节开始解析域名
qtype = data[offset:offset+2] # 查询类型 (e.g., A记录是 \x00\x01)
qclass = data[offset+2:offset+4] # 查询类 (通常是 \x00\x01, 表示 IN)
# 2. 准备响应
# 2.1 响应 Header
response = transaction_id # 事务ID相同
response += b'\x81\x80' # 标准响应标志位:QR=1(响应), OPCODE=0, AA=1(权威答案), etc.
response += qdcount # 问题数量相同
response += b'\x00\x01' # Answer RRs 数量:我们只提供一个答案
response += b'\x00\x00' # Authority RRs 数量:0
response += b'\x00\x00' # Additional RRs 数量:0
# 2.2 把原来的 Question 部分完整地放回去
response += data[12:offset+4] # 从Header后开始,一直到Question结束
# 3. 构建 Answer 部分 (假设我们只响应 A 记录查询)
if qtype == b'\x00\x01': # A record query
# Answer RR 格式:
# - NAME: 指向Question中的域名 (通常用指针 \xc0\x0c 表示,指向报文偏移量12的位置)
# - TYPE: \x00\x01 (A记录)
# - CLASS: \x00\x01 (IN)
# - TTL: 4字节 (生存时间,\x00\x00\x00\x3c 是60秒)
# - RDLENGTH: 2字节,表示后面RDATA的长度 (对于IPv4是4)
# - RDATA: 数据 (对于A记录是4字节的IP地址)
response += b'\xc0\x0c' # 指针,指向报文偏移量12(即Question的域名开始处)
response += qtype # TYPE
response += qclass # CLASS
response += b'\x00\x00\x00\x3c' # TTL: 60 seconds
response += b'\x00\x04' # RDLENGTH: 4 bytes
# 这里我们硬编码一个IP地址,192.168.1.100
response += b'\xc0\xa8\x01\x64' # 192.168.1.100 in hex
else:
# 如果不是A记录查询,可以返回其他响应或错误码
# 这里简单返回一个空的Answer部分
pass
return response
def parse_domain_name(data, offset):
"""解析DNS格式的域名 (e.g., 3www6google3com0)"""
domain = ''
while True:
length = data[offset]
if length == 0:
offset += 1
break
if (length & 0xC0) == 0xC0: # 处理压缩指针
# 这是一个指针,跳过
pointer = int.from_bytes(data[offset:offset+2], 'big') & 0x3FFF
part, _ = parse_domain_name(data, pointer)
domain += part
offset += 2
break
else:
offset += 1
domain += data[offset:offset+length].decode('utf-8') + '.'
offset += length
return domain, offset1、 将上面的代码保存为dns_server.py。

2、 使用sudo 运行(因为需要绑定端口53):sudo python3 dns_server.py。
3、 在另一台机器或使用dig 命令测试(假设服务器IP是192.168.1.100):
dig @192.168.1.100 example.com A 你应该会收到一个包含192.168.1.100 的响应。
递归服务器更复杂,因为它需要代表客户端去查询整个DNS层级系统。
1、接收客户端查询:解析客户端发来的域名和记录类型。
2、检查缓存:首先检查本地缓存是否有记录,有则直接返回。
3、询问根服务器:如果没有,从根服务器(a.root-servers.net 等)开始询问,根服务器会返回负责对应顶级域(如.com)的TLD服务器地址。
4、询问TLD服务器:然后向TLD服务器询问,TLD服务器会返回负责该域名的权威服务器的地址。
5、询问权威服务器:最后向权威服务器询问,它会返回最终的答案(如IP地址)。
6、缓存并返回:将答案缓存起来(根据TTL),然后将答案返回给最初的客户端。
... (之前的socket监听代码相同) ...
def recursive_query(domain, qtype):
"""对一个域名进行递归查询,返回答案"""
# 1. 这里应该先检查缓存
# if answer in cache: return answer
# 2. 从根服务器开始 (这里硬编码一个根服务器的IP)
nameservers = ['198.41.0.4'] # a.root-servers.net
for ns in nameservers:
# 向这个 nameserver 发送查询
# 注意:这里需要构建一个标准的DNS查询请求(QR=0),而不是响应
query = build_query(domain, qtype)
response_data = send_udp_query(query, ns)
# 解析响应
# 如果响应中有答案 (ANSWER SECTION > 0),并且是我们想要的,返回它
# 如果响应中有授权信息 (AUTHORITY SECTION),里面有下一级 nameserver 的地址 (NS记录) 和它们的IP (A记录,在ADDITIONAL SECTION)
# 则更新 nameservers 列表为这些新的 nameserver 的IP,然后跳出内层循环,继续外层循环去问这些新的nameservers
# 如果只有NS记录没有A记录,需要递归查询这些NS记录的A记录
# 重复这个过程,直到得到最终答案
return response_data
def handle_dns_query(data):
# ... (解析客户端请求,获取 domain 和 qtype) ...
# 调用递归查询函数
response_data = recursive_query(domain, qtype)
# 修改响应头,将其变为给客户端的响应 (例如设置AA位为0,因为你不是权威)
# 然后将 response_data 返回给客户端
return modified_response_data
def build_query(domain, qtype):
"""构建一个DNS查询请求"""
# 构建Header: QR=0 (查询), RD=1 (要求递归)
# 构建Question: 包含域名和类型
# 返回二进制数据
pass
def send_udp_query(query, ip_address):
"""向指定IP的53端口发送UDP查询并等待响应"""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(query, (ip_address, 53))
data, _ = s.recvfrom(512)
return data实现一个完整的递归服务器需要处理很多边界情况(超时、压缩指针、各种记录类型等),代码量会大很多。
1、从简单开始:先实现那个简单的权威服务器,理解如何解析和构建DNS报文。
2、使用库:对于生产环境或严肃的项目,绝对不要从头造轮子,可以使用成熟的库,
Python:dnslib
Go: 标准库中的net/dns 包非常强大,是编写DNS服务器的绝佳选择(性能远优于Python),著名的CoreDNS 就是用Go写的。
C:ldns,unbound 等库。
3、学习RFC:要真正理解细节,请阅读 RFC 1034 和 RFC 1035。
4、测试工具:使用dig、drill、Wireshark 等工具来调试和测试你的服务器。
希望这个指南为你提供了一个清晰的起点!祝你编码愉快。
文章摘自:https://idc.huochengrm.cn/dns/14986.html
评论
郜听枫
回复编写DNS服务器需掌握网络协议,熟悉域名系统原理及编程语言技术。