首页 > 解决方案 > 检查我是否使用所有键正确接收流

问题描述

我有以下场景:假设有 20 个传感器正在向我发送流式馈送。我对流应用 keyBy (sensorID) 并执行一些操作,例如平均等。这是实现的,并且运行良好(使用 Flink Java API)。

最初一切顺利,所有传感器都在向我发送信息。一段时间后,可能有几个传感器开始出现异常,我开始从它们那里得到不规则的信息,例如,我从 18 个传感器接收到信息,但有 2 个长时间不向我发送信息。

我们可以假设我已经知道 sensorId 的固定列表(可能是硬编码/或在数据库中)。如何确定哪两个没有发送提要?我在哪里可以获得 keyId 的列表以与数据库中的列表进行比较?

如果我没有收到提要,我想发出警报(例如,2 分钟、5 分钟、10 分钟等,优先级越来越高)。

有没有人使用 flink-streaming / 模式实现了这样的场景?请有任何建议。

标签: apache-flinkflink-streaming

解决方案


我只是碰巧有一个这种模式的例子。它需要一些调整以适合您的用例,但应该可以帮助您入门。

public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {

    private ValueState<Long> lastModifiedState;
    static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes

    @Override
    public void open(Configuration parameters) throws Exception {

        // register our state with the state backend
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {

        // update our state and timer
        Long current = lastModifiedState.value();
        if (current != null) {
            ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
        }
        current = max(current, event.timestamp());
        lastModifiedState.update(current);
        ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

        // emit alert
        String deviceId = ctx.getCurrentKey();
        out.collect(deviceId);
    }
}

这假设一个主程序执行如下操作:

DataStream<String> result = stream
    .assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
    .keyBy(e -> e.deviceId)
    .process(new TimeoutFunction());

正如@Dominik 所说,这只会对至少出现一次的键发出警报。您可以通过引入事件的辅助源来解决此问题,该事件源为应该存在的每个源创建一个人工事件,并将该流与主要源合并。


推荐阅读