首页 > 解决方案 > 在并行处理中使用对象并“重新启动”这些对象

问题描述

目标

我想从网页中检索信息(并行)。一旦其中一个“爬虫”找到我们正在寻找的结果,我们就会终止,如果没有,我们刷新我们刚刚查看的页面并再次搜索。换一种说法:


代码端

刮痧班

让我们首先定义Scraper对象:

import random
import time
import multiprocessing

# Result simulation array, False is much more likely than True
RES = [True, False, False, False, False, False, False, False, False, False, False, False, False]

class Scrape:

    def __init__(self, url):
        self.url = url
        self.result = None

    def get_result(self):
        return self.result

    def scrape(self):
        # Go to url
        # simulate work
        time.sleep(random.randrange(5))
        # simulate result
        result = RES[random.randrange(13)]
        # Return what we found on the page
        self.result = result

    def restart(self):
        # >> Some page refreshing
        self.scrape()

所以我们去网页做一些工作(scrape)然后我们可以通过访问结果get_result,如果这不是我们想要的,我们可以调用restart。请注意,实际上这个类要复杂得多,因此重新创建它会浪费启动驱动程序(与通过 重用同一类相比restart

并行代码

这是我卡住的地方,虽然我使用map了数百次,但我不知道如何保留Scrape对象并再次调用restart并将它们添加到池中。我在想这样的事情,但这并不像我想要的那样工作。也许队列是一个更好的方法,但我不熟悉它。

# Function to create the scrapers
def obj_create(url):
    print('Create')
    a = Scrape(url)
    a.scrape()
    return a

# Function to restart the scraper
def obj_restart(a):
    print('Restart')
    a.restart()
    a.scrape()
    return a

# Callback
def call_back(scrape_obj):
    if scrape_obj.get_result():
        pool.terminate()
        # Also somehow return the result...
    else:
        # Restart and add again
        pool.apply_async(obj_restart, scrape_obj, callback=call_back)


pool = multiprocessing.Pool(3)
url = 'test.com'
delay = 0.001
n = 3

for i in range(n):
    time.sleep(delay)
    pool.apply_async(obj_create, url, callback=call_back)

pool.close()
pool.join()

我尽力制作这个可重现的示例,并尽我所能解释它,但如果有任何不清楚的地方,请告诉我!

标签: pythonperformanceasynchronousparallel-processing

解决方案


Python

在平行下

似乎您从未听说过GIL?如果是这样,我很抱歉让你失望了,但你不能因此得到真正的并行性。

比那更多的。在您描述的情况下..您不需要一个。因为大多数时候你的线程会被网络接口阻塞来回复。因此,您需要一个好的异步框架,即事件循环,它将优化您的执行,从而将阻塞减少到可能的最低限度。

让我介绍一下aiohttp。它为您提供了正确实现的 HTTP(s) 调用,而出色的asyncio为您提供了取消、重试和处理错误的简洁机制。

特别是,看看收集

PSmultiprocessing将使事情并行,但由于任务的 IO 绑定性质,它在这里有点过分了。你可以留在同一个进程中,避免所有的多处理地狱,只需拒绝这种并行性,这两种方式在大多数情况下都会被阻止。


推荐阅读