首页 > 解决方案 > 从rabbitMQ消费时,Flask消费者不执行回调

问题描述

所以我有这个问题。我想同时使用两者FlaskRabbitMQ做一个能够完成一些计算繁重任务的微服务。我基本上想要文档中的 远程过程调用 (RPC)教程之类的东西,但需要 REST Api 开销。

因此,到目前为止,我已经提供了该代码:

server.py

from flask import Flask

import sys
import os
import json

import pika
import uuid
import time

''' HEADERS = {'Content-type': 'audio/*', 'Accept': 'text/plain'}'''

class RPIclient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host='rabbitmq'))
            self.channel = self.connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')

            # Create all the queue and bind them to the corresponding routing key
            self.channel.queue_declare('request', durable=True)
            result = self.channel.queue_declare('answer', durable=True)

            self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
            self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
            self.callback_queue = result.method.queue

            self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)

        def on_response(self, ch, method, props, body):
            print("from server, correlation id : " + str(props.correlation_id), file=sys.stderr)
            self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)

        def call(self, n):
            print("Launched Call ")
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.web.request',
                properties=pika.BasicProperties(
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(2)

def flask_app():
    app = Flask("__name__")

    @app.route('/', methods=['GET'])
    def server_is_up():
        return 'server is up', 200

    @app.route('/add-job/<cmd>')
    def add(cmd):
        app.config['RPIclient'].call(10)
        return "Call RPI client",404

    return app

if __name__ == '__main__':
    print("Waiting for RabbitMq")
    time.sleep(20)
    rpiClient = RPIclient()
    app = flask_app()
    app.config['RPIclient'] = rpiClient
    print("Rabbit MQ is connected, starting server", file=sys.stderr)
    app.run(debug=True, threaded=False, host='0.0.0.0')

worker.py

import pika
import time
import sys

print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)

print(' [*] Connecting to server ...')
channel = connection.channel()

print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    print(" [x] Executing task ")
    print("from worker, correlation id : " + str(properties.correlation_id))
    ch.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.kaldi.answer',
                properties=pika.BasicProperties(correlation_id = properties.correlation_id),
                body="response")
    print(" [x] Done")

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)

channel.start_consuming()

可悲的是,当我发回一条消息(从工作人员到服务器)时,似乎服务器确实使用了消息,但从不执行回调(它显示消息已使用,但兔子 mq 接口上没有 ACK .另外,打印不显示)。

我很迷茫,因为消息似乎已被使用,但回调似乎没有被执行。你知道它可能来自哪里吗?

标签: pythonflaskrabbitmqmicroservices

解决方案


您确实将回调方法附加on_response到队列answer,但您从未告诉您的服务器开始使用队列。

看起来您self.channel.start_consuming()在类初始化结束时丢失了。


推荐阅读