java - Kafka Consumer 对多线程访问不安全异常
问题描述
当我使用 kafka 创建消费者时,出现以下异常:
例外:
Kafka Consumer 对多线程访问不安全异常
100多个消费者同时订阅主题,同时获取记录。
我添加了以下代码
//KafkaConsumer Configuration
KafkaConsumer<String, Object> kafkaConsumer;
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put("group.id", GROUP_ID);
properties.put("enable.auto.commit", "false");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", JsonDeserializer.class);
properties.put("value.deserializer", JsonDeserializer.class);
kafkaConsumer = new KafkaConsumer<>(properties);
//Subscribe Consumer
kafkaConsumer.subscribe(Arrays.asList("Topic_Name"));
//Get Records From Beginning
while (true) {
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
record.key(), record.value());
}
}
//Get Records Using WebSocket this section continuously
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
record.key(), record.value());
}
解决方案
推荐阅读
- java - 如何在 Android 表单中进行过滤后的类似选择选项的下拉菜单?
- linux - ext4 中的项目配额
- python - 如何正确 numpy.nanmean 和 3D 数组上的轴关键字?
- haskell - 并行数字积分器功能比顺序版本慢。为什么?
- mysql - 如何在 Phpmyadmin 中使用 PopSQL
- git - Git:错误:无法读取 MacOSX.sdk 的 SDK 设置
- hangouts-chat - 如何通过环聊 API 删除一个环聊/聊天室中的所有消息?
- macos - 错误:0:10:未声明的标识符“texture2D”的无效调用
- javascript - 为什么我的状态不能使用 Redux 正确改变?
- ios - Ionic 应用程序无法在 iphone 7 plus 中打开显示白屏,但它在 iphone 6 中工作?