java - 管道外的 Java 代码不会在 Dataflow 上运行
问题描述
看起来管道外的任何代码都不会在 Dataflow 上运行。在下面的示例中,我在方法中得到了一个NullPointerException
for 。使用 Apache Beam/Dataflow 执行此操作的正确方法是什么?TableSchema
TableRowConverterFn.processElement
private static TableSchema TableSchema;
public static void main(String[] args) {
try {
TableSchema = TableSchemaReader.read(TableSchemaResource);
} catch (IOException e) {
log.error("Table schema can not be read from {}. Process aborted.", TableSchemaResource);
return;
}
DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
//.withValidation()
.as(DataflowDfpOptions.class);
Pipeline pipeline = Pipeline.create(options);
Stopwatch sw = Stopwatch.createStarted();
log.info("DFP data transfer from GS to BQ has started.");
pipeline.apply("ReadFromStorage", TextIO.read()
.from("gs://my-test/stream/*.gz")
.withCompression(Compression.GZIP))
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getTableId())
.withMethod(STREAMING_INSERTS)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead
pipeline.run().waitUntilFinish();
log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}
/**
* Creates a TableRow from a CSV line
*/
private static class TableRowConverterFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
//Ignore the header line
//Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
if (split[0].equals("Time")) {
log.info("Skipped header");
return;
}
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
//This throws NEP!!!
TableFieldSchema col = TableSchema.getFields().get(i);
//String is the most common type, putting it in the first if clause for a little bit optimization.
if (col.getType().equals("STRING")) {
row.set(col.getName(), split[i]);
} else if (col.getType().equals("INTEGER")) {
row.set(col.getName(), Long.valueOf(split[i]));
} else if (col.getType().equals("BOOLEAN")) {
row.set(col.getName(), Boolean.valueOf(split[i]));
} else if (col.getType().equals("FLOAT")) {
row.set(col.getName(), Float.valueOf(split[i]));
} else {
//Simply try to write it as a String if
//todo: Consider other BQ data types.
row.set(col.getName(), split[i]);
}
}
c.output(row);
}
}
解决方案
尽管此代码可能在 DirectRunner 中本地运行,但它确实不能在 DataflowRunner 中运行。原因如下:
main
在您的函数之外创建的 DoFns无法使用 DataflowRunner 访问您的类的(甚至是静态的)变量。我相信(虽然不是 100% 肯定)这是因为 Dataflow 在云中运行时如何分阶段和序列化 DoFns。
以下是您可以解决此问题的方法:
private static class TableRowConverterFn extends DoFn<String, TableRow> {
private static TableSchema tableSchema;
public TableRowConverterFn(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// stuff
}
}
然后在你的主函数调用中
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn(tableSchema)));
推荐阅读
- java - android studio 无法解析符号
- makefile - Make 删除文件为中间文件
- python - 如果某个列值不包含特定字符串,则删除 pandas 行
- python - 使用自己的词典进行情绪分析
- python - SQL Server 或 Pandas 排名/按分区编号窗口函数
- java - 摇篮 4 Java 11
- python - OpenCV+CUDA 同时与 Python 和 Python C 扩展链接
- reactjs - 在一个视频播放中播放多个 .m3u8 播放列表
- mysql - 更改 MySQL 中的日期格式,上传格式错误
- reactjs - 为什么`create-react-app`命令创建的文件夹占用的外置硬盘空间如此之大?