flask - Celery + SQS 两次接收相同的任务,同时具有相同的任务 ID
问题描述
在烧瓶应用程序中使用带有 SQS 的celery
但celery 同时接收相同任务两次具有相同的任务 ID,像
这样运行工人,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8
这里是 celery 的日志
[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp
两次收到相同的任务并且两者同时运行,
在 python 烧瓶中使用 celery==4.4.0rc4 , boto3==1.9.232,kombu==4.6.6 和 SQS。
在 SQS 中,默认可见性超时是 30 分钟,而我的任务是没有 ETA 并且没有 ack
我的任务.py
from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)
class BookingTasks:
def addBookingToTask(self):
request_data = request.json
print ('in addBookingToTask',request_data['request_id'])
print (request_data)
bookFlightTask.delay(request_data)
return 'addBookingToTask added'
@capp.task(max_retries=0)
def bookFlightTask(request_data):
task_id = capp.current_task.request.id
try:
print ('in booking funtion1')
----
我的配置文件 config.py
import os
from urllib.parse import quote_plus
aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))
broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)
## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')
最后是我的 celery 应用程序文件 run.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()
capp.config_from_object('app.jobs.config')
# Optional configuration, see the capplication user guide.
capp.conf.update(
result_expires=3600,
)
# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
capp.start()
解决方案
默认 SQS visiblity_timeout 为 30 秒。您需要更新 celery 配置值:
broker_transport_options={'visibility_timeout': 3600}
.
当 celery 去创建队列时,它会将可见性超时设置为 1h。
注意:如果您指定了 task_default_queue,并且队列已经创建而没有指定broker_transport_options={'visibility_timeout': 3600}
,则 celery 在重新启动时不会更新可见性超时broker_transport_options={'visibility_timeout': 3600}
。您将需要删除队列并让 celery 重新创建它。
推荐阅读
- arrays - 测试我的排序算法错误:控制到达非空函数的结尾
- google-bigquery - BigQuery 客户端网址
- r - 删除几列中的重复值但保留行
- html - 通过过渡将svg渐变更改为纯白色?
- python - 从 kivy.properties 导入 Pycharm 的 ObjectProperty 错误
- javascript - 如何处理 https 云功能内的 Firestore 事务中的错误?
- curl - curl:(6)无法解析主机:curl
- reactjs - 如何在功能组件中只提取一次值?
- jquery - jQuery datepicker - 将 mindDate 增加一天
- javascript - 有没有办法将线性渐变与 HSLA 一起使用?