首页 > 解决方案 > 无法使用 JDBC Connect 从 Kafka 流向 ClickHouse 表插入数据

问题描述

Kafka 和 Clickhouse 在 Docker 中运行。我正在尝试通过 JDBC Connect 将一些数据从 Kafka 流插入 ClickHouse 表。从流中查询数据显示数据在流中。然后我创建 Clickhouse 表,其字段与 Kafka 流中的字段相同

CREATE TABLE IF NOT EXISTS default.table
(
    id Int32, 
    ms_segment_group_id Int32,
    transact_survey_tmpl_id Int32,
    customer_id Int32, 
    customer_account_id Int32,
    agent String,
    comm_channel_id Int32,
    comm_channel_address String,
    answer_date String,
    communication_type_id Int32, 
    object_id Int32,
    create_date String,
    application_id String,
    external_id Int32, 
    transact_survey_state_id Int32, 
    code String
) ENGINE = MergeTree()
  PARTITION BY id
  ORDER BY tuple();

然后我在 ksql 中创建 JDBC CONNECT

    CREATE SOURCE CONNECTOR `clickhouse-jdbc-connector` WITH (
    'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',
    'topics'='STREAM_TEST',
    'tasks.max'='1',
    'connection.url'='jdbc:clickhouse://clickhouse:8123/default',
    'table.name.format'='table'
    );  

ksql 的输出表明连接器已创建并且一切正常。然后我试图从 ClickHouse 表中选择并得到空结果:

图片

据我了解,ClickHouse 表中没有数据并且 JDBC 连接无法正常工作?

标签: jdbcapache-kafkaclickhouse

解决方案


将引擎从 MergeTree 更改为 Kafka 的最简单方法。首先,需要确保 Kafka 主题正在获取消息。

) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]

推荐阅读