首页 > 解决方案 > 如何在 Fink 中实例化 MapStateDescriptor 以计算多个平均流查询?

问题描述

我正在尝试计算 3 个不同房间的温度平均值,每个房间都有 3 个温度传感器。我正在使用 Flink(在 Java 中)。首先,我通过房间(A、B 或 C)的键来拆分传感器,然后我创建一个RichFlatMapFunction用于MapState保存温度的键,而我在 3 次测量之前没有。三次测量后,我计算平均值。为了使用MapState我需要一个MapStateDescriptor我不知道如何正确实例化的。有人可以帮我弄这个吗?谢谢。

public class SensorsMultipleReadingMqttEdgentQEP2 {

    private boolean checkpointEnable = false;
    private long checkpointInterval = 10000;
    private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;

    public SensorsMultipleReadingMqttEdgentQEP2() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        if (checkpointEnable) {
            env.enableCheckpointing(checkpointInterval, checkpointMode);
        }

        DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
        DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
        DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));

        DataStream<Tuple2<String, Double>> averageStream01 = temperatureStream01.map(new SensorMatcher()).keyBy(0)
                .flatMap(new AverageTempMapper());
        DataStream<Tuple2<String, Double>> averageStream02 = temperatureStream02.map(new SensorMatcher()).keyBy(0)
                .flatMap(new AverageTempMapper());
        DataStream<Tuple2<String, Double>> averageStream03 = temperatureStream03.map(new SensorMatcher()).keyBy(0)
                .flatMap(new AverageTempMapper());

        DataStream<Tuple2<String, Double>> averageStreams = averageStream01.union(averageStream02)
                .union(averageStream03);

        averageStreams.print();
        env.execute("SensorsMultipleReadingMqttEdgentQEP");
    }

    public static class SensorMatcher implements MapFunction<MqttTemperature, Tuple2<String, MqttTemperature>> {

        private static final long serialVersionUID = 7035756567190539683L;

        @Override
        public Tuple2<String, MqttTemperature> map(MqttTemperature value) throws Exception {
            String key = "no-room";
            if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
                key = "room-A";
            } else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
                key = "room-B";
            } else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
                key = "room-C";
            } else {
                System.err.println("Sensor not defined in any room.");
            }
            return new Tuple2<>(key, value);
        }
    }

    public static class AverageTempMapper
            extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {

        private static final long serialVersionUID = -4780146677198295204L;
        private MapState<String, Tuple2<Integer, Double>> modelState;

        @Override
        public void open(Configuration parameters) throws Exception {
            TypeInformation<Tuple2<String, Tuple2<Integer, Double>>> typeInformation = TypeInformation
                    .of(new TypeHint<Tuple2<String, Tuple2<Integer, Double>>>() {
                    });

            // HOW TO INSTANTIATE THIS descriptor?
            MapStateDescriptor<String, Tuple2<Integer, Double>> descriptor = new MapStateDescriptor<>("modelState",
                    String.class, Tuple2.class);
            modelState = getRuntimeContext().getMapState(descriptor);
        }

        @Override
        public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out)
                throws Exception {
            Double temp = null;
            Integer count = 0;
            if (modelState.contains(value.f0)) {
                count = modelState.get(value.f0).f0 + 1;
                temp = (modelState.get(value.f0).f1 + value.f1.getTemp());
            } else {
                count = 1;
                temp = value.f1.getTemp();
            }
            modelState.put(value.f0, Tuple2.of(count, temp));

            if (count >= 3) {
                out.collect(Tuple2.of("room", null));
            }
        }
    }
}

标签: javaapache-flinkstateful

解决方案


为了定义MapStateDescriptor您可以执行以下操作:

MapStateDescriptor<String, Tuple2<Integer, Double>> modelState = new MapStateDescriptor<>(
    "modelState", 
    BasicTypeInfo.STRING_TYPE_INFO, 
    TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class));
this.modelState = getRuntimeContext().getMapState(modelState);

但是,实际上没有必要MapState在您的情况下使用 a 。由于流已经被键控,因此使用ValueState. 代码如下所示:

public static class AverageTempMapper extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {

    private static final long serialVersionUID = -4780146677198295204L;
    private ValueState<Tuple2<Integer, Double>> modelState;

    @Override
    public void open(Configuration parameters) {
        this.modelState = getRuntimeContext().getState(new ValueStateDescriptor<>("modelState", TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class)));
    }

    @Override
    public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out) throws Exception {
        Double temp;
        Integer count;
        if (modelState.value() != null) {
            Tuple2<Integer, Double> state = modelState.value();
            count = state.f0 + 1;
            temp = state.f1 + value.f1.getTemp();
        } else {
            count = 1;
            temp = value.f1.getTemp();
        }
        modelState.update(Tuple2.of(count, temp));

        if (count >= 3) {
            out.collect(Tuple2.of(value.f0, temp/count));
        }
    }
}

推荐阅读