python - 使用 django、celery 和 redis 的一项一项任务
问题描述
我使用 django、celery 和 redis 来异步启动任务。
# tasks.py
@shared_task
def my_task():
# Do stuff of task 1
return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()
使用该配置,my_task 在 2 个不同的文件上启动 2 次。所以它们几乎同时在不同的线程上执行。
我需要将这两个任务一一执行。如果 my_task #1 正在执行并且另一个 my_task #2 已启动,我需要 my_task #2 等待 #1 在执行之前结束。
我不想只使用一个线程将参数传递给 celerycelery worker --concurrency=1
我的 settings.py 中的芹菜配置是基本的:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
我找到了许多谈论该主题的资源,但我真的不明白如何实现我的目标
解决方案
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html上的解决方案几乎奏效了。这是一些改编:
import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=True)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
"""Run task."""
print("test")
问题是,我没有在我的 settings.py 中的任何地方配置 Redis,所以我不明白它是如何找到正确的 redis 数据库的。我想它是从 celery 的配置中获取的。
推荐阅读
- asp.net-mvc - 多次登录和注销时,ASP.NET Identity Core cookie 导致 http 403 错误
- oracle - 来自元数据的 Oracle 列数据类型
- android - 'view' 容器在 Android 的 XML 中做了什么
- vagrant - 在 Vagrant libvirt 提供程序中为管理网络设置静态 IP
- python - pandas 最大数据返回元数据样式?
- opencv - /usr/bin/ld: 找不到 -lvtkRenderingOpenGL
- amazon-web-services - 如何通过 S3 静态网站托管在 API 网关上使用自定义域
- python - 获取 ttk.Combobox 中选定项目的索引
- kubernetes - EKS Kubernetes 无法在集群范围内的 API 组“”中列出资源“命名空间”
- c++ - 如何使用 gMock 创建 Mock 对象?