首页 > 技术文章 > pyqt5 threading 和queue和signal 以及子域名的爆破的各种问题

Triomphe 2020-04-19 02:32 原文

问题描述

这几天想写一个子域名爆破的程序,已经实现了控制台的所有逻辑代码运行正常,但是一直想不到怎么把程序放入到Ui中去.

目前已经可以运行正常了.写一下这期间遇到过的所有的坑.

各种解决思路

问题1 我要实现的是一个线程读取文件,其他多个线程从文件中去拿关键字,然后去http请求主要是文件有132w行,如何解决呢.

想了一下,其实这就是生产者消费者模型,读取文件中的每行添加到共享区域,然后多个线程从共享区域取数据,但是要实现的是线程安全的.百度找到了一个很好的方法.就是python中的Queue队列,因为Queue队列是线程安全的.大致代码思路

q=queue.Queue(30)
for i in range(10):
    t=threading.Thread(target=test2(q))
    t.start()
t2=threading.Thread(target=test(q))
t2.start()
#从队列中拿去数据,然后去http请求
def test(q):
    args=q.get()
    dosomething(args)
#向队列中放入数据
def test(q):
    q.put('test')

这其中呢还有一些细节问题.Queue.get(block=True, timeout=None),队列如果为空,get就会阻塞,所以我设置了20秒的等待时间.到了还没有反应就判断是程序执行完毕.

问题2,我的多线程模块可能要在很多爆破的地方使用,这样写冗余代码很没有意思,所以我就要实现自己的MyThread类(虽然最后没有用上)

class MyThread(threading.Thread):
def __init__(self,queue,func):
    threading.Thread.__init__(self)
    self.queue=queue
    self.func=func
    self.thread_stop=False

def run(self):
    while not self.thread_stop:     #为了跳出循环,停止线程
        try:
            #args为func的参数.
            args= self.queue.get(block=True,timeout=20)
            # print(args)
            try:
                self.func(*args)
                self.queue.task_done()
            except Exception as e:
                logging.debug('多线程尝试执行函数失败')
                self.queue.task_done()
        except Exception as e2:
            print('多线程尝试获取队列数据失败')
            break

def stop(self):
    self.thread_stop = True

func就是函数名字.线程每次从队列中获取参数,然后执行func函数.因为队列中获取的数据是不知道多少的,所以使用args.在self.func(*args)就能传入所有的参数.这样MyThread类就写好了.

问题3当我实现了控制台子域名爆破,然后把它加入到qt中的时候就异常退出,还没有错误显示.(使用try except可以快速找到错误)

后来发现时我的这行代码with open(filename,'r',encoding='utf-8') as f:出现了问题,报错说显示没有这个文件.当时的情况是这样子的.

#a.py文件
class Scan():
    dosomething():
        with open('./b.txt','r',encoding='utf-8') as f:
    start()
if __name__ == '__main__':
    a=Scan()
    a.start()

这样子我的程序执行时成功的.文件我使用的是相对路径.a.py和b.txt是在同一个目录的.当我把程序重新放到qt中去.

#a.py
class Scan():
    dosomething()
    start()
#c.py
if __name__ == '__main__':
    a=Scan()
    a.start()
然后他就报错了.因为c.py和a.py没在同一个目录下.然后open函数默认的相对路径是用的main函数的路径,所以就找不到文件.后面就用了绝对路径
```python
root_path = os.path.abspath(os.path.dirname(__file__)).split(根目录名)[0]
#找到根目录的绝对路径,然后错误解决了.
```

问题4我这个类发送的signal信号,主窗口怎么接收到呢.以及怎么实现的问题.

两个类,在不同的两个Python文件中传递信号
在文件B中定义B类,B类中没有main函数

class B(QtCore.QObject):
    signal = pyqtSignal(str)
    def __init__(self):
        pass
    def emit_signal(self):        
        self.signal.emit('xxx')
在文件A中定义A类,A类中有主函数,同时在A类中声明B类,B定义的窗口调用show()方法,显示

class A():
    def __init__(self):
        pass
    def open(self):        
        b=B()        
        #将信号槽连接起来
        b.signal.connect(self.get_text)
    def get_text(self,strings):
        #获取到B的函数发送的数据
        print(strings)

