首页 > 解决方案 > CoGroupByKey 没有给出想要的结果 Apache Beam(python)

问题描述

我一直在测试将 pub/sub 读取数据与自创数据连接起来。下面是主要的流水线方法。

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    
    pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
    with Pipeline(options=pipeline_options) as pipeline:
        # reading from pub/sub and creating a fixed window of 1 min.
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
        | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
        #creating sample data 
        p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])
    
        ({"schdedule":p2,"timestamp":p1}) | "merging" >> CoGroupByKey()| "merge print">> Map(print)

下面是window和addtimestamp的转换方法。

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 30 seconds.
        self.window_size = int(window_size * 30)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
                                    
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam, window=DoFn.WindowParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (element.decode("utf-8"),datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S"))

我得到的结果如下所示。

('Hello', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})

计划列表打印为空,因为它没有加入。

期望是

('Hello', {'schdedule': ['sh 1','sh 1.1'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00']})

我尝试在 p2 上单独执行GroupByKey,它运行良好并给了我以下结果。

('Hello', ['sh 1','sh 1.1'])
('Hello_world', ['sh 2'])
('Hello_everyone', ['sh 3'])

还尝试了带有侧面输入的静态字典,它运行良好,但是一旦我执行CoGroupByKey,它就不会从 p2 管道产生任何结果。建议我在这里做错什么。

标签: pythongoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


所以只是在这里做出贡献。这个问题的真正目的是将来自维度表或静态数据存储的数据与流数据连接起来。从问题中可以明显看出CoGroupByKey没有加入时间窗口和全局窗口数据。什么是窗口数据和全局窗口数据?

windowed:换句话说,应用了窗口化的数据组。这反过来将时间边界应用于不断流式传输的数据。因此行数永远不会是无穷大。

全局窗口化:没有时间戳边界。它可能是流式或批处理或维度表或静态数据存储。

所以我们在这里遇到了冲突,因为我们将窗口数据与全局窗口数据组合在一起。

那么如何解决这种情况呢?

有不同的方法可以做到这一点。下面列出了其中的几个。

1.使两个数据流进入同一个窗口。

2.使用侧输入。读这个。更多信息在这里

3.在Pardo变换中使用setup方法。

就我而言,我寻求不需要为静态数据生成窗口,因此我使用解决方案23实现了这一点。

解决方案2

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
                 |"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
                 |"p1 group by">>GroupByKey()

        p2 = pipeline |"generating data">> Create([('Hello','sh 1'),('Hello','sh 1.1'),
        ('Hello_world','sh 2'),
        ('Hello_everyone','sh 3'),
        ('Hello_cloud','sh 4')])\
         |"p2 group by">> GroupByKey()      
        p1|"perfomring join">> Map(join_data,beam.pvalue.AsDict(p2))| Map(print)

解决方案3

class join_data(DoFn):
    def setup(self):
        self.sample_data_dict = {'Hello':['sh 1','sh 1.1'],
    'Hello_world':'sh 2',
    'Hello_everyone':'sh 3',
    'Hello_cloud':'sh 4'}
        return
    def process(self,ele):
        yield ((ele[0],ele[1],self.sample_data_dict[ele[0]]))

def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )
#     pipeline = Pipeline(options=pipeline_options)
    with Pipeline(options=pipeline_options) as pipeline:
        p1 =  pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
                 | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)\
|"adding time stamp value ">> Map(lambda x : (x[0],datetime.utcfromtimestamp(float(x[1])).strftime("%Y-%m-%d %H:%M:%S")))\
|"p1 group by">>GroupByKey()
        p1|"perfomring transformation">> ParDo(join_data())| Map(print)

在生产管道中,我们可能会遇到这个问题,通过在其中添加维度信息来转换流数据,我们可以轻松地利用setupstart_bundle创建数据库/bigquery 连接。请注意:每个类实例/每个工作人员调用一次 setup 方法,每个窗口或每组行调用一次 start_bundle 方法文档。更多关于 ParDo 的信息在这里

在这两种情况下,我都能够获得上述问题中提到的预期结果。


推荐阅读