首页 > 解决方案 > 使用 kafka 流绑定器测试 Spring 云流:使用 TopologyTestDriver 我收到“该类不在受信任的包中”的错误

问题描述

我有这个使用 kafka 流绑定器的简单流处理器(不是消费者/生产者)。

@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
    return Function { input-> input.map { key, value ->
        println("\nPAYLOAD KEY: ${key.name}\n");
        println("\nPAYLOAD value: ${value.address}\n");
        val output = FooAddressPlus()
        output.address = value.address
        output.name = value.name
        output.plus = "$value.name-$value.address"
        KeyValue(key, output)
    }}
}

我正在尝试使用 TopologyTestDriver 对其进行测试:

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.NONE,
        classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
    var testDriver: TopologyTestDriver? = null
    val INPUT_TOPIC = "input"
    val OUTPUT_TOPIC = "output"

    val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
    val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()

    fun getStreamsConfiguration(): Properties? {
        val streamsConfiguration = Properties()
        streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
        streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
        streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
        return streamsConfiguration
    }

    @Before
    fun setup() {
        val builder = StreamsBuilder()
        val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
        val processor = FooProcessor()
        val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
        output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
        testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
    }

    @After
    fun tearDown() {
        try {
            testDriver!!.close()
        } catch (e: RuntimeException) {
            // https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
            // Logged stacktrace cannot be avoided
            println("Ignoring exception, test failing in Windows due this exception:" + e.localizedMessage)
        }
    }

    @org.junit.Test
    fun testOne() {
        val inputTopic: TestInputTopic<FooName, FooAddress> =
                testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
        val key = FooName()
        key.name = "sherlock"
        val value = FooAddress()
        value.name = "sherlock"
        value.address = "Baker street"
        inputTopic.pipeInput(key, value)
        val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
                testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
        val message = outputTopic.readValue()

        assertThat(message.name).isEqualTo(key.name)
        assertThat(message.address).isEqualTo(value.address)
    }
}

运行它时,我收到此错误inputTopic.pipeInput(key, value)

类“.FooAddress”不在受信任的包中:[java.util, java.lang]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 ( )。*

关于如何解决这个问题的任何想法?设置这些属性getStreamsConfiguration()没有帮助。请注意,这是一个流处理器,而不是消费者/生产者。

非常感谢!

标签: unit-testingkotlinspring-cloud-stream

解决方案


当 Kafka 创建 Serde 本身时,它通过调用来应用属性configure()

由于您自己实例化 Serde,因此您需要configure()在属性映射中调用它。

这就是受信任的包属性传播到反序列化器的方式。

或者,您可以调用setTrustedPackages()解串器。


推荐阅读