首页 > 解决方案 > Flask Celery 任务锁定

问题描述

我正在将 Flask 与 Celery 一起使用,并且我试图锁定一个特定的任务,以便它一次只能运行一个。在 celery 文档中,它给出了一个执行此Celery 文档的示例,确保一次只执行一个任务。给出的这个示例是针对 Django 的,但是我使用的是烧瓶,我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以多次运行。

我不清楚的一件事是,如果我正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。文档中提到但未解释的一件事是

文档注释:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

我不确定这意味着什么,我是否应该将缓存与数据库结合使用,如果是,我将如何做到这一点?我正在使用 mongodb。在我的代码中,我只是为缓存设置了这个设置,cache = Cache(app, config={'CACHE_TYPE': 'simple'})因为这是 Flask-Cache 文档的Flask-Cache Docs中提到的

myTask1我不清楚的另一件事是,当我从我的 Flask 路线中调用我时,我是否需要做任何不同的事情task1

这是我正在使用的代码示例。

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time


app = Flask(__name__)

cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################


app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')

    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html') 


@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))

最终工作代码

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis


app = Flask(__name__)

# ADDING REDIS
redis_store = FlaskRedis(app)

# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################

# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    print('in memcache_lock and timeout_at is {}'.format(timeout_at))
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
        print('memcache_lock and status is {}'.format(status))
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')
    print('dir is {} '.format(dir(self)))

    lock_id = self.name
    print('lock_id is {}'.format(lock_id))

    with memcache_lock(lock_id, self.app.oid) as acquired:
        print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html')

@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))


if __name__ == '__main__':
    app.secret_key = 'super secret key for me123456789987654321'
    app.run(port=1234, host='localhost')

这也是一个屏幕截图,您可以看到我运行myTask1了两次,myTask2 运行了一次。现在我有了 myTask1 的预期行为。现在myTask1将由一个工人运行,如果另一个工人试图拿起它,它将根据我定义的任何内容继续重试。

花仪表板

标签: pythonflaskcelerycelery-taskflask-cache

解决方案


在您的问题中,您从您使用的 Celery 示例中指出了这个警告:

为了使其正常工作,您需要使用.add操作是原子的缓存后端。memcached众所周知,为此目的可以很好地工作。

你提到你并不真正理解这意味着什么。实际上,您显示的代码表明您没有注意到该警告,因为您的代码使用了不合适的后端。

考虑这段代码:

with memcache_lock(lock_id, self.app.oid) as acquired:
    if acquired:
        # do some work

您在这里想要的是acquired一次只对一个线程有效。如果两个线程同时进入with块,只有一个应该“赢”并且已经acquired为真。具有acquiredtrue 的线程可以继续其工作,而另一个线程必须跳过工作并稍后再试以获取锁。为了保证只有一个线程可以拥有acquiredtrue,.add必须是原子的。

以下是一些伪代码.add(key, value)

1. if <key> is already in the cache:
2.   return False    
3. else:
4.   set the cache so that <key> has the value <value>
5.   return True

如果 的执行.add不是原子的,则如果两个线程 A 和 B 执行,则可能会发生这种情况.add("foo", "bar")。假设一开始有一个空缓存。

  1. 线程 A 执行1. if "foo" is already in the cache并发现"foo"不在缓存中,并跳转到第 3 行,但线程调度程序将控制权切换到线程 B。
  2. 线程 B 也执行了1. if "foo" is already in the cache发现“foo”不在缓存中。所以它跳到第 3 行,然后执行第 4 行和第 5 行,将键"foo"设置为值"bar",然后调用返回True
  3. 最终,调度程序将控制权交还给线程 A,线程 A 继续执行 3、4、5 并将 key"foo"设置为 value"bar"并返回True

你在这里有两个.add调用 return True,如果这些.add调用是在memcache_lock这个范围内进行的,那么两个线程可能acquired是真的。所以两个线程可以同时工作,而你memcache_lock没有做它应该做的事情,即一次只允许一个线程工作。

您没有使用确保它.add是 atomic的缓存。你像这样初始化它:

cache = Cache(app, config={'CACHE_TYPE': 'simple'})

simple后端仅限于单个进程,没有线程安全性,并且具有.add非原子操作。(顺便说一句,这根本不涉及 Mongo。如果您希望缓存由 Mongo 支持,则必须指定专门用于将数据发送到 Mongo 数据库的支持。)

所以你必须切换到另一个后端,一个保证它.add是原子的。您可以按照 Celery 示例的引导并使用memcachedbackend,它确实具有原子.add操作。我不使用 Flask,但我基本上已经完成了您使用 Django 和 Celery 所做的工作,并且成功地使用了 Redis 后端来提供您在此处使用的那种锁定。


推荐阅读