python - Beam:如何按日期唯一数据
问题描述
我想从给定数据中获取用户登录天数
uid ts
u1 2019-08-01
u2 2019-08-01
u2 2019-08-01
u1 2019-08-01
u1 2019-08-02
u3 2019-08-02
u1 2019-08-03
u3 2019-08-03
u3 2019-08-03
和结果
uid day number
u1 3
u2 1
u3 2
ts
我当前的解决方案,在一小时后unqiue window
,然后将结果存储redis
set
为uid
key 和ts
value。
class UniqueFn(beam.CombineFn):
def create_accumulator(self):
return set()
def add_input(self, mutable_accumulator, element):
if element not in mutable_accumulator:
mutable_accumulator.add(element)
return mutable_accumulator
def merge_accumulators(self, accumulators):
return set.union(*accumulators)
def extract_output(self, accumulator):
return list(accumulator)
...
window_events = (
filtered_events
| "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(60 * 60))
)
(
window_events
| 'Group Date By User ID' >> beam.Map(lambda elem: (elem['uid'], elem['ts']))
| 'Compute Unique User Date' >> beam.CombinePerKey(UniqueFn())
)
我计划将窗口扩大到一天(24 * 60 * 60),但是一天有很多数据。我不确定google data flow
能不能很好地处理它。
有没有更好的解决方案来解决它?
解决方案
数据流应该能够处理大量数据。然而,它们是某些限制,可以通过重组管道来缓解。这只有在您对数据的形状有一个粗略的估计后才能知道。
首先,我认为使用以下代码应该简单明了。
class ExtractWindow(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
val, count = element
yield (val, count, window.start)
window_events = (
filtered_events
| "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(24* 60 * 60))
)
(
window_events
| 'Group Date By User ID' >> beam.Map(lambda elem: elem['uid'])
| 'Compute Unique User Date' >> beam.combiners.Count.PerElement()
| 'Extract Window' >> beam.ParDo(ExtractWindow())
)
推荐阅读
- c# - “数据连接的 TLS 会话未恢复或会话与控制连接不匹配”错误
- python - 需要将pdf提取到excel中
- javascript - 获取子元素或组件的焦点信息
- amazon-iam - 提供的角色没有足够的权限访问 CodeDeploy
- sql - SQL 查询以 JSON 格式为 SQLite 返回嵌套的对象数组
- templates - 模板不呈现任何内容,也没有错误,但状态为 200
- java - 春天。Spring 是如何管理 Controller 的
- amazon-web-services - 带有 IRSA 授权错误的 AWS ALB 入口控制器
- java - 如何从java资源文件夹中的文件夹中获取所有文件的列表
- javascript - 从对象数组中过滤