首页 > 解决方案 > 使用flink将kafka数据存储为hdfs中的拼花格式?

问题描述

使用 flink 将 kafka 数据以 parquet 格式存储在 hdfs 中,我正在尝试使用不起作用的 fink 文档。

我没有找到任何合适的文档将其存储为镶木地板文件

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);

final List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));

DataStream<Datum> stream = env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));


stream.addSink(
    StreamingFileSink.forBulkFormat(
        Path.fromLocalFile(new File("path")),
        ParquetAvroWriters.forReflectRecord(String.class))
        .build());
env.execute();

我创建了一个可序列化的类

public static class Datum implements Serializable {

        public String a;
        public int b;

        public Datum() {
        }

        public Datum(String a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            Datum datum = (Datum) o;
            return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
        }

        @Override
        public int hashCode() {
            int result = a != null ? a.hashCode() : 0;
            result = 31 * result + b;
            return result;
        }
    }

上面的代码没有将任何数据写入文件,它只是继续创建许多文件。

如果有人可以帮助提供适当的文档或代码

标签: javaapache-kafkaapache-flinkflink-streaming

解决方案


如上所写documentation of StreamingFileSink

重要提示:使用 StreamingFileSink 时需要启用检查点。零件文件只能在成功的检查点上完成。如果检查点被禁用,部分文件将永远保持in-progresspending状态,并且不能被下游系统安全读取。

要启用,只需使用

env.enableCheckpointing(1000);

你有很多选择来调整它。


这是一个完整的例子

final List<Address> data = Arrays.asList(
    new Address(1, "a", "b", "c", "12345"),
    new Address(2, "p", "q", "r", "12345"),
    new Address(3, "x", "y", "z", "12345")
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);

DataStream<Address> stream = env.addSource(
    new FiniteTestSource<>(data), TypeInformation.of(Address.class));

stream.addSink(
    StreamingFileSink.forBulkFormat(
        Path.fromLocalFile(folder),
        ParquetAvroWriters.forSpecificRecord(Address.class))
        .build());

env.execute();

推荐阅读