java - 如何创建每隔一段时间进入 Apache Beam 管道的假数据流?
问题描述
我正在尝试创建小型 Apache Beam 流程序来测试想法,我认为获取数据最简单的方法是使用框架结构,例如Create.of
创建假数据。这样一来,我就不必进行超出我需要的设置,例如将 GCP Pub/Sub 主题设置为源并发布到它。
问题是我想尝试基于时间的东西,比如窗口化以及使用状态和计时器。我能够把它放在一起:
public class TestPipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1, 2, 3))
.apply(ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toString());
}
}))
.apply(TextIO.write().to("myfile.txt"));
p.run().waitUntilFinish();
}
}
这实现了我在管道开始时发送三段数据的目标,但它同时发送了所有数据。如果我可以将其设置为每 10 秒发送一次数据,等等,我会更喜欢。
我遵循了 Apache Flink 的本教程(https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/walkthroughs/datastream_api.html),它显示了我正在尝试的示例去完成。我深入研究了该教程中的代码,但无法准确找出 Flink 框架的哪一部分实现了这一点。
解决方案
我最终使用了这个TestStream
类。我发现这个UnBoundedSource
类对于我的用例来说太难扩展了。博客文章https://beam.apache.org/blog/2016/10/20/test-stream.html帮助我了解了如何在我的测试中使用这个类。
推荐阅读
- wordpress - WordPress手机人像全角问题
- godot - 如何访问 MeshInstance 的网格数据?
- javascript - 出现键盘时屏幕应向上移动而不会干扰页脚
- php - Mysql与PHP的连接给出500错误
- javascript - owl carousal 中的功能问题
- azure-sql-database - 如何设置/修改从 azure web app 到 azure sql 数据库的连接池 - 慢速应用问题
- python-3.x - 软件包安装失败 - OsX 中的 psycopg2
- python - 重命名多个子目录中的单个 .txt 文件
- c# - Discord:使用 .NET 库更改区域?
- vue.js - 将数据从 Vuex getter 传递到子组件的问题