java - Apache Beam 数据流中的外部 api 调用
问题描述
我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素是否以前被发现。我在每个 json 上做ParDo
一个。DoFn
DoFn
我还没有看到任何在线教程说明如何从 apache beam Dataflow调用外部 API 端点。
我正在使用JAVA
Beam 的 SDK。我研究的一些教程解释了使用startBundle
,FinishBundle
但我不清楚如何使用它
解决方案
如果您需要检查外部存储中每个 JSON 记录的重复项,那么您仍然可以使用DoFn
它。有几个注解,如@Setup
、@StartBundle
、@FinishBundle
等,可用于对DoFn
.
例如,如果您需要实例化一个客户端对象以向您的外部数据库发送请求,那么您可能希望在@Setup
方法中执行此操作(如 POJO 构造函数),然后在您的@ProcessElement
方法中利用此客户端对象。
让我们考虑一个简单的例子:
static class MyDoFn extends DoFn<Record, Record> {
static transient MyClient client;
@Setup
public void setup() {
client = new MyClient("host");
}
@ProcessElement
public void processElement(ProcessContext c) {
// process your records
Record r = c.element();
// check record ID for duplicates
if (!client.isRecordExist(r.id()) {
c.output(r);
}
}
@Teardown
public void teardown() {
if (client != null) {
client.close();
client = null;
}
}
}
此外,为避免对每条记录进行远程调用,您可以将记录批处理到内部缓冲区(将输入数据拆分为捆绑包)并以批处理模式检查重复项(如果您的客户端支持此操作)。为此,您可以使用@StartBundle
和@FinishBundle
注释的方法,这些方法将在相应地处理 Beam 包之前和之后调用。
对于更复杂的示例,我建议查看不同 Beam IO 中的 Sink 实现,例如KinesisIO。
推荐阅读
- javascript - 如何声明嵌套的反应组件?
- c - 在 C 中使用 3D-Array 时出现内存读取异常
- namespaces - 由于命名空间更改,Service Fabric 升级失败
- java - RDBMS 中的并发选择和记录创建
- python - 我可以设置神经网络的预测值吗?
- ms-word - 根据值更改百分比格式
- asp.net-core - 如何同时使用 IHttpClientFactory 和 Typed HttpClient?
- angular - 使用单例服务从 URL 延迟加载 Angular 模块
- c++ - 调用没有名字的方法
- tensorflow - 当每个依赖于不同版本的protobuf时,是否可以使用2个不同的liberies?