apache-spark - 如何通过 Spark SQL 连接 BigQuery?
问题描述
我有一个简单的 python 代码,其中包括使用具有我的凭据的 JSON 文件连接 bigQuery。
data = pd.read_gbq(SampleQuery, project_id='XXXXXXXX', private_key='filename.json')
这里的 filename.json 具有以下格式:
{
"type": "service_account",
"project_id": "projectId",
"private_key_id": "privateKeyId",
"private_key": "privateKey",
"client_email": "clientEmail",
"client_id": "clientId",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/clientEmail"
}
现在,我需要将此代码移植到 pyspark。但是我很难找到如何使用 Spark SQL 进行查询。我正在使用 AWS EMR 集群来运行这个查询!
任何帮助,将不胜感激!
解决方案
由于使用 Spark SQL 需要 SQLContext 对象,因此需要先配置 SparkContext 以连接到 BigQuery。在我看来,BigQuery 连接器(由 sramalingam24 和 Kenneth Jung 提出)可用于查询 BigQuery 中的数据。
请注意,sramalingam24 提供了一个示例链接,以下是代码摘要:
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
word_counts = (
table_data
.map(lambda record: json.loads(record[1]))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
sql_context = SQLContext(sc)
(word_counts
.toDF(['word', 'word_count'])
.write.format('json').save(output_directory))
然后,您可以下载其他 Hadoop 集群的连接器 jar。Kenneth Jung 提供的信息链接表明选项 --jar 可用于包含连接器 (--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar),这是包含的选项驱动程序和执行程序类路径上的 jar。
推荐阅读
- java - 如何在使用 google + 登录时询问个人范围的权限?
- vb.net - 如何从模块打开表单(引用项目)
- html - HTML 下拉菜单重定向到另一个页面
- objective-c - Async 块中的 Objective C 变量不会更改 Class 变量值
- node.js - 连接到 AWS DocumentDB 时出现连接错误
- ruby-on-rails - 为什么设计消毒剂返回空哈希
- ecmascript-6 - 使用选中的复选框过滤并使用未选中的复选框清除过滤器
- android - neighborlist.getCid() 总是返回 -1 作为值
- angular - 使用 ngIf 显示嵌套数组中的错误
- google-chrome-extension - 在 chrome 扩展后台脚本中使用 gapi,无效的 cookiePolicy