首页 > 解决方案 > Beam - 从 Bigquery 中过滤掉记录

问题描述

我是 Apache Beam 的新手,我尝试完成三项任务

  1. 从表中读取前 30 项
  2. 从表中读取前 30 名商店
  3. 从 bigquery 中选择所需的列并对列ItemsStores应用 Filter 。

我有以下代码,用于执行管道

with beam.Pipeline(options=pipeline_args) as p:
        #read the dataset from bigquery
        query_top_30_items = (
            p 
            | 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, COUNT(item_number) AS freq_count FROM 
                [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number 
                ORDER BY freq_count DESC
                LIMIT 30"""
            )
            | 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
            | 'ItemNumberAsList' >> beam.combiners.ToList()
        )


        query_top_30_stores = (
            p
            |
            'GetTopStores' >> beam.io.ReadFromBigQuery(
                query = """SELECT store_number, COUNT(store_number) AS store_count
                 FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
                 store_number ORDER BY store_count DESC LIMIT 30"""
            )
            | 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
            | 'StoreValuesAsList' >> beam.combiners.ToList()
        )

        query_whole_table = (
            (query_top_30_items, query_top_30_stores)
            |'ReadTable' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, store_number, bottles_sold,
                    state_bottle_retail  FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
            | 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
            | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
        )

我已附上 Traceback 以供参考。我该如何解决这个错误?

temp_location = pcoll.pipeline.options.view_as(回溯(最近一次调用最后一次):文件“run.py”,第 113 行,在 run() 文件“run.py”,第 100 行,运行中 | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) 文件 "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",第 1058 行,在ror中 返回 self.transform.ror (pvalueish, self.label ) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py ",第 573 行,在ror 结果 = p.apply(self, pvalueish, label) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py”,第 646 行,应用返回self.apply(transform, pvalueish) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py”,第 689 行,在应用 pvalueish_result = self.runner .apply(transform, pvalueish, self._options) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py”,第 188 行,在应用中return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform .expand(input) 文件 "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py”,第 1881 行,展开 temp_location = pcoll.pipeline.options.view_as(AttributeError:'tuple' 对象没有属性“管道”

由于我是 Beam 新手,因此代码并没有那么优化。如果我可以进一步优化此代码,请告诉我。

感谢您的时间和帮助!

标签: python-3.xapache-beamdataflow

解决方案


对函数应用过滤条件在管道中不起作用。您有两个相同的选项:-

  1. 在管道中应用过滤条件。
  2. 在 BQ-SQL 上应用过滤条件。

Function 上的过滤条件对于 Function 返回调用函数的内容将是模棱两可的。因此修改您的代码以将过滤条件应用于上面突出显示的两个位置中的任何一个。


推荐阅读