python - 芹菜不并行执行块组
问题描述
我有块列表,它们是文件名
chunks_list = [["file_1", "file_2", "file_3"], ["file_4", "file_5", "file_6"], ...]
我有一个处理这些块的任务:
@celery_app.task
def process_files_task(files: List[str]):
for file in files:
logger.info(file)
# processing each file
我需要并行运行块,并且chunks_list
有 8 个块。所以 celery worker 并发也是 8。我运行我的任务是这样的:
chunks_group = process_files_task.chunks([(chunk,) for chunk in chunks_list], 8).group()
chunks_group.delay().get()
问题是所有任务仅由一名工人执行。这就是我在日志中看到的:
[INFO/ForkPoolWorker-4] file_1
[INFO/ForkPoolWorker-4] file_2
[INFO/ForkPoolWorker-4] file_3
[INFO/ForkPoolWorker-4] file_4
...
请告诉我我做错了什么让每个免费的分叉者都执行任务
解决方案
您可以设置worker_prefetch_multiplier
为 1。
一次预取多少消息乘以并发进程数。默认值为 4(每个进程 4 条消息)。然而,默认设置通常是一个不错的选择——如果您有很长时间运行的任务在队列中等待并且您必须启动工作程序,请注意,第一个启动的工作程序将收到最初四倍的消息数量。因此,任务可能不会公平地分配给工人。
更多细节在这里。
推荐阅读
- ios - 如何从 Callkit 最近的通话记录 iOS Objective-c 打开应用程序?
- angular - 如何测试角度服务工作者
- python - 使用 sympy 积分指数函数
- php - 如何在 nesk/puphpeteer 中设置代理?
- c# - 在 Azure 认知搜索中将 2 个 Azure SQL 表合并为 1 个索引
- javascript - Google App Script:如何将 PDF 转换为 GDOC 以获得 OCR?
- javascript - 为什么 event.defaultPrevented 未定义?
- java - 找不到类型的即时转换器:java.time.ZonedDateTime
- r - 在 R 中绘制有效/非缺失数据时出现 Inf/-Inf 错误
- c#-4.0 - 按升序排序