首页 > 解决方案 > 来自 Kafka 的消息未发送到 Clickhouse

问题描述

我的 java 应用程序向 Apache kafka 发送消息

ContainerProperties containerProps = new ContainerProperties("topic1");
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener<Integer, MyData>() {

    @Override
    public void onMessage(ConsumerRecord<Integer, MyData> message) {
        logger.info("received: " + message);
        latch.countDown();
    }

});
KafkaMessageListenerContainer<Integer, MyData> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
KafkaTemplate<Integer, MyData> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, new MyData("foo"));
template.flush();
container.stop();

消费者属性:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.*");

发件人属性:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

我可以在日志记录中看到:

received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name='foo'})

这意味着消息到达卡夫卡。我也在同一主机上的 Clickhouse 中有表:

CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka('localhost:9092', 'topic1', 'group1', 'JSONEachRow');

CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;

CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;

clickhouse 中的表格是空的。

标签: javaspring-bootapache-kafkaclickhouse

解决方案


推荐阅读