首页 > 解决方案 > 超时后如何在自定义java过滤器插件中生成logstash事件?

问题描述

我想在过滤器中关联发送到logstash的消息,并将它们发送到管道内的下一个logstash过滤器。

但是,我已经根据弹性文档成功设置了自定义 logstash 过滤器插件。

我的过滤器插件代码如下所示:

package org.logstashplugins;

import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.*;

// class name must match plugin name
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {

    public static final PluginConfigSpec<String> SOURCE_CONFIG =
            PluginConfigSpec.stringSetting("source", "message");

    private String id;
    private String sourceField;
    private List<String> buffer;
    

    public JavaFilterExample(String id, Configuration config, Context context)
    {
        // constructors should validate configuration options
        this.id = id;
        this.sourceField = config.get(SOURCE_CONFIG);
        this.buffer = new ArrayList<String>();
    }

    @Override
    public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener)
    {
        for (Event e : events)
        {
            Object f = e.getField(sourceField);
            if (f instanceof String)
            {
                buffer.add((String) f);
                matchListener.filterMatched(e);
            }
        }
        return events;
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema()
    {
        // should return a list of all configuration options for this plugin
        return Collections.singletonList(SOURCE_CONFIG);
    }

    @Override
    public String getId() 
    {
        return this.id;
    }
}

现在我想添加一个正在后台执行的线程,该线程在包含消息的缓冲区上运行,并在发生超时或包含“完成”消息的新事件到达后发送相关事件。

我现在的问题是,如何触发新事件并将其发送到 logstash 管道中的下一个过滤器?java API 或类是什么?

我知道有一个聚合过滤器插件可以部分满足我的要求,但是我必须为消息实现不同的解析器,所以最好自己实现一个自定义插件。但是根据文档,如何在不返回过滤器方法中修改的事件集合的情况下发送新事件并不清楚。你能帮我实现这个目标吗?

我需要这样的东西(在伪代码中):

public void anotherFunctionInsideAnotherJavaClass(...)
{
    Logstash.sendEvent(new Event(...));
}

我需要在过滤器方法之外的其他上下文中生成和发送事件。这可能吗?

希望很清楚我的问题是什么。

谢谢。

标签: javaelasticsearcheventslogstashpipeline

解决方案


推荐阅读