首页 > 解决方案 > 在一个 http 请求中发布到生产者和来自消费者的请求

问题描述

我有一些需要由外部库处理的数据,我通过 Flask 中的 POST 请求将数据发送给 Kafka 生产者,处理完外部库后将数据发送给消费者。

在 app.py 文件中,我有这样的内容:

from flask import request, jsonify
from app.common import kafka_buffer


@app.route('/process', methods=['POST'])
def process_data():
    data = request.get_json()
    response = kafka_buffer(data)
    return jsonify([r for r in response])

在 common.py 文件中,我有这样的内容:

from kafka import KafkaProducer, KafkaConsumer


producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('some_consumer', bootstrap_servers='localhost:9092')


kafka_buffer(text):
    producer.send('some_producer', bytes(text, 'utf-8'))
    for message in consumer:
        yield message.value

当我尝试从消费者那里获取数据作为响应时,我得到一个超时错误,并且永远不会得到处理过的数据,它只是挂在那里。

我想要的是一次性将处理后的数据作为 HTTP 响应发送回来,我的意思是在同一个请求中。

有任何想法吗?

标签: flaskapache-kafka

解决方案


推荐阅读