首页 > 解决方案 > flink如何集成测试无界流,主要方法无限期运行

问题描述

我有一个运行 Flink 作业的主要方法,它在最后调用env.execute() 方法来触发 flink 作业,而我正在运行测试,它无限期地运行,我做错了什么。

 @ClassRule
  public static DockerComposeContainer environment =
      new DockerComposeContainer(new File("docker-compose.yml"))
          .withExposedService("zookeeper_1", 2181)
          .withExposedService("kafka_1", 29092);

  @ClassRule
  public static MiniClusterResource flinkCluster =
      new MiniClusterResource(
          new MiniClusterResourceConfiguration.Builder()
              .setNumberSlotsPerTaskManager(2)
              .setNumberTaskManagers(1)
              .build());
   @Test
  public void testMain() throws Exception {
    
    // trigger job, it create flink env, run a map function on unbounded stream, and at  last call execute method 
    StreamingJob.main(null);

    // code execution not coming here
    assert (4 == 4);
  }

标签: integration-testingapache-flink

解决方案


我认为这是因为您正在调用StreamingJob. 您有一些如何调用其中包含的方法:

public void execute() throws Exception {
   StreamExecutionEnvironment env = 
   StreamExecutionEnvironment.getExecutionEnvironment();
   // env.YOUR_DATASTREAM_PIPELINE
   env.execute();
}

那么您的集成测试将类似于:

  @Test
  public void test() throws Exception {
    
    StreamingJob streamJob = new StreamingJob();
    streamJob.execute();

    // execute your assertions
  }

看看这个Flink 集成测试的例子。


推荐阅读