首页 > 解决方案 > Beam:如何按日期唯一数据

问题描述

我想从给定数据中获取用户登录天数

  uid     ts
  u1     2019-08-01
  u2     2019-08-01
  u2     2019-08-01
  u1     2019-08-01
  u1     2019-08-02
  u3     2019-08-02
  u1     2019-08-03
  u3     2019-08-03
  u3     2019-08-03

和结果

uid  day number
u1    3
u2    1
u3    2

ts我当前的解决方案,在一小时后unqiue window,然后将结果存储redis setuidkey 和tsvalue。

class UniqueFn(beam.CombineFn):
    def create_accumulator(self):
        return set()

    def add_input(self, mutable_accumulator, element):
        if element not in mutable_accumulator:
            mutable_accumulator.add(element)
        return mutable_accumulator

    def merge_accumulators(self, accumulators):
        return set.union(*accumulators)

    def extract_output(self, accumulator):
        return list(accumulator)

...

        window_events = (
            filtered_events
            | "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(60 * 60))
        )

        (
            window_events
            | 'Group Date By User ID' >> beam.Map(lambda elem: (elem['uid'], elem['ts']))
            | 'Compute Unique User Date' >> beam.CombinePerKey(UniqueFn())
        )

我计划将窗口扩大到一天(24 * 60 * 60),但是一天有很多数据。我不确定google data flow能不能很好地处理它。

有没有更好的解决方案来解决它?

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


数据流应该能够处理大量数据。然而,它们是某些限制,可以通过重组管道来缓解。这只有在您对数据的形状有一个粗略的估计后才能知道。

首先,我认为使用以下代码应该简单明了。

class ExtractWindow(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    val, count = element
    yield (val, count, window.start)


window_events = (
        filtered_events
        | "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(24* 60 * 60))
    )



(
        window_events
        | 'Group Date By User ID' >> beam.Map(lambda elem: elem['uid'])
        | 'Compute Unique User Date' >> beam.combiners.Count.PerElement()
        | 'Extract Window' >> beam.ParDo(ExtractWindow())
    )

推荐阅读