apache-kafka - Kafka Streams:重试
问题描述
卡夫卡版本 - 1.0.1
我随机收到以下异常。尝试将 request.timeout.ms 增加到 5 分钟,它仍然继续以随机间隔再次超时(几个小时)。目前尚不清楚为什么会出现异常,但重启似乎从它离开的地方恢复,但需要手动任务。因此,尝试启用重试,但这似乎没有效果,因为我在日志中看不到任何重试(意思是失败,然后第一次尝试,再次失败,然后第二次,直到最大重试次数)。您能否对以下异常有所了解,并建议我们如何让 Kafka 流应用程序在发生此异常时继续运行,或者重试一次?如果我们需要增加 request.timeout.ms 的最大值,我们需要注意的缺点是什么,
props.put(ProducerConfig.RETRIES_CONFIG, 3);
2018-07-05 06:04:25 ERROR Housestream:91 - Unknown Exception occurred
org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key GCB21K1X value [L@5e86f18a timestamp 1530783812110)
to topic housestream-digitstore-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 201 record(s) for housestream-digitstore-changelog: 30144 ms has passed since last append.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 201 record(s) for housestream-digitstore-changelog: 30144 ms has passed since last append
尝试将请求超时增加到最大整数值,但遇到另一个超时异常。
2018-07-05 12:22:15 ERROR Housestream:179 - Unknown Exception occurred
org.apache.kafka.streams.errors.StreamsException: task [1_0] Exception caught while punctuating processor 'validatequote'
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:267)
at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619)
at org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430)
at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324)
at org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key 32342 value com.int.digital.QUOTE@2c73fa63 timestamp 153083237883) to topic digital_quote due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms..
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:113)
at org.cox.processor.CheckQuote.handleTasks(CheckQuote.java:122)
at org.cox.processor.CheckQuote$1.punctuate(CheckQuote.java:145)
at org.apache.kafka.streams.processor.internals.ProcessorNode$4.run(ProcessorNode.java:131)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:263)
... 8 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
解决方案
推荐阅读
- javascript - 使用 node.js 创建文件夹,文件
- python - 如何避免 Python I/O 操作对 YAML 文件中的信息进行自动排序?
- c# - 如何在实体框架中为 DatabaseGeneratedOption Identity 生成顺序增量 id
- java - 服务在 Grails 3.3.0 中的线程中运行
- javascript - 优化 Google Apps 脚本中的 for-loop 函数(可能是数组)?
- angular - Google 折线图“dragToZoom”属性在版本 45 上不起作用
- c# - 在 .dll 中发送配置
- html - 如何让边框正常工作html
- python - 如何在 python 中以递增/递减顺序更正列表的值
- c# - 当可以从用户对象中提取所需的属性时,像 GetPhoneNumberAsync(IdentityUser user) 这样的 UserManager 方法的目的是什么?