首页 > 解决方案 > 使用 Celey+RabbitMQ+Redis 实现订单管理服务

问题描述

.zip我已经实现了使用 and 即时将一些文件打包成文件的功能zipstream,因此响应的格式是mimetype='application/zip'使用Flask。现在我想实现提交订单(即生成.zip文件的任务)、跟踪订单状态(如STARTED、、、 ) SUCCESSFAILURE撤销订单(即取消生成.zip文件或删除生成.zip文件的任务)、下载订单(即.zip文件)的服务对于当前用户。我计划使用CeleryRabbitMQ作为消息代理,Redis作为结果后端。

问题来了。仅Redis作为结果后端就足够了吗?因为用户订单的跟踪状态似乎涉及查询,user = ...而不是支持AsyncResult任务 ID查询

更新: 感谢@Tomáš Linhart。我遵循使用celery在MongoDB中存储结果的扩展方式。signal handler mechanism这是与我创建的任务相关的代码片段。我创建 and 实例的方式与@Tomáš Linhart 的回答task_tracker中的andcelery_cli相同。trackerapp

# Import task_tracker and celery_cli here
...

@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
    # Some processing here
    ...

def pack_up_tif(msg):
    result = async_pack_up_tif.delay(**msg)
    return result

但我还有一个问题。调用方法时如何拦截任务ID delay?因为我需要在触发信号find_one_and_update时将该信息存储在 MongoDB 集合中。task_success

标签: pythonredisrabbitmqcelery

解决方案


详细说明我发布的评论并回答您更新的问题。确定从哪里收集您想在每个信号中跟踪的各种数据有点复杂。一些信号为您提供任务本身,一些信号为您提供任务请求。

我以这个task_success信号处理程序结束:

def _on_task_success(self, sender, result, **other_kwargs):
    if sender.name not in self.tasks:
        return

    collection = self.mongo \
        .get_database(self.config['mongodb']['database']) \
        .get_collection(self.config['mongodb']['collection'])
    collection.find_one_and_update(
        {'_id': sender.request.id},
        {
            '$setOnInsert': {
                'name': sender.name,
                'args': sender.request.args,
                'kwargs': sender.request.kwargs
            },
            '$set': {
                'status': states.SUCCESS,
                'date_done': datetime.datetime.utcnow(),
                'retries': sender.request.retries,
                'group_id': sender.request.group,
                'chord_id': sender.request.chord,
                'root_id': sender.request.root_id,
                'parent_id': sender.request.parent_id,
                'result': result
            },
            '$push': {
                'status_history': {
                    'date': datetime.datetime.utcnow(),
                    'status': states.SUCCESS
                }
            }
        },
        upsert=True,
        return_document=ReturnDocument.AFTER)


推荐阅读