首页 > 解决方案 > 是否可以允许(或扩展)IO 连接器,以便它们可以读取以前的 PCollection 项目?

问题描述

问题可能不像我想的那么清楚,但首先要解释我在这里想要实现的目标很复杂。

拥有一个窗口,是否可以从全局窗口中制作“较小”的窗口并单独触发每个窗口?一点伪代码可以稍微澄清一下水域。

        pipeline
                .apply("InputStream", stream)
                .apply("3600s windowDuration",
                        Window.into(FixedWindows.of(Duration.standardSeconds(3600)))
                )
                .apply("/// groupBy 'timestamp, store_id and collection_name'? ///", ...)
                .apply("Write to MongoDB",
                        MongoDbIO
                                .write()
                                .withCollection(msg -> msg.getCollection()) // Admits a String but doesn't admit reading from the previous typed PCollection.
                );

是否可以基于消息数据应用 MongoDB 写入?或者至少配置类来这样做?一开始似乎不可能,所以我不知道是否有任何其他与 Beam 相关的解决方法可以允许多次插入。

我的想法是在全局 3600 内部设置更小的内部窗口,然后再应用插入。有点FileIO像它的.by方法。

标签: google-cloud-dataflowapache-beam

解决方案


如果我理解正确,您想根据对象上列出的集合写入 Mongo 集合,对吗?

不幸的是,目前看来这是不可能的(参见Javadoc for MongoDBIO)。

如果集合列表相对较小,并且是先验已知的,那么您可以将事件路由到多个MongoDBIO.write转换,但这可能是不可能的。有点像这样:

myelements.apply(
          Filter.by(msg -> msg.getCollection().equals(COLLECTION1))
          .apply("Write to MongoDB - coll1",
                        MongoDbIO
                                .write()
                                .withCollection(COLLECTION1));

myelements.apply(
          Filter.by(msg -> msg.getCollection().equals(COLLECTION2))
          .apply("Write to MongoDB - coll2",
                        MongoDbIO
                                .write()
                                .withCollection(COLLECTION2));

如果这对您不起作用,那么您可能需要编写自己的逻辑来写入 MongoDB。如果您需要自己编写,我建议您GroupIntoBatches为每个 MongoDB 集合创建一批元素,然后将它们写出来。


推荐阅读