google-bigquery - 带有“更新”标志的数据流管道因“Reshuffle/GroupByKey”错误而失败
问题描述
我当前的代码从 pubsub 读取并对其应用过滤器,然后写入 bigQuery 表。代码如下。
public class BeaconAnomalyDetectionPipeline {
public static void main(String[] args) {
BeaconAnomalyDetectionOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BeaconAnomalyDetectionOptions.class);
options.setJobName("test-name");
run(options);
}
public static PipelineResult run(BeaconAnomalyDetectionOptions options) {
Pipeline p = Pipeline.create(options);
p.getCoderRegistry().registerCoderForType(TypeDescriptor.of(String.class),
StringUtf8Coder.of());
PCollection<IngestionRequest> ingestionRequests = p.
apply("ReadPubSubSubscription",
PubsubIO.readMessages()
.fromSubscription(options.getSubscriberId()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
PCollection<IngestionRequest> anomalies =
ingestionRequests.apply(
"filter by Signature",
Filter.by(ingestionRequest -> ingestionRequest.getCompressionTypeValue()%2!=0));
anomalies
.apply(
"WriteAnomalyToBQ",
BQWriteTransform.newBuilder()
.setTableSpec(options.getTableSpec())
.setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.build());
return p.run();
}
}
在“按签名过滤”阶段,如果我将过滤器从 %2!=0 更改为 %2==0 并尝试使用“--update”标志运行管道,总是会出现错误
The new job is not compatible with 2021-04-20_14_48_00-16442341566949376119. The original job has not been aborted., The Coder or type for step WriteAnomalyToBQ/Write anomalies to bigquery table/StreamingInserts/StreamingWriteTables/Reshuffle/GroupByKey has changed.
我已经更新了我的代码以指定编码器并通过 reshuffle 和 groupByKey 逐步添加,但仍然看到相同的问题。
更新的代码在这里:
public class BeaconAnomalyDetectionPipeline {
public static void main(String[] args) {
BeaconAnomalyDetectionOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BeaconAnomalyDetectionOptions.class);
options.setJobName("test-name");
run(options);
}
public static PipelineResult run(BeaconAnomalyDetectionOptions options) {
Pipeline p = Pipeline.create(options);
p.getCoderRegistry().registerCoderForType(TypeDescriptor.of(String.class),
StringUtf8Coder.of());
PCollection<IngestionRequest> ingestionRequests = p.
apply("ReadPubSubSubscription",
PubsubIO.readMessages()
.fromSubscription(options.getSubscriberId()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), PubsubMessageWithAttributesCoder.of()))
.apply(Reshuffle.of())
.apply(GroupByKey.<Integer, PubsubMessage>create())
.apply(ParDo.of(new Combiner()))
.apply("filter by compression type new", MapElements.via(new SimpleFunction<KV<Integer, PubsubMessage>, PubsubMessage>() {
public PubsubMessage apply(KV<Integer, PubsubMessage> input) {
if (input.getKey()%2!=0) {
return input.getValue();
}else {
return null;
}
}
}))
.apply("PubSubMessagesToTableRows",
new PubsubProtoToIngestionRequest());
ingestionRequests.apply(
"WriteAnomalyToBQ",
BQWriteTransform.newBuilder()
.setTableSpec(options.getTableSpec())
.setMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.build());
return p.run();
}
}
在我使用新代码运行数据流更新后,错误变为
The new job is missing steps GroupByKey, Reshuffle/GroupByKey. If these steps have been renamed or deleted, please specify them with the update command.
我在更新脚本中使用了 transformNameMapping。
--update \
--transformNameMapping='{\"Reshuffle/GroupBykey\":\"\",\"filter by compression type/MapElements\":\"\",\"\":\"filter by Signature\"}' \
--jobName=test-name "
谁能帮我找到一个可行的解决方案?非常感谢。
解决方案
推荐阅读
- android - Android - 与 Laravel API 的通信
- python - 我在将变量放入路径时遇到了一些问题
- javascript - 来自 json 对象的 setState 给出未定义的 [React]
- sql - 如何从 SQLServer 获取所有可用数据
- java - 按顺序从属性文件中提取值?
- reactjs - 无法使用命令 react-native run-android 启动应用程序
- java - Maven/Gradle 为编译设置环境变量?
- excel - VBA中的文本到列
- regex - RegEx Haystack(目标)中的通配符?
- javascript - 通过合并小模块 JSON 文件动态生成单个 JSON ReactJS React-Intl