首页 > 解决方案 > 使用角度的弹簧卡夫卡前端

问题描述

我有一个消费者使用我的 kafka 集群中的 spring 来消费数据。现在我想使用 Angular 创建 UI。我已经写完了,但是我在后端(java)中得到了这个错误

在此处输入图像描述

我知道 kafka 不适用于多线程,当我尝试在浏览器(前端)中调用 UI 时会发生此错误

在此处输入图像描述

这是我的service.ts代码,它与弹簧连接起来

  private usersUrl: string;

  constructor(private http: HttpClient) {
  this.usersUrl = 'http://localhost:8080/receiptsData';
 }

  public findAll(): Observable<ReceiptData[]> {
  return this.http.get<ReceiptData[]>(this.usersUrl, { responseType: 'json' });
 }

在我的后端,我还将它添加到我的消费者以避免访问块

@CrossOrigin(origins = "http://localhost:4200")

如果我不能从另一个来源(角度,浏览器)对我的 kafka 进行多线程访问,我可以访问数据的其他可能性是什么?

编辑:

我在我的 java 中使用一个线程,因为我不想使用while(true) 这是我的后端控制器Java 代码:

public class ExampleKafkaConsumerController extends 
ShutdownableThread {

private final KafkaConsumer<Integer, String> consumer;
private static final String RECEIVED_MESSAGE = "Received message: (";
private static final long DURATION = 5000;

public ExampleKafkaConsumerController() {

  super(DataUtils.GROUP_ID, false);
  ExampleKafkaConsumerConfig exKafkaConsumerConfig = new 
  ExampleKafkaConsumerConfig();
  consumer = exKafkaConsumerConfig.kafkaConfiguration();
}

@GetMapping("/receiptsData")
@Override
public void doWork() {
  
  consumer.subscribe(Collections.singletonList(DataUtils.TOPIC_NAME));
    ConsumerRecords<Integer, String> records = 
      consumer.poll(Duration.ofMillis(DURATION));
    System.out.println(records.count());
    for (ConsumerRecord<Integer, String> record : records) {
      System.out.println(RECEIVED_MESSAGE + record.value());
      KafkaJsonConverter kafkaJsonConverter = new 
        KafkaJsonConverter(record.value());
      System.out.println(kafkaJsonConverter.
        convertStringToJsonObject().toString());
    }
  }
}

这是我的主要课程:

@SpringBootApplication
public class ExampleConsumerRunner implements CommandLineRunner {

  public static void main(String[] args) {
   SpringApplication.run(ExampleConsumerRunner.class, args);
 }

 @Autowired
 ExampleKafkaConsumerController exKafkaConsumerController;

 @Override
 public void run(String... args) {
   try {
     exKafkaConsumerController.start();
  } catch (Exception ex) {
     System.out.println(ex.getMessage());
  }
}

标签: javaangularspringapache-kafka

解决方案


根据示例,您正在尝试通过 GET 请求从 Kafka 主题接收最新条目。您显示的错误是因为您试图在控制器的 doWork() 方法中使用单个使用者实例。要解决此特定问题,您应该为每个请求(在 doWork() 中)拥有自己的消费者实例,但我认为您可能想要重新审视解决问题的基本方法,而不是这样做。

原因是,将端点直接连接到主题订阅并不是很典型 - 因为您可能不希望每个请求单独连接到 Kafka(尤其是具有相同的 groupId),这种方法有很多缺点,并且确实不会导致高性能或确定性 API。

我建议您在以下情况下进行设计:a)服务器端数据存储(单例、内存数据库或持久数据库,具体取决于您的需要)b)服务器端消费者订阅主题并将记录写入数据库。Spring @KafkaListener 很容易做到这一点。c) REST 控制器读取在步骤 a) 中完成的数据存储

使用这种设计,您可以为主题设置一个侦听器,将 API 与侦听器分离,并允许您对 API 实施例如查询过滤器(最新的,或自日期或偏移量以来的条目等)。


推荐阅读