protocol-buffers - 自定义 FileInputFormat 始终将一个文件拆分分配给一个插槽
问题描述
我一直在将 protobuf 记录写入我们的 s3 存储桶。我想使用 flink dataset api 来读取它。所以我实现了一个自定义的 FileInputFormat 来实现这一点。代码如下。
public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
public ProtobufInputFormat() {
}
private transient boolean reachedEnd = false;
@Override
public boolean reachedEnd() throws IOException {
return reachedEnd;
}
@Override
public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
if (pageview == null) {
reachedEnd = true;
}
return pageview;
}
@Override
public boolean supportsMultiPaths() {
return true;
}
}
public class BatchReadJob {
public static void main(String... args) throws Exception {
String readPath1 = args[0];
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ProtobufInputFormat inputFormat = new ProtobufInputFormat();
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilePaths(readPath1);
DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);
dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
@Override
public String map(StandardLog.Pageview value) throws Exception {
return value.getId();
}
}).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
问题是 flink 总是将一个文件拆分分配给一个并行槽。换句话说,它总是处理与并行数相同的文件拆分数。
我想知道实现自定义 FileInputFormat 的正确方法是什么。
谢谢。
解决方案
我相信您看到的行为是因为ExecutionJobVertex
调用该FileInputFormat. createInputSplits()
方法的minNumSplits
参数等于顶点(数据源)并行度。因此,如果您想要不同的行为,那么您必须重写该createInputSplits
方法。
虽然你没有说你真正想要的行为。例如,如果您只想对每个文件进行一次拆分,那么您可以覆盖testForUnsplittable()
子类中的方法FileInputFormat
以始终返回 true;它还应该将(受保护的)unsplittable
布尔值设置为 true。
推荐阅读
- python - 使用 PyPDF 将页码居中在页脚中
- lstm - Pytorch 将 PackSequence 参数传递给 LSTM
- android - 在 html webview 网页上开始新活动不可用
- sql-server - 仅重新创建数据库模式
- python - Python 发现没有使用 conda install 安装的名为 package 的模块
- python - 同时线程和初始化(它是如何工作的?)
- android - Charles Proxy 在 Android 上阻止 SSL 流量
- android - 如何将多个整数值从活动 A 中的 EditText 传递到活动 B 中的文本视图?
- ms-access - 如何获取两个 sql 查询并将它们组合成一个查询
- javascript - React 组件忽略显示空 obj 的文本