首页 > 解决方案 > Streamsets 在尝试解析有效​​ JSON 时出现此错误

问题描述

我正在为一个项目设置流集。它的来源是 Kafka 消费者。它适用于较小的消息,但是当消息大小较大时,它会引发此错误。

com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191

我已经将 Max Object Length (chars) 设置为 1000000 并将 parser.limit 属性设置为 10335040。我无法弄清楚这个问题。

不适用

完整的堆栈跟踪是

KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
com.streamsets.pipeline.api.base.OnRecordErrorException: KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:265)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:224)
    at com.streamsets.pipeline.stage.origin.kafka.StandaloneKafkaSource.produce(StandaloneKafkaSource.java:86)
    at com.streamsets.pipeline.api.base.configurablestage.DSource.produce(DSource.java:38)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:283)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:235)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:298)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:219)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:810)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:554)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
    at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:527)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:109)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
    at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:703)
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:483)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName2(ReaderBasedJsonParser.java:1716)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName(ReaderBasedJsonParser.java:1700)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:493)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.readObjectFromStream(JsonObjectReaderImpl.java:199)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl.readObjectFromStream(OverrunJsonObjectReaderImpl.java:196)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.read(JsonObjectReaderImpl.java:111)
    at com.streamsets.pipeline.lib.parser.json.JsonCharDataParser.parse(JsonCharDataParser.java:70)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.lambda$parse$0(WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.parse(WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:244)
    ... 29 more

此 json 失败:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex":"profile000","pType":"profile","da":{"clientId ":"168613","clientType":"1","statusDataList":{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS": null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05 -21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null," STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103 ","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949, "SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68365 ":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0," OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY": "76866550"},"68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[]," CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS": 1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[]," ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367 ","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY ":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1 ,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949" ,"CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE" :"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68372":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE ":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00"," REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}},"recruiterId":"76866550","isActivity":false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613", "statusId":"1949","subStatusId":null,"assessmentTestId":"","feedbackFormIds":[],"hiring manager":[],"isBillingEnabled":null,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment":{"action":1," sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE":"2019-05-21 17:18:59"}},"projectDetailsForAsyncActions":{ "projectId":"15463"}},"optn":{"_routing":"168613"},"action":22,"activityField":"STATUS_CHANGED"},"dataArray":null,"retryCount":3 ,"additionalHeaders":{},"routingKey":"168613","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID":"27108593751"}}

这个 Json 成功了:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex":"profile000","pType":"profile","da":{"clientId ":"168613","clientType":"1","statusDataList":{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS": null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05 -21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null," STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103 ","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949, "SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68365 ":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0," OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY": "76866550"},"68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[]," CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS": 1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[]," ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367 ","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY ":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1 ,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949" ,"CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE" :"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}},"recruiterId":"76866550","isActivity":false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613","statusId":"1949" ,"subStatusId":null,"assessmentTestId":"","feedbackFormIds":[],"招聘经理":[],"isBillingEnabled":null,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment": {"action":1,"sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE":"2019-05-21 17:18:59" }},"projectDetailsForAsyncActions":{"projectId":"15463"}},"optn":{"_routing":"168613"},"action":22,"activityField":"STATUS_CHANGED"},"dataArray":null,"retryCount":3,"additionalHeaders":{},"routingKey":"168613 ","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID":"27108593751"}}

标签: streamsetsbuffer-overrun

解决方案


我写了一个快速管道来尝试复制它,但它可以按我的预期工作。正如您所做的那样,我必须在 Kafka Consumer 的数据格式配置中设置最大对象长度(字符),它读取和解析数据就好了。

检查从 Kafka 中获取的数据是否完好:复制管道,将 Kafka Consumer 的 Data Format 更改为Text,并将输出发送到文件。您应该能够查看是否正在从 Kafka 主题读取所有数据。Kafka 中的最大消息大小可能已设置为 4k,这会导致消息被截断。

要检查的另一件事是您使用的是正确的阶段库。事实上,正如评论中所解释的,这就是修复——Deep 使用的是 CDH 2.x 消费者;当他将其更改为 Kafka 0.11.0.0 时,它开始正常工作。


推荐阅读