首页 > 解决方案 > 不同服务器上的 Celery 任务同时启动

问题描述

我有两个 Django Celery 代码实例在两台不同的服务器上运行,以冗余访问另一台服务器上的公共数据库。我注意到当用户提交作业时,celery 在两台服务器上同时启动相同的任务。这会创建一个竞争条件并更新数据库两次。如何通过在另一台服务器中启动另一个类似任务时将任务保留在一台服务器中来防止这种情况?

标签: djangocelery

解决方案


您需要创建一个锁以防止两个任务同时执行,芹菜文档http://ask.github.io/celery/cookbook/tasks.html中有一个页面,其中包含如何执行此操作的示例。小心你的实现不会陷入某种死锁,并且你在你的锁上设置了一个超时,这样如果一个工作人员崩溃了,它就不会无限期地持有锁。

# Example from the link above
from celery.task import Task
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from djangofeeds.models import Feed

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes

class FeedImporter(Task):
    name = "feed.import"

    def run(self, feed_url, **kwargs):
        logger = self.get_logger(**kwargs)

        # The cache key consists of the task name and the MD5 digest
        # of the feed URL.
        feed_url_digest = md5(feed_url).hexdigest()
        lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)

        # cache.add fails if if the key already exists
        acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        release_lock = lambda: cache.delete(lock_id)

        logger.debug("Importing feed: %s" % feed_url)
        if acquire_lock():
            try:
                feed = Feed.objects.import_feed(feed_url)
            finally:
                release_lock()
            return feed.url

        logger.debug(
            "Feed %s is already being imported by another worker" % (
                feed_url))
        return

此模式需要一个用于获取锁的缓存服务器。当任务开始时,它会在缓存服务器中获得一个基于键的锁,例如“my_task”,然后当你的任务完成时它会释放那个锁。任何其他启动的任务可能都应该有一个while循环等待它可以获取锁。Redis 锁是原子的,这意味着获取锁的操作不会同时发生,只有一个任务能够获取锁。


推荐阅读