lambda - Kafka Streams 中的 RocksDB 异常
问题描述
在一个简单的 Kafka Stream 程序中,当我使用以下代码时,它可以正常工作而不会引发任何错误:
KTable<String, Long> result= source.mapValues(textLine
->textLine.toLowerCase()) .flatMapValues(lowercasedTextLine ->
Arrays.asList(lowercasedTextLine.split(" "))) .selectKey((ignoredKey,word) ->
word) .groupByKey() .count("Counts");
result.to(Serdes.String(), Serdes.Long(), "wc-output");
但是,当我使用以下代码时,出现错误:
KStream<String, String> source = builder.stream("wc-input");
source.groupBy((key, word) -> word).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000))).count()
.toStream().map((key, value) -> new KeyValue<>(key.key(), value))
.to("wc-output", Produced.with(Serdes.String(), Serdes.Long()));
线程“streams-wordcount-b160d715-f0e0-42ee-831e-0e4eed7e9424-StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中的异常:在进程中捕获异常。taskId=1_0,processor=KSTREAM-SOURCE-0000000006,topic=streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition,partition=0,offset=0 at org.apache.kafka.streams.processor.internals。 StreamTask.process(StreamTask.java:232) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) at org.apache.kafka.streams.processor.internals.TaskManager.process( TaskManager.java:317) 在 org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java: 822)在org.apache。
解决方案
当您使用窗口聚合时,以不同的方式存储 a,并且 Kafka 中存在1.0.0
影响 Windows 操作系统的错误:窗口存储的名称包含:
Windows 操作系统上不允许的 a。该错误已在版本中修复,1.0.1
并且1.1.0
推荐阅读
- apache-kafka - 如何理解 Kafka HA?
- nginx - 您如何将 NGINX 指向多个站点的标准 PHP?
- docker - Kubernetes 服务:随机连接被拒绝
- database - typeorm 中的复杂查询
- python - 使用 dask 作为并行后端时的问题
- string - VIsual Basic Strings.Trim(String) 方法和 .NET String.Trim 方法有区别吗?
- php - 使用通配符键从数组中提取键和值
- ansible - Ansible 不接受客户失败模块
- javascript - 无法读取未定义的属性 - 小的随机报价应用程序错误
- azure - 使用 ARM 模板和 GitHub 操作将 .NET Core 应用程序部署到 Azure Web 应用程序