python - 每个芹菜工人的定期任务
问题描述
我正在寻找一种方法来对每个 celery worker执行特定类型的任务。确切的用例是一个周期性的健康作业,它确保成功的任务初始化和进展的各种先决条件(这些指标报告给不同的服务)。例如,确保可以建立与 DB 的连接。
我发现远程控制和检查命令可用于该目的(有一些固定的调度),但当 AWS SQS 用作后端代理时,它们不受支持。
知道如何在不向分叉进程任务添加任何内存占用的情况下实现这一目标吗?也许通过在工作进程中启动另一个线程?
解决方案
为了解决这个问题,我使用了Celery custom worker bootstep。它在启动时注册一个计划任务,每 X 秒将一个健康检查任务省略到工作执行池中。
该解决方案与后端代理无关,并利用定制的工作执行池。
class WorkerHealthMonitor(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer',
'celery.worker.components:Pool'}
def __init__(self, worker, **kwargs):
self.tref = None
self.interval = 60
def start(self, worker):
logger.info("Registering health monitor timer with %d seconds interval", self.interval)
self.tref = worker.timer.call_repeatedly(
self.interval, schedule_health_check, (worker,), priority=10,
)
def stop(self, worker):
if self.tref:
self.tref.cancel()
self.tref = None
def schedule_health_check(worker):
worker.pool.apply_async(health_check, callback=health_check_completed)
def health_check(**kwargs):
logger.info('Running Health Check...')
return 'I am alive'
def health_check_completed(result):
logger.info('Health check completed with msg: %s', result)
任务注册:
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
app.steps['worker'].add(WorkerHealthMonitor)
推荐阅读
- java - Websocket 通信与 Netty 环境
- mysql - 如果列具有特定数据,mysql将忽略所有行
- tensorflow - TensorFlow 加载的模型无法在 GPU 上运行
- mongodb - 如何以最优化的方式从不同的集合中获取国家、州、城市名称
- python - 我如何创建一个刽子手计时器,以便它在 python 中每 10 秒扣除一次用户的分数?
- javascript - 如何在expo react native中将图像上传到firebase
- c++ - 是否有任何 C++ 函数可以对哈希表进行排序?
- android - 启用 PRODUCT_FULL_TREBLE_OVERRIDE 标志会影响 SELINUX 策略吗?
- python - Xpath没有给出结果scrapy python
- php - soapclient 请求 https 服务器错误,“您正在对启用 SSL 的服务器端口使用纯 HTTP”