python - 使用 Python KafkaConsumer 访问 Kafka 元数据
问题描述
我有一个简单的 Kafka 阅读器类。我真的不记得我在哪里得到这个代码。可能已经找到它,或者我以前的自己可能已经从各种示例中创建了它。无论哪种方式,它都可以让我快速阅读 kafka 主题。
class KafkaStreamReader():
def __init__(self, schema_name, topic, server_list):
self.schema = get_schema(schema_name)
self.topic = topic
self.server_list = server_list
self.consumer = KafkaConsumer(topic, bootstrap_servers=server_list,
auto_offset_reset = 'latest',
security_protocol="PLAINTEXT")
def decode(self, msg, schema):
parsed_schema = avro.schema.parse(schema)
bytes_reader = io.BytesIO(msg)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(parsed_schema)
record = reader.read(decoder)
return record
def fetch_msg(self):
event = next(self.consumer).value
record = self.decode(event, self.schema)
return record
为了使用它,我实例化一个对象并循环读取数据,如下所示:
consumer = KafkaStreamReader(schema, topic, server_list)
while True:
message = consumer.fetch_msg()
print message
我确信有更好的解决方案,但这对我有用。
我想从中得到的是 Kafka 记录上的元数据。另一组的同事使用 Java 或 Node,并且能够在记录中看到以下信息。
{
topic: 'clickstream-v2.origin.test',
value:
{
schema:payload_data/jsonschema/1-0-3',
data: [ [Object] ] },
offset: 16,
partition: 0,
highWaterOffset: 17,
key: null,
timestamp: 2018-07-25T17:01:36.959Z
}
}
我想使用 Python KafkaConsumer 访问时间戳字段。
解决方案
我有一个解决方案。如果我更改 fetch_msg 方法,我可以弄清楚如何访问它。
def fetch_msg(self):
event = next(self.consumer)
timestamp = event.timestamp
record = self.decode(event.value, self.schema)
return record, timestamp
不是最优雅的解决方案,因为我个人不喜欢返回多个值的方法。但是,它说明了如何访问我所追求的事件数据。我可以研究更优雅的解决方案
推荐阅读
- sql - 使用 SQL Server 中另一列中的行值计数更新列
- c# - Entity Framework DB First:如何使字段可选
- javascript - 解析 json 表单数据以提交给 API
- python - HackerRank Python print STDOUT 似乎不起作用
- asp.net-core-mvc - 无法在 MVC.NET Core 应用程序的单选按钮字段上看到必需消息
- docker - 无法让流量进入 docker 容器
- c++ - C++:避免构造函数中成员的 nullptr 初始化
- javascript - Javascript:从 URL 解析参数,然后在 URL 中删除它们
- visual-studio-2017 - Visual Studio webtest 找不到自己的程序集
- npm - 无法通过其他设备上的IP地址访问Vue项目vue cli 3 npm