google-cloud-platform - 使用 SchemaAndRecord 类从表中读取 BigQuery 数值数据类型
问题描述
在开发我的代码时,我使用下面的代码片段从 BigQuery 读取表数据。
PCollection<ReasonCode> gpseEftReasonCodes = input.
apply("Reading xxyyzz",
BigQueryIO.read(new
ReadTable<ReasonCode>(ReasonCode.class))
.withoutValidation().withTemplateCompatibility()
.fromQuery("Select * from dataset.xxyyzz").usingStandardSql()
.withCoder(SerializableCoder.of(xxyyzz.class))
读表类:
@DefaultSchema(JavaBeanSchema.class)
public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> {
private static final long serialVersionUID = 1L;
private static Gson gson = new Gson();
public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class);
private final Counter countingRecords = Metrics.counter(ReadTable.class,"Reading Records EFT Report");
private Class<T> class1;
public ReadTable(Class<T> class1) {
this.class1 = class1;
}
public T apply(SchemaAndRecord schemaAndRecord) {
Map<String, String> mapping = new HashMap<>();
int counter = 0;
try {
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++;
mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
}
countingRecords.inc();
JsonElement jsonElement = gson.toJsonTree(mapping);
return gson.fromJson(jsonElement, class1);
}catch(Exception mp) {
LOG.error("Found Wrong Mapping for the Record: "+mapping);
mp.printStackTrace();
return null;
}
}
}
因此,在从 Bigquery 读取数据后,我将数据从 SchemaAndRecord 映射到 pojo,我得到了下面提到的数据类型为 Numeric 的列的值。
last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]
我的期望是我会得到准确的值,但我使用的 HyperByte 缓冲区版本是 Apache Beam 2.12.0。如果需要更多信息,请告诉我。
方式2尝试:
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++;
mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id")) {
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
System.out.println("Numeric Con"+numericValue);
}
else {
System.out.println("Else Condition "+f.name());
}
}
```
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD
Way 2:
GenericRecord s = schemaAndRecord.getRecord();
org.apache.avro.Schema s1 = s.getSchema();
for (Field f : s1.getFields()) {
counter++;
mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
if(f.name().equalsIgnoreCase("reason_code_id")) {
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
System.out.println("Numeric Con"+numericValue);
}
else {
System.out.println("Else Condition "+f.name());
}
}
```
Facing Issue:
2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD
StackTrace
java.io.IOException: Failed to start reading from source: gs://trusted-bucket/mgp/temp/BigQueryExtractTemp/3a5365f1e53d4dd393f0eda15a2c6bd4/000000000000.avro range [0, 65461)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:596)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:306)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: Can't create a: RECORD
at org.apache.avro.Schema.create(Schema.java:120)
at com.globalpay.WelcomeEmail.mapRecordToObject(WelcomeEmail.java:118)
at com.globalpay.WelcomeEmail.access$0(WelcomeEmail.java:112)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:54)
at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:221)
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:214)
at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:567)
at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:593)
... 14 more
解决方案
总体方法是正确的。很难弄清楚到底出了什么问题。如果可能,请粘贴完整的堆栈跟踪。另外,看看如何使用的示例BigQueryIO.read()
,它们可能会有所帮助:https ://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO .html
而不是read()
您可以使用readTableRows()
并获取解析的值。或者按照TableRowParser
实现的示例来了解此类解析器如何工作(readTableRows()
它用于主/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L449
更新
显然,最近添加了使用 Beam 模式读取行的功能:https ://github.com/apache/beam/pull/8620
您现在应该能够按照以下方式做一些事情:
p.apply(BigQueryIO.readTableRowsWithSchema())
.apply(Convert.to(PojoClass.class));
推荐阅读
- c - 如何使用scanf函数获取字符串长度
- react-native - 从 useSelector 获取相关值
- powershell - 在powershell中将行与字符串变量进行比较
- swift - 如何快速向 UIViewController 添加左右栏按钮
- sql - 使用 sql 查询文本字符串 - 如何按类别查找单词序列
- tableau-api - 在 Tableau 中连接来自 2 个字符串维度的数据
- reporting-services - SSRS:将计算的行放在另一行下方
- javascript - 您如何确定 gatsbyjs 中页面之间的父子关系?
- kubernetes - kubernetes pod 动态命令行
- php - 在数组中搜索多个序列