首页 > 解决方案 > 每个芹菜工人的定期任务

问题描述

我正在寻找一种方法来对每个 celery worker执行特定类型的任务。确切的用例是一个周期性的健康作业,它确保成功的任务初始化和进展的各种先决条件(这些指标报告给不同的服务)。例如,确保可以建立与 DB 的连接。

我发现远程控制和检查命令可用于该目的(有一些固定的调度),但当 AWS SQS 用作后端代理时,它们不受支持。

知道如何在不向分叉进程任务添加任何内存占用的情况下实现这一目标吗?也许通过在工作进程中启动另一个线程?

标签: pythoncelery

解决方案


为了解决这个问题,我使用了Celery custom worker bootstep。它在启动时注册一个计划任务,每 X 秒将一个健康检查任务省略到工作执行池中。

该解决方案与后端代理无关,并利用定制的工作执行池。

class WorkerHealthMonitor(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer',
                'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        self.tref = None
        self.interval = 60

    def start(self, worker):
        logger.info("Registering health monitor timer with %d seconds interval", self.interval)
        self.tref = worker.timer.call_repeatedly(
            self.interval, schedule_health_check, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None


def schedule_health_check(worker):
    worker.pool.apply_async(health_check, callback=health_check_completed)


def health_check(**kwargs):
    logger.info('Running Health Check...')
    return 'I am alive'


def health_check_completed(result):
    logger.info('Health check completed with msg: %s', result)

任务注册:

app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
app.steps['worker'].add(WorkerHealthMonitor)

推荐阅读