首页 > 解决方案 > 查看和操作kafka存储数据的快速方法?#isoBlue #isoBus

问题描述

以最快和最简单的方式,我希望 Kafka 消息日志中的二进制数据可以作为十六进制字符串值查看。

我有以下数据,其中包含二进制形式的 CAN 消息,我想将其视为十六进制字符串

case-8010-wheat-ault-072018/
├── 清洁偏移检查点
├── 调试-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── 00000000000000006972.index
│ ├── 00000000000000006972.log
│ ├── 00000000000000006972.snapshot
│ ├── 00000000000000006972.timeindex
│ ├── 00000000000000079766.snapshot
│ └── leader-epoch-checkpoint
├── gps-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── 00000000000000003235.index
│ ├── 00000000000000003235.log
│ ├── 00000000000000003235.snapshot
│ ├── 00000000000000003235.timeindex
│ ├── 00000000000000029657.snapshot
│ └── leader-epoch-checkpoint
├── imp-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ ├── 00000000000000004940.index
│ ├── 00000000000000004940.log
│ ├── 00000000000000004940.snapshot
│ ├── 00000000000000004940.timeindex
│ ├── 00000000000000915321.snapshot
│ └── leader-epoch-checkpoint

数据源提供了使用数据的文档,但可能由于版本不匹配,使用 kafka_2.11-0.11.0.1.tgz 步骤失败。https://www.isoblue.org/docs/data/data/

尝试直接查看数据,我有:

1.玩过kafka.tools.DumpLogSegments

./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log  --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | head -n 15

我试图操纵 DumpLogSegments 输出,但数据看起来不像预期的那样。./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | awk -F'payload:' '{print $2}' | awk -F'offset:' '{print $1}' | od -A n -t x1 | head -n 10

2.我目前正在尝试基于:https ://docs.confluent.io/current/connect/devguide.html#connector-example编写一个文件源连接器

标签: apache-kafka

解决方案


我无法让 KafkaConsumer 与 Python2 Kafka 一起工作。isoBlue 脚本对我不起作用。

1.编辑$KAFKA_HOME/config/server.properties log.dirs=<KAFKA_DATA>

2.zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

3. kafka-topics.sh --list --zookeeper localhost:2181

4. kafka-server-start.sh $KAFKA_HOME/config/server.properties

5.使用Python3:

#!/usr/bin/env python3

from kafka import KafkaConsumer
import sys

try:

    bootstrap_servers = ['localhost:9092']
    topicName = 'tra'
    consumer = KafkaConsumer (topicName, group_id = 'can-test',bootstrap_servers = bootstrap_servers,auto_offset_reset='earliest');

    for message in consumer:
        wait = input("PRESS ENTER TO CONTINUE.")    
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
except KeyboardInterrupt:
    sys.exit()

6.观察服务器是否正在运行以及数据是否仍然存在,这对我很有帮助。

watch -n 5 ls -alF <KAFKA_DATA> | head
watch -n 5 netstat -a | grep 9092

推荐阅读