首页 > 技术文章 > Python爬虫之queue线程安全实战

Guishuzhe 2018-10-25 11:52 原文

1.普通下载

import requests
import os
import re
from lxml import etree
from urllib import request


def get_detail(url):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"
    }
    rep = requests.get(url, headers=headers)
    html = etree.HTML(rep.text)
    imgs = html.xpath('//div[@class="page-content text-center"]//img[@class!="gif"]')
    for img in imgs:
        img_url = img.get("data-original")
        # 获取图片名称
        img_name = img.get("alt")
        # 过滤特殊字符
        img_name = re.sub(r'[\??\.,。!!]', "", img_name)
        # 获取图片后缀名
        suffix = os.path.splitext(img_url)[1].split("!")[0]
        filename = img_name + suffix
        # 开始下载到本地
        request.urlretrieve(img_url, "imgs/" + filename)


def main():
    for i in range(1, 101):
        url = "http://www.doutula.com/photo/list/?page={}".format(i)
        get_detail(url)


if __name__ == '__main__':
    main()
View Code

2.开启queue多线程安全队列异步下载斗图图片

import requests
import os
import re
from lxml import etree
from urllib import request
from queue import Queue
import threading


class Producer(threading.Thread):
    """批量下载"""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"
    }

    def __init__(self, page_queue, img_queue, *args, **kwargs):
        # 找到Producer的父类Thread,然后把Producer的对象self转换为Thread的对象,调用父类(Thread)的__init__方法,实例化对象
        super(Producer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.img_queue = img_queue

    def run(self):
        while True:
            # 队列空为True,则break掉
            if self.page_queue.empty():
                break
            url = self.page_queue.get()
            self.get_detail(url)

    def get_detail(self, url):
        rep = requests.get(url, headers=self.headers)
        text = rep.text
        html = etree.HTML(text)
        imgs = html.xpath('//div[@class="page-content text-center"]//img[@class!="gif"]')
        for img in imgs:
            img_url = img.get("data-original")
            # 获取图片名称
            img_name = img.get("alt")
            # 过滤特殊字符
            img_name = re.sub(r'[\??\.,。!!\*]', "", img_name)
            # 获取图片后缀名
            suffix = os.path.splitext(img_url)[1].split("!")[0]
            filename = img_name + suffix
            # 以元组形式推送到队列中
            self.img_queue.put((img_url, filename))


class Consumer(threading.Thread):
    """批量存储"""
    def __init__(self, page_queue, img_queue, *args, **kwargs):
        # 继承同一个父类,拥有一样的方法和变量
        super(Consumer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.img_queue = img_queue

    def run(self):
        while True:
            # 因为是异步下载,所以需要两个都判断是否为空
            if self.img_queue.empty() and self.page_queue.empty():
                break
            # 获取队列中元组内数据
            img_url, filename = self.img_queue.get()
            request.urlretrieve(img_url, "imgs/" + filename)
            print(filename+"下载完成!")


def main():
    page_queue = Queue(100)       # 设置最大线程数量
    img_queue = Queue(1000)
    for i in range(1, 101):
        url = "http://www.doutula.com/photo/list/?page={}".format(i)
        page_queue.put(url)
    for i in range(5):
        # 开启五个下载线程
        t = Producer(page_queue, img_queue)
        t.start()

    for x in range(5):
        # 开启五个储存线程
        t = Consumer(page_queue, img_queue)
        t.start()


if __name__ == '__main__':
    main()
View Code

3.开启queue多线程安全队列异步下载百思不得姐段子到本地csv

"""queue安全多线程下载百思不得姐内容到csv"""
import requests
import threading
import csv
from queue import Queue
from lxml import etree


class Producer(threading.Thread):
    """生产者"""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"
    }

    def __init__(self, page_queue, content_url, *args, **kwargs):
        super(Producer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.content_url = content_url

    def run(self):
        while True:
            if self.page_queue.empty():
                break
            # 从队列中获取url
            url = self.page_queue.get()
            self.get_content(url)

    def get_content(self, url):
        rep = requests.get(url, headers=self.headers)
        text = rep.text
        html = etree.HTML(text)
        contents = html.xpath("//div[@class='j-r-list-c-desc']/a/text()")
        imgs = html.xpath("//div[@class='j-r-list-c-img']//img/@data-original")
        content_tags = zip(contents, imgs)
        for content, img_url in content_tags:
            self.content_url.put((content.strip(), img_url.strip()))


class Consumer(threading.Thread):
    """消费者"""
    def __init__(self, page_queue, content_url, *args, **kwargs):
        super(Consumer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.content_url = content_url

    def run(self):
        while True:
            if self.page_queue.empty() and self.content_url.empty():
                break
            content, img_url = self.content_url.get()
            # 获取的内容必须修改成csv支持的格式
            values = {(content, img_url)}
            with open("joke.csv", "a+", encoding="utf-8", newline="") as fp:
                writer = csv.writer(fp)
                # 写入多个数据
                writer.writerows(values)


def main():
    page_queue = Queue(100)          # 最大线程数量
    content_url = Queue(500)
    for i in range(1, 51):
        url = "http://www.budejie.com/{}".format(i)
        # 推送到队列中
        page_queue.put(url)
    # 开启五个线程下载
    for x in range(5):
        t = Producer(page_queue, content_url)
        t.start()
    # 开启五个线程储存
    for j in range(5):
        t = Consumer(page_queue, content_url)
        t.start()


if __name__ == '__main__':
    main()
View Code

 

推荐阅读