首页 > 解决方案 > Kafka Streams 中的 RocksDB 异常

问题描述

在一个简单的 Kafka Stream 程序中,当我使用以下代码时,它可以正常工作而不会引发任何错误:

      KTable<String, Long> result= source.mapValues(textLine
      ->textLine.toLowerCase()) .flatMapValues(lowercasedTextLine ->
      Arrays.asList(lowercasedTextLine.split(" "))) .selectKey((ignoredKey,word) ->
      word) .groupByKey() .count("Counts");

      result.to(Serdes.String(), Serdes.Long(), "wc-output");

但是,当我使用以下代码时,出现错误:

    KStream<String, String> source = builder.stream("wc-input");
    source.groupBy((key, word) -> word).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000))).count()
            .toStream().map((key, value) -> new KeyValue<>(key.key(), value))
            .to("wc-output", Produced.with(Serdes.String(), Serdes.Long()));

线程“streams-wordcount-b160d715-f0e0-42ee-831e-0e4eed7e9424-StreamThread-1”org.apache.kafka.streams.errors.StreamsException 中的异常:在进程中捕获异常。taskId=1_0,processor=KSTREAM-SOURCE-0000000006,topic=streams-wordcount-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition,partition=0,offset=0 at org.apache.kafka.streams.processor.internals。 StreamTask.process(StreamTask.java:232) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) at org.apache.kafka.streams.processor.internals.TaskManager.process( TaskManager.java:317) 在 org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java: 822)在org.apache。

标签: lambdaapache-kafkakafka-consumer-apiapache-kafka-streams

解决方案


当您使用窗口聚合时,以不同的方式存储 a,并且 Kafka 中存在1.0.0影响 Windows 操作系统的错误:窗口存储的名称包含:Windows 操作系统上不允许的 a。该错误已在版本中修复,1.0.1并且1.1.0

参照。https://issues.apache.org/jira/browse/KAFKA-6167


推荐阅读