java - java jdbctobigquery 管道
问题描述
我正在为 bigquery 管道制作一个 jave jdbc,但我想添加一个额外的步骤来检查某个列中的值是否满足大于 4 个字符的标准。我会使用什么方法?
这是我从 jdbc 读取、转换为 tablerow 并将 tablerow 附加到现有 bigquery 表的代码。
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/**
* A template that copies data from a relational database using JDBC to an existing BigQuery table.
*/
public class JdbcToBigQuery {
private static ValueProvider<String> maybeDecrypt(
ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
}
/**
* Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
* blocking execution is required, use the {@link
* JdbcToBigQuery#run(JdbcConverters.JdbcToBigQueryOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
JdbcConverters.JdbcToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(JdbcConverters.JdbcToBigQueryOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Append TableRow to BigQuery via BigQueryIO
*/
pipeline
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from JdbcIO",
DynamicJdbcIO.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIO.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery(options.getQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));
// Execute the pipeline and return the result.
return pipeline.run();
}
}```
解决方案
对于您的条件,请使用withMethod(Method.DIRECT_READ)将 BigQuery Storage API 用于读取操作。如果要过滤和验证,使用列投影和列过滤等功能,必须分别指定withSelectedFields和withRowRestriction。有关其他信息,请参阅此链接。
推荐阅读
- python - python中5 ode系统求解中标量变量错误的无效索引
- reactjs - Expo无法从资产文件夹加载任何图像
- python - 如何使用 subplot 绘制 nrows=12,ncols=2
- r - 无法以 CSV 格式从 Rstudo 导出到 Excel。我究竟做错了什么?下面的错误信息:
- arduino - 如何在 Arduino nano IoT 上在 Wifi 和 BLE 之间切换?
- amazon-sqs - 通过 asin 从 AnyOfferChangedNotification 获取产品 sku
- javascript - 如何更改 C3Chart 工具提示背景颜色?
- vue.js - Vue js表格动作按钮不显示
- angular - 如何在这样的自定义场景中以角度调用http post?
- mysql - 无法将 OKE Kubernetes 集群连接到集群外部的 Oracle MySQL 数据库实例