首页 > 解决方案 > 5s 时间窗口内 Stream 中的记录数

问题描述

尝试在 5 秒的时间窗口内获取 Stream 中的记录计数。总是计数为 1 ?? 发送了 10 条记录。预计最后计数为 10。

尝试遵循 Fabian Hueske 的建议-如何计算 Apache Flink 在给定时间窗口内处理的记录数

DataStream<Map<String, Object>> kinesisStream;
...//get data from Kinesis source into kinesisStream - works fine
final SingleOutputStreamOperator<Map<String, Object>> filterDroppedEvents = kinesisStream
    .filter(resultMap -> {
        long timestamp = Utils.getEventTimestampFromMap(resultMap);
        long currTimestamp = System.currentTimeMillis();
        long driftFromCurrTS = currTimestamp - timestamp;
        if (driftFromCurrTS < 0) {
            Object eventNameObj = resultMap.get(EVENT_NAME);
            String eventName = eventNameObj != null ? (String) eventNameObj : "";
            logger.debug("PMS - event_timestamp is > current timestamp by driftFromCurrTS:{} for event_name:{} and event_timestamp:{}", driftFromCurrTS, eventName, timestamp);
            return true;
        } else {
            return false;
        }
    });//called 10 times here - GOOD

final SingleOutputStreamOperator<CountRows> droppedEventsMapToCountRows = filterDroppedEvents
        .map(mapValue -> new CountRows(mapValue, 1L, mapValue.get(EVENT_NAME) != null ? (String) mapValue.get(EVENT_NAME) : ""));//this is called 10 times - GOOD

final KeyedStream<CountRows, String> countRowsKeyedStream = droppedEventsMapToCountRows.keyBy(new KeySelector<CountRows, String>() {
@Override
public String getKey(CountRows countRows) throws Exception {
logger.info("Inside getKey");
return countRows.getEventName();
}
});//doesn't get in here to this logger statement ??

final AllWindowedStream<CountRows, TimeWindow> countRowsTimeWindowAllWindowedStream =  countRowsKeyedStream
        .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
//.sum("count")
final SingleOutputStreamOperator<CountRows> countRowsReduceStream = countRowsTimeWindowAllWindowedStream.reduce((accum, input) -> {
            logger.info("Inside reduce");
            return new CountRows(input.getRow(), accum.getCount() + input.getCount(), input.getEventName());// sum 1s to count
        });//don't see this logger statement "Inside reduce"

DataStream<InfluxDBPoint> droppedEventsStream =
        countRowsReduceStream.flatMap(new FlatMapFunction<CountRows, InfluxDBPoint>() {
    @Override
    public void flatMap(CountRows countRows, Collector<InfluxDBPoint> out) throws Exception {
        logger.info("Inside final map"); // only called once and countRows.getCount() is 1 - BAD - want it to be 10 ??
        Map<String, Object> mapValue = countRows.getRow();
        //long currTimestamp = System.currentTimeMillis();
        Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
        String eventTimestamp = eventTSObj != null ? (String)eventTSObj : "";
        long eventTS = Utils.getLongFromDateStr(eventTimestamp);
        Map<String, String> tags = new HashMap<>();
        Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
        String eventName = eventNameObj != null ? (String)eventNameObj : "";
        tags.put(Utils.EVENT_NAME, eventName);
        Map<String, Object> fields = new HashMap<>();
        fields.put("count", countRows.getCount());
        out.collect(new InfluxDBPoint("dropped_events_count", eventTS, tags, fields));//TODO: measurement name
    }
});
        /* Tried map but doesn't work
reduceStream.map(countRows -> {
            logger.info("Inside final map");
            Map<String, Object> mapValue = countRows.getRow();
            //long currTimestamp = System.currentTimeMillis();
            Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
            String eventTimestamp = eventTSObj != null ? (String)eventTSObj : "";
            long eventTS = Utils.getLongFromDateStr(eventTimestamp);
            Map<String, String> tags = new HashMap<>();
            Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
            String eventName = eventNameObj != null ? (String)eventNameObj : "";
            tags.put(Utils.EVENT_NAME, eventName);
            Map<String, Object> fields = new HashMap<>();
            fields.put("count", countRows.getCount());
            return new InfluxDBPoint("dropped_events_count", eventTS, tags, fields);//TODO: measurement name
        });*/
droppedEventsStream.addSink(influxSink);

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

CountRows 是 Map<String, Object> 的 POJO 包装器,用于添加计数:

public static class CountRows implements Serializable, Comparable<CountRows> {
        Map<String, Object> row;
        Long count;
        String eventName;
  //default constructor and constructor with 3 attributes

…………

TIA,

标签: functional-programmingapache-flink

解决方案


突出的一件事是您使用timeWindowAll的是KeyedStream. 这不是 API 的这一部分应该被使用的方式。如果您希望计算所有键的全局计数,请删除keyBy; 如果您希望分别计算每个键的事件,请保留keyByand 使用timeWindow而不是timeWindowAll.

我还看到您有事件时间时间戳,但似乎没有使用事件时间窗口(因为我没有看到时间戳分配器或水印生成器)。我不知道这是否是故意的,或者可能与为什么结果不符合您的期望有关。


推荐阅读