apache-kafka - 如何在 kafka 流中滚动主题的日志段时克服错误?
问题描述
我正在使用 faust kafka 流式 python 包,每 5 秒间隔使用一次数据,而在某个时间点,它会抛出一个错误,指出"Error while rolling log for topic"
在此之后主题的日志被自动删除并且无法使用之后的消息这点?
如何在 faust kafka 流式 python 包中避免这种情况?
[2020-02-07 20:03:27,692] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets
in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-02-07 20:12:51,562] ERROR Error while rolling log segment for ble_rtls-1 in dir /var/lib/kafka/data (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /var/lib/kafka/data/ble_rtls-1/00000000000022839498.index (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:121)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:184)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
以下是我的日志保留政策,
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
delete.topic.enable = false
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=1
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
解决方案
推荐阅读
- c# - 如何在列表中获取放置项目 - WPF
- javascript - Angular 2:从服务返回数据时未定义
- c# - C#中委托()方法的确切含义是什么?
- javascript - 为“Cesium PolylineCollection”添加“名称”和“描述”框
- javascript - 如何创建动态正则表达式
- markov-models - TML(Tractable Markov Logic)是一个很棒的模型!为什么我没有看到它被用于人工智能的广泛应用场景?
- java - java sample web service started with "java.lang.NullPointerException", how to fix?
- hyperledger-fabric - 读取 PEM 文件的 hyperledger explorer 错误问题 :: Error: EISDIR
- android - 工具使用方法:openDrawer = "bottom"
- c - C 语言 K60 系列 GPIO 键盘