java - 带有 Kafka 源和 Dataflow 运行器的 Beam java SDK 2.10.0:窗口化的 Count.perElement 永远不会触发数据
问题描述
我在 Google DataFlow 上运行 Beam SDK 到 2.10.0 作业时遇到问题
流程很简单:我使用Kafka作为源,然后应用Fixed windows,然后按键计数。但看起来数据永远不会离开计数阶段,直到工作耗尽。的输出集合Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0
始终为零。元素仅在排空 Dataflow 作业后发布。
这是代码:
public KafkaProcessingJob(BaseOptions options) {
PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
.apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.updateConsumerProperties(configureConsumerProperties())
.withCreateTime(Duration.standardMinutes(1L))
.withTopics(inputTopics)
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))
.apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());
.apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
.apply(Count.<String>perElement())
.apply(
new WriteWindowedToBigQuery<>(
project,
dataset,
table,
configureWindowedTableWrite()));
}
private Map<String, Object> configureConsumerProperties() {
Map<String, Object> configUpdates = Maps.newHashMap();
configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configUpdates;
}
private static String getKey(GenericRecord record) {
//extract key
}
看起来流永远不会离开舞台.apply(Count.<String>perElement())
有人可以帮忙吗?
解决方案
我找到了原因。
它与此处使用的 TimestampPolicy ( .withCreateTime(Duration.standardMinutes(1L))
) 有关。
由于我们的 Kafka 主题中存在空分区,主题水印从未使用默认的 TimestampPolicy 进行高级。我需要实施自定义策略来解决问题。
推荐阅读
- reactjs - 使用 D3.js 从 SQLite3 数据库中挑选数据
- javascript - Vue路由器传递道具未定义
- javascript - 在同一元素js函数上更改bg-color
- reactjs - 无法修复警告:无法对未安装的组件执行 React 状态更新。这是一个无操作,但它表明您的应用程序中存在内存泄漏
- android - 使用 Kotlin 离线播放视频
- android - 未为类型“Map”定义运算符“[]”
颤振中的函数(动态)' - mysql - 哪些 SQL 查询必须写成动态的,不能使用静态的 sql?
- linux - 从可执行路径中查找进程
- kubernetes - 在较低的环境中重现 solr LockObtainFailed 错误问题
- android - 我为 recycleView 创建了一个布局,当我在约束布局中添加图像时,顶部总是有一个默认边距