首页 > 解决方案 > 如何以json格式将数据从rds推送到kafka队列

问题描述

我使用kafka主题从mysql数据库接收消息。我需要编写python代码将json格式的数据从mysql推送到kafka主题。我的要求是获取json格式的输出,而不是原始字符串。

以下是将mysql表数据以json格式转储到kafka主题的python代码。

代码:

connection = mysql.connector.connect(host='xyz.us-east-1.rds.amazonaws.com', database='testdb',user='stdnt', password='pssw123')

cursor=connection.cursor()

statement='SELECT * FROM patients_vital_info'

cursor.execute(statement)
data=cursor.fetchall()

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                api_version=(0,11,5),value_serializer=lambda x:
                                                        json.dumps(x).encode('utf-8'))
for i in data:
    producer.send('test',i)
    sleep(1)

原始字符串格式的 kafka 主题输出:

[3, 69, 175]
[4, 68, 171]
[5, 72, 177]
[1, 78, 162]
[2, 66, 157]
[3, 72, 156]

在将消息写入 kafka 队列时,输出应以 json 格式推送。

预期输出:

{"bp":140,"heartBeat":73,"Customerid":1}

标签: mysqljsonapache-kafka

解决方案


cursor.fetchall()返回行迭代器,而不是具有列到值的键值对的字典。您的数据也是正确的 JSON 数组

如果您想包含列名或使用 Kafka Connect JDBC 源/Debezium 而不是 Python 来完全满足您的需求,您需要自己构建 JSON


推荐阅读