google-cloud-dataflow - 是否可以允许(或扩展)IO 连接器,以便它们可以读取以前的 PCollection 项目?
问题描述
问题可能不像我想的那么清楚,但首先要解释我在这里想要实现的目标很复杂。
拥有一个窗口,是否可以从全局窗口中制作“较小”的窗口并单独触发每个窗口?一点伪代码可以稍微澄清一下水域。
pipeline
.apply("InputStream", stream)
.apply("3600s windowDuration",
Window.into(FixedWindows.of(Duration.standardSeconds(3600)))
)
.apply("/// groupBy 'timestamp, store_id and collection_name'? ///", ...)
.apply("Write to MongoDB",
MongoDbIO
.write()
.withCollection(msg -> msg.getCollection()) // Admits a String but doesn't admit reading from the previous typed PCollection.
);
是否可以基于消息数据应用 MongoDB 写入?或者至少配置类来这样做?一开始似乎不可能,所以我不知道是否有任何其他与 Beam 相关的解决方法可以允许多次插入。
我的想法是在全局 3600 内部设置更小的内部窗口,然后再应用插入。有点FileIO
像它的.by
方法。
解决方案
如果我理解正确,您想根据对象上列出的集合写入 Mongo 集合,对吗?
不幸的是,目前看来这是不可能的(参见Javadoc for MongoDBIO)。
如果集合列表相对较小,并且是先验已知的,那么您可以将事件路由到多个MongoDBIO.write
转换,但这可能是不可能的。有点像这样:
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION1))
.apply("Write to MongoDB - coll1",
MongoDbIO
.write()
.withCollection(COLLECTION1));
myelements.apply(
Filter.by(msg -> msg.getCollection().equals(COLLECTION2))
.apply("Write to MongoDB - coll2",
MongoDbIO
.write()
.withCollection(COLLECTION2));
如果这对您不起作用,那么您可能需要编写自己的逻辑来写入 MongoDB。如果您需要自己编写,我建议您GroupIntoBatches
为每个 MongoDB 集合创建一批元素,然后将它们写出来。
推荐阅读
- python - python tkinter中的奇怪情况,有人可以解释吗?
- c++ - 将结构标头与 vec 相互转换
- java - 如何解决 Java 编译器错误“找不到符号”?
- c - 当我打印修改后的字符串时从函数返回后,它显示了垃圾值
- javascript - 如何从哈巴狗数组文本表单中获取信息到 javascript 文件
- python-3.x - Python:动态定义一个函数,其参数来自字符串列表
- python-3.x - Python 3:在包含字母和数字的文件中打印测试分数的平均值
- javascript - 错误类型错误:无法读取未定义的属性“0”
- wordpress - Docker 本地 Wordpress 开发
- architecture - 使用事件溯源的复式记账系统