首页 > 技术文章 > celery执行池概览

tracydzf 2022-01-04 14:17 原文

执行池概念

当运行类似 如下命令启动一个celery进程时,其实启动的是一个管理进程,此进程不处理实际的任务,而是产生的子进程或线程 去处理具体任务;那么这些 子进程或线程 在一起就叫做 执行池;

celery worker --app=worker.app

执行池的大小(子进程或线程的个数) 决定了 celery可以并发执行任务的个数;如果想尽可能快和多的执行任务,那么 增加执行池的大小 是个可以考虑的解决方案;

选择不同的执行池工作方式

Prefork

celery默认的执行池工作方式,是多进程的执行池,一般在 计算密集型任务中使用,能充分利用cpu多核; 如果不指定 –concurrency(并发进程个数)参数,则无论什么执行池都是尽可能多的使用cpu的核数;

Solo

这种执行池有点特殊,因为此模式在处理任务时 直接在 管理进程中进行; 不是基于进程或线程的工作模式; 也就是一直只有一个 消费者 在处理任务;上个任务不结束,则下个任务就会阻塞; 但是在微服务中 比如 在使用k8s 进行 docker部署时,可以使用此模式,这样 k8s直接通过观察启动多少个docker容器 就能知道启动了多少个celery消费者;

Eventlet/Gevent

2个都是基于协程 的执行池,一般在 IO密集型任务中使用,如频繁的网络请求,数据库操作等等;

gevent是对eventlet的高级封装,一般使用时 用 gevent,因为此包有monkey.patch_all()方法将 所有能转为协程的地方都转为协程,从而增加处理能力;

此模式指定--concurrency(并发数) 参数值可以比CPU核数多,理论上 只要内存不爆,套接字够用,就可以增加;

实际使用

需求: 多任务 从mysql查询将 数据导入到mongo中;且任务间没有依赖关系

实现:使用celery+gevent实现任务并发;

用法:此任务为IO密集型任务(查询mysql,并发导入mongo[业务端只需将数据发送给mongo,其内部会实现并发处理]),且每个任务内存消耗均有一些;因此使用gevent,且设置并发数不能过多,否则可能出现内存不足情况;

"""
没有 celery定时任务,所以不需要 心跳服务
此次使用 基于协程的 gevent作为 执行池(execution pool);因为 瓶颈在于 同步到mongo的IO上;
本地测试时 可将 --pool=gevent 改为 --pool=solo 使worker顺序消费任务方便debug;
"""
from gevent import monkey
 
# 使用gevent注意调用此方法
monkey.patch_all()
import os
from app import create_app
from app.core import celery
 
app = create_app()
 
if __name__ == '__main__':
    os.environ["START_METHOD"] = "worker"
    # 本地开发调试时,注意将 --pool 值改为 solo
    celery.start(
        ["celery", "-A", "worker.celery", "worker", "--pool=gevent", "--loglevel=INFO", "-E", "-n", 'node_name_xxx',
         "-Q", "queue_name_xxx"])

 

推荐阅读