python - 多线程的引入并没有减少 Python 程序的执行时间
问题描述
我是 python 新手,我第一次使用它来处理 pcap 文件。到目前为止,我已经有了一个程序,它可以过滤掉属于特定 IP 和 PROTOCOL 的数据包,并将它们写入一个新的 pcap 文件。
from scapy.all import *
import re
import glob
def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
count=0;
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
print "Reading file", pcap
packets=rdpcap(pcap)
for pkt in packets:
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
count=count+1
print "Writing packets " , count
#wrpcap("temp.pcap", pkt)
pktdump.write(pkt)
path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocols to be added in filter
process_pcap(path, hosts, ports)
此代码花费的时间太长,因为它需要匹配的 IP 列表可以是 1000 个 IP,并且目录中的 pcap 文件也可以是千兆字节。这就是为什么有必要引入多线程。为此,我将代码更改如下;
from scapy.all import *
import re
import glob
import threading
def process_packet(pkt, pktdump, packets, ports):
count = 0
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
count=count+1
print "Writing packets " , count
#wrpcap("temp.pcap", pkt)
pktdump.write(pkt)
def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
ts=list()
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
print "Reading file", pcap
packets=rdpcap(pcap)
for pkt in packets:
t=threading.Thread(target=process_packet,args=(pkt,pktdump, packets,ports,))
ts.append(t)
t.start()
for t in ts:
t.join()
path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocos to be added in filter
process_pcap(path, hosts, ports)
但我认为我没有以最好的方式做到这一点,因为时间并没有减少。
请有任何建议!
编辑:
我已经根据响应更改了代码,运行时我很糟糕,但线程并没有自行终止。python中多线程的所有示例都不需要显式终止线程。请查明此代码中的问题;
from scapy.all import *
import re
import glob
import threading
import Queue
import multiprocessing
#global variables declaration
path="\pcaps"
pcapCounter = len(glob.glob1(path,"*.pcapng")) #size of the queue
q = Queue.Queue(pcapCounter) # queue to hold all pcaps in directory
pcap_lock = threading.Lock()
ports=[443] # Protocols to be added in filter
def safe_print(content):
print "{0}\n".format(content),
def process_pcap (hosts):
content = "Thread no ", threading.current_thread().name, " in action"
safe_print(content)
if not q.empty():
with pcap_lock:
content = "IN LOCK ", threading.current_thread().name
safe_print(content)
pcap=q.get()
content = "OUT LOCK", threading.current_thread().name, " and reading packets from ", pcap
safe_print(content)
packets=rdpcap(pcap)
pktdump = PcapWriter(threading.current_thread().name+".pcapng", append=True, sync=True)
pList=[]
for pkt in packets:
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
pList.append(pkt)
content="Wrting Packets to pcap ", threading.current_thread().name
safe_print(content)
pktdump.write(pList)
else:
content = "DONE!! QUEUE IS EMPTY", threading.current_thread().name
safe_print(content)
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
q.put(pcap)
file_ip = open('ip_list.txt', 'r') #Text file with many ip addresses
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
threads = []
cpu = multiprocessing.cpu_count()
for i in range(cpu):
t = threading.Thread(target=process_pcap, args=(hosts,), name = i)
t.start()
threads.append(t)
for t in threads:
t.join()
print "Exiting Main Thread"
这是对上述程序的回应;它从不打印“退出主线程”
('Thread no ', 'Thread-1', ' in action')
('Thread no ', '3', ' in action')
('Thread no ', '1', ' in action')
('Thread no ', '2', ' in action')
('IN LOCK ', 'Thread-1')
('IN LOCK ', '3')
('OUT LOCK', 'Thread-1', ' and reading packets from ', 'path to\\test.pcapng')
('OUT LOCK', '3', ' and reading packets from ', 'path to\\test11.pcapng')
('IN LOCK ', '1')
('Wrting Packets to pcap ', '3')
('Wrting Packets to pcap ', 'Thread-1')
编辑 2:我在长度检查之前锁定了队列,一切正常。
谢谢你。
解决方案
您正在为每个数据包创建一个线程。这是根本问题。
此外,您正在对每个已处理的数据包执行 I/O 步骤,而不是写入一批数据包
您的 PC 上可能有 1-10 个内核。对于您正在处理的数据包计数,创建 1000 多个线程的开销超过了每个内核的并行度值。有一个非常快的收益递减规律,运行线程多于可用内核。
这是一种更好的方法,您将在其中意识到并行性的好处。
主线程创建一个全局队列和锁以供后续线程共享。在创建任何线程之前,主线程会枚举*.pcapng
文件列表并将每个文件名放入队列中。它还读取 IP 地址列表以及用于过滤数据包。
然后产生N个线程。其中 N 是设备上的内核数 (N = os.cpu_count()
)。
每个线程进入一个锁,将下一个文件从主线程建立的队列中弹出,然后释放锁。然后线程将文件读入packets
列表并删除它不需要的文件。然后保存回一个单独的唯一文件,该文件代表原始输入文件的过滤结果。理想情况下,pktdump 对象支持一次写回超过 1 个数据包,因为批处理 I/O 操作可以节省大量时间。
线程处理完单个文件后,重新进入锁,从队列中弹出下一个文件,释放锁,对下一个文件重复处理。
当文件名队列为空时,线程退出。
主线程等待所有 N 个线程完成。现在你有了一整套要合并的 K 文件。您的主线程只需要重新打开由线程创建的这些 K 个文件,并将每个文件连接回单个输出文件。
推荐阅读
- javascript - 聚合物 3 元素 - NotSupportedError
- ubuntu - CMake 到 Bazel 的迁移
- google-apps-script - Google App Script GAS 表中的最后修改日期
- regex - 正则表达式从文件路径中间提取值
- python - 在 Python 3 中将元素传递给多个函数
- json - 从传入的数据流中解析 json 以在 Flink 中执行简单的转换
- python - 是否可以在 tkinter 窗口屏幕上获得鼠标输入?
- python - 根据格式分隔部分字符串
- html - HTML 如何将图像向右对齐,然后使用边距/填充?或居中,将一个 x px 向右对齐,另一张图像稍微向左对齐
- r - 在 R 中:如何在两个整数之间找到的向量中替换 NA