python - 通过 KAFKA 发送带有多个 JSON 对象的 JSON 文件
问题描述
我有一个文件,其中包含以下格式的多个 json 文档。
{"attribute1": "value1", "attribute2": "value2", "attribute3": "value3", "attribute4": "value4"}
{"attribute1": "value11", "attribute2": "value12", "attribute3": "value13", "attribute4": "value14"}
{"attribute1": "value21", "attribute22": "value2", "attribute23": "value3", "attribute4": "value24"}
我正在尝试将单个 json 文档发送到 kafka。该脚本以退出代码 0 执行,但我看不到 KAFKA 消费者上没有任何消息。我不确定我哪里出错了。
我的代码如下:
import csv
import json
bootstrap = ['hostname:9092']
valueSerializer = lambda x: dumps(x).encode('utf-8')
producer = KafkaProducer(bootstrap_servers = bootstrap, value_serializer = valueSerializer)
table = []
with open('~/json_file_name.json', 'r') as json_file:
for line in json_file:
table.append(json.loads(line))
#numrows = len(table)
#print(numrows)
for row in table:
print(row)
producer.send('Topic_Name', value=row)
解决方案
您可能没有为生产者发送足够的数据来刷新其批次。您尚未显示 KafkaProducer 的导入,但请查看是否可以producer.flush()
在脚本末尾执行
顺便说一句,您不需要表变量,只需在读取文件行时发送即可。您也不需要,dumps(x)
因为您正在发送json.loads
已经获得的字符串
您还可以删除 csv 导入
推荐阅读
- oracle - 是否可以使用 Oracle APEX 自动化启动 SMS?
- cgal - corefine_and_compute_difference CGAL 错误:违反先决条件
- python - UnknownError:OSError:图像文件被截断(未处理 30 个字节):
- c++ - CMake,Qt6 - 未安装模块“QtQuick.Controls”
- tensorflow - tf.keras.layers.CategoryEncoding output_mode='multi_hot' 行为的解释
- three.js - 直接更新 Shape 几何体的位置属性会破坏 Mesh 上的纹理。如何修复纹理?
- gcc - 使用“riscv32/64-unknown-elf-gcc”时如何设置数据内存地址?
- reactjs - 如何在 React 中返回两个 JSX 元素以在不同的地方呈现?
- android - 如果我添加firebase并导入它,将输出错误
- android - 错误:无法访问的语句 myBluetoothAdapter = BluetoothAdapter.getDefaultAdapter();