我将为你提供一个从简单到相对完整的实现思路和指南,并使用 Python 来演示核心代码片段,这只是一个教学示例,用于学习原理,不适合直接在生产环境使用。
1、权威DNS服务器:托管特定域区的域名记录(如example.com
),并对其拥有权威性,当有人查询这些域名时,它提供最终的答案。
2、递归DNS服务器:接收客户端的查询请求,替客户端进行递归查询(询问根服务器 -> TLD 服务器 -> 权威服务器),最后将结果返回给客户端,我们常用的8.8.8.8
(Google DNS) 和1.1.1.1
(Cloudflare DNS) 就是递归服务器。
我们将实现一个简单的权威DNS服务器 和一个简单的递归DNS服务器 的核心部分。
这个服务器将监听 UDP 53 端口,并响应对特定域名的查询。
DNS 消息是二进制的,格式如下:
+---------------------+ | Header | +---------------------+ | Question | // 查询的问题 +---------------------+ | Answer | // 响应资源记录 (RRs) +---------------------+ | Authority | // 授权资源记录 +---------------------+ | Additional | // 附加信息记录 +---------------------+
Header 部分包含了一些标志位,如是否是查询/响应、操作码、返回码等。
我们需要编写函数来解析收到的请求和构建发出的响应。
步骤 2:使用 Python 的socket
库监听 UDP 53 端口
import socket 创建一个 UDP socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 绑定到本地地址和端口 53 server_address = ('0.0.0.0', 53) sock.bind(server_address) print("DNS server is listening on port 53...") while True: # 接收数据和客户端地址 data, client_address = sock.recvfrom(512) # DNS UDP 报文通常小于 512 字节 print(f"Received query from {client_address}") # 在这里处理 DNS 请求 response = handle_dns_query(data) # 将响应发送回客户端 sock.sendto(response, client_address)
这是一个简化的handle_dns_query
函数,我们需要解析请求中的问题部分,看看查询的是什么域名和类型。
def handle_dns_query(data): # 1. 解析请求的 Header 和 Question 部分 transaction_id = data[:2] # 前两个字节是事务ID,需要原样返回 flags = data[2:4] # 标志位,我们需要修改它来构造响应 # 问题数量 (通常为1) qdcount = data[4:6] # 跳过 Header (12字节),开始解析 Question 部分 # Question 格式: <域名><类型(2字节)><类(2字节)> # 域名是压缩的: 例如3www6google3com0
表示www.google.com.
domain, offset = parse_domain_name(data, 12) # 从第12字节开始解析域名 qtype = data[offset:offset+2] # 查询类型 (e.g., A记录是 \x00\x01) qclass = data[offset+2:offset+4] # 查询类 (通常是 \x00\x01, 表示 IN) # 2. 准备响应 # 2.1 响应 Header response = transaction_id # 事务ID相同 response += b'\x81\x80' # 标准响应标志位:QR=1(响应), OPCODE=0, AA=1(权威答案), etc. response += qdcount # 问题数量相同 response += b'\x00\x01' # Answer RRs 数量:我们只提供一个答案 response += b'\x00\x00' # Authority RRs 数量:0 response += b'\x00\x00' # Additional RRs 数量:0 # 2.2 把原来的 Question 部分完整地放回去 response += data[12:offset+4] # 从Header后开始,一直到Question结束 # 3. 构建 Answer 部分 (假设我们只响应 A 记录查询) if qtype == b'\x00\x01': # A record query # Answer RR 格式: # - NAME: 指向Question中的域名 (通常用指针 \xc0\x0c 表示,指向报文偏移量12的位置) # - TYPE: \x00\x01 (A记录) # - CLASS: \x00\x01 (IN) # - TTL: 4字节 (生存时间,\x00\x00\x00\x3c 是60秒) # - RDLENGTH: 2字节,表示后面RDATA的长度 (对于IPv4是4) # - RDATA: 数据 (对于A记录是4字节的IP地址) response += b'\xc0\x0c' # 指针,指向报文偏移量12(即Question的域名开始处) response += qtype # TYPE response += qclass # CLASS response += b'\x00\x00\x00\x3c' # TTL: 60 seconds response += b'\x00\x04' # RDLENGTH: 4 bytes # 这里我们硬编码一个IP地址,192.168.1.100 response += b'\xc0\xa8\x01\x64' # 192.168.1.100 in hex else: # 如果不是A记录查询,可以返回其他响应或错误码 # 这里简单返回一个空的Answer部分 pass return response def parse_domain_name(data, offset): """解析DNS格式的域名 (e.g., 3www6google3com0)""" domain = '' while True: length = data[offset] if length == 0: offset += 1 break if (length & 0xC0) == 0xC0: # 处理压缩指针 # 这是一个指针,跳过 pointer = int.from_bytes(data[offset:offset+2], 'big') & 0x3FFF part, _ = parse_domain_name(data, pointer) domain += part offset += 2 break else: offset += 1 domain += data[offset:offset+length].decode('utf-8') + '.' offset += length return domain, offset
1、 将上面的代码保存为dns_server.py
。
2、 使用sudo
运行(因为需要绑定端口53):sudo python3 dns_server.py
。
3、 在另一台机器或使用dig
命令测试(假设服务器IP是192.168.1.100
):
dig @192.168.1.100 example.com A
你应该会收到一个包含192.168.1.100
的响应。
递归服务器更复杂,因为它需要代表客户端去查询整个DNS层级系统。
1、接收客户端查询:解析客户端发来的域名和记录类型。
2、检查缓存:首先检查本地缓存是否有记录,有则直接返回。
3、询问根服务器:如果没有,从根服务器(a.root-servers.net
等)开始询问,根服务器会返回负责对应顶级域(如.com
)的TLD服务器地址。
4、询问TLD服务器:然后向TLD服务器询问,TLD服务器会返回负责该域名的权威服务器的地址。
5、询问权威服务器:最后向权威服务器询问,它会返回最终的答案(如IP地址)。
6、缓存并返回:将答案缓存起来(根据TTL),然后将答案返回给最初的客户端。
... (之前的socket监听代码相同) ... def recursive_query(domain, qtype): """对一个域名进行递归查询,返回答案""" # 1. 这里应该先检查缓存 # if answer in cache: return answer # 2. 从根服务器开始 (这里硬编码一个根服务器的IP) nameservers = ['198.41.0.4'] # a.root-servers.net for ns in nameservers: # 向这个 nameserver 发送查询 # 注意:这里需要构建一个标准的DNS查询请求(QR=0),而不是响应 query = build_query(domain, qtype) response_data = send_udp_query(query, ns) # 解析响应 # 如果响应中有答案 (ANSWER SECTION > 0),并且是我们想要的,返回它 # 如果响应中有授权信息 (AUTHORITY SECTION),里面有下一级 nameserver 的地址 (NS记录) 和它们的IP (A记录,在ADDITIONAL SECTION) # 则更新 nameservers 列表为这些新的 nameserver 的IP,然后跳出内层循环,继续外层循环去问这些新的nameservers # 如果只有NS记录没有A记录,需要递归查询这些NS记录的A记录 # 重复这个过程,直到得到最终答案 return response_data def handle_dns_query(data): # ... (解析客户端请求,获取 domain 和 qtype) ... # 调用递归查询函数 response_data = recursive_query(domain, qtype) # 修改响应头,将其变为给客户端的响应 (例如设置AA位为0,因为你不是权威) # 然后将 response_data 返回给客户端 return modified_response_data def build_query(domain, qtype): """构建一个DNS查询请求""" # 构建Header: QR=0 (查询), RD=1 (要求递归) # 构建Question: 包含域名和类型 # 返回二进制数据 pass def send_udp_query(query, ip_address): """向指定IP的53端口发送UDP查询并等待响应""" with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.sendto(query, (ip_address, 53)) data, _ = s.recvfrom(512) return data
实现一个完整的递归服务器需要处理很多边界情况(超时、压缩指针、各种记录类型等),代码量会大很多。
1、从简单开始:先实现那个简单的权威服务器,理解如何解析和构建DNS报文。
2、使用库:对于生产环境或严肃的项目,绝对不要从头造轮子,可以使用成熟的库,
Python:dnslib
Go: 标准库中的net/dns
包非常强大,是编写DNS服务器的绝佳选择(性能远优于Python),著名的CoreDNS
就是用Go写的。
C:ldns
,unbound
等库。
3、学习RFC:要真正理解细节,请阅读 RFC 1034 和 RFC 1035。
4、测试工具:使用dig
、drill
、Wireshark
等工具来调试和测试你的服务器。
希望这个指南为你提供了一个清晰的起点!祝你编码愉快。
文章摘自:https://idc.huochengrm.cn/dns/14986.html
评论