integration-testing - flink如何集成测试无界流,主要方法无限期运行
问题描述
我有一个运行 Flink 作业的主要方法,它在最后调用env.execute() 方法来触发 flink 作业,而我正在运行测试,它无限期地运行,我做错了什么。
@ClassRule
public static DockerComposeContainer environment =
new DockerComposeContainer(new File("docker-compose.yml"))
.withExposedService("zookeeper_1", 2181)
.withExposedService("kafka_1", 29092);
@ClassRule
public static MiniClusterResource flinkCluster =
new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testMain() throws Exception {
// trigger job, it create flink env, run a map function on unbounded stream, and at last call execute method
StreamingJob.main(null);
// code execution not coming here
assert (4 == 4);
}
解决方案
我认为这是因为您正在调用StreamingJob
. 您有一些如何调用其中包含的方法:
public void execute() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.YOUR_DATASTREAM_PIPELINE
env.execute();
}
那么您的集成测试将类似于:
@Test
public void test() throws Exception {
StreamingJob streamJob = new StreamingJob();
streamJob.execute();
// execute your assertions
}
看看这个Flink 集成测试的例子。
推荐阅读
- sas - SAS 字符比较
- windows - 如何使用 CMD 和 reg.exe 添加具有包含空格和参数的长路径的程序可执行文件?
- firebase - 如何在日期为 datenow 的 firebase 选择条件中查询
- c# - 在使用 MVVM 单击 ListView 项目内的按钮时选择 ListView 中的项目
- scrapy - 如何使用 Scrapyd 和 ScrapydWeb 在集群中分布蜘蛛?
- ios - 在 IOS 13 中,当 ViewController 未推送时,Google Map 相机位置未显示准确。附上两个屏幕截图
- assembly - 在 GAS .intel-syntax 程序集中的内存标签和寄存器上使用“偏移”和“[]”有什么区别?
- firebase - 为什么 Firebase 身份验证在没有 google dns 的情况下不起作用?
- python - python中所有可能的组合
- go - Golang 将 gob 字符串转换为接口