ksqldb - kafka 流中的 Consumer_failed_message:记录未从主题推送
问题描述
我有一个流程,从 IBM 大型机 IIDR,我将记录发送到 Kafka 主题。进入 Kafka 主题的value_format
消息是 AVRO,密钥也是 AVRO 格式。记录被推送到 Kafka 主题中。我有一个与该主题相关的流。但是记录不会传递到流中。主题示例test_iidr
-
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
流中的 value_format 是 AVRO 并且列名都被检查。
流创建查询 -
CREATE STREAM test_iidr (
col1 STRING,
col2 DECIMAL(2,0),
col3 DECIMAL(1,0),
iidr_tran_type STRING,
iidr_a_ccid STRING,
iidr_a_user STRING,
iidr_src_upd_ts STRING,
iidr_a_member STRING)
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');
KEY
由于声明中未提及,它是否无法从主题加载到流中WITH
?模式注册表在其中注册了test_iidr-value
和test_iidr-key
主题。
泊坞窗中的key.converter
and设置为 - 。这是在制造这个问题吗?value.converter
Kafka-connect
org.apache.kafka.connect.json.JsonConverter
JsonConverter
我用不同的流创建了一个完全不同的管道,并使用insert into
语句手动插入了相同的数据。有效。只有 IIDR 流不起作用,并且记录不会从主题推送到流中。
我正在使用 Confluent kafka 5.5.0 版。
解决方案
连接配置中的JsonConverter
很可能会将您的 Avro 数据转换为 JSON。
要确定键和值序列化格式,您可以使用PRINT
命令(我可以看到您已经运行)。 PRINT
运行时会输出键值格式。例如:
ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
因此,首先要检查的是 PRINT 为键和值输出的格式,然后CREATE
相应地更新您的语句。
请注意,ksqlDB尚不支持 Avro/Json 键,因此您可能希望/需要重新分区数据,请参阅:https ://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-待办事项,如果您的密钥未设置或采用不同格式
旁注:如果值的架构存储在架构注册表中,那么您不需要在 CREATE 语句中定义列,因为 ksqlDB 将从架构注册表加载列
旁注:您不需要PARTITIONS=1, REPLICAS=3
在WITH
现有主题的子句中,只有当您希望 ksqlDB 为您创建主题时。
推荐阅读
- c# - 有时会抛出“System.InvalidOperationException:序列不包含元素”
- web-services - Websphere 自由服务器 JCA(Java 连接器架构)、JNDI 和资源适配器
- javascript - 如何在多标签页上的标签加载时显示第一张幻灯片
- mingw-w64 - windows10 pro 缺少 objdump(安装了 msys2)
- javascript - 如何根据两行数据在谷歌表格上创建if语句
- spring - 为什么@WebFluxTest 在 Spring Boot 2.1.0.RELEASE 中启用 Spring Security
- javascript - 如何在页面中获取所有包含字体的列表?
- javascript - Asp.Net Validation 控件仅与 MS Edge 中的 javascript 函数冲突
- android - 使用 LAME 将原始 PCM 文件转换为 MP3 会返回失真的音频
- git - 致命:配置“protocol.version”的未知值:2