apache-kafka - 在 ksqlDB 中创建的流显示 NULL 值
问题描述
我正在尝试在 ksqlDB 中创建一个流,以从 kafka 主题中获取数据并对其执行查询。
CREATE STREAM test_location (
id VARCHAR,
name VARCHAR,
location VARCHAR
)
WITH (KAFKA_TOPIC='public.location',
VALUE_FORMAT='JSON',
PARTITIONS=10);
主题 public.location 中的数据采用 JSON 格式。
更新的主题消息。
print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
创建流后,对创建的流执行 SELECT 后,我在输出中得到 NULL。虽然题目有数据。
select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID |NAME |LOCATION |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
Limit Reached
Query terminated
这是来自 docker 文件的详细信息
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.18.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
更新:这是我在 Kafka 中看到的主题中的一条消息
{
"sourceTable": {
"id": "1",
"name": Sam,
"location": Manchester,
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
},
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
}
我缺少哪个步骤或配置?
解决方案
给定您的有效负载,您需要声明嵌套模式,因为id
,name
和location
不是 Json 中的“顶级”字段,但它们嵌套在sourceTable
.
CREATE STREAM est_location (
sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
在定义模式时不可能“解包”数据,但模式必须与主题中的内容相匹配。除此之外,sourceTable
您还可以将ConnectorVersion
etc 添加到架构中,因为它们也是 JSON 中的“顶级”字段。底线是,ksqlDB 中的该列只能在顶级字段上声明。其他所有内容都是可以使用STRUCT
类型访问的嵌套数据。
当然稍后,当您查询时,您可以通过etcest_location
引用各个字段。sourceTable->id
如果要取消嵌套模式,也可以声明派生的 STREAM:
CREATE STREAM unnested_est_location AS
SELECT sourceTable->id AS id,
sourceTable->name AS name,
sourceTable->location AS location
FROM est_location;
当然,这会将数据写入新主题。
推荐阅读
- cassandra - Cassandra vs RDBMS:聚类列
- python - 使用“in”检查 Pandas 系列中是否存在值
- node.js - 在 Nodejs 中执行命令并将输出附加到控制台
- javascript - 将参数传递给 Firebase Cloud Functions
- sql - 为什么 hasura 控制台不将数据作为我的自定义 sql 函数返回?
- javascript - firebase 托管的反应应用程序返回“您需要启用 JavaScript 才能运行此应用程序”
- pandas - 如何将 DataFrame 列转换为浮动并永久保持这种方式
- ms-access - MS-Access 粘贴期间的字段/记录丢失
- swift - Swift - 结合多种折扣类型
- python - 使用元组格式化字符串时删除尾随逗号