首页 > 解决方案 > 在 ksqlDB 中创建的流显示 NULL 值

问题描述

我正在尝试在 ksqlDB 中创建一个流,以从 kafka 主题中获取数据并对其执行查询。

CREATE STREAM test_location (
  id VARCHAR,
  name VARCHAR,
  location VARCHAR
  )

 WITH (KAFKA_TOPIC='public.location',
       VALUE_FORMAT='JSON',
       PARTITIONS=10);

主题 public.location 中的数据采用 JSON 格式。

更新的主题消息。

print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3

创建流后,对创建的流执行 SELECT 后,我在输出中得到 NULL。虽然题目有数据。

select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID                                                               |NAME                                                            |LOCATION                                                          |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
Limit Reached
Query terminated

这是来自 docker 文件的详细信息

version: '2'

services:

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.18.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      # Configuration to embed Kafka Connect support.
      KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

更新:这是我在 Kafka 中看到的主题中的一条消息

{
   "sourceTable": {
      "id": "1",
      "name": Sam,
      "location": Manchester,
      "ConnectorVersion": null,
      "connectorId": null,
      "ConnectorName": null,
      "DbName": null,
      "DbSchema": null,
      "TableName": null,
      "payload": null,
      "schema": null
   },
   "ConnectorVersion": null,
   "connectorId": null,
   "ConnectorName": null,
   "DbName": null,
   "DbSchema": null,
   "TableName": null,
   "payload": null,
   "schema": null
}

我缺少哪个步骤或配置?

标签: apache-kafkaconfluent-platformksqldb

解决方案


给定您的有效负载,您需要声明嵌套模式,因为id,namelocation不是 Json 中的“顶级”字段,但它们嵌套在sourceTable.

CREATE STREAM est_location (
  sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)

在定义模式时不可能“解包”数据,但模式必须与主题中的内容相匹配。除此之外,sourceTable您还可以将ConnectorVersionetc 添加到架构中,因为它们也是 JSON 中的“顶级”字段。底线是,ksqlDB 中的该列只能在顶级字段上声明。其他所有内容都是可以使用STRUCT类型访问的嵌套数据。

当然稍后,当您查询时,您可以通过etcest_location引用各个字段。sourceTable->id

如果要取消嵌套模式,也可以声明派生的 STREAM:

CREATE STREAM unnested_est_location AS
  SELECT sourceTable->id AS id,
         sourceTable->name AS name,
         sourceTable->location AS location
  FROM est_location;

当然,这会将数据写入新主题。


推荐阅读