首页 > 解决方案 > 如何在 kafka 流中滚动主题的日志段时克服错误?

问题描述

我正在使用 faust kafka 流式 python 包,每 5 秒间隔使用一次数据,而在某个时间点,它会抛出一个错误,指出"Error while rolling log for topic"在此之后主题的日志被自动删除并且无法使用之后的消息这点?

如何在 faust kafka 流式 python 包中避免这种情况?

[2020-02-07 20:03:27,692] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets 

in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-02-07 20:12:51,562] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)

以下是我的日志保留政策,

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
delete.topic.enable = false

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=1

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

标签: apache-kafka

解决方案


推荐阅读