首页 > 解决方案 > Python Proxy Scraper / Checker 添加多线程麻烦

问题描述

我设法拼凑了一个代理刮板/检查器,它确实有效,但速度很慢。我听说添加线程可以加快进程,这已经超出了我的能力,我想知道是否有人可以帮助我展示如何在代码中实现线程。我读到线程库包含在 python 中,我试图添加它,但它似乎创建了第二个线程,做的完全一样,所以它只是通过相同的代理列表同时保存重复项。这是代码。

import requests
from bs4 import BeautifulSoup
from random import choice
import threading
import time
    
stop_flag = 0
    
def get_proxies():
    link = 'https://api.proxyscrape.com/?request=displayproxies&proxytype=all&timeout=5000&country=all&anonymity=all&ssl=no'
    other = 'https://www.proxy-list.download/api/v1/get?type=http'
    get_list1 = requests.get(link).text
    get_list2 = requests.get(other).text
    soup1 = BeautifulSoup(get_list1, 'lxml')
    soup2 = BeautifulSoup(get_list2, 'lxml')
    list1 = soup1.find('body').get_text().strip()
    list2 = soup2.find('body').get_text().strip()
    mix = list1+'\n'+list2+'\n'
    raw_proxies = mix.splitlines()
    t = threading.Thread(target=check_proxy, args=(raw_proxies,))
    t.start()
    time.sleep(0.5)
    return check_proxy(raw_proxies)

def check_proxy(proxies):
    check = 'http://icanhazip.com'       
    for line in proxies:

        headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0","Accept-Encoding": "*","Connection": "keep-alive"}
        try:    
            response = requests.get(check, proxies={'http': 'http://'+line}, headers=headers, timeout=5)
            status = response.status_code

            outfile = open('good_proxies.txt', 'a')
            if status is 200:
                print('good - '+line)
                outfile.write(line+'\n')
            else:
        
                pass
        except Exception:
            print('bad  - '+line)
    outfile.close()        

get_proxies()

标签: pythonmultithreading

解决方案


以下应该运行更快。最好在主线程中完成所有文件的写入和打印,并让工作线程简单地返回结果:

import requests
from bs4 import BeautifulSoup
from random import choice
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial

stop_flag = 0

def get_list(session, url):
    get_list = session.get(url).text
    soup = BeautifulSoup(get_list, 'lxml')
    return soup.find('body').get_text().strip()


def get_proxies(session, executor):
    link = 'https://api.proxyscrape.com/?request=displayproxies&proxytype=all&timeout=5000&country=all&anonymity=all&ssl=no'
    other = 'https://www.proxy-list.download/api/v1/get?type=http'
    lists = list(executor.map(partial(get_list, session), (link, other)))
    mix = lists[0] + '\n' + lists[1] + '\n'
    raw_proxies = mix.splitlines()
    with open('good_proxies.txt', 'a') as outfile:
        headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0","Accept-Encoding": "*","Connection": "keep-alive"}
        session.headers.update(headers)
        futures = {executor.submit(partial(check_proxy, session), proxy): proxy for proxy in raw_proxies}
        for future in as_completed(futures):
            proxy = futures[future]
            is_good = future.result()
            if is_good:
                print('good -', proxy)
                outfile.write(proxy + '\n')
            else:
                print('bad -', proxy)


def check_proxy(session, proxy):
    check = 'http://icanhazip.com'
    try:
        response = session.get(check, proxies={'http': 'http://'+proxy}, timeout=5)
        status = response.status_code
        return status == 200
    except Exception:
        return False


N_THREADS=100
with requests.Session() as session:
    with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
        get_proxies(session, executor)

部分输出:

bad - 104.40.158.173:80
bad - 47.105.149.144:3128
good - 185.220.115.150:80
bad - 138.197.157.32:8080
bad - 138.197.157.32:3128
good - 116.17.102.174:3128
good - 183.238.173.226:3128
good - 119.8.44.244:8080
good - 1.174.138.125:8080
good - 116.17.102.131:3128
good - 101.133.167.140:8888
good - 118.31.225.11:8000
good - 117.131.119.116:80
good - 101.200.127.78:80
good - 1.70.67.175:9999
good - 116.196.85.150:3128
good - 1.70.64.160:9999
bad - 102.129.249.120:3128
bad - 138.68.41.90:3128
bad - 138.68.240.218:3128
good - 47.106.239.215:3328
good - 183.162.158.172:4216
bad - 138.68.240.218:8080
good - 115.219.131.244:3000
bad - 138.68.161.14:3128
good - 185.49.107.1:8080
bad - 134.209.29.120:8080

