apache-nifi - Nifi处理器被触发两次单个输入流文件
问题描述
我目前是 Apache Nifi 的新手,并且仍在探索它。我制作了一个自定义处理器,我将在其中通过分页从服务器获取数据。我传递了包含属性“url”的输入文件。最后在输出流文件中传输响应,因为我通过分页获取数据,所以我为每个页面制作了一个新的输出流文件并将其传输到成功关系。下面是代码部分:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incomingFlowFile = session.get();
String api = null;
if (incomingFlowFile == null) {
logger.info("empty input flow file");
session.commit();
return;
} else {
api=incomingFlowFile.getAttribute("url");
}
session.remove(incomingFlowFile);
if(api == null) {
logger.warn("API url is null");
session.commit();
return;
}
int page = Integer.parseInt(context.getProperty(PAGE).getValue());
while(page < 3) {
try {
String url = api + "&curpg=" + page;
logger.info("input url is: {}", url);
HttpResponse response = httpGetApiCall(url, 10000);
if(response == null || response.getEntity() == null) {
logger.warn("response null");
session.commit();
return;
}
String resp = EntityUtils.toString(response.getEntity());
InputStream is = new ByteArrayInputStream(StandardCharsets.UTF_16.encode(resp).array());
FlowFile outFlowFile = session.create();
outFlowFile = session.importFrom(is, outFlowFile);
session.transfer(outFlowFile, SUCCESSFUL);
} catch (IOException e) {
logger.warn("IOException :{}", e.getMessage());
return;
}
++page;
}
session.commit();
}
我面临的问题是,对于单个输入流文件,该处理器被触发两次,因此它为单个输入流文件生成 4 个流文件。
我无法弄清楚我做错了什么。请帮助解决这个问题。提前致谢。
==================================================== ==================== 处理器组 1(Nifi_Parvin)
解决方案
推荐阅读
- python - 如何合并两个数据框以匹配匹配的第一行或下一行
- hapijs - 如何在 Hapi ^18 上设置 statusCode 和特定消息
- html - 无法左对齐侧边栏链接,使其居中
- arrays - Julia - 如何将数组中的子字符串/字符串放入自己的单独数组中
- angular - 不允许在 registerForm angular8 中使用“@”
- jupyter-notebook - 如何使用 JupyterLab 的部分名称找到特定的 Jupyter Notebook?
- javascript - 创建 API 以使用 id 更新记录
- python - Django 的非社交提供者
- android - 滚动浏览Tabview时Android自动选择Tab
- android - 对 Android 设备上的退化三角带的普遍支持?