首页 > 解决方案 > 使用 django、celery 和 redis 的一项一项任务

问题描述

我使用 django、celery 和 redis 来异步启动任务。

# tasks.py
@shared_task
def my_task():
    # Do stuff of task 1
    return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()

使用该配置,my_task 在 2 个不同的文件上启动 2 次。所以它们几乎同时在不同的线程上执行。

我需要将这两个任务一一执行。如果 my_task #1 正在执行并且另一个 my_task #2 已启动,我需要 my_task #2 等待 #1 在执行之前结束。

我不想只使用一个线程将参数传递给 celerycelery worker --concurrency=1

我的 settings.py 中的芹菜配置是基本的:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

我找到了许多谈论该主题的资源,但我真的不明白如何实现我的目标

标签: pythondjangomultithreadingrediscelery

解决方案


http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html上的解决方案几乎奏效了。这是一些改编:

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=True)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
    """Run task."""
    print("test")

问题是,我没有在我的 settings.py 中的任何地方配置 Redis,所以我不明白它是如何找到正确的 redis 数据库的。我想它是从 celery 的配置中获取的。


推荐阅读