python - 第 3 方通知上的 Flask + Celery 任务重复
问题描述
我有一个烧瓶应用程序,它使用 ETA/Countdown celery 功能在特定时间向用户发送电子邮件/SMS,并使用 Redis 作为代理,问题是电子邮件和 SMS 任务随机重复 - 有时用户会收到 10 封电子邮件/SMS,有时用户会收到20+ 用于这些任务,并且该任务应该只运行一次。数据流:
- 初始函数 schedule_event_main 使用通知调用 ETA 任务
date_event = datetime.combine(day, time.max)
schedule_ratings_email.apply_async([str(event[0])], eta=date_event)
schedule_ratings_sms.apply_async([str(event[0])], eta=date_event)
- 内部函数 schedule_ratings_email 和 schedule_ratings_sms 任务是 .delay 任务函数,它创建单独的 celery 任务,以将电子邮件和 SMS 发送给活动的各种客人。
@app.task(bind=True)
def schedule_ratings_email(self,event_id):
""" Fetch feed of URLs to crawl and queue up a task to grab and process
each url. """
try:
url = SITE_URL + 'user/dashboard'
guests = db.session.query(EventGuest).filter(EventGuest.event_id == int(event_id)).all()
event_details = db.session.query(Event).filter(Event.id == event_id).first()
if guests:
if event_details.status == "archived":
for guest in guests:
schedule_individual_ratings_emails.delay(guest.user.first_name, guest.event.host.first_name, guest.user.email,url)
except Exception as e:
log.error("Error processing ratings email for %s" % event_id, exc_info=e)
# self.retry()
- 这是发送通知的最终 .delay 个人任务:
@app.task()
def schedule_individual_ratings_emails(guest_name, host, guest, url):
try:
email_rating(guest_name, host, guest, url)
except Exception as e:
log.error("Error processing ratings email for %s", exc_info=e)
我尝试了多个 SO 答案并调整了很多变量,包括 celery 设置,但通知仍在重复。这只是 ETA/Countdown 任务,并且仅适用于 3rd 方服务器,因为我有某些 ETA 任务具有数据库数据写入,并且这些任务没有任何问题。
这在本地和 heroku(生产)上都是一个问题。当前技术栈:Flask==1.0.2 celery==4.1.0 Redis 4.0.9 Celery 启动:worker: celery worker --app openseat.tasks --beat --concurrency 1 --loglevel info Celery 配置详细信息:
CELERY_ACKS_LATE = True
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Africa/Johannesburg'
CELERY_ENABLE_UTC = True
解决方案
推荐阅读
- python - 无法在 tkinter python 3.6 中打印出条目
- dart - 如何在 showModalBottomSheet 中换行?
- python - Python Pandas Dataframe - 使用行值和列名的计算填充空列
- weblate - 在 Weblate 界面中添加自定义插件
- angular - 将 FileURL 或 Data URL 转换为 Blob 格式
- python - 从共享网络驱动器加载 .npy 文件
- java - 统计java创建的对象个数
- ruby-on-rails - 如何在 Oracle Linux 7 上安装 ruby 2.3
- java - Spring RestTemplate 身份验证的性能问题
- sql - SQL - 如何计算过去 7 天内每天的不同 ID?