python-3.x - Beam - 从 Bigquery 中过滤掉记录
问题描述
我是 Apache Beam 的新手,我尝试完成三项任务
- 从表中读取前 30 项
- 从表中读取前 30 名商店
- 从 bigquery 中选择所需的列并对列Items和Stores应用 Filter 。
我有以下代码,用于执行管道
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = (
p
| 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, COUNT(item_number) AS freq_count FROM
[bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number
ORDER BY freq_count DESC
LIMIT 30"""
)
| 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
| 'ItemNumberAsList' >> beam.combiners.ToList()
)
query_top_30_stores = (
p
|
'GetTopStores' >> beam.io.ReadFromBigQuery(
query = """SELECT store_number, COUNT(store_number) AS store_count
FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
store_number ORDER BY store_count DESC LIMIT 30"""
)
| 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
| 'StoreValuesAsList' >> beam.combiners.ToList()
)
query_whole_table = (
(query_top_30_items, query_top_30_stores)
|'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
| 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
)
我已附上 Traceback 以供参考。我该如何解决这个错误?
temp_location = pcoll.pipeline.options.view_as(回溯(最近一次调用最后一次):文件“run.py”,第 113 行,在 run() 文件“run.py”,第 100 行,运行中 | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) 文件 "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",第 1058 行,在ror中 返回 self.transform.ror (pvalueish, self.label ) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py ",第 573 行,在ror 结果 = p.apply(self, pvalueish, label) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py”,第 646 行,应用返回self.apply(transform, pvalueish) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py”,第 689 行,在应用 pvalueish_result = self.runner .apply(transform, pvalueish, self._options) 文件“/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py”,第 188 行,在应用中return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform .expand(input) 文件 "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py”,第 1881 行,展开 temp_location = pcoll.pipeline.options.view_as(AttributeError:'tuple' 对象没有属性“管道”
由于我是 Beam 新手,因此代码并没有那么优化。如果我可以进一步优化此代码,请告诉我。
感谢您的时间和帮助!
解决方案
对函数应用过滤条件在管道中不起作用。您有两个相同的选项:-
- 在管道中应用过滤条件。
- 在 BQ-SQL 上应用过滤条件。
Function 上的过滤条件对于 Function 返回调用函数的内容将是模棱两可的。因此修改您的代码以将过滤条件应用于上面突出显示的两个位置中的任何一个。
推荐阅读
- java - 如果一个参数是原始参数而另一个参数可能来自列表,则模拟方法
- unreal-engine4 - 在第一人称游戏中实现小游戏 - UE4
- bash - 在 find -exec 中使用命令替换
- entity-framework - How to use .NET Framework dll buit with Entity Framework 6.0 in .NET Core application
- go - Get all go structs from a file or package
- swiftui - Mapbox iOS Navigation SDK Not Working With SwiftUI
- python - How to convert variable to a regex string?
- python - Read inputs(nummbers) till a input (Stop) is given
- node.js - express-session not responding to request with "Cookie-Set" header
- c - TDD for Embedded C : How to Design Tests for LED Driver?