java - Wiered Spring kafka 消费者问题。具有相同配置的两个消费者具有不同的行为
问题描述
目前消费者正在按预期工作,我们不确定为什么,因为没有更改配置。一旦我们有更多信息,将更新问题
我们正在部署一个新的 Kafka 管道来处理数据。我们有两个消费者在听同一个话题。一位消费者正在按预期工作,而另一位则没有。
期望只有新消息应该由两个侦听器处理。侦听器 1 一次又一次地重新处理所有消息(未预期),而侦听器 2 仅处理新消息(预期)。我们不完全确定为什么会发生这种情况,因为两个 Listner 的配置相似,并且他们正在收听相同的主题。
Listner 1 的平均处理时间 = 4-5 秒。
Listner 2 的平均处理时间 = 3-4 秒。
卡夫卡消费者工厂。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,Object> consumerFactory(){
Map<String,Object> configs=new HashMap<>();
String bootstrap=System.getenv().getOrDefault("BOOTSTRAP_SERVERS_CONFIG","10.99.2.135:9093,10.99.2.136:9093,10.99.2.134:9093");
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "20000");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//configs.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "update");
return new DefaultKafkaConsumerFactory<>(configs,new StringDeserializer(),new JsonDeserializer<>(Object.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(5);
factory.getContainerProperties().setPollTimeout(10000);
return factory;
}
@Bean
public ProducerFactory<String, String> producerConfigs() {
Map<String, Object> props = new HashMap<>();
String bootstrap=System.getenv().getOrDefault("BOOTSTRAP_SERVERS_CONFIG","10.99.2.135:9093,10.99.2.136:9093,10.99.2.134:9093");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerConfigs());
}
}
侦听器 1,在一段时间后重新处理所有消息。
@KafkaListener(groupId="updateCache",topics="omp-orderBuffApi",containerFactory="kafkaListenerContainerFactory")
public void consume(Object object){
long time = System.currentTimeMillis();
logger.info("Start Time of consumer for Details Cache: {}" ,time);
ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord<String, Object>) object;
LinkedHashMap<String, Object> map=(LinkedHashMap<String, Object>) consumerRecord.value();
logger.info("Payload for Details Cache : {}" ,map.get("payload"));
Map<String,Object> payload=(Map<String, Object>) map.get("payload");
String order = (String) payload.get("ORDERITEMNUMBER");
logger.info("OrderItemNumber : {}",order);
long atime=System.currentTimeMillis();
logger.info("Start Time of api - Details Cache: {}" ,atime);
solrController.updateOrderDetailsCache1(payload);
float atimex = (System.currentTimeMillis() - atime)/1000F;
logger.info("End Time of api - Details Cache: {}" ,atimex);
float timex = (System.currentTimeMillis() - time)/1000F;
logger.info("End Time of consumer for Details Cache: {}" ,timex);
logger.info("***************************************************");
countDownLatch0.countDown();
}
侦听器 2,按预期工作,不再重新处理任何消息。
@KafkaListener(groupId = "updateSolr", topics = "omp-orderBuffApi", containerFactory = "kafkaListenerContainerFactory")
public void consumerForSolr(Object object) {
long time = System.currentTimeMillis();
logger.info("Start Time of consumer for Sorl Details Cache: {}", time);
ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord<String, Object>) object;
LinkedHashMap<String, Object> map = (LinkedHashMap<String, Object>) consumerRecord.value();
Map<String, Object> payload = (Map<String, Object>) map.get("payload");
logger.info("payload for Solr Details : {} ", payload);
String order = (String) payload.get("ORDERITEMNUMBER");
logger.info("OrderItemNumber Sorl Details Cache: {} ", order);
long atime = System.currentTimeMillis();
logger.info("Start Time of api - Solr Details Cache: {}", atime);
try {
solrController.updateSolrDate(payload);
} catch (SolrServerException | IOException | ParseException e) {
e.printStackTrace();
}
float atimex = (System.currentTimeMillis() - atime) / 1000F;
logger.info("End Time of api - Solr Details Cache: {}", atimex);
float timex = (System.currentTimeMillis() - time) / 1000F;
logger.info("end Time of consumer - Solr Details Cache: {}", timex);
logger.info("***************************************************");
countDownLatch3.countDown();
}
解决方案
推荐阅读
- algorithm - 如果必须满足所有先决条件,则最大化 K 课程的值总和
- python - AttributeError:“str”对象在 pandas read_csv 中没有属性“decode”
- .htaccess - 将子域重写到现有文件夹
- arrays - 将所有 Outlook 文件夹添加到数组中
- python - 打印 Powerset 时重复
- java - 导入 org.openqa.selenium.chrome 无法解析
- android - 如何安装具有所有权限并使用 FileProvider 的新 apk?
- php - 如何访问我的 Web Api 在 PHP 中返回的 json?
- python - HTTPError:HTTP 错误 403:禁止。wget 与 python
- java - 应用程序未启动,但可以正确编译