java - 使用同步请求/回复配置扩展 KAFKA 以管理更高的 TPS
问题描述
我是 KAFKA 的新手,我正在开发 POC 以将我们当前的 ESB 迁移到微服务中。我们当前的 ESB 使用 SOAP 服务工作,因此我需要继续使用请求/回复范例。
POC 包含一个 Spring Boot 微服务。应用程序的一个实例是使用以下代码的生产者:
@Endpoint
public class AcctInfoSoapServiceController {
private static final Logger LOGGER = LoggerFactory.getLogger(AcctInfoSoapServiceController.class);
@Autowired
KafkaAsyncService kafkaAsyncService;
@PayloadRoot(namespace = "http://firstbankpr.com/feis/common/model", localPart = "GetAccountInfoRequest")
@ResponsePayload
public GetAccountInfoResponse getModelResponse(@RequestPayload GetAccountInfoRequest accountInfo) throws Exception {
long start = System.currentTimeMillis();
LOGGER.info("Returning request for account " + accountInfo.getInGetAccountInfo().getAccountNumber() );
AccountInquiryDto modelResponse = kafkaAsyncService.getModelResponse(accountInfo.getInGetAccountInfo());
GetAccountInfoResponse response = ObjectFactory.getGetAccountInfoResponse(modelResponse);
long elapsedTime = System.currentTimeMillis() - start;
LOGGER.info("Returning request in " + elapsedTime + " ms for account = " + accountInfo.getInGetAccountInfo().getAccountNumber() + " " + response );
return response;
}
}
public AccountInquiryDto getModelResponse(InGetAccountInfo accountInfo) throws Exception{
LOGGER.info("Received request for request for account " + accountInfo);
// create producer record
ProducerRecord<String, InGetAccountInfo> record = new ProducerRecord<String, InGetAccountInfo>(requestTopic,accountInfo);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post in kafka topic
RequestReplyFuture<String, InGetAccountInfo, AccountInquiryDto> sendAndReceive = kafkaTemplate.sendAndReceive(record);
// confirm if producer produced successfully
SendResult<String, InGetAccountInfo> sendResult = sendAndReceive.getSendFuture().get();
// //print all headers
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
// get consumer record
ConsumerRecord<String, AccountInquiryDto> consumerRecord = sendAndReceive.get();
ObjectMapper mapper = new ObjectMapper();
AccountInquiryDto modelResponse = mapper.convertValue(
consumerRecord.value(),
new TypeReference<AccountInquiryDto>() { });
LOGGER.info("Returning record for " + modelResponse);
return modelResponse;
}
以下是生产者的配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + UUID.randomUUID().toString());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "-" + UUID.randomUUID().toString());
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId + "-" + UUID.randomUUID().toString());
props.put(ProducerConfig.RETRIES_CONFIG,"2");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ReplyingKafkaTemplate<String, InGetAccountInfo, AccountInquiryDto> replyKafkaTemplate(ProducerFactory<String, InGetAccountInfo> pf, KafkaMessageListenerContainer<String, AccountInquiryDto> container){
return new ReplyingKafkaTemplate(pf, container);
}
@Bean
public ProducerFactory<String, InGetAccountInfo> requestProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, AccountInquiryDto> replyConsumerFactory() {
JsonDeserializer<AccountInquiryDto> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages(InGetAccountInfo.class.getPackage().getName());
jsonDeserializer.addTrustedPackages(AccountInquiryDto.class.getPackage().getName());
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),jsonDeserializer);
}
@Bean
public KafkaMessageListenerContainer<String, AccountInquiryDto> replyContainer(ConsumerFactory<String, AccountInquiryDto> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public KafkaAsyncService kafkaAsyncService(){
return new KafkaAsyncService();
}
我有一个使用以下代码作为 KAFKA 消费者工作的 Spring Boot 应用程序实例:
@KafkaListener(topics = "${kafka.topic.acct-info.request}", containerFactory = "requestReplyListenerContainerFactory")
@SendTo
public Message<?> listenPartition0(InGetAccountInfo accountInfo,
@Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int id) {
try {
LOGGER.info("Received request for partition id = " + id);
LOGGER.info("Received request for accountInfo = " + accountInfo.getAccountNumber());
AccountInquiryDto accountInfoDto = getAccountInquiryDto(accountInfo);
LOGGER.info("Returning accountInfoDto = " + accountInfoDto.toString());
return MessageBuilder.withPayload(accountInfoDto)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.RECEIVED_PARTITION_ID, id)
.build();
} catch (Exception e) {
LOGGER.error(e.toString(),e);
}
return null;
}
以下是消费者的配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + UUID.randomUUID().toString());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "-" + UUID.randomUUID().toString());
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId + "-" + UUID.randomUUID().toString());
props.put(ProducerConfig.RETRIES_CONFIG,"2");
return props;
}
@Bean
public ConsumerFactory<String, InGetAccountInfo> requestConsumerFactory() {
JsonDeserializer<InGetAccountInfo> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages(InGetAccountInfo.class.getPackage().getName());
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),jsonDeserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, InGetAccountInfo>> requestReplyListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, InGetAccountInfo> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(requestConsumerFactory());
factory.setConcurrency(3);
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory<String, AccountInquiryDto> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, AccountInquiryDto> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
@Bean
public DepAcctInqConsumerController Controller() {
return new DepAcctInqConsumerController();
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", replyTimeout.toString());
return new NewTopic(requestTopic, 2, (short) 2).configs(configs);
}
KAFKA 使用 5 个分区的一个主题。当我使用 SoapUI 运行负载测试时,我使用 20 个线程获得了大约 19 TPS。即使我使用 20 个线程将分区数增加到 10,TPS 也保持不变,即使分区数更高(最多 10 个),将线程数增加到 40 也不会增加 TPS。同样将消费者和生产者的实例数增加到 2 并配置负载均衡器来分配负载不会改变 TPS(保持在 20 左右)。在这种情况下,KAFKA 似乎是瓶颈。
我能够增加 TPS 的唯一方法是为消费者/生产者对的每个实例分配一个不同的主题。使用负载均衡器时,TPS 增加到 38 TPS,大约是我使用消费者/生产者对的一个实例获得的两倍。监控运行 Spring Boot 应用程序的服务器并不能提供任何有意义的信息,因为 CPU 负载和内存仍然非常低。KAFKA 服务器的使用率也很低,CPU 利用率约为 20%。目前 kafka 只使用一个代理。
我正在寻找有关如何配置 KAFKA 的建议,以便我可以使用相同的主题增加 Spring Boot 应用程序的实例数量,这将使我能够随着消费者/生产者的每个新实例的启动而增加 TPS。我知道,对于消费者/生产者的每个新实例,我可能需要增加主题上的分区数。最终目标是将这些作为 PODS 在 openshift 集群中运行,在该集群中,应用程序应该能够在流量增加时自动增长。
解决方案
推荐阅读
- rust - 使用类似“复制”的语义在可变引用上创建包装器
- c# - 如何使用 CsvHelper 正确映射集合
- django - 如何在 Django ModelForm 中测试自定义的清理方法
- javascript - 该站点似乎使用了滚动链接的定位效果。这可能不适用于异步平移
- javascript - Jquery在追加和上传后隐藏和删除图像
- javascript - 在 Mongo DB 中以 ISO 字符串格式获取不到一周的数据
- html - 如何使用-webkit-box在省略号后添加链接?
- ios - SwiftUI:从模态框内转到新视图
- powershell - 如何在powershell中获取没有实例名称的总CPU使用百分比
- docker - Docker 为 webpacker_dev_server 加载了错误的端口