首页 > 解决方案 > 当密钥未引用时,如何进行 kafka rest api 调用?

问题描述

我正在尝试将 kafka 消息发送到 android 应用程序。

我在游戏后期才意识到,唯一的方法是使用 Kafka Rest Proxy。

我已经设置好了,但是我的密钥没有被引用,导致反序列化错误。

/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --property print.key=true --from-beginning

我收到 truck/6084/position {"timestamp":1636987919544,"truckId":6084,"driverId":36,"routeId":1177752383,"eventType":"Normal","correlationId":"597220341814695046","latitude":24.7700961485576,"longitude":46.6207663588065}

我已经将这些消息通过典型的消费者几乎一行一行地从融合中运行到 PC 应用程序,没有任何问题。

我的确切错误是

{"error_code":40801,"message":"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'truck': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"truck/7103/position\"; line: 1, column: 7]"}

尝试使用“jsonschema”时

{"error_code":40801,"message":"Error deserializing key/value for partition vehicle_tracking_sysA-5 at offset 0. If needed, please seek past the record to continue consumption."}

任何见解将不胜感激。

克里斯

编辑:

Autoconnected Player Kafka - Received Expection: Sequence contains no matching element trace:   at System.Linq.Enumerable.Single[TSource] (System.Collections.Generic.IEnumerable`1[T] source, System.Func`2[T,TResult] predicate) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.Impl.Librdkafka.SetDelegates (System.Type nativeMethodsClass) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.Impl.Librdkafka.TrySetDelegates (System.Collections.Generic.List`1[T] nativeMethodCandidateTypes) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.Impl.Librdkafka.LoadLinuxDelegates (System.String userSpecifiedPath) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.Impl.Librdkafka.Initialize (System.String userSpecifiedPath) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.Consumer`2[TKey,TValue]..ctor (Confluent.Kafka.ConsumerBuilder`2[TKey,TValue] builder) [0x00000] in <00000000000000000000000000000000>:0 
  at Confluent.Kafka.ConsumerBuilder`2[TKey,TValue].Build () [0x00000] in <00000000000000000000000000000000>:0 
  at KafkaConsumer+threadHandle.StartKafkaListener () [0x00000] in <00000000000000000000000000000000>:0 
  at System.Threading.ThreadHelper.ThreadStart_Context (System.Object state) [0x00000] in <00000000000000000000000000000000>:0 
  at System.Threading.ExecutionContext.RunInternal (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) [0x00000] in <00000000000000000000000000000000>:0 
  at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx) [0x00000] in <00000000000000000000000000000000>:0 
  at System.Threading.ExecutionContext.Run (System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state) [0x00000] in <00000000000000000000000000000000>:0 
  at System.Threading.ThreadHelper.ThreadStart () [0x00000] in <00000000000000000000000000000000>:0

标签: androidserializationapache-kafkakafka-consumer-apikafka-rest

解决方案


您似乎正在尝试使用 Jackson 来解析包含非 json 内容的整个响应。根据您提供的信息,您似乎有 2 个选项

  1. 找到一种方法来丢弃文件中不属于 json 响应的任何内容并将其提供给 Jackson
  2. 手动解析响应,取出您需要的部分

推荐阅读