首页 > 解决方案 > java jdbctobigquery 管道

问题描述

我正在为 bigquery 管道制作一个 jave jdbc,但我想添加一个额外的步骤来检查某个列中的值是否满足大于 4 个字符的标准。我会使用什么方法?

这是我从 jdbc 读取、转换为 tablerow 并将 tablerow 附加到现有 bigquery 表的代码。

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;

/**
* A template that copies data from a relational database using JDBC to an existing BigQuery table.
*/
public class JdbcToBigQuery {

 private static ValueProvider<String> maybeDecrypt(
     ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
   return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
 }

 /**
  * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
  * blocking execution is required, use the {@link
  * JdbcToBigQuery#run(JdbcConverters.JdbcToBigQueryOptions)} method to start the pipeline and
  * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
  *
  * @param args The command-line arguments to the pipeline.
  */
 public static void main(String[] args) {

   // Parse the user options passed from the command-line
   JdbcConverters.JdbcToBigQueryOptions options =
       PipelineOptionsFactory.fromArgs(args)
           .withValidation()
           .as(JdbcConverters.JdbcToBigQueryOptions.class);

   run(options);
 }

 /**
  * Runs the pipeline with the supplied options.
  *
  * @param options The execution parameters to the pipeline.
  * @return The result of the pipeline execution.
  */
 private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
   // Create the pipeline
   Pipeline pipeline = Pipeline.create(options);

   /*
    * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
    *        2) Append TableRow to BigQuery via BigQueryIO
    */
   pipeline
       /*
        * Step 1: Read records via JDBC and convert to TableRow
        *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
        */
       .apply(
           "Read from JdbcIO",
           DynamicJdbcIO.<TableRow>read()
               .withDataSourceConfiguration(
                   DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                           options.getDriverClassName(),
                           maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                       .withUsername(
                           maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                       .withPassword(
                           maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                       .withDriverJars(options.getDriverJars())
                       .withConnectionProperties(options.getConnectionProperties()))
               .withQuery(options.getQuery())
               .withCoder(TableRowJsonCoder.of())
               .withRowMapper(JdbcConverters.getResultSetToTableRow()))
       /*
        * Step 2: Append TableRow to an existing BigQuery table
        */
       .apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
               .withoutValidation()
               .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
               .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
               .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
               .to(options.getOutputTable()));

   // Execute the pipeline and return the result.
   return pipeline.run();
 }
}```

标签: javagoogle-cloud-platformpipelinedataflow

解决方案


对于您的条件,请使用withMethod(Method.DIRECT_READ)将 BigQuery Storage API 用于读取操作。如果要过滤和验证,使用列投影和列过滤等功能,必须分别指定withSelectedFieldswithRowRestriction。有关其他信息,请参阅此链接


推荐阅读