首页 > 解决方案 > 如何在 Python 中使用 confluent-kafka 发送和使用 json 消息

问题描述

我对 Python 还很陌生,并且开始使用 Kafka。因此,我设置了一个 Kafka 代理,并尝试使用confluent-kafka与它进行通信。我已经能够使用它生成和使用简单的消息,但是,我有一些 django 对象,我需要对其进行序列化并将其发送到 ti kafka。

以前我使用的是kafka-python,我可以在其上发送和使用 json 消息,但是我遇到了一些奇怪的问题。

#生产者.py

def send_message(topic,message) :
try :
    try :
        p.produce(topic,message,callback=delivery_callback)
    except BufferError as b :
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    p.poll(0)
    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()
except Exception as e :
    import traceback
    print(traceback.format_exc())

#消费者.py

conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                (msg.topic(), msg.partition(), msg.offset(),
                                str(msg.key())))
            print(msg.value())
except Exception as e:
    import traceback
    print(traceback.format_exc())
finally:
    c.close()

我像这样序列化我的 django 模型对象:

from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])

那么我需要在我的生产者和消费者中进行哪些更改来生成和使用 json 消息?

标签: djangopython-3.xapache-kafkaconfluent-platform

解决方案


试试制作人

send_message(topic, serialized_obj)

而消费者,你会将字节反序列化为一个字符串

print(msg.value().decode('utf8'))

如果你需要 json 对象,那么你可以使用json.loads


推荐阅读