首页 > 解决方案 > NiFi 定制处理器在一段时间后不会发布数据到下一个处理器

问题描述

我已经编写了redis数据丰富器,它将基于macid从redis获取规则和超时密钥,但它正在工作一段时间,并且在一段时间后它不会将流文件发送到下一个处理器(它将处于运行状态但不会将流文件发送到下一个处理器)。Nifi 正在集群模式下工作,下面的处理器(RedisDataEnricher)是否有任何问题。

在下面的代码中,我只使用了一次 redis 连接,之后我使用相同的连接从 redis 中获取数据。

public class RedisDataEnricher extends AbstractProcessor {

    private volatile Jedis jedis;


    public static final PropertyDescriptor ConnectionHost = new PropertyDescriptor
            .Builder().name("ConnectionHost")
            .displayName("ConnectionHost")
            .description("ConnectionHost")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor ConnectionPort = new PropertyDescriptor
            .Builder().name("ConnectionPort")
            .displayName("ConnectionPort")
            .description("ConnectionPort")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final PropertyDescriptor JSONKEY = new PropertyDescriptor
            .Builder().name("JSONKEY")
            .displayName("JSONKEY")
            .description("JSON key to be fetched from input")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor MACIDKEY = new PropertyDescriptor
            .Builder().name("MACIDKEY")
            .displayName("MACIDKEY")
            .description("MACIDKEY to be fetched from input")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .description("SUCCESS")
            .build();
    public static final Relationship FAILURE = new Relationship.Builder()
            .name("FAILURE")
            .description("FAILURE")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(JSONKEY);
        descriptors.add(MACIDKEY);
        descriptors.add(ConnectionHost);
        descriptors.add(ConnectionPort);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS);
        relationships.add(FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        try {
            jedis = new Jedis(context.getProperty("ConnectionHost").toString(),Integer.parseInt(context.getProperty("ConnectionPort").toString()));
        } catch (Exception e) {
            getLogger().error("Unable to establish Redis connection.");
        }

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }

        else{
            try{
                InputStream inputStream = session.read(flowFile);
                StringWriter writer = new StringWriter();
                IOUtils.copy(inputStream, writer, "UTF-8");
                Jedis jedis1=jedis;
                JSONObject json=new JSONObject(writer.toString());
                inputStream.close();
                JSONObject json1=new JSONObject();
                String rules=jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()), "rules");
                json1.put("data", json.getJSONObject(context.getProperty("JSONKEY").toString()));
                json1.put("timeOut", jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()),"timeOut"));
                json1.put("rules", rules!=null?new ArrayList<String>(Arrays.asList(rules.split(" , "))):new ArrayList<>());
                flowFile = session.write(flowFile, new OutputStreamCallback() {
                    @Override
                    public void process(OutputStream out) throws IOException {
                        out.write(json1.toString().getBytes());
                    }


            });
                flowFile = session.putAttribute(flowFile, "OutBound", jedis1.hget(json.getJSONObject(context.getProperty("JSONKEY").toString()).getString(context.getProperty("MACIDKEY").toString()),"OutBound"));
                session.transfer(flowFile, SUCCESS);
            }
            catch(Exception e)
            {
                session.transfer(flowFile, FAILURE);
            }
        }
    }
}

标签: apacheapache-nifi

解决方案


推荐阅读