首页 > 解决方案 > 在数据流中循环 - 用于操作的 apache 束

问题描述

我在appache Beam中写了一个日常批处理操作的代码——数据流python代码。我正在尝试制作日期范围跑步者。即目前它昨天运行良好 - 如果我想从开始日期运行到结束日期它很困难。请求您为此建议任何方法。请找到我昨天运行的代码片段。

start_date = '20180101'
end_date = '20190101'


p = beam.Pipeline(options=options)

read = (
        p
        | 'BQRead: ' >> BQReader(
    query=test_query.format(date=date))
)

transformed = (
        read
        | 'Transform 1 ' >> beam.ParDo(Transform1())
)

transformed | 'BQWrite' >> BQWriter(table + date, table_schema)

我尝试如下,但它不工作

start_date = datetime.strptime('20190101', "%Y%m%d")
end_date = datetime.strptime('20190110', "%Y%m%d")
dates = list(rrule.rrule(rrule.DAILY, dtstart=start_date, until=end_date))

for date in dates:
    ds_nd = date.strftime('%Y%m%d')

    p = beam.Pipeline(options=options)

    read = (
        p
        | 'BQRead: ' >> BQReader(
    query=test_query.format(date=ds_nd))
    )

    transformed = (
        read
        | 'Transform 1 ' >> beam.ParDo(Transform1())
    )

    transformed | 'BQWrite' >> BQWriter(table + ds_nd, table_schema)

标签: python-2.7apache-beamdataflow

解决方案


推荐阅读