首页 > 解决方案 > 使用 Scala 将 Bigquery 数据(使用 SQL)导入 S3

问题描述

我是 BigQuery 世界的新手!!!

我的任务是查询 BigQuery 数据(SQL 语句)(并打开其他方法)并上传到 S3 存储桶。查找用于查询数据并加载到 S3 存储桶中的随附代码(未按预期工作)。我们试图避免将数据存储到 GCS 并从那里提取或使用 GSUTIL。

任何指向示例或方法的指针都值得赞赏。

提前致谢

从 BigQuery 中提取数据:

import java.util.UUID
import com.google.cloud.bigquery.{Dataset, JobId, JobInfo, 
QueryJobConfiguration}
import com.gwf.Logging

class BQQuery extends Logging{
    def extractBQData: String  = {
    val bqClient = (new BigQueryClient).createBQClient
    logger.info("projectId: " + bqClient.getOptions.getProjectId)
    val START_DATE = "20180514"
    val END_DATE = "20180514"

    val query = "SELECT " + " FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', 
    TIMESTAMP_SECONDS(SAFE_CAST(visitStartTime+hits.time/1000 AS INT64)), 
    'America/Denver') AS hit_timestamp, " + " FORMAT_TIMESTAMP('%Y-%m-%d 
    %H:%M:%S', TIMESTAMP_SECONDS(SAFE_CAST(visitStartTime AS INT64)), 
    'America/Denver') AS hit_visitStartTime, " + " CONCAT(SUBSTR(date,0,4),'- 
    ',SUBSTR(date,5,2),'-',SUBSTR(date,7,2)) AS date, " + " visitNumber, " + " 
    visitId, " + " fullVisitorId, " + " totals.hits, " + " totals.pageviews, " + 
    " totals.timeOnSite, " + " totals.bounces, " + " totals.newVisits, " + " 
    trafficSource.referralPath, " + " trafficSource.campaign, " + " 
    trafficSource.source, " + " trafficSource.medium, " + " 
    trafficSource.keyword, " + " trafficSource.adContent, " + " device.browser, 
    " + " device.browserVersion, " + " device.browserSize, " + " 
    device.operatingSystem, " + " device.operatingSystemVersion, " + " 
    device.isMobile, " + " device.mobileDeviceBranding, " + " 
    device.flashVersion, " + " device.javaEnabled, " + " device.language, " + " 
    device.screenColors, " + " device.screenResolution, " + " 
    device.deviceCategory, " + " geoNetwork.continent, " + " 
    geoNetwork.subContinent, " + " geoNetwork.country, " + " geoNetwork.region, 
    " + " geoNetwork.metro, " + " hits.type, " + " hits.hitNumber, " + " 
    hits.social.socialInteractionNetwork, " + " 
    hits.social.socialInteractionAction, " + " hits.time/1000 AS hits_time, " + 
    " hits.hour, " + " hits.minute, " + " hits.isSecure, " + " 
    hits.isInteraction, " + " hits.isEntrance, " + " hits.isExit, " + " 
    hits.referer, " + " hits.page.pagePath, " + " hits.page.hostname, " + " 
    hits.page.pageTitle, " + " hits.page.searchKeyword, " + " 
    hits.page.searchCategory, " + " hits.eventInfo.eventCategory, " + " 
    hits.eventInfo.eventAction, " + " hits.eventInfo.eventLabel, " + " 
    hits.eventInfo.eventValue, " + " (SELECT MAX(IF(index=1, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS LIAT_event_source_cd1, " + " (SELECT 
    MAX(IF(index=2, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    LIAT_event_id_cd2, " + " (SELECT MAX(IF(index=3, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Slider_id_cd3, " + " (SELECT 
    MAX(IF(index=4, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Individual_id_cd4, " + " (SELECT MAX(IF(index=5, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Plan_id_cd5, " + " (SELECT MAX(IF(index=6, 
    value, NULL)) FROM UNNEST(hits.customDimensions)) AS Terminated_status_cd6, 
    " + " (SELECT MAX(IF(index=7, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS LIAT_Slider_Changed_Element_cd7, " + " 
    (SELECT MAX(IF(index=8, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    LIAT_Page_Name_cd8, " + " (SELECT MAX(IF(index=9, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Login_Status_cd9, " + " (SELECT 
    MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Debug_cd10, " + " (SELECT MAX(IF(index=11, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Age_cd11, " + " (SELECT MAX(IF(index=12, 
    value, NULL)) FROM UNNEST(hits.customDimensions)) AS Salary_Group_cd12, " + 
    " (SELECT MAX(IF(index=13, value, NULL)) FROM UNNEST(hits.customDimensions)) 
    AS Gender_cd13, " + " (SELECT MAX(IF(index=14, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS TransactionId_cd14, " + " (SELECT 
    MAX(IF(index=15, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Transaction_User_Id_cd15, " + " (SELECT MAX(IF(index=16, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Unique_Event_Id_cd16, " + " (SELECT 
    MAX(IF(index=17, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    not_set_dimension_cd17, " + " (SELECT MAX(IF(index=18, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS ValueUnits_cd18, " + " (SELECT 
    MAX(IF(index=19, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Deferral_Type_Code_cd19, " + " (SELECT MAX(IF(index=20, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Income_Term_cd20, " + " (SELECT 
    MAX(IF(index=21, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Plan_Name_cd21, " + " (SELECT MAX(IF(index=22, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Company_Match_Rule_cd22, " + " (SELECT 
    MAX(IF(index=23, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    Company_Match_Description_cd23, " + " (SELECT MAX(IF(index=24, value, NULL)) 
    FROM UNNEST(hits.customDimensions)) AS Has_Company_Match_cd24, " + " (SELECT 
    MAX(IF(index=25, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    TimeStamp_cd25, " + " (SELECT MAX(IF(index=26, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS Enrollment_Type_cd26, " + " (SELECT 
    MAX(IF(index=27, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    test_Liat_Page_cd27, " + " (SELECT MAX(IF(index=28, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS hdic_ageselection_cd28, " + " (SELECT 
    MAX(IF(index=29, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    hdic_salaryselection_cd29, " + " (SELECT MAX(IF(index=30, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS hdic_genderselection_cd30, " + " (SELECT 
    MAX(IF(index=31, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    hdic_my_pct_goal_cd31, " + " (SELECT MAX(IF(index=32, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS hdic_mycontribution_cd32, " + " (SELECT 
    MAX(IF(index=33, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    hdic_mybalance_cd33, " + " (SELECT MAX(IF(index=34, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS hdic_peer_pctgoal_cd34, " + " (SELECT 
    MAX(IF(index=35, value, NULL)) FROM UNNEST(hits.customDimensions)) AS 
    hdic_peer_contributionrate_cd35, " + " (SELECT MAX(IF(index=36, value, 
    NULL)) FROM UNNEST(hits.customDimensions)) AS hdic_peer_balance_cd36, " + " 
    (SELECT MAX(IF(index=37, value, NULL)) FROM UNNEST(hits.customDimensions)) 
    AS hdic_top_pct_goal_cd37, " + " (SELECT MAX(IF(index=38, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS hdic_top_contributionrate_cd38, " + " 
    (SELECT MAX(IF(index=39, value, NULL)) FROM UNNEST(hits.customDimensions)) 
    AS hdic_top_balance_cd39, " + " (SELECT MAX(IF(index=40, value, NULL)) FROM 
    UNNEST(hits.customDimensions)) AS AgeRange_cd40, " + " (SELECT 
    MAX(IF(index=1, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Slider_value_cm1, " + " (SELECT MAX(IF(index=2, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Slider_previous_value_cm2, " + " (SELECT 
    MAX(IF(index=3, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Slider_Users_cm3, " + " (SELECT MAX(IF(index=4, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Slider_End_Value_cm4, " + " (SELECT 
    MAX(IF(index=5, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Slider_Start_Value_cm5, " + " (SELECT MAX(IF(index=6, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Saved_Retirement_Age_Start_Value_cm6, " + " 
    (SELECT MAX(IF(index=7, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Saved_Retirement_Age_End_Value_cm7, " + " (SELECT MAX(IF(index=8, value, 
    NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Saved_Deferral_Rate_Start_Value_cm8, " + " (SELECT MAX(IF(index=9, value, 
    NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Saved_Deferral_Rate_End_Value_cm9, " + " (SELECT MAX(IF(index=10, value, 
    NULL)) FROM UNNEST(hits.customMetrics)) AS Equity_Start_Value_cm10, " + " 
    (SELECT MAX(IF(index=11, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Equity_End_Value_cm11, " + " (SELECT MAX(IF(index=12, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Bond_Start_Value_cm12, " + " (SELECT 
    MAX(IF(index=13, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Bond_End_Value_cm13, " + " (SELECT MAX(IF(index=14, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Salary_cm14, " + " (SELECT MAX(IF(index=15, 
    value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Deferral_Rate_Change_Pct_cm15, " + " (SELECT MAX(IF(index=16, value, NULL)) 
    FROM UNNEST(hits.customMetrics)) AS BNS_Recommended_Dollar_cm16, " + " 
    (SELECT MAX(IF(index=17, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    BNS_Recommended_Pct_cm17, " + " (SELECT MAX(IF(index=18, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Participant_Age_cm18, " + " (SELECT 
    MAX(IF(index=19, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Estimated_Monthly_Income_cm19, " + " (SELECT MAX(IF(index=20, value, NULL)) 
    FROM UNNEST(hits.customMetrics)) AS Percent_of_Goal_cm20, " + " (SELECT 
    MAX(IF(index=21, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Income_Goal_cm21, " + " (SELECT MAX(IF(index=22, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Income_Gap_cm22, " + " (SELECT 
    MAX(IF(index=23, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    test_Avg_Deferral_Start_cm23, " + " (SELECT MAX(IF(index=24, value, NULL)) 
    FROM UNNEST(hits.customMetrics)) AS test_Avg_Deferral_End_cm24, " + " 
    (SELECT MAX(IF(index=25, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Goal_Base_Salary_New_cm25, " + " (SELECT MAX(IF(index=26, value, NULL)) FROM 
    UNNEST(hits.customMetrics)) AS Goal_Percent_Of_Salary_New_cm26, " + " 
    (SELECT MAX(IF(index=27, value, NULL)) FROM UNNEST(hits.customMetrics)) AS 
    Goal_Base_Salary_Previous_cm27, " + " (SELECT MAX(IF(index=28, value, NULL)) 
    FROM UNNEST(hits.customMetrics)) AS Goal_Percent_Of_Salary_Previous_cm28, " 
    + "  hits.sourcePropertyInfo.sourcePropertyDisplayName, " + "  
    hits.sourcePropertyInfo.sourcePropertyTrackingId " + " FROM `<ProjectID>. 
    <DataSetId>.ga_sessions_*` , UNNEST(hits) as hits " + " WHERE _TABLE_SUFFIX 
    BETWEEN '" + START_DATE + "' AND '" + END_DATE + "' LIMIT 5"
    val queryConfig = 
    QueryJobConfiguration.newBuilder(query).setUseLegacySql(false).build()


    logger.info("query:" + query)
    val jobId = JobId.of(UUID.randomUUID().toString)
    logger.info("Jobid:" + jobId)

    val bQry =   
        bqClient.create(JobInfo.newBuilder(queryConfig)
        .setJobId(jobId).build()).getBigQuery


    /*var queryJob = 
    bqClient.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())
    queryJob = queryJob.waitFor()

    if (queryJob.getStatus.getError != null) {
    //TODO:
    } else if (queryJob == null) {
    //TODO:
    }
    */
    bQry.getDataset(queryConfig.getDefaultDataset).toString
}

将 JSON 数据读入 DataFrame:

<code>

def loadDataFromGoogleAnalytics : DataFrame = {
    env.sparkSession
  .read
  .json((new BQQuery).extractBQData)
  .toDF()
}

</code>

标签: scalaamazon-s3google-bigquery

解决方案


推荐阅读