python - 芹菜正确的任务组链
问题描述
我有一个字符串列表,它们是文件名:
chunks_list = [["file_1", "file_2"], ["file_3", "file_4", "file_5"], ...]
我需要在 celery 任务中处理这些文件,所以我有一个任务:
@celery_app.task
def process_file_task(filename):
# do some staff with file
# (e.g. produce data to Kafka)
我需要并行运行文件的内部列表。并且外部列表应该是顺序的。
处理方式应该是下一个:
worker1: file_1
worker2: file_2
下一组在第一组完成后开始:
worker1: file_3
worker2: file_4
worker3: file_5
我试图以这种方式运行我的任务:
sequence_tasks = []
for chunks in chunks_list:
sequence_tasks.append(
group([process_file_task.si(filename) for filename in chunks])
)
tasks_chain = chain(
tasks_group for tasks_group in sequence_tasks
)()
tasks_chain.get()
问题是我可能有数千个文件。而且这种运行任务的方式会消耗我所有的空闲内存,即使只有几百个。请告诉我如何以正确的方式设计我的工作流程,以免内存不足。
解决方案
你没有提到设置细节:
- 你的工人在哪里跑?码头工人?(k8s/ecs/..) ec2?
- 什么是工人并发?
- 每个文件的任务(在同一组内)是否必须在单独的工作人员上运行?
你有多少工人总是有一个大写的限制。假设您有 10 个工作人员,每个工作人员具有并发 1 和 1,000 个任务在一个组中。它们将几乎并行运行(因为您没有 1,000 名工人,只有 10 名)但这很好。Celery 将确保第二组的任何任务都不会开始,直到它完成所有前 1000 个任务。如果运行每个任务所需的时间相等,则每个工作人员将处理 100 个任务,然后将继续进行下一个块。如果花费的时间不同,Celery 将通过将下一个任务分配给下一个空闲工作人员来进行优化。
换句话说,您知道设置是什么(假设具有 16GB 和 8 个内核的 EC2 实例)。您还知道(我希望)单个任务可以占用的最大内存大小是多少。如果一个平均任务占用 1GB RAM,您最多可以运行 16 个并行度为 1 的任务或 6 个并行度为 2 的任务等)
推荐阅读
- c# - Android:通过蓝牙将元数据从媒体播放器应用程序发送到汽车音响阻止广播接收器播放下一首歌曲或暂停音乐
- java - 增加彩信图片的宽度和高度
- javascript - 65536 处的 getSelection() 错误
- python - 获取在 Python 中调用方法的对象
- python - Matplotlib:所有组的堆积面积图
- c# - 如何在具有 MVC 5 的类库项目中使用 Entity Framework 6 Code First
- cron - 每个月按不同时区的国家/地区运行定期任务
- apache-kafka - Kafka ACL 导致主题复制失败
- c# - 在包含许多项目的 Excel 中查找总和为 0 的值
- android - 无法在 Android Studio 之外安装基于 CLI 构建的应用程序 - “删除失败”错误/base.apk 代码缺失