首页 > 技术文章 > 爬虫里的多线程基本使用

cheflone 2021-04-01 19:37 原文

最近拜读瑞安·米切尔的书关于并行抓取问题有很通俗的介绍:

  “网页抓去的速度很快,起码通常比雇佣几十个实习生手动网上复制数据要快很多。当然随着技术的不断进步和享乐适应,人们还是在某个时刻觉得‘不够快’,于是把目光转向分布式计算。

 和其他领域不同的是,网页抓取不能单纯依靠‘给问题增加更多进程’来提升速度,虽然运行一个process很快,但是两个进程未必能让速度提升一倍,而当运行三个乃更多时,可能你的所有请求都会被远程服务器封杀,因为他认为你是在恶意攻击。”

然而,某些场景里使用网页并行抓取或者并行线程(thread)/进程仍然有些好处:

 

1.从多个数据源(多个远程服务器),而不只是在一个数据源收集数据。

2.在已经收集到的数据上执行更加复杂/执行时间更长的操作(例如图像分析或者OCR处理)。

3.从大型web服务器收集数据,如果你已经付费,或者创建多个连接是使用协议允许的行为。

 

一.pythom3.x 使用的是_thread,而thread模块已经废弃

1.1下面用_thread实现一个小例子

 

 1 import time
 2 import  _thread
 3 def print. time(threadName, delay, iterations):
 4 
 5     start = int(time . time())
 6 
 7     for i in range(0 ,iterations):
 8 
 9         time .sleep(delay)
10 
11         seconds_ elapsed = str(int(time.time()) - start)
12 
13         print ("[] []". format(seconds_ elapsed, threadName))
14     
15 try:
16     _thread.start_new_thread(print_time, ('Fizz', 3, 33))
17     _thread.start_new_thread(print_time, ('Buzz',5,20))
18     _thread.start_new_thread(print_time,('Counter',1,100))
19 except:
20 
21     print ('Error:unable to start_thread')
22 
23 while 1:
24 
25 pass                

上面的例子开启了三个线程,分别3,5,1秒打印不同次数的名字

 

1.2 threading 模块

 

 _thread是python相当底层的模块,虽然可以有详细的管理操作,但是苦于没有高级函数,使用起来不太方便。

threadign模块是一个高级接口,更加便捷使用线程,与此同时也体现了_thread模块的特性。

在threading与os模块里,分别调用了current_thread()和getpid() 该函数来获取当前的线程/进程号,能有更好的执行效果。

 1 import os
 2 import threading
 3 from threading import Thread
 4 
 5 import time
 6 
 7 userlist = ["黎明", "张学友", "刘德华"]
 8 
 9 
10 def work(n):
11     print(f"开始给{n}打电话,进程号:{os.getpid()},线程号是:{threading.current_thread()}")  # 这里加f 是字符串格式化,作用类似于format
12     time.sleep(3)
13     print(f"给{n}打电话结束。进程号:{os.getpid()}线程号是:{threading.current_thread()}")
14 
15 
16 if __name__ == '__main__':
17     # 单线程
18     # for d in userlist:
19     #     work(d)
20     plist = []
21     # # 循环创建多进程部门,类似增加
22     # for d in userlist:
23     #     # 进程对象
24     #     p = Process(target=work, args=(d,))
25     #     # 生成进程
26     #     p.start()
27     #     # 生成的进程加入列表
28     #     plist.append(p)
29     #
30     # # 阻塞终止进程的执行
31     # [i.join() for i in plist]
32 
33     # 多线程类似部门增加人手
34     for d in userlist: 
35         # 利用for来生成线程对象,然后再为它们传入你想要传入的数据(可以不一样)
36         p = Thread(target=work, args=(d,))
37         # 生成线程
38         p.start()
39         # 生成的线程加入列表
40         plist.append(p)
41 
42     # 阻塞终止线程的执行(join():创建的都是子线程。如果不加join().主线程运行完,程序就结束了,作用就是等待子线程运行完毕。)
43     [i.join() for i in plist]

你也可以单个的创建,threading的优势是创建的线程其他线程无法访问的线程局部数据(local thread data),这样的优势对于爬虫尤其明显,它们抓取不同的网站,那么每个线程都可以

