java - Apache Beam 中的状态处理问题
问题描述
因此,我阅读了梁的状态处理和及时处理文章,并发现了实现这些功能本身的问题。
我试图解决的问题与此类似,为每一行生成一个顺序索引。因为我确实希望能够将数据流产生的行引用到原始源的行。
public static class createIndex extends DoFn<String, KV<String, String>> {
@StateId("count")
private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());
@ProcessElement
public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState) {
String val = c.element();
long count = 0L;
if(countState.read() != null)
count = countState.read();
count = count + 1;
countState.write(count);
c.output(KV.of(String.valueOf(count), val));
}
}
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new createIndex()));
我关注了我在网上可以找到的任何内容,并查看了 ParDo 的原始源代码,但不确定需要做什么。我得到的错误是:
java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.
我意识到这是一个简单的问题,但由于缺乏足够的示例或文档,我无法解决问题。我会很感激任何帮助。谢谢!
解决方案
好的,所以我继续解决这个问题,在阅读了一些资料后,我能够解决这个问题。事实证明, 的输入ParDo.of(new DoFn())
要求输入的形式为KV<T,U>
。
因此,为了读取文件并为每一行创建一个索引,我需要将它传递给一个键值对对象。下面我添加了代码:
public static class FakeKvPair extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
并将管道更改为:
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new FakeKvPair()))
.apply(ParDo.of(new createIndex()));
这引发的新问题是是否保留了行的顺序,因为我在被馈送到之前运行了一个额外的 ParDo 函数(这可能会改变行的顺序)createIndex()
。
在我的本地机器上,订单被保留,但我不确定它如何扩展到 Dataflow。但我会问这是一个不同的问题。
推荐阅读
- r - 线性回归:eval 中的错误(predvars、data、env):找不到对象“G3”
- reactjs - 使用 setState() 值不会改变,反应嵌套 API 调用
- neural-network - 调试 GAN 覆盖错误
- c# - 我可以在 ASP.NET Core 解决方案中使用 EF 6 吗?
- bash - 如何在 Bash 中转义颜色代码?
- azure - 具有多个输出到同一 EventHub 的 Azure 函数不起作用
- javascript - Jquery on click (tap/vclick/touchstart) 在真实的移动设备上不起作用
- scala - 在 Scala 中关闭套接字时拦截 Akka HTTP WebSocket 事件
- html - 无法在网站“https://qa01.pdng.pepsico.com/”的网站“https://qa01.pdng.pepsico.com/”中使用 selenium 和 java 在 Bootstrap 下拉列表(无框架)中输入电子邮件 ID 和密码
- vba - VBA Excel - 使用单个变量将多个值分配给数组