python - 从rabbitMQ消费时,Flask消费者不执行回调
问题描述
所以我有这个问题。我想同时使用两者Flask
并RabbitMQ
做一个能够完成一些计算繁重任务的微服务。我基本上想要文档中的
远程过程调用 (RPC)教程之类的东西,但需要 REST Api 开销。
因此,到目前为止,我已经提供了该代码:
server.py
from flask import Flask
import sys
import os
import json
import pika
import uuid
import time
''' HEADERS = {'Content-type': 'audio/*', 'Accept': 'text/plain'}'''
class RPIclient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')
# Create all the queue and bind them to the corresponding routing key
self.channel.queue_declare('request', durable=True)
result = self.channel.queue_declare('answer', durable=True)
self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
self.callback_queue = result.method.queue
self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)
def on_response(self, ch, method, props, body):
print("from server, correlation id : " + str(props.correlation_id), file=sys.stderr)
self.response = body
ch.basic_ack(delivery_tag=method.delivery_tag)
def call(self, n):
print("Launched Call ")
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.web.request',
properties=pika.BasicProperties(
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(2)
def flask_app():
app = Flask("__name__")
@app.route('/', methods=['GET'])
def server_is_up():
return 'server is up', 200
@app.route('/add-job/<cmd>')
def add(cmd):
app.config['RPIclient'].call(10)
return "Call RPI client",404
return app
if __name__ == '__main__':
print("Waiting for RabbitMq")
time.sleep(20)
rpiClient = RPIclient()
app = flask_app()
app.config['RPIclient'] = rpiClient
print("Rabbit MQ is connected, starting server", file=sys.stderr)
app.run(debug=True, threaded=False, host='0.0.0.0')
worker.py
import pika
import time
import sys
print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)
print(' [*] Connecting to server ...')
channel = connection.channel()
print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
print(" [x] Received %s" % body)
print(" [x] Executing task ")
print("from worker, correlation id : " + str(properties.correlation_id))
ch.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.kaldi.answer',
properties=pika.BasicProperties(correlation_id = properties.correlation_id),
body="response")
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)
channel.start_consuming()
可悲的是,当我发回一条消息(从工作人员到服务器)时,似乎服务器确实使用了消息,但从不执行回调(它显示消息已使用,但兔子 mq 接口上没有 ACK .另外,打印不显示)。
我很迷茫,因为消息似乎已被使用,但回调似乎没有被执行。你知道它可能来自哪里吗?
解决方案
您确实将回调方法附加on_response
到队列answer
,但您从未告诉您的服务器开始使用队列。
看起来您self.channel.start_consuming()
在类初始化结束时丢失了。
推荐阅读
- java - 我的天蓝色管道没有运行。如何在 azure 管道中删除代理池
- blazor-server-side - 将 blazor 参数传递到另一个页面
- nestjs - 如何使用 NestJS KafkaClient 订阅 Kafka 主题?
- r - 将常量列表添加到不同列中的所有行
- python-3.x - 无法定位元素(按钮)
- javascript - 如何使用 javascript 使我的 topnav 具有粘性?
- docker - Google OAuth2 重定向循环 - 防火墙规则?
- node.js - 在全球运行 NPM 和 Yarn 的任何冲突
- c# - 限制 Linq GroupBy 返回的项目数
- python - 是否有 python 或 pandas 函数可以将列表分成大小相等或尽可能对称的组?