python-3.x - 将 json 对象作为键控消息传递给 apache kafka
问题描述
我有一个 json 对象,它具有不同的订单状态。我想把它推到 apache kafka。顺序至关重要,因为我设置了消费数据的消费者。我计划使用订单 ID 作为消息的键,因为它将按顺序存储在同一个分区中。
data = [{"id": 1, "orderId": "A123","status":"PLACED"},{"id": 2, "orderId": "B123","status":"PLACED"}
{"id": 3, "orderId": "A123","status":"DISPATCHED"}]
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL")
for item in data:
d = json.dumps(item)
future = producer.send(topic,key=item['orderId'], value=d)
但是我在生产者中收到了 assertionError 。我什至试过
d = json.dumps(item).encode('utf-8')
future = producer.send(topic,key=item['orderId'], value=d)
也可以通过在初始化代码中定义值序列化器,例如
producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL",value_serializer=lambda v: json.dumps(v).encode('utf-8'))
他们都给出了同样的错误。如果我删除密钥并传递数据
d = json.dumps(item).encode('utf-8')
future = producer.send(topic,d)
这是有效的,消息被推送到 kafka 并显示在消费者端。但是我想传递密钥,以保持相同订单ID的顺序。我该如何解决?
更新 1:
这是错误:
{
"errorType": "AssertionError",
"stackTrace": [
" File \"/var/task/producer.py\", line 56, future = producer.send(topic,key=item['orderId'], value=d)\n",
" File \"/opt/python/lib/python3.7/site-packages/kafka/producer/kafka.py\", line 572, in send\n assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))\n"
]
}
解决方案
推荐阅读
- java - NetBeans 不工作,Linux 上的新项目
- unity3d - unity:如何在没有 VIVE 设备的情况下为 HTC VIVE Pro 开发
- javascript - 使用用户选择更新多个下拉列表
- java - 使用 PDF2Dom 从 PDF 中提取字体大小和粗体
- mysql - 尝试从 Mysql 导出到 CSV 时出错
- python - 如何将 QLabel 值检索为整数
- python - CodecRegistryError:在 PyCharm 中运行 Nosetests 时,模块“encodings.ascii”中的编解码器不兼容
- javascript - 按 ID 删除一行中的特定 DIV
- python - 将两个数据框与python合并时的MemoryError
- javascript - 在 JavaScript 中计算文档中单词的开始和结束位置