django - 不同服务器上的 Celery 任务同时启动
问题描述
我有两个 Django Celery 代码实例在两台不同的服务器上运行,以冗余访问另一台服务器上的公共数据库。我注意到当用户提交作业时,celery 在两台服务器上同时启动相同的任务。这会创建一个竞争条件并更新数据库两次。如何通过在另一台服务器中启动另一个类似任务时将任务保留在一台服务器中来防止这种情况?
解决方案
您需要创建一个锁以防止两个任务同时执行,芹菜文档http://ask.github.io/celery/cookbook/tasks.html中有一个页面,其中包含如何执行此操作的示例。小心你的实现不会陷入某种死锁,并且你在你的锁上设置了一个超时,这样如果一个工作人员崩溃了,它就不会无限期地持有锁。
# Example from the link above
from celery.task import Task
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from djangofeeds.models import Feed
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
class FeedImporter(Task):
name = "feed.import"
def run(self, feed_url, **kwargs):
logger = self.get_logger(**kwargs)
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_digest = md5(feed_url).hexdigest()
lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
logger.debug("Importing feed: %s" % feed_url)
if acquire_lock():
try:
feed = Feed.objects.import_feed(feed_url)
finally:
release_lock()
return feed.url
logger.debug(
"Feed %s is already being imported by another worker" % (
feed_url))
return
此模式需要一个用于获取锁的缓存服务器。当任务开始时,它会在缓存服务器中获得一个基于键的锁,例如“my_task”,然后当你的任务完成时它会释放那个锁。任何其他启动的任务可能都应该有一个while
循环等待它可以获取锁。Redis 锁是原子的,这意味着获取锁的操作不会同时发生,只有一个任务能够获取锁。
推荐阅读
- java - 我无法在 Java 中为 usinf JFrame 和 contentPane 设置组件的位置?
- html - Mailto 超链接限制?
- angular - 为什么不订阅 NgRx 效果中的可观察对象?
- mysql - autocommit=true 时无法调用 commit
- c++11 - 如何在不使用循环的情况下获取 std::array 中项目的索引?
- python - 将图层连接到自身
- python - Django - 条纹。您没有设置有效的可发布密钥。尝试在 django 网站上合并条带付款时出现的错误
- postgresql - 在 PostgreSQL 中有条件地返回 SETOF
- c - 为什么我的代码在通过凯撒密码转换字母时会跳过空格和标点符号?
- ios - 如何让 UIButtons 跟随静态图像的大小调整?