mysql - 如何以json格式将数据从rds推送到kafka队列
问题描述
我使用kafka主题从mysql数据库接收消息。我需要编写python代码将json格式的数据从mysql推送到kafka主题。我的要求是获取json格式的输出,而不是原始字符串。
以下是将mysql表数据以json格式转储到kafka主题的python代码。
代码:
connection = mysql.connector.connect(host='xyz.us-east-1.rds.amazonaws.com', database='testdb',user='stdnt', password='pssw123')
cursor=connection.cursor()
statement='SELECT * FROM patients_vital_info'
cursor.execute(statement)
data=cursor.fetchall()
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
for i in data:
producer.send('test',i)
sleep(1)
原始字符串格式的 kafka 主题输出:
[3, 69, 175]
[4, 68, 171]
[5, 72, 177]
[1, 78, 162]
[2, 66, 157]
[3, 72, 156]
在将消息写入 kafka 队列时,输出应以 json 格式推送。
预期输出:
{"bp":140,"heartBeat":73,"Customerid":1}
解决方案
cursor.fetchall()
返回行迭代器,而不是具有列到值的键值对的字典。您的数据也是正确的 JSON 数组
如果您想包含列名或使用 Kafka Connect JDBC 源/Debezium 而不是 Python 来完全满足您的需求,您需要自己构建 JSON
推荐阅读
- cmake - CMake:如何将具有不同文件名的导入库链接到库名
- javascript - Javascript:如何打印包含回车的字符串的原始内容?
- python-3.x - Change precision of floats and export to csv
- python - 如何在 Google Colaboratory 中安装第三方模块
- python - 用户输入作为嵌套字典的查询
- node.js - 如何让 npm 在终端中运行?
- java - 多张图片上传OOM问题
- django - django 权限不被尊重
- azure - 使用分区键、行键和非索引的 Azure 表查询性能
- objective-c - Swift - Obj C 与闭包/块的互操作性