rabbitmq - Python Pika - 与rabbitmq相关的错误
问题描述
我正在创建一个包含 API 和 Broker Rabbitmq 的包含应用程序。我的应用程序将在端点中回答。我的rabbitmq,在测试中,处理我的数据并正常返回数据。
所以,在我的 API 中,我创建了一个库并将我的生产者代码放在这个路径中。我创建一个类并调用它,但显示错误:
测试中的相同代码将正常运行。
这是我的生产代码:
import base64
import os
from decouple import config
import pika
class Producer:
def __init__(self, corr_id, img):
self.corr_id = id
self.img = img
def producer(self):
BROKER = config('BROKER_URL')
BROKER_PORT = config('BROKER_PORT')
USER = config('USER')
PASSWORD = config('PASSWORD')
VHOST = config('VHOST')
uri = f'amqp://{USER}:{PASSWORD}@{BROKER}:{BROKER_PORT}/{VHOST}'
connection = pika.BlockingConnection(uri)
channel = connection.channel()
queue_declared = channel.queue_declare('', exclusive=True)
callback_queue = queue_declared.method.queue
channel.exchange_declare(exchange='image_hash', exchange_type='headers')
def on_response(ch, method, props, body):
if corr_id == props.correlation_id:
print(body)
response = str(body)
channel.basic_consume(
queue = callback_queue,
on_message_callback = on_response,
auto_ack = True
)
corr_id = str(self.corr_id)
published_message = channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
correlation_id = corr_id,
delivery_mode=2,
reply_to=callback_queue
),
body = self.img
)
channel.start_consuming()
return published_message
正常发送和接收信息的测试代码是:
import base64
from decouple import config
import pika
import uuid
BROKER = config('BROKER_URL')
BROKER_PORT = config('BROKER_PORT')
USER = config('USERRABBITMQ')
PASSWORD = config('PASSRABBITMQ')
VHOST = config('VHOST')
credentials = pika.PlainCredentials(USER, PASSWORD)
parameters = pika.ConnectionParameters(BROKER,
BROKER_PORT,
VHOST,
credentials
)
def test_consumer(data_img):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
queue_declared = channel.queue_declare('', exclusive=True)
callback_queue = queue_declared.method.queue
channel.exchange_declare(exchange='image_hash', exchange_type='headers')
def on_response(ch, method, props, body):
if corr_id == props.correlation_id:
print(body)
response = str(body)
channel.basic_consume(
queue = callback_queue,
on_message_callback = on_response,
auto_ack = True
)
corr_id = str(uuid.uuid4())
published_message = channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
correlation_id = corr_id,
delivery_mode=2,
reply_to=callback_queue
),
body = data_img
)
channel.start_consuming()
return published_message
file = 'C:\\Users\\ER20259240\\workspace\\image_hash\\services\\consumers\\tests\\img\\front.jpg'
with open(file, 'rb') as img_file:
data_img = base64.b64encode(img_file.read())
response = test_consumer(data_img)
我不知道为什么在测试代码中没有问题,而在我的课堂上却发生了错误!
我的 api 仅使用路由 /image 并接收 um id 和 image。是用 fastapi 构建的。
我的路线:
@app.post('/images')
async def images(files: List[UploadFile] = File(...), id:str = Form(...)):
# Aqui deve ser feita a chamada da classe para responder a requisição de hash.
for file in files:
corr_id = id
name = file.filename
producer = Producer(corr_id, file)
hash = producer.producer()
return hash
为什么会发生错误,我该如何解决?
解决方案
推荐阅读
- spring-boot - 在@SpringBootApplication 中测试配置文件时如何禁用自动配置?
- javascript - typescript:扩展多种类型的模板化接口数组
- objective-c - 目标 C:如何继续收听标准输入上的传入数据?
- xml - XSLT 如何组合模板并将现有字段添加到某些元素
- jupyter-notebook - Jupyter Notebook 中 markdown 单元和 md('text') en 代码单元的边距差异
- javascript - 我怎样才能将此代码“封装”到一个模块中,以便它可以重用?
- here-api - 为什么Router HERE API 向Isoline 路由器HERE API 返回不同的结果?
- python - 使用 Pyglet 将 gif 缩放到屏幕
- decimal - 如何处理系统中的数字路由
- amazon-web-services - 替代 nginx 和 AWS 中的服务发现