apache-kafka - Kafka 连接器 HDFS Sink 5.3.1 无法生成所有 JSON 记录
问题描述
用例
我正在阅读一个已经创建的 Kafka 主题,其中一个单独的集群正在生成一些键和值。我的最终目标是以 JSON 格式写入 HDFS,为此我已经用 Kafka HDFS Sink 5.3 试验了一段时间。我面临的问题是我无法将主题中的所有记录摄取并写入 HDFS。到目前为止,如果我的主题包含数百万条记录的每小时数据,我只能以 10 万条记录的形式编写。
以下是我用于kafka-connect-standalone.properties和我的 HDFS quickstart-hdfs.properties的配置
kafka-connect-standalone.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false
offset.flush.interval.ms=10000
group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
快速入门-hdfs.properties
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all
format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC
如果我不使用 errors.tolerance=all属性,那么我只会产生约 500 条记录。
就工作人员日志而言,我没有收到任何错误,所以我不确定我错过了什么。
由于我对 Kafka 连接器相对较新并且已经尝试了一段时间,如果有人能提供一些关于我做错了什么的见解,我将非常感激。
跟进问题
kafka 连接器也在 2 天内死亡。那就是它可以正常工作近 2 天,但过了一段时间它停止读取数据并且没有产生任何东西。我在独立模式下运行它,这可能是原因吗?我试着描述了消费者群体,似乎所有的消费者都死了。
kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21 1186755480 1187487551 732071 - - -
connect-ajay-hdfs-sink-mapr topic_name 12 957021804 957736810 715006 - - -
connect-ajay-hdfs-sink-mapr topic_name 17 957031965 957746941 714976 - - -
connect-ajay-hdfs-sink-mapr topic_name 24 957496491 958212413 715922 - - -
connect-ajay-hdfs-sink-mapr topic_name 0 956991807 957716202 724395 - - -
connect-ajay-hdfs-sink-mapr topic_name 28 956940273 957668689 728416 - - -
connect-ajay-hdfs-sink-mapr topic_name 5 957182822 957899308 716486 - - -
connect-ajay-hdfs-sink-mapr topic_name 3 956974180 957695189 721009 - - -
connect-ajay-hdfs-sink-mapr topic_name 19 956878365 957590196 711831 - - -
connect-ajay-hdfs-sink-mapr topic_name 2 956968023 957685835 717812 - - -
connect-ajay-hdfs-sink-mapr topic_name 16 957010175 957726139 715964 - - -
connect-ajay-hdfs-sink-mapr topic_name 7 956900190 957624746 724556 - - -
connect-ajay-hdfs-sink-mapr topic_name 8 957020325 957739604 719279 - - -
connect-ajay-hdfs-sink-mapr topic_name 22 957064283 957788487 724204 - - -
connect-ajay-hdfs-sink-mapr topic_name 29 957026931 957744496 717565 - - -
connect-ajay-hdfs-sink-mapr topic_name 13 957400623 958129555 728932 - - -
connect-ajay-hdfs-sink-mapr topic_name 6 956892063 957618485 726422 - - -
connect-ajay-hdfs-sink-mapr topic_name 11 957117685 957841645 723960 - - -
connect-ajay-hdfs-sink-mapr topic_name 1 957003873 957734649 730776 - - -
connect-ajay-hdfs-sink-mapr topic_name 18 957007813 957734011 726198 - - -
connect-ajay-hdfs-sink-mapr topic_name 27 957047658 957766131 718473 - - -
connect-ajay-hdfs-sink-mapr topic_name 10 956975729 957689182 713453 - - -
connect-ajay-hdfs-sink-mapr topic_name 15 957046441 957775251 728810 - - -
connect-ajay-hdfs-sink-mapr topic_name 23 957011972 957727996 716024 - - -
connect-ajay-hdfs-sink-mapr topic_name 14 957151628 957881644 730016 - - -
connect-ajay-hdfs-sink-mapr topic_name 4 957118644 957845399 726755 - - -
connect-ajay-hdfs-sink-mapr topic_name 9 957109152 957838497 729345 - - -
connect-ajay-hdfs-sink-mapr topic_name 25 956923833 957646070 722237 - - -
connect-ajay-hdfs-sink-mapr topic_name 26 957026885 957742112 715227 - - -
connect-ajay-hdfs-sink-mapr topic_name 20 957010071 957733605 723534 - - -
解决方案
为了将主题中的所有现有记录获取到接收器连接器,请将其添加到工作器属性并重新启动连接
consumer.auto.offset.reset=earliest
如果您已经启动了连接器,则需要重置其使用者组,或更改配置中的名称以创建新组
推荐阅读
- java - file_paths.xml 无法从字符串资源中读取
- c# - 获取使用自定义属性标记的方法的 MethodInfo
- python - 来自两个文本列的熊猫 to_datetime
- java - IllegalStateException 和 NullPointer 异常
- javascript - 从 Firebase 存储下载文件
- reactjs - 如何将 Typescript 代码转换为 React
- ffmpeg - 生成输出视频后持续时间较长的 FFMPEG 图像序列
- .net - 如何获取构建应用程序的 Visual Studio 的版本号
- java - 多表 td 元素的 Xpath
- django - 如何与使用 DRF 制作的 REST api 进行交互