首页 > 解决方案 > 第 3 方通知上的 Flask + Celery 任务重复

问题描述

我有一个烧瓶应用程序,它使用 ETA/Countdown celery 功能在特定时间向用户发送电子邮件/SMS,并使用 Redis 作为代理,问题是电子邮件和 SMS 任务随机重复 - 有时用户会收到 10 封电子邮件/SMS,有时用户会收到20+ 用于这些任务,并且该任务应该只运行一次。数据流:

  1. 初始函数 schedule_event_main 使用通知调用 ETA 任务
   date_event = datetime.combine(day, time.max)
   schedule_ratings_email.apply_async([str(event[0])], eta=date_event)
   schedule_ratings_sms.apply_async([str(event[0])], eta=date_event) 
  1. 内部函数 schedule_ratings_email 和 schedule_ratings_sms 任务是 .delay 任务函数,它创建单独的 celery 任务,以将电子邮件和 SMS 发送给活动的各种客人。
@app.task(bind=True)
def schedule_ratings_email(self,event_id):
    """ Fetch feed of URLs to crawl and queue  up a task to grab and process
    each url. """
    try:
        url = SITE_URL + 'user/dashboard'
        guests = db.session.query(EventGuest).filter(EventGuest.event_id == int(event_id)).all()
        event_details = db.session.query(Event).filter(Event.id == event_id).first()
        if guests:
            if event_details.status == "archived":
                for guest in guests:
                    schedule_individual_ratings_emails.delay(guest.user.first_name, guest.event.host.first_name, guest.user.email,url)
    except Exception as e:
        log.error("Error processing ratings email for %s" % event_id, exc_info=e)
        # self.retry()

  1. 这是发送通知的最终 .delay 个人任务:
@app.task()
def schedule_individual_ratings_emails(guest_name, host, guest, url):
    try:
        email_rating(guest_name, host, guest, url)
    except Exception as e:
        log.error("Error processing ratings email for %s", exc_info=e)

我尝试了多个 SO 答案并调整了很多变量,包括 celery 设置,但通知仍在重复。这只是 ETA/Countdown 任务,并且仅适用于 3rd 方服务器,因为我有某些 ETA 任务具有数据库数据写入,并且这些任务没有任何问题。

这在本地和 heroku(生产)上都是一个问题。当前技术栈:Flask==1.0.2 celery==4.1.0 Redis 4.0.9 Celery 启动:worker: celery worker --app openseat.tasks --beat --concurrency 1 --loglevel info Celery 配置详细信息:

CELERY_ACKS_LATE = True
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Africa/Johannesburg'
CELERY_ENABLE_UTC = True

标签: pythonflaskrediscelery

解决方案


推荐阅读