apache-kafka - JHipster 与 kafka 真实应用使用示例
问题描述
在JHipster版本6.6.0中,Kafka 使用模型已从标准的生产者/消费者类更改为 WebResource 级别。没有真实的例子,这种变化的优势是什么,以及如何在实际应用程序中使用这种变化。
假设我们有Service A和Service B。这两个服务之间的通信必须通过 Kafka 事件来完成。
问题是 - 我必须这样做服务 B开始监听来自服务 A主题的事件。在当前配置中,看起来我必须手动触发/consumes
端点,但这没有任何意义,因为我期望服务将在应用程序启动并运行后开始侦听指定的主题列表。
我将不胜感激对此主题的任何评论,以帮助我理解这一点。
示例:jhipster 7.1.0 生成此资源:
服务 A -网关
package com.stukans.refirmware.gateway.web.rest;
import com.stukans.refirmware.gateway.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@RestController
@RequestMapping("/api/gateway-kafka")
public class GatewayKafkaResource {
private final Logger log = LoggerFactory.getLogger(GatewayKafkaResource.class);
private final KafkaProperties kafkaProperties;
private KafkaSender<String, String> sender;
public GatewayKafkaResource(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
}
@PostMapping("/publish/{topic}")
public Mono<PublishResult> publish(
@PathVariable String topic,
@RequestParam String message,
@RequestParam(required = false) String key
) {
log.debug("REST request to send to Kafka topic {} with key {} the message : {}", topic, key, message);
return Mono
.just(SenderRecord.create(topic, null, null, key, message, null))
.as(sender::send)
.next()
.map(SenderResult::recordMetadata)
.map(
metadata ->
new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp()))
);
}
@GetMapping("/consume")
public Flux<String> consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
log.debug("REST request to consume records from Kafka topics {}", topics);
Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
consumerProps.putAll(consumerParams);
consumerProps.remove("topic");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps).subscription(topics);
return KafkaReceiver.create(receiverOptions).receive().map(ConsumerRecord::value);
}
private static class PublishResult {
public final String topic;
public final int partition;
public final long offset;
public final Instant timestamp;
private PublishResult(String topic, int partition, long offset, Instant timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
}
}
}
服务 B -代理
package com.stukans.refirmware.agent.web.rest;
import com.stukans.refirmware.agent.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@RestController
@RequestMapping("/api/agent-kafka")
public class AgentKafkaResource {
private final Logger log = LoggerFactory.getLogger(AgentKafkaResource.class);
private final KafkaProperties kafkaProperties;
private KafkaSender<String, String> sender;
public AgentKafkaResource(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
}
@PostMapping("/publish/{topic}")
public Mono<PublishResult> publish(
@PathVariable String topic,
@RequestParam String message,
@RequestParam(required = false) String key
) {
log.debug("REST request to send to Kafka topic {} with key {} the message : {}", topic, key, message);
return Mono
.just(SenderRecord.create(topic, null, null, key, message, null))
.as(sender::send)
.next()
.map(SenderResult::recordMetadata)
.map(
metadata ->
new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp()))
);
}
@GetMapping("/consume")
public Flux<String> consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
log.debug("REST request to consume records from Kafka topics {}", topics);
Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
consumerProps.putAll(consumerParams);
consumerProps.remove("topic");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps).subscription(topics);
return KafkaReceiver.create(receiverOptions).receive().map(ConsumerRecord::value);
}
private static class PublishResult {
public final String topic;
public final int partition;
public final long offset;
public final Instant timestamp;
private PublishResult(String topic, int partition, long offset, Instant timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
}
}
}
这是唯一可用的与 Kafka 相关的代码。
在 6.6.0 版本之前,JHipster生成标准的生产者/消费者类,我可以使用它们来定义要收听的主题。现在还不清楚如何使用生成的代码来发出/监听事件。
解决方案
我提供这样的服务,自动触发消费者:
@Service
public class KafkaConsumerService {
private final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private static final String GROUP_USER_CREATE_TOPIC = "GROUP_STORE.GROUP_USER.SAVE"; //<application name>.<dataset name>.<event>
private final KafkaProperties kafkaProperties;
private KafkaReceiver<String, String> kafkaReceiver;
private final ObjectMapper objectMapper = new ObjectMapper();
private final GroupMemberService groupMemberService;
public KafkaConsumerService(KafkaProperties kafkaProperties, GroupMemberService groupMemberService) {
this.kafkaProperties = kafkaProperties;
this.groupMemberService = groupMemberService;
}
@PostConstruct
public void start() {
log.info("Kafka consumer starting...");
Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singletonList(GROUP_USER_CREATE_TOPIC));
this.kafkaReceiver = KafkaReceiver.create(receiverOptions);
consumeGroupMember().subscribe();
}
public Flux<GroupMemberDTO> consumeGroupMember() {
log.debug("consumer group member....");
return this.kafkaReceiver
.receive()
.map(ConsumerRecord::value)
.flatMap(
record -> {
try {
GroupMemberDTO groupMemberDTO = objectMapper.readValue(record, GroupMemberDTO.class);
log.debug("Complete convert object: {}", groupMemberDTO);
return groupMemberService.insert(groupMemberDTO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
);
}
public void shutdown() {
log.info("Shutdown kafka consumer");
}
}
推荐阅读
- angular - TurboTable延迟加载具有初始值的默认过滤器?
- css - 如何在引导程序中更改容器的固定宽度?
- php - 如何使用正则表达式自动完成随机 URL?
- python - 在 TensorFlow 中,如何断言列表的值在某个集合中?
- python - 在每个列表的末尾,我得到 '\n'
- r - 如何在R中制作陷波滤波器
- python - 在 Python 中查询日期时间对象并与当前日期进行比较
- java - BaseExpandableListAdapter 的 getChildrenCount() 上出现 NullPointerException
- c# - 单击“再次播放”按钮后,我的对象不再被识别
- c# - 无法将文件附加为数据库