hive - 简单的 Hive 写入不起作用
问题描述
尝试使用 Apache Beam 和 Hive 编写一个简单的 POC:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(PVAOptions.class);
Pipeline p = Pipeline.create(options);
p
.apply(TextIO.read().from("src/test/resources/words.txt"))
.apply(ParDo.of(new PukeHive()))
.apply(HCatalogIO.write()
.withBatchSize(100)
.withConfigProperties(getHiveConfigProperties())
.withTable(getHiveTable())
)
;
p.run().waitUntilFinish();
}
static class PukeHive extends DoFn<String, HCatRecord> {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
DefaultHCatRecord rec = new DefaultHCatRecord(1);
rec.set(0, c.element());
c.output(rec);
}
}
这会导致以下异常。调试表明这是因为 Beam 的 WritableCoder 试图创建抽象类 HCatRecord 的 newInstance()。
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:349)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
at com.comp.beam.Main.main (Main.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:114)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to deserialize record
at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:92)
at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:54)
at org.apache.beam.sdk.coders.Coder.decode (Coder.java:170)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:122)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:105)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:99)
at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:148)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:117)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
at com.comp.beam.Main$PukeHive$DoFnInvoker.invokeProcessElement (Unknown Source)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:185)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:149)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows (SimplePushbackSideInputDoFnRunner.java:78)
at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance (InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance (Constructor.java:423)
at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:85)
at org.apache.beam.sdk.io.hadoop.WritableCoder.decode (WritableCoder.java:54)
at org.apache.beam.sdk.coders.Coder.decode (Coder.java:170)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:122)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:105)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:99)
at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:148)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:117)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700 (SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:505)
at com.comp.beam.Main$PukeHive.processElement (Main.java:61)
at com.comp.beam.Main$PukeHive$DoFnInvoker.invokeProcessElement (Unknown Source)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement (SimpleDoFnRunner.java:185)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement (SimpleDoFnRunner.java:149)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows (SimplePushbackSideInputDoFnRunner.java:78)
at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement (DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
如何使用 Beam 将数据输入 Hive?
解决方案
我相信您需要为以下代码注册编码器HCatRecord
:
Pipeline p = Pipeline.create(options);
p.getCoderRegistry()
.registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class));
为了测试这一点,我HCatalogIOTest
在梁项目的类中添加了以下内容。它使用不同的模式,但应该演示一个完整的示例:
@Test
@NeedsEmptyTestTables
public void testSOKricket() {
// Register the coder
defaultPipeline
.getCoderRegistry()
.registerCoderForClass(HCatRecord.class, WritableCoder.of(DefaultHCatRecord.class));
defaultPipeline
.apply(TextIO.read().from("/tmp/words.txt"))
.apply(ParDo.of(new PukeHive()))
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withPartition(new java.util.HashMap<>())
.withBatchSize(1L));
defaultPipeline.run();
}
static class PukeHive extends DoFn<String, HCatRecord> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// our test schema is (mycol1 string, mycol2 int)
DefaultHCatRecord rec = new DefaultHCatRecord(2);
rec.set(0, c.element());
rec.set(1, 1);
c.output(rec);
}
}
推荐阅读
- c# - 当我们从 IIS 重启网站时会触发哪个事件
- c# - 如何在 SaveFileDialog 中设置初始文件名?
- pdf - 将 Informix 文件转换为 pdf
- python - 寻找图像的 Rank K 近似值
- java - 为什么在这里使用 Atomic?
- c++ - 为什么条件为假时仍然执行if语句?
- lean - 如何删除精益中的通用量词?
- javascript - 如何为具有字符串作为键和 React SFC 作为值的对象包装器定义 TypeScript 类型?
- android - Android 8.0:保持后台服务活跃
- json - 如何在颤振飞镖中将 JSON 解析为 GoogleSignInAccount 数据