java - 使用flink将kafka数据存储为hdfs中的拼花格式?
问题描述
使用 flink 将 kafka 数据以 parquet 格式存储在 hdfs 中,我正在尝试使用不起作用的 fink 文档。
我没有找到任何合适的文档将其存储为镶木地板文件
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
final List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
DataStream<Datum> stream = env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
stream.addSink(
StreamingFileSink.forBulkFormat(
Path.fromLocalFile(new File("path")),
ParquetAvroWriters.forReflectRecord(String.class))
.build());
env.execute();
我创建了一个可序列化的类
public static class Datum implements Serializable {
public String a;
public int b;
public Datum() {
}
public Datum(String a, int b) {
this.a = a;
this.b = b;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Datum datum = (Datum) o;
return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
}
@Override
public int hashCode() {
int result = a != null ? a.hashCode() : 0;
result = 31 * result + b;
return result;
}
}
上面的代码没有将任何数据写入文件,它只是继续创建许多文件。
如果有人可以帮助提供适当的文档或代码
解决方案
如上所写documentation of StreamingFileSink
:
重要提示:使用 StreamingFileSink 时需要启用检查点。零件文件只能在成功的检查点上完成。如果检查点被禁用,部分文件将永远保持
in-progress
或pending
状态,并且不能被下游系统安全读取。
要启用,只需使用
env.enableCheckpointing(1000);
你有很多选择来调整它。
这是一个完整的例子
final List<Address> data = Arrays.asList(
new Address(1, "a", "b", "c", "12345"),
new Address(2, "p", "q", "r", "12345"),
new Address(3, "x", "y", "z", "12345")
);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
DataStream<Address> stream = env.addSource(
new FiniteTestSource<>(data), TypeInformation.of(Address.class));
stream.addSink(
StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
ParquetAvroWriters.forSpecificRecord(Address.class))
.build());
env.execute();
推荐阅读
- apache-nifi - 一起处理传入队列中存在的 nifi 流文件
- protocol-buffers - 由于默认值,Protobuf 忽略 bool 和 ints 值
- javascript - angularjs中我的jsp中其他控制器的可访问性
- c# - 将 ASP.NET CORE 2.1 项目作为 Windows 服务托管时,不会启动 Https URL
- sql - liquibase 更改日志中的 WHERE 子句用于创建数据库
- xamarin - Wikiitude 图像识别重置
- c# - EPiServer - 在已安装的插件上找不到插件
- powershell - 根据年月 LastWriteTime 持久子文件夹将文件移动到子文件夹
- java - 用其他方法写出表格
- javascript - 等待数组填充 ajax 结果