首页 > 解决方案 > 集成测试 flink 作业

问题描述

我写了一个小的 flink 应用程序。我有一些输入,并使用来自外部来源的数据来丰富它。这是我构建用于丰富的http客户端RichAsyncFunction的方法。open

现在我想为我的工作编写一个集成测试。但是由于 http 客户端是在 open 方法中创建的,所以我无法提供它,并在我的集成测试中模拟它。我试图重构它在构造函数中提供它,但我总是遇到序列化错误。

这是我正在使用的示例: https ://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html

提前致谢 :)

标签: apache-flinkflink-streaming

解决方案


这个问题是在一年前发布的,但我会发布答案,以防将来有人偶然发现这个问题。

您看到的序列化异常可能是这样的

Exception encountered when invoking run on a nested suite. *** ABORTED *** (610 milliseconds)
  java.lang.NullPointerException:
  at java.util.Objects.requireNonNull(Objects.java:203)
  at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
  at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:136)
  at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:77)
  at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
  at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:366)
  at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:165)
...

原因是您的测试操作员需要知道如何反序列化 DataStream 输入类型。提供此功能的唯一方法是在初始化时直接提供它testHarness,然后将其传递给setup()方法调用。

因此,要测试您链接的Flink 文档中的示例,您可以执行以下操作(我的实现是在 Scala 中,但您也可以将其调整为 Java)

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
import org.apache.flink.streaming.runtime.tasks.{StreamTaskActionExecutor, TestProcessingTimeService}
import org.apache.flink.streaming.runtime.tasks.mailbox.{MailboxExecutorImpl, TaskMailboxImpl}
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}


/**
 This test case is written using Flink 1.11+.
 Older versions likely have a simpler constructor definition for [[AsyncWaitOperator]] so you might have to remove the last two arguments (processingTimeService and mailboxExecutor)
*/
class AsyncDatabaseRequestSuite extends FunSuite with BeforeAndAfter with Matchers {

  var testHarness: OneInputStreamOperatorTestHarness[String, (String, String)] = _
  val TIMEOUT = 1000
  val CAPACITY = 1000
  val MAILBOX_PRIORITY = 0

  def createTestHarness: Unit = {
    val operator = new AsyncWaitOperator[String, (String, String)](
      new AsyncDatabaseRequest {

        override def open(configuration: Configuration): Unit = {
          client = new MockDatabaseClient(host, post, credentials);  // put your mock DatabaseClient object here
        }
      },
      TIMEOUT,
      CAPACITY,
      OutputMode.UNORDERED,
      new TestProcessingTimeService,
      new MailboxExecutorImpl(
        new TaskMailboxImpl,
        MAILBOX_PRIORITY,
        StreamTaskActionExecutor.IMMEDIATE
      )
    )

    // supply the TypeSerializer for the "input" type of the operator
    testHarness = new OneInputStreamOperatorTestHarness[String, (String, String)](
      operator,
      TypeExtractor.getForClass(classOf[String]).createSerializer(new ExecutionConfig)
    )

    // supply the TypeSerializer for the "output" type of the operator to the setup() call
    testHarness.setup(
      TypeExtractor.getForClass(classOf[(String, String)]).createSerializer(new ExecutionConfig)
    )
    testHarness.open()
  }

  before {
    createTestHarness
  }

  after {
    testHarness.close()
  }

  test("Your test case goes here") {
    // fill in your test case here
  }

}

推荐阅读