apache-kafka - Kafka Streams 2.1.1 类转换同时刷新定时聚合以存储
问题描述
我正在尝试使用 kafka 流来执行窗口聚合并仅在某个会话窗口关闭后才发出结果。为此,我使用了抑制功能。
问题是我找不到使这个简单测试工作的方法,因为当它尝试保持状态时,我得到一个类转换异常,因为它试图将 Windowed 转换为 String。我试图提供给聚合函数 aMaterialized<Windowed<String>,Long,StateStore<>>
但它没有类型检查,因为它期望第一个类型是简单的字符串。
我在这里想念什么?
卡夫卡版本 2.1.1
package test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Test;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Properties;
public class TestAggregation {
@Test
public void aggregationTest() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Long> input = streamsBuilder.stream("input");
input
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
.aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((k, v) -> new KeyValue<>(k.key(), v))
.to("output");
Topology topology = streamsBuilder.build();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, Long> producer =
new ConsumerRecordFactory<>("input", Serdes.String().serializer(), Serdes.Long().serializer());
testDriver.pipeInput(producer.create("input", "key", 10L));
ProducerRecord<String, Long> output = testDriver.readOutput("output", Serdes.String().deserializer(), Serdes.Long().deserializer());
System.out.println(MessageFormat.format("output: k: {0}, v:{1}", output.key(), output.value()));
}
}
这是我从中得到的堆栈跟踪
17:05:38.925 [main] DEBUG org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Committing
17:05:38.925 [main] DEBUG org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [0_0] Flushing all stores registered in the state manager
17:05:38.929 [main] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:37)
at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:198)
at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:405)
at test.TestAggregation.aggregationTest(TestAggregation.java:49)
解决方案
有两个选项可以解决该问题:
采用
TimeWindowedKStream::aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
采用
KStream::groupByKey(final Grouped<K, V> grouped)
在您的情况下,它将是:
广告 1:
input
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
.aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2, Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((k, v) -> new KeyValue<>(k.key(), v))
.to("output");
广告 2:
input
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long())
.windowedBy(SessionWindows.with(Duration.ofSeconds(30)))
.aggregate(() -> Long.valueOf(0), (key, v1, v2) -> v1 + v2, (key, agg1, agg2) -> agg1 + agg2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((k, v) -> new KeyValue<>(k.key(), v))
.to("output");
推荐阅读
- ios - 恼人的错误:找不到框架 GoogleAppMeasurement
- pip - Python3 我尝试在 Windows 10 上安装 lshash 0.0.4dev
- html - 在反应按钮上添加和删除类
- pyqt5 - 从单独的 python 文件调用的 PyQt5 按钮方法
- c++ - C++中冒号“:”的作用是什么?
- javascript - 如何按屏幕大小javascript执行功能
- java - 如何读取 java pig latin 的文本文件?
- c# - 在 Blazor 应用程序中成功提交时写入 Cookie
- c++ - Automatic Object Construction in Function Parameters by Passing Constructor Parameters
- html - Angular 9 - 简单的 CSS 动画 - (不是 Angular 动画!) - 如何向后(反向)重用关键帧?