python - 没有 Spark 的 Kafka 到 Pandas 数据框
问题描述
我正在从 kafka 主题中读取流数据,并且我想将其中的某些部分存储在 pandas 数据框中。
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': "###",
'group.id': '###',
'default.topic.config': {
'auto.offset.reset': 'latest' }
})
c.subscribe(['scorestore'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
收到的消息是一个json
{
"messageHeader" : {
"messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
"timestamp" : 152520000,
"sourceHost" : "test",
"sourceLocation" : "test",
"tags" : [ ],
"version" : "1.0"
},
"id_value" : {
"id" : "1234",
"value" : "333.0"
}
}
我正在尝试创建一个包含时间戳、id 和值列的数据框,例如
timestamp id value
0 152520000 1234 333.0
有没有办法在不解析 json 消息并将我需要的值逐行附加到数据帧的情况下完成此操作?
解决方案
我提出的解决方案可能有点棘手。想象一下,您的 JSON 消息位于一个名为“msg_str”的字符串中:
import pandas as pd
msg_str = '{ "messageHeader" : { "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6", "timestamp" : 152520000, "sourceHost" : "test", "sourceLocation" : "test", "tags" : [ ], "version" : "1.0" }, "id_value" : { "id" : "1234", "value" : "333.0" }}'
#first create a dataframe with read_json
p = pd.read_json(msg_str)
# Now you have a dataframe with two columns. Where a column has a value, the other
# has a NaN. Now create a new column only with the values which are not 'NaN'
p['fussion'] = p['id_value'].fillna(p['messageHeader'])
# Delete columns 'id_value' and 'messageHeader' as you don't need them anymore
p = p[['fussion']].reset_index()
# Create a temporal column only to be the index to do a pivot
p['tmp'] = 0
# Do the pivot to convert rows into columns
p = p.pivot(index = 'tmp' ,values='fussion', columns='index')
# Finally get the columns that you are interested in
p = p.reset_index()[['timestamp','id','value']]
print(p)
结果:
index timestamp id value
0 152520000 1234 333
然后,您可以将此数据框附加到您正在累积结果的数据框中。
也许有一个最简单的解决方案,但如果不是这样,我希望它对您有所帮助。
推荐阅读
- python - 在我定义 mixin 类的文件中,如何使用使用 mixin 的文件中的记录器?
- hadoop - 如何停止从 hdfs 读取 .tmp 文件的 spark 文件流式传输作业?
- powershell - 如果其他任务在 Azure Devops 中失败,如何运行任务
- pip - 在 mac os 10.15.6 中安装 pycld2 时遇到问题。它说 pycld2 的构建轮失败
- php - 为什么提交表单后它不重定向我?
- excel - 创建工作表并设置其标签颜色
- c# - 意外的相等性测试,Equals(a, a) 评估为假
- gremlin - 遍历 Graph 的子集以进行复杂查询
- mysql - Drush “要求” SSL 连接到数据库
- python - Python + Selenium:如何在 while 循环中找到新出现的元素?