首页 > 解决方案 > 编写单元测试来断言 flink 函数实例是可序列化的

问题描述

我在运行时遇到了这个异常:

org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields. 

虽然我了解正在发生的事情并知道如何解决它,但我想确保它不会再次发生。当有人向这个 RichFlatMapFunction 类添加不可序列化的字段时,我希望单元测试失败而不是在运行时失败。

有没有办法编写一个单元测试来断言该函数是可序列化的,使用与 flink 相同的函数序列化代码?

标签: unit-testingapache-flinkserializable

解决方案


对于这种情况,请改用集成测试

在以下代码中,该行将env.execute();运行管道并序列化运算符MultiplyByTwoCollectSink.

您可以使用相同的方式来测试是否RichFlatMapFunction可序列化。

public class ExampleIntegrationTest extends AbstractTestBase {

    @Test
    public void testMultiply() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(1);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new MultiplyByTwo())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = new ArrayList<>();

        @Override
        public synchronized void invoke(Long value) throws Exception {
            values.add(value);
        }
    }
}

参考:https ://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing


推荐阅读