apache-kafka - 无法刷新状态存储(带有 Kafka Stream Binder 的 Spring Cloud Stream)
问题描述
我正在尝试创建一个简单的流应用程序,它读取 json 格式的日志消息并计算不同类型的日志。此应用程序因 a 失败NullPointerException
,这似乎不可行MessageConverter
。
我应该使用其他内容类型还是 serde?任何帮助,将不胜感激。
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.2.RELEASE)
2018-06-06 14:03:26.876 INFO 1 --- [ main] c.c.m.e.e.l.LogEventConsumerApplication : Starting LogEventConsumerApplication v0.0.1-SNAPSHOT on logevent-stream-consumer-14-xx17g with PID 1 (/deployments/logevent-stream-consumer-0.0.1-SNAPSHOT.jar started by ? in /deployments)
2018-06-06 14:03:26.889 INFO 1 --- [ main] c.c.m.e.e.l.LogEventConsumerApplication : No active profile set, falling back to default profiles: default
2018-06-06 14:03:27.047 INFO 1 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@358ee631: startup date [Wed Jun 06 14:03:27 CST 2018]; root of context hierarchy
2018-06-06 14:03:28.457 INFO 1 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2018-06-06 14:03:29.186 INFO 1 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2018-06-06 14:03:29.192 INFO 1 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2018-06-06 14:03:29.422 INFO 1 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$3a04a801] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-06-06 14:03:29.594 INFO 1 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$d14451ad] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-06-06 14:03:30.599 INFO 1 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
Exception in thread "default-group-92b2551e-6d84-47a1-9255-2ccc7be847b6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store default-group-counts-store
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:115)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:99)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:132)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:128)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
... 14 more
应用程序属性
spring.cloud.stream.bindings.es4xx5xxError.destination=es4xx5xx_error
spring.cloud.stream.bindings.es4xx5xxError.group=default-group
spring.cloud.stream.bindings.errorEvent.destination=test-event-x
spring.cloud.stream.bindings.errorEvent.group=default-group
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.zkNodes=localhost
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.timeWindow.length=${TIME_WINDOW_MS:60000}
spring.cloud.stream.kafka.streams.timeWindow.advanceBy=${TIME_ADVANCE_MS:60000}
处理器
@Autowired
private TimeWindows window;
@Value("${spring.cloud.stream.bindings.errorEvent.group}")
private String group;
@StreamListener("es4xx5xxError")
@SendTo("errorEvent")
public KStream<?, CodeCount> process(KStream<?, Source> data) {
return data.mapValues(x -> x.getResponseCode()+"")
.groupBy((key, code) -> code)
.windowedBy(window)
.count(Materialized.as(group + "-counts-store"))
.toStream()
.map((w, c) -> new KeyValue<>(null, getCodeCount(w, c)));
}
解决方案
推荐阅读
- javascript - 在两个之间更改字符串的颜色 | 符号
- python - 显示模式
- godot - 如何在 Godot 中像素化 2D 游戏?
- c++ - 在写入大量数据时,mmap() 比 ofstream() 慢
- python - 如何摆脱我的 Selenium 自动化弹出的 google cookie
- swift - 如何将 USDZ 下载为 Data 对象并将其转换回 USDZ?
- reactjs - FB 与 react native 集成时出错
- flutter - 如何进行测试分片或并行运行以实现颤振
- visual-c++ - 使用 AFX_IDB_CHECKLISTBOX_95 调试 MFC 应用程序时出错
- reactjs - 如何在 React 中的参考输入中设置默认值 - 管理员