google-cloud-dataflow - 在 Apache Beam 中创建自定义窗口函数
问题描述
我有一个 Beam 管道,它首先读取多个文本文件,其中文件中的每一行代表一行,该行稍后在管道中插入 Bigtable。该场景需要确认从每个文件中提取的行数和后来插入 Bigtable 的行数匹配。为此,我计划开发一个自定义窗口策略,以便基于文件名将单个文件中的行分配给单个窗口,作为将传递给窗口函数的键。
是否有任何用于创建自定义窗口函数的代码示例?
解决方案
虽然我改变了确认插入行数的策略,但对于任何对从批处理源读取的窗口元素感兴趣的人,例如FileIO
在批处理作业中,这里是创建自定义窗口策略的代码:
public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{
private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant end = new Instant(timestamp.getMillis() + 1);
IntervalWindow interval = new IntervalWindow(timestamp, end);
LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
return interval;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
}
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
}
然后它可以在管道中使用,如下所示:
p
.apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
.apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
请记住,您需要编写 ,AssignTimestampFn()
以便每条消息都带有时间戳。
推荐阅读
- selenium - ElementNotInteractableException:无法使用 python 和 firefox webdriver 将元素滚动到 selenium 中的视图
- ios - 在 UITableViewCell (Swift) 中重置步进器
- java - Spring Autowire 在通用类中找不到 R2DBC 存储库的实现
- image - Typo3 Fluid 忽略排版中设置的最大图像尺寸
- ios - UiCollectionview 单元格无法更改单元格大小
- javascript - 如何在javascript中将表格从垂直交换到水平?
- c# - 使用 Linq 更新列表的对象
- java - Paths.get("/xyz") 返回 \xyz 而不是 /xyz
- python - 如何从零除错误中纠正代码:
- android - 在颤振中使用 MVP 架构的 Chopper