专注于自己要抓取的目标。

import threading

def  crawler(url):
       data = threading.local()
       data.visited = []
       #爬去目标网站

threading.Thread(target=crawler, args=('http://www.douban.com')).start()

这样就解决了线程之间的共享对象导致竞争条件问题。目标很明显:不需要共享就不要共享(使用local data),为了安全的共享,就需要用到Queue模块(后面会有示例)

threading的保姆职责甚至可以达到高度定制:isAlive函数的默认行为查看是否有线程仍处于活跃状态。当一个线程崩溃或者重启后才会返回True:

threading.Thread(target=crawler)

t.start()

while True:
         time.sleep(1)
         if not t.isAlive:

t = Thread(target=crawler, args=(d,))

t.start()

这样可以达到简单的监控方法。

 

2021-04-01

19:36:13

1.3import threading

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import  os
import time

user_list = ['树花', '欧阳娜娜', '迪丽热巴', '鞠婧祎', '宋祖儿']

user_list1 = ['树花', '欧阳娜娜', '迪丽热巴', '鞠婧祎', '宋祖儿']


def work(n):
    print(f"开始给{n}打电话,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+'\n')
    time.sleep(2)
    print(f"给{n}打电话结束,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+'\n')

def work1(n):
    print(f"开始给{n}打电话,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+'\n')
    time.sleep(2)
    print(f"给{n}打电话结束,进程号是{os.getpid()},线程号是:{threading.current_thread()}"+'\n')


if __name__ == '__main__':
    '''
    创建线程池的时候规定三个线程,根据指派任务为三种情况:
    1.= 三个任务  理论上同时执行三个任务
    2.> 三个任务  先执行前三个任务,在这其中根据完成速度先后继续分配线程给后面的任务
    3.< 三个任务  同时执行分配的任务,无任务线程等待阻塞 
    '''
    # 创建线程池(进程池)
with ThreadPoolExecutor(max_workers=3) as pool:
    # pool = ProcessPoolExecutor(max_workers=3)
    # 循环指派任务和参数
    [pool.submit(work,user) for user in user_list]
    # [pool.submit(work1,[user]) for user in user_list1]

    # 关闭线程池(进程池)

    pool.shutdown(wait=True)
print('主线程完成')



1.3.1

      如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数,直接调用result函数结果。

 

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
    time.sleep(2)
    return 'finished'

def test_result(future):
    print(future.result())

if __name__ == "__main__":

    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
        future = threadPool.submit(test, i,i+1)
        future.add_done_callback(test_result) # 回调函数
        print(future.result())

    threadPool.shutdown(wait=True)
    print('main finished')

 

1.3.2 map()

from concurrent.futures import ThreadPoolExecutor

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
#     time.sleep(2)
    return '*******'

if __name__ == "__main__":

    args1 = [1,2]  #假设1,2分别代表一个url
    args2 = [3,4]

    with ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_") as threadPool:

        threadPool.map(test, args1,args2) # 这是运行一次test的参数,众所周知map可以让test执行多次,一个[]代表参数,这里会请求1,3和2,4,执行两次。

        threadPool.shutdown(wait=True)

执行结果:

test__0 threading is printed 1, 3
test__1 threading is printed 2, 4


    上面程序使用 map() 方法来启动 4个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来
才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
    通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 test() 函数,
但最后收集的 test() 函数的执行结果,依然与传入参数的结果保持一致。

2021-04-03

00:01:27

 

1.4 利用队列达到线程之间安全通信

 

这一个面向对象的简单爬虫,使用了上面提到的线程池和Queue。

import os
import threading
import time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import requests
from lxml import etree


class quibai():
    def __init__(self):
        self.base_url = "https://www.qiushibaike.com/imgrank/page/{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36", }
        self.queue_list_url = Queue()
        self.queue_parse_url = Queue()
        self.queue_content = Queue()

    def list_url(self):
        # return [self.base_url.format(i) for i in range(1,14)]
        for i in range(1, 14):
            url_s = self.base_url.format(i)
            self.queue_list_url.put(url_s)
            # print(self.queue_list_url.get())

    def get_imformations(self):
        while True:
            url = self.queue_list_url.get()

            response = requests.get(url=url, headers=self.headers)
            time.sleep(1)
            # return response.content.decode('utf-8')
            self.queue_parse_url.put(response.content.decode('utf-8'))
            self.queue_list_url.task_done()

    def analysis(self):
        while True:
            resp = self.queue_parse_url.get()
            res = etree.HTML(resp)
            item = {}
            picture = res.xpath('//div[@class="author clearfix"]//img/@src')
            item["picture"] = ["https:" + i for i in picture]
            name = res.xpath('//div[@class="author clearfix"]//h2/text()')
            item["name"] = [n.replace("\n", "") for n in name]
            # for n in name:

            title = res.xpath('//div[@class="content"]/span/text()')
            item["title"] = [t.replace("\n", "") for t in title]
            photo = res.xpath('//div[@class="thumb"]/a/img/@src')
            item["photo"] = ["https:" + i for i in photo]
            item["Fabulous"] = res.xpath('//span[@class="stats-vote"]/i/text()')
            item["comment"] = res.xpath('//span[@class="stats-comments"]//i/text()')

            gender = res.xpath('//div[@class="author clearfix"]/div/@class')
            for i in range(len(gender)):
                re = gender[i].split(" ")[1]
                if re == "womenIcon":
                    gender[i] = ""
                else:
                    gender[i] = ""
            item["gender"] = gender
            info = {}
            # # print(item['picture'])
            for name, gender, picture, title, photo, Fabulous, comment in zip(item['name'], item['gender'],
                                                                              item['picture'], item['title'],
                                                                              item['photo'], item['Fabulous'],
                                                                              item['comment']):
                info['name'] = name
                info['gender'] = gender
                info['picture'] = picture
                info['title'] = title
                info['photo'] = photo
                info['Fabulous'] = Fabulous
                info['comment'] = comment
            # return name, gender, picture, title, photo, Fabulous, comment
            self.queue_content.put(info)
            self.queue_parse_url.task_done()

    def save_data(self):
        if not os.path.exists("story_info"):
            os.mkdir("story_info")
        s = 1
        while True:
            item_list = self.queue_content.get()
            with open("story_info/test_{}.txt".format(s), 'w', encoding='utf-8') as f:
                f.writelines(str(item_list))
            s += 1
            self.queue_content.task_done()

    def go(self):
        """
        1.设置基础信息,链接数据库
        2.获取url列表
        3。获取请求
        4。get信息
        5.save data from 'def get' to mysql
        6.改造成线程
        :return:
        """
        # li_url = self.list_url()
        # print(li_url)
        # for j in li_url:
        # resp = self.get_imformations(j)
        # print(resp)
        # name, gender, picture, title, photo, Fabulous, comment =self.analysis(resp)
        # print(self.analysis(resp))
        thread_list = []
        # 在下面同时循环建立线程对象
        
         for j in range(3):
             t1 = threading.Thread(target=self.list_url)
             thread_list.append(t1)
        
             t2 = threading.Thread(target=self.get_imformations)
        
             thread_list.append(t2)
             t3 = threading.Thread(target=self.analysis)
        
             thread_list.append(t3)
             t4 = threading.Thread(target=self.save_data)
        
             thread_list.append(t4)
         for t in thread_list:
             t.setDaemon(True)  # 把子线程设置成守护线程,该线程不signifisant,当主线程结束,子线程结束
             t.start()
             print(t)
         for q in [self.queue_list_url, self.queue_parse_url, self.queue_content]:
             q.join()  # 让主线程队列先阻塞等待,当所有子线程完成后再完成。
             print(q)
         print("主线程结束")

        if __name__ == '__main__':
    start_time=time.time()
    star = quibai()
    star.go()
    end_time=time.time()
    print('执行完毕,花了:{}的时间'.format(end_time-start_time))

 

1.4.1  锁与线程同步

 

我上面的例子使用的了队列来实现了线程的同步和安全,而锁也可以实现共享资源的同步访问,特别在一些场景下有十分重要的应用,例如io操作、数据库查询,在博客总能看到大佬们对我这种的小白的调侃告诫 “毕加索”。 这里就不写

推荐阅读