java - 我们能否以及如何将 apache nifi 流文件及其属性保存到磁盘,以便我们可以使用/重新读取它们以用于自定义处理器单元测试用例
问题描述
我正在尝试探索如何为 Apache Nifi 编写单元测试用例,这样我就可以避免“更改代码,构建 nar,将 nar 粘贴到 lib 文件夹中,重新启动 nifi”循环。但是,我想,为此我还需要将流文件捕获到本地磁盘并在每次运行单元测试用例时重新加载它们。我遇到了这篇文章,它要求将流文件序列化到磁盘,然后读取这些文件并将它们排入单元测试中的处理器,以将它们提供给我目前正在开发的自定义处理器。文章要求使用MergeContent
withFlowFileV3
选项,然后使用PutFile
. 我能够以.pkg
格式保存这些文件。我正在按照同一篇文章中的建议,GetFile
使用IndetifyMimeType
和UnpackContent
处理器。但是我在下面的代码中这样做:
//Get File
TestRunner getFileRunner = TestRunners.newTestRunner(new GetFile());
getFileRunner.setProperty(GetFile.DIRECTORY, "C:\\Mahesh\\delete\\serialized-flow-file-2");
getFileRunner.setProperty(GetFile.KEEP_SOURCE_FILE, "true");
getFileRunner.run(1);
List<MockFlowFile> getFileResult = getFileRunner.getFlowFilesForRelationship(GetFile.REL_SUCCESS);
List<? extends FlowFile> getFileFFResult = getFileResult;
//IdentifyMimeType
TestRunner identifyMimeTypeRunner = TestRunners.newTestRunner(new IdentifyMimeType());
identifyMimeTypeRunner.enqueue(getFileFFResult.toArray(new FlowFile[getFileFFResult.size()]));
identifyMimeTypeRunner.run(1);
List<MockFlowFile> identifyMimeTypeResult = identifyMimeTypeRunner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);
List<? extends FlowFile> identifyMimeTypeFFResult = identifyMimeTypeResult;
//UnpackContent
TestRunner unpackContentRunner = TestRunners.newTestRunner(new UnpackContent());
unpackContentRunner.enqueue(identifyMimeTypeFFResult.toArray(new FlowFile[identifyMimeTypeFFResult.size()]));
unpackContentRunner.run(1);
List<MockFlowFile> unpackContentResult = unpackContentRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
List<? extends FlowFile> unpackContentFFResult = unpackContentResult;
但是我收到以下异常:
17:39:36.676 [pool-1-thread-1] INFO org.apache.nifi.processors.standard.GetFile - GetFile[id=2e2161db-48a7-4a13-b7dd-ec75ce2b30dc] added FlowFile[0,618912147321300.pkg,556530B] to flow
17:40:08.772 [pool-2-thread-1] INFO org.apache.nifi.processors.standard.IdentifyMimeType - IdentifyMimeType[id=aefc3abe-0820-48a0-8935-e905aeadb191] Identified FlowFile[0,618912147321300.pkg,556530B] as having MIME Type application/flowfile-v3
17:40:48.625 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] failed to process due to java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed; rolling back session: java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
17:40:48.630 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent -
java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我有些疑惑:
首先,很明显,为什么我会收到以下错误:
FlowFile already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
?我做得对吗?我的方法是使用保存这些流文件
MergeContent
,PutFile
然后使用 , 读取它们GetFile
,IndentifyMimeType
并且UnpackContent
正确吗?我正在考虑将我的提要的输出提供UnpackContent
给我的自定义处理器TestRunner
?这一切都正确吗?或者他们是我完全错过的其他一些更可取/标准的方法?这种方法是否会保留流文件的属性(如文章中所述),以便我可以盲目地将它们排入自定义处理器的测试运行器并且它将运行干净(如果我成功修复了上述异常)?
编辑
在调试时,我进入了一些框架类,然后在 eclipse 调试 shell 中,我做了e.printStackTrace()
,它打印了这个:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:89)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:763)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:463)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:209)
Caused by: java.lang.AssertionError: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:201)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:160)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:155)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:150)
at MyCustomProcessorTest.testOnTrigger(MyCustomProcessorTest.java:47)
... 23 more
Caused by: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
MyCustomProcessorTest.java:47
在哪里unpackContentRunner.run(1)
。
解决方案
模拟框架并不是真的要编写将多个处理器链接在一起的测试。模拟框架用于对单个处理器进行单元测试。
使用 mock 框架设置流文件有很多不同的方法。流文件的内容可以来自文件、字符串、输入流或字节数组:
可以指定一个可选的属性映射来设置预期的流文件属性。
一种常见的方法是在 src/test/resources 中为您的自定义处理器期望的任何数据设置文件,然后调用 testRunner.enqueue(pathToTestFile)。
推荐阅读
- python - 使用 Python 和流式方法将文件从 HDFS 复制到 SFTP 服务器
- javascript - × TypeError:无法读取未定义的属性“地图”。发现错误
- rabbitmq - 我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?
- angular - 图像框的拖放
- javascript - 有没有办法通过单击 VSCode 上的按钮/快捷方式同时运行“tsc -watch”和“npm start”?
- android - FAILURE:构建失败并出现异常。应为 BEGIN_ARRAY,但为 BEGIN_OBJECT
- python - 如何将最相似的rand生成字符串作为主字符串,通过世代变异随机字符串以实现原始字符串?
- npm - 错误 403 - 尝试 npm install 时被禁止
- python - 在 Python 3.4 (Windows 10) 上升级后 pip 不起作用。如何降级?
- python - forward() 接受 1 个位置参数,但给出了 2 个