首页 > 解决方案 > Apache Kafka ksqlDB 将流转发到现有主题

问题描述

我仍在使用 ksqlDB 来了解它可以做什么和不能做什么,所以我可能会遗漏一些明显的东西。

这是我正在尝试做的事情:

-- Define input stream for raw data
CREATE STREAM user_input (
  a VARCHAR,
  b VARCHAR
) WITH (
  KAFKA_TOPIC = 'user_input',
  PARTITIONS = 1,
  VALUE_FORMAT = 'JSON'
);

-- Then define table that would hold input data, keyed
CREATE TABLE user_input_keyed (
  key VARCHAR PRIMARY KEY,
  a VARCHAR,
  B VARCHAR
) WITH (
  KAFKA_TOPIC = 'user_input_keyed',
  PARTITIONS = 1,
  VALUE_FORMAT = 'JSON'
);

-- And then define substream that would append key to input stream
CREATE STREAM user_input_keyer WITH (
    KAFKA_TOPIC = 'user_input_keyed', -- this is the same topic as for TABLE user_input_keyed 
    VALUE_FORMAT = 'JSON'
) AS SELECT UUID() key, a, b
FROM user_input
EMIT CHANGES;

-- This construction would allow me to eliminate certain data entries by newly 
-- generated key by sending tombstone message like this:
INSERT INTO user_input_keyed (key) VALUES (${some UUID});

-- Event more - by using this technique I could actually invoke ksqlDB level
-- tombstones as well.
-- We can imagine this scheme being extended to not only receive new data, but 
-- some updates as well.
-- So I could have it like this

CREATE STREAM user_input_update (
  key VARCHAR KEY,
  a VARCHAR,
  b VARCHAR
) WITH (
  KAFKA_TOPIC = 'user_input_update',
  VALUE_FORMAT = 'JSON'
);
CREATE STREAM user_input_removed WITH ( 
  KAFKA_TOPIC = 'user_input_keyer',
  VALUE_FORMAT = 'JSON'
) AS SELECT key 
FROM user_input_update
WHERE a IS NULL
EMIT CHANGES;

所以问题是 - 这真的支持吗?

我在网上找到的一些代码中看到了这种方法,但我还没有在任何地方看到这种方法。

此外,这段代码实际上是有效的,但有时会弄乱一些字段/值,所以我想看看我是否没有敲门,并且在 v 0.13 上并没有那么多 ksqlDB 可以做。

标签: apache-kafka-streamsksqldb

解决方案


推荐阅读