unit-testing - 编写单元测试来断言 flink 函数实例是可序列化的
问题描述
我在运行时遇到了这个异常:
org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
虽然我了解正在发生的事情并知道如何解决它,但我想确保它不会再次发生。当有人向这个 RichFlatMapFunction 类添加不可序列化的字段时,我希望单元测试失败而不是在运行时失败。
有没有办法编写一个单元测试来断言该函数是可序列化的,使用与 flink 相同的函数序列化代码?
解决方案
对于这种情况,请改用集成测试。
在以下代码中,该行将env.execute();
运行管道并序列化运算符MultiplyByTwo
和CollectSink
.
您可以使用相同的方式来测试是否RichFlatMapFunction
可序列化。
public class ExampleIntegrationTest extends AbstractTestBase {
@Test
public void testMultiply() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new MultiplyByTwo())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = new ArrayList<>();
@Override
public synchronized void invoke(Long value) throws Exception {
values.add(value);
}
}
}
参考:https ://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing
推荐阅读
- html - 如何在 HTML 中依次列出动态数据?(意思是)
- django - Django:如何创建一个只有一个自动字段的对象来获取它的 ID
- javascript - 如何在垫表顶部插入新行
- angular - 用户名必须至少为 8 个字符,并且必须是字母和数字的组合
- python - python数据框中的Splittig数据并自动获取数组值
- mysql - 如何通过另一个表中的类似数据更新sql中的列
- unity3d - 创建自定义形状
- node.js - Mocha 使用 ts-mockito 的测试代码不起作用
- c# - 如何从 2 个文本字段中减去一个值并将该值存储在第二个文本字段中?
- apache-flink - 使用 Flink Sql 选择 Top N Per Group