unit-testing - 使用 kafka 流绑定器测试 Spring 云流:使用 TopologyTestDriver 我收到“该类不在受信任的包中”的错误
问题描述
我有这个使用 kafka 流绑定器的简单流处理器(不是消费者/生产者)。
@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
return Function { input-> input.map { key, value ->
println("\nPAYLOAD KEY: ${key.name}\n");
println("\nPAYLOAD value: ${value.address}\n");
val output = FooAddressPlus()
output.address = value.address
output.name = value.name
output.plus = "$value.name-$value.address"
KeyValue(key, output)
}}
}
我正在尝试使用 TopologyTestDriver 对其进行测试:
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
var testDriver: TopologyTestDriver? = null
val INPUT_TOPIC = "input"
val OUTPUT_TOPIC = "output"
val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()
fun getStreamsConfiguration(): Properties? {
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
return streamsConfiguration
}
@Before
fun setup() {
val builder = StreamsBuilder()
val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
val processor = FooProcessor()
val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
}
@After
fun tearDown() {
try {
testDriver!!.close()
} catch (e: RuntimeException) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
println("Ignoring exception, test failing in Windows due this exception:" + e.localizedMessage)
}
}
@org.junit.Test
fun testOne() {
val inputTopic: TestInputTopic<FooName, FooAddress> =
testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
val key = FooName()
key.name = "sherlock"
val value = FooAddress()
value.name = "sherlock"
value.address = "Baker street"
inputTopic.pipeInput(key, value)
val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
val message = outputTopic.readValue()
assertThat(message.name).isEqualTo(key.name)
assertThat(message.address).isEqualTo(value.address)
}
}
运行它时,我收到此错误inputTopic.pipeInput(key, value)
类“包.FooAddress”不在受信任的包中:[java.util, java.lang]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 ( )。*
关于如何解决这个问题的任何想法?设置这些属性getStreamsConfiguration()
没有帮助。请注意,这是一个流处理器,而不是消费者/生产者。
非常感谢!
解决方案
当 Kafka 创建 Serde 本身时,它通过调用来应用属性configure()
。
由于您自己实例化 Serde,因此您需要configure()
在属性映射中调用它。
这就是受信任的包属性传播到反序列化器的方式。
或者,您可以调用setTrustedPackages()
解串器。
推荐阅读
- android - RecyclerView 在滚动时丢失了它的项目位置
- javascript - 如何在javascript中分割文本
- unity3d - 需要帮助实施 Perforce depot 或 GitHub repot,允许异地云托管与本地计算机之间的现场主服务器同步
- pandas - 在 pandas 的 groupby 期间根据列是否包含特定字符串来创建变量
- r - 如何从R中的数据框中删除重复项
- android - 有两个intent filter的activity重启了,但是重启的时候主intent-filter没有被调用,其他intentet filter被调用
- salesforce - 无法连接到 Salesforce api
- django - 移动设备上的 Django 实时访问
- java - 我在哪里可以下载用于 Java 的 rt.jar?
- python - Python pandas 替代 'map' 用于 2 个变量的函数