首页 > 解决方案 > 为什么我的 RabbitMQ 消息无法使用 Apache Beam 序列化?

问题描述

我正在尝试使用 Apache Beam 读取 RabbitMQ 队列。我编写了一些转换代码来将消息写入 Kafka。我什至使用简单的短信测试了我的场景。

现在我尝试使用该转换器运行的有效消息来部署它。这些是大小适中的 JSON 消息。

奇怪的是,当我尝试阅读“生产”消息时,我得到了这个异常堆栈跟踪。

java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f179a7f}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@76190fb2)'.
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        java.util.HashMap.writeObject(HashMap.java:1362)
        sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.lang.reflect.Method.invoke(Method.java:498)
        java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

我的理解是 RabbitMQ 读者认为消息足够大,需要使用LongString不可序列化的 .

我在这一点上是对的吗?如果是这样,我如何建议 RabbitMQ 使用一个简单的字符串(这对于这些消息来说就足够了)?

标签: rabbitmqapache-beam

解决方案


这是一个 Apache Beam ( https://issues.apache.org/jira/browse/BEAM-7414 ),其解决方案...尚未通过纯粹的惰性合并到 Apache Beam 存储库中(这很糟糕)。如果有人想立即修复,可以建立我的分支https://github.com/Riduidel/beam/tree/fix/rabbitmq-message-not-serializable


推荐阅读