java - 来自 Kafka 的消息未发送到 Clickhouse
问题描述
我的 java 应用程序向 Apache kafka 发送消息
ContainerProperties containerProps = new ContainerProperties("topic1");
final CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener(new MessageListener<Integer, MyData>() {
@Override
public void onMessage(ConsumerRecord<Integer, MyData> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, MyData> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
KafkaTemplate<Integer, MyData> template = createTemplate();
template.setDefaultTopic("topic1");
template.sendDefault(0, new MyData("foo"));
template.flush();
container.stop();
消费者属性:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.*");
发件人属性:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<my_host>:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
我可以在日志记录中看到:
received: ConsumerRecord(topic = topic1, partition = 0, leaderEpoch = 0, offset = 42, CreateTime = 1595598507889, serialized key size = 4, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = MyData{name='foo'})
这意味着消息到达卡夫卡。我也在同一主机上的 Clickhouse 中有表:
CREATE TABLE IF NOT EXISTS schema_name.table (...) ENGINE = Kafka('localhost:9092', 'topic1', 'group1', 'JSONEachRow');
CREATE TABLE IF NOT EXISTS schema_name.table_view (...) ENGINE = MergeTree() ORDER BY datetime;
CREATE MATERIALIZED VIEW schema_name.consumer TO schema_name.table_view AS SELECT * FROM schema_name.table;
clickhouse 中的表格是空的。
解决方案
推荐阅读
- android - 尝试构建 github 项目时出错
- android - 在 Android 应用程序中重新启动后尝试打开文件夹时出现权限错误
- c# - 如何在局部视图 MVC.Core 中使用 @section 脚本
- mysql - 如何通过外键在角度数据表中的另一个表中的字段?
- r - 将迭代/非聚合函数应用于 R 中的多个数据子集
- firebase - 是否可以在子目录中使用通配符变量?
- python - 有没有一种简单的方法可以在 Python 或 PIL 中调整/扩展只有零的图像?
- r - 如何以分组格式获取输出并将相同格式导出到 csv 文件中?
- typescript - 我可以让 TypeScript 显示它为表达式确定的类型吗?
- php - 为什么在 Moodle 3.3 中出现 SQL 语法错误?