apache-kafka - 连接到现有 kafka 代理时,ksqlDB 服务器无法启动
问题描述
我在 <myurl>:9092 有一个现有的 kafka 经纪人。该代理正在运行 Apache Kafka 2.2.0 版。我想使用 ksqlDB 对来自该 kafka 代理的主题的数据进行一些流处理。因此,根据https://docs.confluent.io/platform/current/installation/versions-interoperability.html上的兼容性表,我正在使用 Confluent Platform 5.2 版的 ksqlDB 。
我已经入驻bootstrap.servers=<myurl>:9092
了ksql/ksql-server.properties
。
但是,当我尝试通过运行启动 ksql-server 时ksql-server-start etc/ksql/ksql-server.properties
,出现以下错误:
ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:53)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
...
如果我使用本地代理并设置bootstrap.servers=localhost:9092
,ksql-server 启动没有任何问题。
如何解决这个空记录/反序列化问题,以便将 ksqldb 服务器连接到我现有的 kafka 代理?
解决方案
正如@OneCricketeer 指出的那样,问题是由集群上现有的命令主题引起的。ksql.service.id
您可以通过更改ksqlDB 服务器属性来使用新的命令主题。请参阅配置 ksqlDB 服务器。
推荐阅读
- r - Calculating the time spent with dates and time format in r
- c# - 仅下载 Azure Blob 的前几个字节
- maven - 从依赖项中读取 pom.xml 中的文件 - Maven
- cordova - 运行 ionic cordova build 命令时,你能帮我解决这个构建失败的问题吗
- azure-devops-rest-api - 在 AzureDevOps 服务器(本地)中,如何获取上次查看拉取请求时的时间戳?
- php - 如何从 POST 请求中正确输出特定的 cURL 响应
- d3.js - 带有分箱时间数据的 dc 折线图不显示空箱
- javascript - 上传失败,gapi php
- delphi - 在 Delphi 2007 项目中使用 Delphi 10.3 组件甚至是可能的。有解决方法吗?
- css - 如何使图像占据整个网格图块