首页 > 解决方案 > 在 Celery 链中并行运行结果列表

问题描述

这是我要实现的逻辑:

1. Get a list of URLs by scraping a home page
2. Get, scrape and store a sublist of URLs in parallel by visiting each link in above list

我首先尝试创建一个主任务,它首先从主页抓取所有 URL,然后在 for 循环中获取子 URL:

@task
def master_task():
    urls = scrape_list_of_urls()
    job = group([scrape_url_and_save.s(url) for url in urls])  # scrape_url takes around 200ms each URL, and there are thousands of URLs. Hence I want it to run in parallel
    result = job.apply_async()
    result.join()
@task
def scrape_url_and_save(url):
    save_to_db(contents_of_url_by_scraping)
...
def scrape(request):  # In Django
    master_task.delay()
    ...

但这会导致错误:

RuntimeError:永远不要在任务中调用 result.get()!

我在 Django 应用程序中使用 Celery 4。这master_task必须是一项任务,因为我不希望用户在抓取主页时等待。我不确定我的代码逻辑是否正确。一个更好的逻辑将不胜感激。

标签: pythoncelerypython-3.7django-celery

解决方案


这是实现所需工作流程的方法 - 您的主任务应该返回一组子任务,例如

@app.task(bind=True)
def master_task(self):
    urls = scrape_list_of_urls()
    job = group((self.app.signature('tasks_module.scrape_url_and_save', (url,)) for url in urls))  
    # run scrape_url_and_save in parallel e.g. using gevent/eventlet worker pool
    return job.delay()

推荐阅读