首页 > 解决方案 > 为什么 Clickhouse 不支持向 kafka 表添加列

问题描述

我在 ClickHouse 中向 Kafka 队列添加列时遇到问题。

我用命令创建了一个表

CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
(
    `ts` String,
    .... some other columns
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = '172.21.0.3:9092', 
kafka_topic_list = 'my_topic', 
kafka_group_name = 'my_group', 
kafka_format = 'JSONEachRow', 
kafka_row_delimiter = '\n', 
kafka_num_consumers = 1, 
kafka_skip_broken_messages = 10;

然后尝试添加一列

ALTER TABLE my_db.my_queue  ON CLUSTER my_cluster ADD COLUMN new_column String;

但是出现错误

SQL Error [48]: ClickHouse exception, code: 48, host: 172.21.0.4, port: 8123; Code: 48,
e.displayText() = DB::Exception: There was an error on [clickhouse-server:9000]: Code: 48,
e.displayText() = DB::Exception: Alter of type 'ADD COLUMN' is not supported by storage Kafka
(version 20.11.4.13 (official build)) (version 20.11.4.13 (official build))

我不熟悉 ClickHouse 和任何分析数据库。所以我想知道为什么不支持它?或者我应该以另一种方式添加一列?

标签: apache-kafkaclickhouse

解决方案


支持来自 Kafka 队列的具有不同模式的消息的一种方法包括存储原始 JSON 消息,如下所示:

CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
(
    `message` String
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = '172.21.0.3:9092', 
kafka_topic_list = 'my_topic', 
kafka_group_name = 'my_group', 
kafka_format = 'JSONAsString', 
kafka_row_delimiter = '\n', 
kafka_num_consumers = 1, 
kafka_skip_broken_messages = 10;

JSONAsString格式将原始 JSON 存储在message中。通过这种方式,您可以从 Kafka 表中通过物化视图和JSON 函数对每个新行进行后处理。

例如:

CREATE TABLE my_db.post_processed_data (
  `ts` String,
  `another_column` String
)
-- use a proper engine
Engine=Log;

CREATE MATERIALIZED VIEW my_db.my_queue_mv TO my_db.post_processed_data 
AS
SELECT 
    JSONExtractString(message, 'ts') AS ts,
    JSONExtractString(message, 'another_column') AS another_column
FROM my_db.my_queue;

如果 Kafka 队列的 JSON 模式有任何变化,您可以相应ALTER TABLE .. ADD COLUMN ..地在post_processed_data表中执行操作并相应地更新物化视图。这样卡夫卡表将保持原样。


推荐阅读