首页 > 解决方案 > Confluent Kafka SMT:时间戳转换问题

问题描述

我对 Kafka 完全陌生,并且一直在尝试让一些基本用例工作。

我想要完成什么?从 Oracle 中的表将数据加载到 Kafka 主题中。(使用基于查询的 CDC)

目标:当我从主题中读取数据时,获取正确的日期格式。

日期列的数据将作为纪元出现。我在连接器配置中包含了时间戳转换步骤,以将其转换为 dd-MM-yyyy hh:mm:ss 格式,但无法使其正常工作。

以下是我在连接器配置中的内容。我尝试了几种格式组合,将目标类型更改为字符串,但我仍然看到纪元值。

            "incrementing.column.name": "MD_LOAD_NUMBER",
            "validate.non.null": "false",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://localhost:8081",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://localhost:8081",
            "value.converter.schemas.enable": "false",

            "transforms":"insrt_dt, updt_dt",
            "transforms.insrt_dt.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.insrt_dt.target.type": "Timestamp",
            "transforms.insrt_dt.field": "MD_INSERT_DATE",
            "transforms.insrt_dt.format": "yyyy-MM-dd HH:mm:ss",
            "transforms.updt_dt.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.updt_dt.target.type": "Timestamp",
            "transforms.updt_dt.field": "MD_UPDATE_DATE",
            "transforms.updt_dt.format": "yyyy-MM-dd HH:mm:ss"

ksql> 从头开始​​打印 biORGANIZATION_DIM;

密钥格式:SESSION(AVRO) or HOPPING(AVRO) or TUMBLING(AVRO) or AVRO or SESSION(PROTOBUF) or HOPPING(PROTOBUF) or TUMBLING(PROTOBUF) or PROTOBUF or SESSION(JSON) or HOPPING(JSON) or TUMBLING(JSON) ) 或 JSON 或 SESSION(JSON_SR) 或 HOPPING(JSON_SR) 或 TUMBLING(JSON_SR) 或 JSON_SR 或 SESSION(KAFKA_INT) 或 HOPPING(KAFKA_INT) 或 TUMBLING(KAFKA_INT) 或 KAFKA_INT 或 SESSION(KAFKA_BIGINT) 或 HOPPING(KAFKA_BIGINT) 或 TUMBLING( KAFKA_BIGINT) 或 KAFKA_BIGINT 或 SESSION(KAFKA_DOUBLE) 或 HOPPING(KAFKA_DOUBLE) 或 TUMBLING(KAFKA_DOUBLE) 或 KAFKA_DOUBLE 或 SESSION(KAFKA_STRING) 或 HOPPING(KAFKA_STRING) 或 TUMBLING(KAFKA_STRING) 或 KAFKA_STRING

值格式:AVRO

行时间:太平洋夏令时间 6/4/20 上午 10:49:19,键:,值:{“ORGANIZATION_KEY”:-1,“ORGANIZATION_ID”:null,“ORGANIZATION_NAME”:“UNASSIGNED”,“SET_OF_BOOKS_ID”:null,“MD_INSERT_DATE “:1404321287000,“MD_UPDATE_DATE”:1404321287000,“MD_DELETED_FLAG”:“N”}

我提到了https://www.confluent.de/blog/kafka-connect-deep-dive-jdbc-source-connector/https://docs.confluent.io/current/connect/transforms/timestampconverter.html

如何将日期转换为易于阅读的格式?感谢您提供任何帮助,并感谢您阅读长篇文章!

版本:融合本地版本本地命令仅适用于单节点开发环境,不适用于生产使用。https://docs.confluent.io/current/cli/index.html

Confluent 社区软件:5.5.0

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


更改为 transforms.insrt_dt.target.type :"string"。它应该可以工作。


推荐阅读