首页 > 解决方案 > ksqldb REST API - 如何从带有推送查询的表中接收墓碑值?

问题描述

如何使用ksqldb REST API接收 tombstone 值?

表格示例:

CREATE TABLE movies (
      title VARCHAR PRIMARY KEY,
      id INT,
      release_year INT
    ) WITH (
      KAFKA_TOPIC='movies',
      PARTITIONS=1,
      VALUE_FORMAT = 'JSON'
    );

INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (48, 'Aliens', 1986);

罗宾。M.发布了如何将墓碑值插入表的解决方法。

通过 REST API 进行的以下 ksql 查询不会收到那些“已删除”的值:

POST /query HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{"ksql":"SELECT * FROM Movies EMIT CHANGES;","streamsProperties":{"ksql.streams.auto.offset.reset":"earliest"}}

收到的行插入示例:

[{"header":{"queryId":"none","schema":"`TITLE` STRING, `ID` INTEGER, `RELEASE_YEAR` INTEGER"}},
{"row":{"columns":["Die Hard",2,1998]}},

插入 tombstone 值后的预期响应是具有布尔 row.tombstone 属性的行:

{"row":{"columns":["Die Hard",null,null],"tombstone":true}}

该值不会发送给客户端。这是一个错误还是我做错了什么?如果可以接收这些值,我可以对 /query-stream API 做同样的事情吗?

标签: ksqldb

解决方案


我在 GitHub 上发现了这个问题,所以我尝试了最近发布的 0.15.0 版本,它可以工作。所以这似乎是 v0.14.0 中的一个错误。


推荐阅读