首页 > 解决方案 > JHipster 与 kafka 真实应用使用示例

问题描述

JHipster版本6.6.0中,Kafka 使用模型已从标准的生产者/消费者类更改为 WebResource 级别。没有真实的例子,这种变化的优势是什么,以及如何在实际应用程序中使用这种变化。

假设我们有Service AService B。这两个服务之间的通信必须通过 Kafka 事件来完成。

问题是 - 我必须这样做服务 B开始监听来自服务 A主题的事件。在当前配置中,看起来我必须手动触发/consumes端点,但这没有任何意义,因为我期望服务将在应用程序启动并运行后开始侦听指定的主题列表。

我将不胜感激对此主题的任何评论,以帮助我理解这一点。

示例:jhipster 7.1.0 生成此资源:

服务 A -网关

package com.stukans.refirmware.gateway.web.rest;

import com.stukans.refirmware.gateway.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

@RestController
@RequestMapping("/api/gateway-kafka")
public class GatewayKafkaResource {

    private final Logger log = LoggerFactory.getLogger(GatewayKafkaResource.class);

    private final KafkaProperties kafkaProperties;
    private KafkaSender<String, String> sender;

    public GatewayKafkaResource(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
    }

    @PostMapping("/publish/{topic}")
    public Mono<PublishResult> publish(
        @PathVariable String topic,
        @RequestParam String message,
        @RequestParam(required = false) String key
    ) {
        log.debug("REST request to send to Kafka topic {} with key {} the message : {}", topic, key, message);
        return Mono
            .just(SenderRecord.create(topic, null, null, key, message, null))
            .as(sender::send)
            .next()
            .map(SenderResult::recordMetadata)
            .map(
                metadata ->
                    new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp()))
            );
    }

    @GetMapping("/consume")
    public Flux<String> consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
        log.debug("REST request to consume records from Kafka topics {}", topics);
        Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
        consumerProps.putAll(consumerParams);
        consumerProps.remove("topic");

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps).subscription(topics);
        return KafkaReceiver.create(receiverOptions).receive().map(ConsumerRecord::value);
    }

    private static class PublishResult {

        public final String topic;
        public final int partition;
        public final long offset;
        public final Instant timestamp;

        private PublishResult(String topic, int partition, long offset, Instant timestamp) {
            this.topic = topic;
            this.partition = partition;
            this.offset = offset;
            this.timestamp = timestamp;
        }
    }
}

服务 B -代理

package com.stukans.refirmware.agent.web.rest;

import com.stukans.refirmware.agent.config.KafkaProperties;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

@RestController
@RequestMapping("/api/agent-kafka")
public class AgentKafkaResource {

    private final Logger log = LoggerFactory.getLogger(AgentKafkaResource.class);

    private final KafkaProperties kafkaProperties;
    private KafkaSender<String, String> sender;

    public AgentKafkaResource(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
    }

    @PostMapping("/publish/{topic}")
    public Mono<PublishResult> publish(
        @PathVariable String topic,
        @RequestParam String message,
        @RequestParam(required = false) String key
    ) {
        log.debug("REST request to send to Kafka topic {} with key {} the message : {}", topic, key, message);
        return Mono
            .just(SenderRecord.create(topic, null, null, key, message, null))
            .as(sender::send)
            .next()
            .map(SenderResult::recordMetadata)
            .map(
                metadata ->
                    new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp()))
            );
    }

    @GetMapping("/consume")
    public Flux<String> consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
        log.debug("REST request to consume records from Kafka topics {}", topics);
        Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
        consumerProps.putAll(consumerParams);
        consumerProps.remove("topic");

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps).subscription(topics);
        return KafkaReceiver.create(receiverOptions).receive().map(ConsumerRecord::value);
    }

    private static class PublishResult {

        public final String topic;
        public final int partition;
        public final long offset;
        public final Instant timestamp;

        private PublishResult(String topic, int partition, long offset, Instant timestamp) {
            this.topic = topic;
            this.partition = partition;
            this.offset = offset;
            this.timestamp = timestamp;
        }
    }
}

这是唯一可用的与 Kafka 相关的代码。

在 6.6.0 版本之前,JHipster生成标准的生产者/消费者类,我可以使用它们来定义要收听的主题。现在还不清楚如何使用生成的代码来发出/监听事件。

标签: apache-kafkajhipsterreactor-kafka

解决方案


我提供这样的服务,自动触发消费者:

@Service
public class KafkaConsumerService {

    private final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);

    private static final String GROUP_USER_CREATE_TOPIC = "GROUP_STORE.GROUP_USER.SAVE"; //<application name>.<dataset name>.<event>

    private final KafkaProperties kafkaProperties;

    private KafkaReceiver<String, String> kafkaReceiver;

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final GroupMemberService groupMemberService;

    public KafkaConsumerService(KafkaProperties kafkaProperties, GroupMemberService groupMemberService) {
        this.kafkaProperties = kafkaProperties;
        this.groupMemberService = groupMemberService;
    }

    @PostConstruct
    public void start() {
        log.info("Kafka consumer starting...");
        Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
            .subscription(Collections.singletonList(GROUP_USER_CREATE_TOPIC));
        this.kafkaReceiver = KafkaReceiver.create(receiverOptions);
        consumeGroupMember().subscribe();
    }

    public Flux<GroupMemberDTO> consumeGroupMember() {
        log.debug("consumer group member....");
        return this.kafkaReceiver
            .receive()
            .map(ConsumerRecord::value)
            .flatMap(
                record -> {
                    try {
                        GroupMemberDTO groupMemberDTO = objectMapper.readValue(record, GroupMemberDTO.class);
                        log.debug("Complete convert object: {}", groupMemberDTO);
                        return groupMemberService.insert(groupMemberDTO);
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                }
            );
    }

    public void shutdown() {
        log.info("Shutdown kafka consumer");
    }
}

推荐阅读