首页 > 解决方案 > Flink 和 Beam SDK 如何处理窗口化——哪个更高效?

问题描述

我将 Apache Beam SDK 与 Flink SDK 进行流处理比较,以确定使用 Beam 作为附加框架的成本/优势。

我有一个非常简单的设置,从 Kafka 源读取数据流并由运行 Flink 的节点集群并行处理。

根据我对这些 SDK 工作原理的理解,逐个窗口处理数据流的最简单方法是:

  1. 使用 Apache Beam(在 Flink 上运行):

    1.1。创建一个管道对象。

    1.2. 创建 Kafka 记录的 PCollection。

    1.3. 应用窗口功能。

    1.4. 按窗口将管道转换为键。

    1.5。按键(窗口)对记录进行分组。

    1.6. 将所需的任何功能应用于窗口记录。

  2. 使用 Flink SDK

    2.1。从 Kafka 源创建数据流。

    2.2. 通过提供 key 函数将其转换为 Keyed Stream。

    2.3. 应用窗口功能。

    2.4. 将所需的任何功能应用于窗口记录。

虽然 Flink 解决方案在编程上看起来更简洁,但根据我的经验,它在处理大量数据时效率较低。我只能想象密钥提取功能会引入开销,因为 Beam 不需要此步骤。

我的问题是:我是在比较喜欢吗?这些过程不是等效的吗?什么可以解释 Beam 方式更高效,因为它使用 Flink 作为运行器(并且所有其他条件都相同)?

这是使用 Beam SDK 的代码

    PipelineOptions options = PipelineOptionsFactory.create();

    //Run with Flink
    FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
    flinkPipelineOptions.setRunner(FlinkRunner.class);
    flinkPipelineOptions.setStreaming(true);
    flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);

    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
            .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
            .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
            .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));

    //Apply Windowing Function    
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

    //Transform the pipeline to key by window
    PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
            windowedKafkaCollection.apply(
                    ParDo.of(
                            new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                @ProcessElement
                                public void processElement(ProcessContext context, IntervalWindow window) {
                                    context.output(KV.of(window, context.element()));
                                }
                            }));
    //Group records by key (window)
    PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
            .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());

    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
            .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

    // Run the pipeline.
    p.run().waitUntilFinish();

这是使用 Flink SDK 的代码

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);

//Connect to Kafka
Properties properties = new Properties();   
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));

//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())

        //Set the windowing function.
        .timeWindow(Time.seconds(5L), Time.seconds(1L))

        //Process Windowed Data
        .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));

// execute program
env.execute("Using Flink SDK");

非常感谢您的任何见解。

编辑

我想我应该添加一些可能相关的指标。

网络接收字节

Flink SDK

光束

CPU 利用率(最大值)

Flink SDK

光束

Beam 似乎使用了更多的网络,而 Flink 使用了更多的 CPU。这是否表明 Beam 正在以更有效的方式并行处理?

编辑 No2

我很确定 PueCalculatorFn 类是等价的,但我将在此处分享代码以查看两个进程之间是否存在明显差异。

光束

public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;

@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
    KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
    Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
    Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
    Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();

    //Calculate Pue
    IPueResult result = calculatePue(element.getKey(), records);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);

    //Return Pue keyed by Window
    c.output(KV.of(intervalWindowResult, result));
}

private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Transform the results into a stream
    Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);

    //Iterate through each reading and add to the increment count
    streamOfRecords
            .map(record -> {
                byte[] valueBytes = record.getKV().getValue();
                assert valueBytes != null;
                String valueString = new String(valueBytes);
                assert !valueString.isEmpty();
                return KV.of(record, valueString);
            }).map(kv -> {
        Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
        KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
        return KV.of(kv.getKey(), consumption);

    }).forEach(consumptionRecord -> {
                switch (consumptionRecord.getKey().getTopic()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        realEnergyRecords.add(consumptionRecord.getValue());
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        itEnergyRecords.add(consumptionRecord.getValue());
                        break;
                }
            }
    );

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }

    //Create a PueResult object to return
    IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
    return new PueResult(intervalWindow, pue.stripTrailingZeros());
}

@Override
protected void finalize() throws Throwable {
    super.finalize();
    RecordSenderFactory.closeSender();
    WindowSenderFactory.closeSender();
}
} 

弗林克

public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;

@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
    Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
    Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
    BigDecimal pue = calculatePue(iterable);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()), itEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()));


    //Create PueResult object to return
    IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());

    //Collect result
    collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));

}

protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Iterate through each reading and add to the increment count
    StreamSupport.stream(iterable.spliterator(), false)
            .forEach(object -> {
                switch (object.get("topic").textValue()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                        realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                        itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                }

            });

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }
    return pue;
}

}

这是我在 Beam 示例中使用的自定义解串器。

KafkaConsumption反序列化器

public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {

public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
    if(jsonElement == null) {
        return null;
    } else {
        JsonObject jsonObject = jsonElement.getAsJsonObject();
        JsonElement id = jsonObject.get("id");
        JsonElement energyConsumed = jsonObject.get("energyConsumed");
        Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
        Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
        JsonElement topic = jsonObject.get("topic");
        Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
        return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
    }
  }

}

标签: streambigdataapache-flinkapache-beamstream-processing

解决方案


不确定为什么您编写的 Beam 管道更快,但在语义上它与 Flink 作业不同。与 Flink 中的窗口工作类似,一旦您在 Beam 中分配窗口,所有后续操作都会自动考虑窗口。您不需要按窗口分组。

您的 Beam 管道定义可以简化如下:

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);

// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...

//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
 Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
    .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

// Run the pipeline.
p.run().waitUntilFinish();

至于性能,它取决于许多因素,但请记住,Beam 是 Fl​​ink 之上的抽象层。一般来说,如果您看到 Beam on Flink 的性能提升,我会感到惊讶。

编辑:为了进一步澄清,您不会在 Beam 管道中的 JSON“id”字段上进行分组,您在 Flink 片段中执行此操作。


推荐阅读