java - 带有 WindowStore 单元测试 ClassCastException 的 Kafka 流处理器
问题描述
我试图为实现Processor
包含 的接口的类编写单元测试WindowStore
,但运行测试失败,并ClassCastException
尝试InMemoryWindowStore.init
强制转换MockProcessorContext
为InternalProcessorContext
.
谷歌搜索这个问题并没有发现任何有用的信息,或者表明这是一个已知问题。我正在尝试遵循文档中的指导,但也许我仍然遗漏了一些东西。
如果有人愿意直接引用它,我已经创建了一个带有最小示例的存储库,以在Github上重现该问题。
但除此之外,处理器类是:
package com.cantgetthistowork;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;
public class InMemWindowProcessor implements Processor<String, String> {
private ProcessorContext context;
private WindowStore<String, String> windowStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
windowStore = (WindowStore<String, String>) context.getStateStore("my-win-store");
}
@Override
public void process(String key, String value) {
}
@Override
public void close() {
}
}
测试类是:
package com.cantgetthistowork;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.Before;
import org.junit.Test;
public class InMemWindowProcessorTest {
InMemWindowProcessor processor = null;
MockProcessorContext context = null;
@Before
public void setup() {
processor = new InMemWindowProcessor();
context = new MockProcessorContext();
WindowStore<String, String> store =
Stores.windowStoreBuilder(
Stores.inMemoryWindowStore(
"my-win-store",
Duration.ofMinutes(10),
Duration.ofSeconds(10),
false
),
Serdes.String(),
Serdes.String()
)
.withLoggingDisabled()
.build();
store.init(context, store);
context.register(store, null);
processor.init(context);
}
@Test
public void testThings() {
Instant baseTime = Instant.now();
context.setTimestamp(baseTime.toEpochMilli());
context.setTopic("topic-name");
processor.process("key1", "value1");
}
}
我使用 maven 作为我的构建工具mvn --version
输出:
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00)
Maven home: ~/opt/apache-maven-3.5.0
Java version: 1.8.0_212, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"
最后,mvn test
输出:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.cantgetthistowork.InMemWindowProcessorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE!
testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR!
java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Results :
Tests in error:
testThings(com.cantgetthistowork.InMemWindowProcessorTest): org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0
所以,是的,也许我只是缺少一些配置,或者做一些不正常的事情,或者其他愚蠢的事情。与此同时,我打算编写使用 的测试TopologyTestDriver
,并希望它有效。
我期待人们可能有的任何回应/想法。
解决方案
推荐阅读
- java - 如何更新 Spring Data JPA 中的递归实体
- javascript - 如何根据索引值将状态变量更改为真
- python - 使用相关列选择数据框
- bots - 使用电报机器人从私人群组获取消息更新
- wordpress - Facebook og:图像不加载图像
- python - 将包安装为具有相同顶级模块的多个包的可编辑中断导入
- google-people-api - 检测数据*离开* Google 表格
- react-redux - CRA 样板无法读取 Redux -Saga & Reselect & redux-toolkit 中未定义的属性“initialLoad”
- python - 从父目录python导入文件中的函数
- r - 如何使用 Caret 训练功能提取/存储“非最佳模型”?