java - 从 Dataflow 中的管道外部将变量传递给 ParDo
问题描述
如何将变量从管道外部传递给 ParDo 函数到 Dataflow 作业。下面是一个示例,我试图在创建管道之前派生 fileDate 并将其传递给 ParDo 函数。我在接口中声明了变量
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
我将工作中的值设置为
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
我正在访问 ParDo 函数中的变量
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
这是完整的代码
public class CsvToBq {
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv"))
.apply("TRANSFORM", ParDo.of(new WikiParDo()))
.apply("WRITE", BigQueryIO.writeTableRows()
.to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_TRUNCATE)
.withSchema(getTableSchema()));
pipeline.run();
}
private static TableSchema getTableSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("title").setType("STRING"));
fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("file_date").setType("STRING"));
return new TableSchema().setFields(fields);
}
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
}
但这不起作用。我尝试使用 StaticValueProvider 和 sideinputs,但看起来它没有达到目的。
解决方案
我认为您将需要类似的东西:
CsvToBq options = c.getPipelineOptions().as(CsvToBq.class);
String fileDate = options.getFileDate();
此外,如果您不打算使用 ValueProviders(将参数传递给 Dataflow 模板的当前要求),您还可以执行以下操作:
private static class WikiParDo extends DoFn<String, TableRow> {
String fileName;
public WikiParDo(String fileName) {
this.fileName = fileName;
}
请注意您存储的内容需要是可序列化的。joda.time Instant objects 如果我记得没问题。
推荐阅读
- laravel - 选择器的 Laravel 黄昏问题
- r - 希伯来语 R 代码转换为问号
- json - jq substr() 相当于格式化一个值
- python - 在 Python 中连接整个字符串之前和之后
- reactjs - 从 API 检索数据的简单函数不返回数据
- reactjs - 如何导入和调用使用 redux connect 及其函数的反应纯函数?(只有 React 功能组件没有类)
- ios - 使用 URL 将图像加载到 UIButton 不起作用
- java - 用Java从文本文件中的一行读取特定数据到相应的数组
- javascript - 使用 html 提交所选选项的任何文本
- amazon-web-services - Amazon HTTP API 网关无法通过 VPC 链接工作