python - 在 Celery 链中并行运行结果列表
问题描述
这是我要实现的逻辑:
1. Get a list of URLs by scraping a home page
2. Get, scrape and store a sublist of URLs in parallel by visiting each link in above list
我首先尝试创建一个主任务,它首先从主页抓取所有 URL,然后在 for 循环中获取子 URL:
@task
def master_task():
urls = scrape_list_of_urls()
job = group([scrape_url_and_save.s(url) for url in urls]) # scrape_url takes around 200ms each URL, and there are thousands of URLs. Hence I want it to run in parallel
result = job.apply_async()
result.join()
@task
def scrape_url_and_save(url):
save_to_db(contents_of_url_by_scraping)
...
def scrape(request): # In Django
master_task.delay()
...
但这会导致错误:
RuntimeError:永远不要在任务中调用 result.get()!
我在 Django 应用程序中使用 Celery 4。这master_task
必须是一项任务,因为我不希望用户在抓取主页时等待。我不确定我的代码逻辑是否正确。一个更好的逻辑将不胜感激。
解决方案
这是实现所需工作流程的方法 - 您的主任务应该返回一组子任务,例如
@app.task(bind=True)
def master_task(self):
urls = scrape_list_of_urls()
job = group((self.app.signature('tasks_module.scrape_url_and_save', (url,)) for url in urls))
# run scrape_url_and_save in parallel e.g. using gevent/eventlet worker pool
return job.delay()
推荐阅读
- android - Android安装后会自动删除obb扩展文件吗?
- java - 重新声明抽象方法公开的规则?
- postgresql - 使用 pg_dump 导出 postgres db 不起作用,数据库“db_name”不存在
- ibm-watson - IBM Watson Dialog 节点响应未到来
- javascript - 如何将项目推送到 Vuejs 中数据对象的数组中?Vue 似乎没有关注 .push() 方法
- c# - 将内部版本号应用于程序集的 VSTS 问题
- arrays - 如何将 PNG 生成的字节列表转换为每个像素的 RGB 值?
- neo4j - Neo4J 如何聚合和统计相同的查询结果?
- mongodb - 使用双嵌套元素数组进行查询
- google-apps-script - 如何将邮件正文的文本保存到 Google Docs