首页 > 解决方案 > 多线程的引入并没有减少 Python 程序的执行时间

问题描述

我是 python 新手,我第一次使用它来处理 pcap 文件。到目前为止,我已经有了一个程序,它可以过滤掉属于特定 IP 和 PROTOCOL 的数据包,并将它们写入一个新的 pcap 文件。

from scapy.all import *
import re
import glob

def process_pcap(path, hosts, ports):
    pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
    count=0;
    for pcap in glob.glob(os.path.join(path, '*.pcapng')):
        print "Reading file", pcap
        packets=rdpcap(pcap)
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    count=count+1
                    print "Writing packets " , count
                    #wrpcap("temp.pcap", pkt)
                    pktdump.write(pkt)


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocols to be added in filter
process_pcap(path, hosts, ports)  

此代码花费的时间太长,因为它需要匹配的 IP 列表可以是 1000 个 IP,并且目录中的 pcap 文件也可以是千兆字节。这就是为什么有必要引入多线程。为此,我将代码更改如下;

from scapy.all import *
import re
import glob
import threading


def process_packet(pkt, pktdump, packets, ports):
count = 0
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
            if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                count=count+1
                print "Writing packets " , count
                #wrpcap("temp.pcap", pkt)
                pktdump.write(pkt)  


def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
ts=list()
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    print "Reading file", pcap
    packets=rdpcap(pcap)
    for pkt in packets:
         t=threading.Thread(target=process_packet,args=(pkt,pktdump, packets,ports,))
         ts.append(t)
         t.start()
for t in ts:
    t.join()    


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocos to be added in filter
process_pcap(path, hosts, ports)  

但我认为我没有以最好的方式做到这一点,因为时间并没有减少。

请有任何建议!

编辑:

我已经根据响应更改了代码,运行时我很糟糕,但线程并没有自行终止。python中多线程的所有示例都不需要显式终止线程。请查明此代码中的问题;

from scapy.all import *
import re
import glob
import threading
import Queue
import multiprocessing

#global variables declaration

path="\pcaps"
pcapCounter = len(glob.glob1(path,"*.pcapng")) #size of the queue
q = Queue.Queue(pcapCounter) # queue to hold all pcaps in directory
pcap_lock = threading.Lock()
ports=[443] # Protocols to be added in filter


def safe_print(content):
    print "{0}\n".format(content),

def process_pcap (hosts):
    content = "Thread no ", threading.current_thread().name, " in action"
    safe_print(content)
    if not q.empty():
        with pcap_lock:
            content = "IN LOCK ", threading.current_thread().name
            safe_print(content)
            pcap=q.get()

        content = "OUT LOCK", threading.current_thread().name, " and reading packets from ", pcap
        safe_print(content)   
        packets=rdpcap(pcap)


        pktdump = PcapWriter(threading.current_thread().name+".pcapng", append=True, sync=True)
        pList=[]
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    pList.append(pkt)

                    content="Wrting Packets to pcap ", threading.current_thread().name
                    safe_print(content)
                    pktdump.write(pList) 


else:
    content = "DONE!! QUEUE IS EMPTY", threading.current_thread().name
    safe_print(content)


for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    q.put(pcap)

file_ip = open('ip_list.txt', 'r') #Text file with many ip addresses
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
threads = []
cpu = multiprocessing.cpu_count() 
for i in range(cpu):
    t = threading.Thread(target=process_pcap, args=(hosts,), name = i)
    t.start()
    threads.append(t)

for t in threads:
    t.join()


print "Exiting Main Thread"

这是对上述程序的回应;它从不打印“退出主线程”

('Thread no ', 'Thread-1', ' in action')
('Thread no ', '3', ' in action')
('Thread no ', '1', ' in action')
('Thread no ', '2', ' in action')
('IN LOCK ', 'Thread-1')
('IN LOCK ', '3')
('OUT LOCK', 'Thread-1', ' and reading packets from ', 'path to\\test.pcapng')
('OUT LOCK', '3', ' and reading packets from ', 'path to\\test11.pcapng')
('IN LOCK ', '1')
('Wrting Packets to pcap ', '3')
('Wrting Packets to pcap ', 'Thread-1')

编辑 2:我在长度检查之前锁定了队列,一切正常。

谢谢你。

标签: pythonmultithreadingscapy

解决方案


您正在为每个数据包创建一个线程。这是根本问题。

此外,您正在对每个已处理的数据包执行 I/O 步骤,而不是写入一批数据包

您的 PC 上可能有 1-10 个内核。对于您正在处理的数据包计数,创建 1000 多个线程的开销超过了每个内核的并行度值。有一个非常快的收益递减规律,运行线程多于可用内核。

这是一种更好的方法,您将在其中意识到并行性的好处。

主线程创建一个全局队列和锁以供后续线程共享。在创建任何线程之前,主线程会枚举*.pcapng文件列表并将每个文件名放入队列中。它还读取 IP 地址列表以及用于过滤数据包。

然后产生N个线程。其中 N 是设备上的内核数 (N = os.cpu_count())。

每个线程进入一个锁,将下一个文件从主线程建立的队列中弹出,然后释放锁。然后线程将文件读入packets列表并删除它不需要的文件。然后保存回一个单独的唯一文件,该文件代表原始输入文件的过滤结果。理想情况下,pktdump 对象支持一次写回超过 1 个数据包,因为批处理 I/O 操作可以节省大量时间。

线程处理完单个文件后,重新进入锁,从队列中弹出下一个文件,释放锁,对下一个文件重复处理。

当文件名队列为空时,线程退出。

主线程等待所有 N 个线程完成。现在你有了一整套要合并的 K 文件。您的主线程只需要重新打开由线程创建的这些 K 个文件,并将每个文件连接回单个输出文件。


推荐阅读