if __name__ == '__main__':
    a= A()
    a.show()

这里很重要的就是B类必须继承QtCore.QObject,否则就会报错.因为B类不是Pyqt5中的类,所以不能使用信号,也就需要继承QObject

  • 想法1 其实qt自带了QThread类,可以实现多线程.这是我写完程序才发现的.然后为了程序解耦,所以就用了threading的Thread类.
  • 想法2 最好使用logging类,给程序加上日志

问题7 2020-4-19 17:38:38 又出现了很多问题.比如信号槽其实不能用该传输大量的数据,否则会卡死.所以还是需要使用生产者消费者模型 但是又不是一个普通的生产者消费者模型.

大致思路就是这样子.

又写了新的实现代码

import threading
import queue
import BruteForce
import logging


class MyProducer(threading.Thread):
    def __init__(self,queue_in,domain,fileName):
        super(MyProducer, self).__init__()
        self.q=queue_in
        self.domain=domain
        self.file=fileName

    def run(self):
        while True:
            try:
                with open('subdomain_wordlist','r') as f:
                    for row in f:
                        row =row.replace('\n','')
                        text=str(row)+'.'+self.domain
                        self.q.put(text)
                f.close()
            except Exception as e:
                logging.error(e)

class ShowInfo(threading.Thread):

    def __init__(self,queue_out):
        super(ShowInfo, self).__init__()
        self.qout=queue_out 
    def run(self):
        while True:
            res=self.qout.get() #从队列2中拿到数据,展示结果.
            print(res)


class MyConsumer(threading.Thread):
    def __init__(self,queue_in,queue_out):
        super(MyConsumer, self).__init__()
        threading.Thread.__init__(self)
        self.qin=queue_in
        self.qout=queue_out
        self.thread_stop=False

    def run(self):
        while True:
            try:
                args= self.qin.get(block=True,timeout=20)#队列1拿走数据
                res=BruteForce.bf_subdomain(args)# 处理数据
                if res:
                    self.qout.put(res)#放入队列2中
                self.qin.task_done()
            except Exception as e2:
                print('多线程尝试获取队列数据失败')
                break

    def stop(self):
        self.thread_stop = True

if __name__ == '__main__':
    qin =queue.Queue(40)
    qout =queue.Queue(40)
    for i in range(int(30)):
        t=MyConsumer(qin,qout)
        t.start()
    t3=ShowInfo(qout)
    t3.start()
    t2=MyProducer(qin,'baidu.com','test')
    t2.start()

    print('多线程完成了')

问题8 继续报错,莫名退出程序,debug看不到正确的答案.重新架构代码.

1. 分离UI 和逻辑
2. 出现问题的原因可能是多线程竞争singal,导致死锁问题.
3. 主UI假死的原因可能是逻辑代码没有全部扔到子线程里面去的原因.

找到了一个可能正确的答案. tCore.QThread是一个管理线程的类,当我们使用其构造函数的时候,便新建了一个线程。这里要强调,QThread是一个线程管理器,不要把业务逻辑放在这个类里面,Qt的作者已经多次批评继承QThread类来实现业务逻辑的做法。

果然作者永远是最牛逼的.思想牛逼

self.worker=Worker() #逻辑封装到这里面
self.thread=QtCore.Qthread()
self.worker.moveToThread(self.thread)

这个逻辑果然很厉害,解决我的主UI假死问题.

  1. 一直把一个队列放在主线程中,这个可能是主线程假死的另外一个原因.应该吧子线程的所有逻辑和主线程分离开.然后通过信号槽通信.
    所以最后的逻辑

  1. 2020-4-20 01:21:50
    非常重要的几行代码
self.thread=QThread()  #线程管理器
self.worker=Worker()
self.worker._signal.connect(self.sds_showInfo2) #signal绑定
self.worker.moveToThread(self.thread) #把逻辑代码放入子线程
self.thread.started.connect(self.worker.work)
self.thread.finished.connect(self.worker.stop)
self.thread.start() #启动线程.

主线程结束了,子线程依旧运行的额问题.

通过Thread 中的daemon属性来设置.self.thread.setDaemon(True)成功解决问题.

推荐阅读