首页 > 解决方案 > Python Pika - 与rabbitmq相关的错误

问题描述

我正在创建一个包含 API 和 Broker Rabbitmq 的包含应用程序。我的应用程序将在端点中回答。我的rabbitmq,在测试中,处理我的数据并正常返回数据。

所以,在我的 API 中,我创建了一个库并将我的生产者代码放在这个路径中。我创建一个类并调用它,但显示错误:

在此处输入图像描述

测试中的相同代码将正常运行。

这是我的生产代码:

import base64
import os
from decouple import config
import pika

class Producer:

    def __init__(self, corr_id, img):
        self.corr_id = id
        self.img = img
        
    def producer(self):
        BROKER = config('BROKER_URL')
        BROKER_PORT = config('BROKER_PORT')
        USER = config('USER')
        PASSWORD = config('PASSWORD')
        VHOST = config('VHOST')
        
        uri = f'amqp://{USER}:{PASSWORD}@{BROKER}:{BROKER_PORT}/{VHOST}'

        connection = pika.BlockingConnection(uri)
        channel = connection.channel()

        queue_declared = channel.queue_declare('', exclusive=True)
        callback_queue = queue_declared.method.queue

        channel.exchange_declare(exchange='image_hash', exchange_type='headers')

        def on_response(ch, method, props, body):
            if corr_id == props.correlation_id:
                print(body)
                response = str(body)

        channel.basic_consume(
            queue = callback_queue,
            on_message_callback = on_response, 
            auto_ack = True
        )

        corr_id = str(self.corr_id)
        published_message = channel.basic_publish(exchange='', 
                        routing_key='rpc_queue', 
                        properties=pika.BasicProperties(
                            correlation_id = corr_id,
                            delivery_mode=2,
                            reply_to=callback_queue
                        ),
                        body = self.img
                    )
        channel.start_consuming()
        return published_message

正常发送和接收信息的测试代码是:

import base64
from decouple import config
import pika
import uuid


BROKER = config('BROKER_URL')
BROKER_PORT = config('BROKER_PORT')
USER = config('USERRABBITMQ')
PASSWORD = config('PASSRABBITMQ')
VHOST = config('VHOST')

credentials = pika.PlainCredentials(USER, PASSWORD)
parameters = pika.ConnectionParameters(BROKER,
       BROKER_PORT,
       VHOST,
       credentials
    )

def test_consumer(data_img):

    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    queue_declared = channel.queue_declare('', exclusive=True)
    callback_queue = queue_declared.method.queue

    channel.exchange_declare(exchange='image_hash', exchange_type='headers')

    def on_response(ch, method, props, body):
        if corr_id == props.correlation_id:
            print(body)
            response = str(body)

    channel.basic_consume(
        queue = callback_queue,
        on_message_callback = on_response, 
        auto_ack = True
    )


    corr_id = str(uuid.uuid4())
    published_message = channel.basic_publish(exchange='', 
                    routing_key='rpc_queue', 
                    properties=pika.BasicProperties(
                        correlation_id = corr_id,
                        delivery_mode=2,
                        reply_to=callback_queue
                    ),
                    body = data_img
                )
    channel.start_consuming()
    return published_message



file = 'C:\\Users\\ER20259240\\workspace\\image_hash\\services\\consumers\\tests\\img\\front.jpg'
with open(file, 'rb') as img_file:
    data_img = base64.b64encode(img_file.read())
    response = test_consumer(data_img)

我不知道为什么在测试代码中没有问题,而在我的课堂上却发生了错误!

我的 api 仅使用路由 /image 并接收 um id 和 image。是用 fastapi 构建的。

我的路线:

@app.post('/images')
async def images(files: List[UploadFile] = File(...), id:str = Form(...)):
    # Aqui deve ser feita a chamada da classe para responder a requisição de hash.
    for file in files:
        corr_id = id
        name = file.filename

    producer = Producer(corr_id, file)
    hash = producer.producer()
    return hash

为什么会发生错误,我该如何解决?

标签: rabbitmqpython-3.8pika

解决方案


推荐阅读