首页 > 解决方案 > Apache Beam 中的嵌套管道

问题描述

我的 Apache Beam 管道接收无限的消息流。每条消息扇出 N 个元素(N 约为 1000,并且每个输入都不同)。然后对于前一阶段产生的每个元素,都有一个映射操作产生新的 N 个元素,应该使用top 1操作减少这些元素(元素按从队列中读取的原始消息分组)。top 1的结果保存到外部存储中。在 Spark 中,我可以通过从流中读取消息并为每条执行 map + reduce 的消息创建 RDD 来轻松做到这一点。由于 Apache Beam 没有嵌套管道,我看不到在 Beam 中使用无限流输入实现它的方法。例子:

Infinite stream elements: A, B

Step 1 (fan out, N = 3): A -> A1, A2, A3
                (N = 2): B -> B1, B2

Step 2 (map): A1, A2, A3 -> A1', A2', A3'
              B1, B2, B3 -> B1', B2'

Step 3 (top1): A1', A2', A3' -> A2'
               B1', B2' -> B3'

Output: A2', B2'

A 和 B 元素之间没有依赖关系。A2' 和 B2' 是它们所在组的顶部元素。流是无限的。映射操作可能需要几秒钟到几分钟的时间。为执行映射操作所需的最长时间创建窗口水印将使快速映射操作的整体管道时间慢得多。嵌套管道会有所帮助,因为这样我可以为每条消息创建一个管道。

标签: apache-sparkstreamgoogle-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


看起来你不需要一个“嵌套管道”。让我向您展示 Beam Python SDK 中的样子(Java 类似):

例如,尝试将数字和撇号附加到字符串(例如"A"=> "A1'")的虚拟操作,您会执行以下操作:

def my_fn(value):
  def _inner(elm):
    return (elm, elm + str(value) + "'")  # A KV-pair
  return _inner

# my_stream has [A, B]
pcoll_1 = (my_stream
           | beam.Map(my_fn(1)))
pcoll_2 = (my_stream
           | beam.Map(my_fn(2)))
pcoll_3 = (my_stream
           | beam.Map(my_fn(3)))

def top_1(elms):
  ... # Some operation

result = ((pcoll_1, pcoll_2, pcoll_3)
          | beam.CoGroupByKey()
          | beam.Map(top_1))

推荐阅读