apache-kafka - 查看和操作kafka存储数据的快速方法?#isoBlue #isoBus
问题描述
以最快和最简单的方式,我希望 Kafka 消息日志中的二进制数据可以作为十六进制字符串值查看。
我有以下数据,其中包含二进制形式的 CAN 消息,我想将其视为十六进制字符串
case-8010-wheat-ault-072018/ ├── 清洁偏移检查点 ├── 调试-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ ├── 00000000000000006972.index │ ├── 00000000000000006972.log │ ├── 00000000000000006972.snapshot │ ├── 00000000000000006972.timeindex │ ├── 00000000000000079766.snapshot │ └── leader-epoch-checkpoint ├── gps-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ ├── 00000000000000003235.index │ ├── 00000000000000003235.log │ ├── 00000000000000003235.snapshot │ ├── 00000000000000003235.timeindex │ ├── 00000000000000029657.snapshot │ └── leader-epoch-checkpoint ├── imp-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ ├── 00000000000000004940.index │ ├── 00000000000000004940.log │ ├── 00000000000000004940.snapshot │ ├── 00000000000000004940.timeindex │ ├── 00000000000000915321.snapshot │ └── leader-epoch-checkpoint
数据源提供了使用数据的文档,但可能由于版本不匹配,使用 kafka_2.11-0.11.0.1.tgz 步骤失败。https://www.isoblue.org/docs/data/data/
尝试直接查看数据,我有:
1.玩过kafka.tools.DumpLogSegments
./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | head -n 15
我试图操纵 DumpLogSegments 输出,但数据看起来不像预期的那样。./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files ../../case-8010-wheat-ault-072018/imp-0/00000000000000004940.log | awk -F'payload:' '{print $2}' | awk -F'offset:' '{print $1}' | od -A n -t x1 | head -n 10
2.我目前正在尝试基于:https ://docs.confluent.io/current/connect/devguide.html#connector-example编写一个文件源连接器
解决方案
我无法让 KafkaConsumer 与 Python2 Kafka 一起工作。isoBlue 脚本对我不起作用。
1.编辑$KAFKA_HOME/config/server.properties
log.dirs=<KAFKA_DATA>
2.zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
3. kafka-topics.sh --list --zookeeper localhost:2181
4. kafka-server-start.sh $KAFKA_HOME/config/server.properties
5.使用Python3:
#!/usr/bin/env python3
from kafka import KafkaConsumer
import sys
try:
bootstrap_servers = ['localhost:9092']
topicName = 'tra'
consumer = KafkaConsumer (topicName, group_id = 'can-test',bootstrap_servers = bootstrap_servers,auto_offset_reset='earliest');
for message in consumer:
wait = input("PRESS ENTER TO CONTINUE.")
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
except KeyboardInterrupt:
sys.exit()
6.观察服务器是否正在运行以及数据是否仍然存在,这对我很有帮助。
watch -n 5 ls -alF <KAFKA_DATA> | head
watch -n 5 netstat -a | grep 9092
推荐阅读
- c - 使用 O_CREAT 时 open 的执行
- python - 有没有办法使用 python 脚本在 Visual Studio 中调试 C++ 应用程序的内存?
- mrtk - 如何配置 MRTK 以在编辑器和移动设备上使用触摸输入?
- python - 如何从 python 中的 html 源代码中提取以下链接?
- python - Numba:将 numpy 数组转换为可散列对象
- c++ - 将 std::list 附加到 std::list 的向量中
- lambda - 如何使用部署到 Netlify 的无服务器 lambda 函数隐藏条带密钥
- slurm - 如何在 SLURM 的分区中按节点组分配作业
- javascript - 使用 deleteRow 按 id 删除一行
- java - 使用 JNI 调用 C++ 库函数以及哪个进程执行该 C++ 库