java - 将 ByteBuffer 字段写入 Cassandra 的异常
问题描述
我们有一个 apache Beam 数据流作业,它从 Big Query 读取数据,然后使用 Datastax 驱动程序将它们写入 Cassandra 之前将它们转换为 POJO。我最近向表中添加了一个新的 blob 列,并向 POJO 添加了一个 ByteBuffer 字段。
我如何创建 ByteBuffer
String str = objectMapper.writeValueAsString(installSkuAttributes);
byte[] bytes = str.getBytes( StandardCharsets.UTF_8 );
pojo.setInstallAttributes(ByteBuffer.wrap(bytes));
这是管道片段 public void executePipeline() throws Exception {
Pipeline pipeline =
Pipeline.create(jobMetaDataBean.getDataflowPipelineOptions());
.... writeDataToCassandra(installSkuData);
pipeline.run();
在 DAO 编写器中进行必要的更改后,我在运行作业时遇到以下异常。我正在使用 Datastax 驱动程序
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.executePipeline(InstallSkusFullFeed.java:216)
at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.main(InstallSkusFullFeed.java:221)
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:88)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
Caused by: java.io.NotSerializableException: java.nio.HeapByteBuffer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:166)
at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:52)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:189)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决方案
真正的问题在这里:
java.io.NotSerializableException: java.nio.HeapByteBuffer
这是由于ByteBuffer
不可序列化的事实造成的,因此它不能用于分布式工作。您可以通过byte[]
直接用作属性来避免这种情况,或者实现一个ByteBuffer
可序列化的包装器。
推荐阅读
- node.js - 在我的 testcafe 项目中尝试 Jenkins,但构建失败
- javascript - Redux thunk 不会更改输入字段的值
- extjs - Ext js 7 现代面板.Resizer 拆分器配置
- python - 我将python转换为c编程。是否可以在另一个函数中定义一个函数?蟒蛇到C
- php - Laravel 8:试图获取非对象的属性“名称”
- python - 通过python将图表从excel复制到powerpoint
- postgresql - 用于创建 PostgreSQL 服务器并启用自定义密钥加密的 Azure ARM 模板
- android - LocationRequest (FusedLocationProviderClient) - 已弃用
- javascript - Emscripten预加载文件我做错了什么?
- python - 问题乘以一个浮点值