apache-kafka-streams - Apache Kafka ksqlDB 将流转发到现有主题
问题描述
我仍在使用 ksqlDB 来了解它可以做什么和不能做什么,所以我可能会遗漏一些明显的东西。
这是我正在尝试做的事情:
-- Define input stream for raw data
CREATE STREAM user_input (
a VARCHAR,
b VARCHAR
) WITH (
KAFKA_TOPIC = 'user_input',
PARTITIONS = 1,
VALUE_FORMAT = 'JSON'
);
-- Then define table that would hold input data, keyed
CREATE TABLE user_input_keyed (
key VARCHAR PRIMARY KEY,
a VARCHAR,
B VARCHAR
) WITH (
KAFKA_TOPIC = 'user_input_keyed',
PARTITIONS = 1,
VALUE_FORMAT = 'JSON'
);
-- And then define substream that would append key to input stream
CREATE STREAM user_input_keyer WITH (
KAFKA_TOPIC = 'user_input_keyed', -- this is the same topic as for TABLE user_input_keyed
VALUE_FORMAT = 'JSON'
) AS SELECT UUID() key, a, b
FROM user_input
EMIT CHANGES;
-- This construction would allow me to eliminate certain data entries by newly
-- generated key by sending tombstone message like this:
INSERT INTO user_input_keyed (key) VALUES (${some UUID});
-- Event more - by using this technique I could actually invoke ksqlDB level
-- tombstones as well.
-- We can imagine this scheme being extended to not only receive new data, but
-- some updates as well.
-- So I could have it like this
CREATE STREAM user_input_update (
key VARCHAR KEY,
a VARCHAR,
b VARCHAR
) WITH (
KAFKA_TOPIC = 'user_input_update',
VALUE_FORMAT = 'JSON'
);
CREATE STREAM user_input_removed WITH (
KAFKA_TOPIC = 'user_input_keyer',
VALUE_FORMAT = 'JSON'
) AS SELECT key
FROM user_input_update
WHERE a IS NULL
EMIT CHANGES;
所以问题是 - 这真的支持吗?
我在网上找到的一些代码中看到了这种方法,但我还没有在任何地方看到这种方法。
此外,这段代码实际上是有效的,但有时会弄乱一些字段/值,所以我想看看我是否没有敲门,并且在 v 0.13 上并没有那么多 ksqlDB 可以做。
解决方案
推荐阅读
- terminal - 打开新终端时出现 Msys 终端错误
- python - 无法使用 miniforge (conda-forge) 下载 Python 3.6
- java - 升级到 Oracle 19C 会抛出 java.lang.NoClassDefFoundError: oracle/ons/ONS" 错误
- python - 当数字在乘法表中为偶数时打印 0
- python - python多处理数据帧行
- r - 在 R 中跨三列查找最大值
- reactjs - 如何从用户设备将图像上传到 Firebase 存储?(V 9.0^)
- docker - Docker 运行需要至少 1 个参数错误
- javascript - 如何正确验证十进制值?
- r - dplyr:简化创建匹配和绝对差异变量