apache - NiFi 定制处理器在一段时间后不会发布数据到下一个处理器
问题描述
我已经编写了redis数据丰富器,它将基于macid从redis获取规则和超时密钥,但它正在工作一段时间,并且在一段时间后它不会将流文件发送到下一个处理器(它将处于运行状态但不会将流文件发送到下一个处理器)。Nifi 正在集群模式下工作,下面的处理器(RedisDataEnricher)是否有任何问题。
在下面的代码中,我只使用了一次 redis 连接,之后我使用相同的连接从 redis 中获取数据。
public class RedisDataEnricher extends AbstractProcessor {
private volatile Jedis jedis;
public static final PropertyDescriptor ConnectionHost = new PropertyDescriptor
.Builder().name("ConnectionHost")
.displayName("ConnectionHost")
.description("ConnectionHost")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ConnectionPort = new PropertyDescriptor
.Builder().name("ConnectionPort")
.displayName("ConnectionPort")
.description("ConnectionPort")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor JSONKEY = new PropertyDescriptor
.Builder().name("JSONKEY")
.displayName("JSONKEY")
.description("JSON key to be fetched from input")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MACIDKEY = new PropertyDescriptor
.Builder().name("MACIDKEY")
.displayName("MACIDKEY")
.description("MACIDKEY to be fetched from input")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("SUCCESS")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("FAILURE")
.description("FAILURE")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(JSONKEY);
descriptors.add(MACIDKEY);
descriptors.add(ConnectionHost);
descriptors.add(ConnectionPort);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS);
relationships.add(FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
jedis = new Jedis(context.getProperty("ConnectionHost").toString(),Integer.parseInt(context.getProperty("ConnectionPort").toString()));
} catch (Exception e) {
getLogger().error("Unable to establish Redis connection.");
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
else{
try{
InputStream inputStream = session.read(flowFile);
StringWriter writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
Jedis jedis1=jedis;
JSONObject json=new JSONObject(writer.toString());
inputStream.close();
JSONObject json1=new JSONObject();
String rules=jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()), "rules");
json1.put("data", json.getJSONObject(context.getProperty("JSONKEY").toString()));
json1.put("timeOut", jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()),"timeOut"));
json1.put("rules", rules!=null?new ArrayList<String>(Arrays.asList(rules.split(" , "))):new ArrayList<>());
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(json1.toString().getBytes());
}
});
flowFile = session.putAttribute(flowFile, "OutBound", jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()),"OutBound"));
session.transfer(flowFile, SUCCESS);
}
catch(Exception e)
{
session.transfer(flowFile, FAILURE);
}
}
}
}
解决方案
推荐阅读
- c++ - 使用 netlink 获取进程 inode
- java - 单击结帐按钮时总计的问题
- reactjs - 我可以在 REST API 和 RectJS 中使用会话对象和会话 ID 进行身份验证吗?
- sql - Postgres对索引列的简单选择太慢了
- laravel-valet - 为什么 Laravel Valet 不在 Catalina 上使用自制软件?
- sql - 如何获得每年 SQLite 的 x 个最佳结果
- scala - Scala中的方法调用反射会导致性能下降吗?
- javascript - 无法使用 Angular 发布到 CosmosDB
- ruby-on-rails - Rails HABTM:通过
- c++ - 执行转换错误的过程