django - 如何在 Python 中使用 confluent-kafka 发送和使用 json 消息
问题描述
我对 Python 还很陌生,并且开始使用 Kafka。因此,我设置了一个 Kafka 代理,并尝试使用confluent-kafka与它进行通信。我已经能够使用它生成和使用简单的消息,但是,我有一些 django 对象,我需要对其进行序列化并将其发送到 ti kafka。
以前我使用的是kafka-python,我可以在其上发送和使用 json 消息,但是我遇到了一些奇怪的问题。
#生产者.py
def send_message(topic,message) :
try :
try :
p.produce(topic,message,callback=delivery_callback)
except BufferError as b :
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
except Exception as e :
import traceback
print(traceback.format_exc())
#消费者.py
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except Exception as e:
import traceback
print(traceback.format_exc())
finally:
c.close()
我像这样序列化我的 django 模型对象:
from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])
那么我需要在我的生产者和消费者中进行哪些更改来生成和使用 json 消息?
解决方案
试试制作人
send_message(topic, serialized_obj)
而消费者,你会将字节反序列化为一个字符串
print(msg.value().decode('utf8'))
如果你需要 json 对象,那么你可以使用json.loads
推荐阅读
- python - 整数掩码数组的numpy方法
- r - 错误:loadNamespace(j <- i[[1L]], c(lib.loc, .libPaths()), versionCheck = vI[[j]]) 中的 'tidyverse' 的包或命名空间加载失败:
- android - 滚动时列表的图标状态恢复
- vue.js - 使用 vue js 编辑数据库中的文本字符串
- mern - 你能解决我的问题链接不起作用但是当我从控制台复制并粘贴链接时它可以工作吗?
- xamarin.android - 当我们在 Xamarin Android 上尝试 ZoomIn 和 ZoomOut 时,只有 Android 版本 7 和 8 会出现几种颜色阴影
- css - 在可滚动部分中定位粘性和间隙
- c++ - VSCode 扩展运行代码无法在终端中运行 C++
- javascript - 如何避免 JSON.stringify 和 JSON.parse 替换字符串?
- aws-cdk - AWS CDK:无法在 cxapi 中找到 REMOVE_DEFAULT_DESIRED_COUNT