首页 > 解决方案 > 没有 Spark 的 Kafka 到 Pandas 数据框

问题描述

我正在从 kafka 主题中读取流数据,并且我想将其中的某些部分存储在 pandas 数据框中。

from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': "###",
    'group.id': '###',
    'default.topic.config': {
'auto.offset.reset': 'latest' }
})

c.subscribe(['scorestore'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

收到的消息是一个json

{
  "messageHeader" : {
    "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",
    "timestamp" : 152520000,
    "sourceHost" : "test",
    "sourceLocation" : "test",
    "tags" : [ ],
    "version" : "1.0"
  },
  "id_value" : {
    "id" : "1234",
    "value" : "333.0"
  }
}

我正在尝试创建一个包含时间戳、id 和值列的数据框,例如

    timestamp   id  value
0   152520000   1234    333.0

有没有办法在不解析 json 消息并将我需要的值逐行附加到数据帧的情况下完成此操作?

标签: pythonjsonpandasapache-kafka

解决方案


我提出的解决方案可能有点棘手。想象一下,您的 JSON 消息位于一个名为“msg_str”的字符串中:

import pandas as pd

msg_str = '{  "messageHeader" : { "messageId" : "4b604b33-7256-47b6-89d6-eb1d92a282e6",    "timestamp" : 152520000,    "sourceHost" : "test",    "sourceLocation" : "test",    "tags" : [ ],    "version" : "1.0"  },  "id_value" : {    "id" : "1234",    "value" : "333.0"  }}'


#first create a dataframe with read_json
p = pd.read_json(msg_str)
# Now you have a dataframe with two columns. Where a column has a value, the other 
# has a NaN. Now create a new column only with the values which are not 'NaN'
p['fussion'] = p['id_value'].fillna(p['messageHeader'])
# Delete columns 'id_value' and 'messageHeader' as you don't need them anymore
p = p[['fussion']].reset_index()
# Create a temporal column only to be the index to do a pivot
p['tmp'] = 0
# Do the pivot to convert rows into columns
p = p.pivot(index = 'tmp' ,values='fussion', columns='index')
# Finally get the columns that you are interested in
p = p.reset_index()[['timestamp','id','value']]

print(p)

结果:

index  timestamp    id value
0      152520000  1234   333

然后,您可以将此数据框附加到您正在累积结果的数据框中。

也许有一个最简单的解决方案,但如果不是这样,我希望它对您有所帮助。


推荐阅读