解释

速度的提高主要是使用线程,其次是使用包Session object提供的a requests,其主要优点是如果您向同一主机发出多个请求,则将重用相同的 TCP 连接。

Python 提供了两种线程池机制,(1) 未记录的multiprocessing.pool.ThreadPool类,它与multiprocessing.pool.Pool用于创建子进程池的类共享相同的接口;(2) 来自模块的ThreadPoolExecutor类,它与来自模块的类concurrent.futures共享相同的接口ProcessPoolExecutor相同的模块,用于创建处理器池。此代码使用ThreadPoolExecutor该类。

线程是轻量级的并且创建起来相对便宜,典型的台式计算机可以支持数千个。然而,一个给定的应用程序,取决于它正在做什么,可能不会通过创建超过某个最大值的线程来获利。线程只适用于非 CPU 密集型的“作业”。也就是说,它们经常放弃 CPU 以允许其他线程运行,因为它们正在等待 I/O 操作或 URL 获取请求完成。这是因为 Python 字节码不能在多个线程中并行运行,因为 Python 解释器会在执行字节码之前获得全局解释器锁 (GIL)。

创建一个ThreadPoolExecutor实例(分配给变量executor),使用max_workers参数指定池中的线程数。这里我比较随意地指定了 100 个线程。您可以尝试增加它并查看它是否可以提高性能。该ThreadPoolExecutor实例有两种方法可以用于将“作业”或“任务”提交到线程池以执行。请参阅concurrent.futures 文档。Functionmap类似于内置map函数,因为它返回一个迭代器,该迭代器将一个函数应用于其可迭代结果的每个项目,从而产生结果。不同之处在于,现在将通过将每个调用作为“作业”提交给线程池来同时进行函数调用。raw_proxiesget_list它负责检索单个 URL:

def get_list(session, url):
    get_list = session.get(url).text
    soup = BeautifulSoup(get_list, 'lxml')
    return soup.find('body').get_text().strip()

我现在想同时为每个 URL 调用这个函数,所以我想使用可迭代参数是 URL 列表的map函数。问题是只会将单个参数(每个调用的迭代的每个元素)传递给工作函数,但我也想传递参数。我本可以将变量分配给全局变量,但还有另一种方法。创建另一个函数,当被调用时,它的第一个参数“硬编码”被调用,所以我在调用中使用这个新函数:mapsessionsessionfunctools.partial(get_list, session)get_listsessionmap

lists = list(executor.map(partial(get_list, session), (link, other)))

我将调用返回的可迭代对象map转换为list我以后可以索引的对象。

可以用来将作业提交到线程池的另一种方法称为submit。它将工作函数和工作函数的参数作为参数,并立即返回 a 的实例,Future而无需等待作业完成。您可以将多种方法应用于此Future实例,最重要的一种是result,它会阻塞直到作业完成并从工作函数返回返回值。我可以很容易地map再次使用该函数将 .raw_proxies作为可迭代参数传递,然后遍历来自对 . 的调用的返回值map。但我会按照提交的顺序(即它们出现在raw_proxies列表)。这可能还不算太糟糕,因为在所有“工作”都完成之前,程序不会完成。但是,如果您不需要按特定顺序输出结果,则在作业完成后立即处理作业结果的效率会稍高一些,而与提交顺序无关。该submit函数返回一个Future实例,提供了这种灵活性:

我将每个代理 IP 作为单独的作业单独提交,并将结果存储Future在字典中作为键,其值是用于创建作业的 IP 值。我使用字典理解通过单个语句完成所有操作:

futures = {executor.submit(partial(check_proxy, session), proxy): proxy for proxy in raw_proxies}

然后,我使用由concurrent.futures,提供的另一种方法as_completed来遍历字典的所有键值,即 Futures,以Future在完成时返回每个实例,然后查询Future作业的返回值,该值将是TrueFalse

    for future in as_completed(futures):
        proxy = futures[future]
        is_good = future.result()
        if is_good:
            print('good -', proxy)
            outfile.write(proxy + '\n')
        else:
            print('bad -', proxy)

推荐阅读