python - 使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS
问题描述
我是 Apache Beam 的新手,我正在尝试编写一个管道来从 Google BigQuery 中提取数据并使用 Python 以 CSV 格式将数据写入 GCS。
使用beam.io.read(beam.io.BigQuerySource())
我能够从 BigQuery 读取数据,但不确定如何以 CSV 格式将其写入 GCS。
是否有自定义功能可以实现相同的功能,您能帮帮我吗?
import logging
import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need your help
BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=CREATE_IF_NEEDED,
write_disposition=WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
解决方案
您可以使用WriteToText
添加.csv
后缀和headers
. 考虑到您需要将查询结果解析为 CSV 格式。例如,我使用了莎士比亚公共数据集和以下查询:
SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10
我们现在读取查询结果:
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
BQ_DATA
现在包含键值对:
{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}
我们可以应用一个beam.Map
函数来只产生值:
BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())
摘录BQ_VALUES
:
[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]
最后再次映射以使所有列值用逗号而不是列表分隔(考虑到如果双引号可以出现在字段中,则需要转义它们):
BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))
现在我们使用后缀和标题将结果写入 GCS:
BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')
书面结果:
$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
推荐阅读
- ejabberd - ejabberd MUC-Sub 麻烦
- android - 本地类的 Proguard 规则
- java - JcaPEMWriter 可以生成 PKCS#8 输出吗?
- php - 当应用程序在本地运行时如何修复 PHP 上的重定向错误
- php - Google Api,如何将 access_token 传递给 API 调用
- javascript - 在反应中选择带有jquery的当前元素
- angularjs - 使用绑定指令不起作用的两种方式编辑对象属性
- apache-spark - 结构化流式传输作业突然停止
- google-apps-script - 删除原始数据时,如何在不删除复制数据的情况下将数据从 Google Sheet1 移动或复制到 Sheet2
- amazon-web-services - s3-select查询字段名数据