apache-flink - 集成测试 flink 作业
问题描述
我写了一个小的 flink 应用程序。我有一些输入,并使用来自外部来源的数据来丰富它。这是我构建用于丰富的http客户端RichAsyncFunction
的方法。open
现在我想为我的工作编写一个集成测试。但是由于 http 客户端是在 open 方法中创建的,所以我无法提供它,并在我的集成测试中模拟它。我试图重构它在构造函数中提供它,但我总是遇到序列化错误。
这是我正在使用的示例: https ://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html
提前致谢 :)
解决方案
这个问题是在一年前发布的,但我会发布答案,以防将来有人偶然发现这个问题。
您看到的序列化异常可能是这样的
Exception encountered when invoking run on a nested suite. *** ABORTED *** (610 milliseconds)
java.lang.NullPointerException:
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:136)
at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:77)
at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:366)
at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:165)
...
原因是您的测试操作员需要知道如何反序列化 DataStream 输入类型。提供此功能的唯一方法是在初始化时直接提供它testHarness
,然后将其传递给setup()
方法调用。
因此,要测试您链接的Flink 文档中的示例,您可以执行以下操作(我的实现是在 Scala 中,但您也可以将其调整为 Java)
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
import org.apache.flink.streaming.runtime.tasks.{StreamTaskActionExecutor, TestProcessingTimeService}
import org.apache.flink.streaming.runtime.tasks.mailbox.{MailboxExecutorImpl, TaskMailboxImpl}
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
/**
This test case is written using Flink 1.11+.
Older versions likely have a simpler constructor definition for [[AsyncWaitOperator]] so you might have to remove the last two arguments (processingTimeService and mailboxExecutor)
*/
class AsyncDatabaseRequestSuite extends FunSuite with BeforeAndAfter with Matchers {
var testHarness: OneInputStreamOperatorTestHarness[String, (String, String)] = _
val TIMEOUT = 1000
val CAPACITY = 1000
val MAILBOX_PRIORITY = 0
def createTestHarness: Unit = {
val operator = new AsyncWaitOperator[String, (String, String)](
new AsyncDatabaseRequest {
override def open(configuration: Configuration): Unit = {
client = new MockDatabaseClient(host, post, credentials); // put your mock DatabaseClient object here
}
},
TIMEOUT,
CAPACITY,
OutputMode.UNORDERED,
new TestProcessingTimeService,
new MailboxExecutorImpl(
new TaskMailboxImpl,
MAILBOX_PRIORITY,
StreamTaskActionExecutor.IMMEDIATE
)
)
// supply the TypeSerializer for the "input" type of the operator
testHarness = new OneInputStreamOperatorTestHarness[String, (String, String)](
operator,
TypeExtractor.getForClass(classOf[String]).createSerializer(new ExecutionConfig)
)
// supply the TypeSerializer for the "output" type of the operator to the setup() call
testHarness.setup(
TypeExtractor.getForClass(classOf[(String, String)]).createSerializer(new ExecutionConfig)
)
testHarness.open()
}
before {
createTestHarness
}
after {
testHarness.close()
}
test("Your test case goes here") {
// fill in your test case here
}
}
推荐阅读
- javascript - 如何检查 html 元素是否包含它自己的文本(包含 textNode)
- oracle - Function to query data import date when there is no "created at" column in Oracle
- firebase - Slow function execution from google iot core MQTT modifyCloudToDeviceConfig()
- .net - How to pass variable list in ViewBag
- java - Insert into Hive ORC table by nested POJO
- c# - 从 NOSQL 数据库中读出 Double 和 DateTime
- installshield - Evaluate if variable contains a value (installscript)
- javascript - Javascript - 将 1 转换为真(示例)
- c# - 确定用户是否通过双击文件或上下文菜单打开应用程序
- node.js - `npm 安装;npm run build` 不会在 jenkins 上生成 build.js