java - 使用角度的弹簧卡夫卡前端
问题描述
我有一个消费者使用我的 kafka 集群中的 spring 来消费数据。现在我想使用 Angular 创建 UI。我已经写完了,但是我在后端(java)中得到了这个错误
我知道 kafka 不适用于多线程,当我尝试在浏览器(前端)中调用 UI 时会发生此错误
这是我的service.ts代码,它与弹簧连接起来
private usersUrl: string;
constructor(private http: HttpClient) {
this.usersUrl = 'http://localhost:8080/receiptsData';
}
public findAll(): Observable<ReceiptData[]> {
return this.http.get<ReceiptData[]>(this.usersUrl, { responseType: 'json' });
}
在我的后端,我还将它添加到我的消费者以避免访问块
@CrossOrigin(origins = "http://localhost:4200")
如果我不能从另一个来源(角度,浏览器)对我的 kafka 进行多线程访问,我可以访问数据的其他可能性是什么?
- 对不起我的英语,我尽力解释
编辑:
我在我的 java 中使用一个线程,因为我不想使用while(true)
这是我的后端控制器Java 代码:
public class ExampleKafkaConsumerController extends
ShutdownableThread {
private final KafkaConsumer<Integer, String> consumer;
private static final String RECEIVED_MESSAGE = "Received message: (";
private static final long DURATION = 5000;
public ExampleKafkaConsumerController() {
super(DataUtils.GROUP_ID, false);
ExampleKafkaConsumerConfig exKafkaConsumerConfig = new
ExampleKafkaConsumerConfig();
consumer = exKafkaConsumerConfig.kafkaConfiguration();
}
@GetMapping("/receiptsData")
@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(DataUtils.TOPIC_NAME));
ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofMillis(DURATION));
System.out.println(records.count());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(RECEIVED_MESSAGE + record.value());
KafkaJsonConverter kafkaJsonConverter = new
KafkaJsonConverter(record.value());
System.out.println(kafkaJsonConverter.
convertStringToJsonObject().toString());
}
}
}
这是我的主要课程:
@SpringBootApplication
public class ExampleConsumerRunner implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ExampleConsumerRunner.class, args);
}
@Autowired
ExampleKafkaConsumerController exKafkaConsumerController;
@Override
public void run(String... args) {
try {
exKafkaConsumerController.start();
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
解决方案
根据示例,您正在尝试通过 GET 请求从 Kafka 主题接收最新条目。您显示的错误是因为您试图在控制器的 doWork() 方法中使用单个使用者实例。要解决此特定问题,您应该为每个请求(在 doWork() 中)拥有自己的消费者实例,但我认为您可能想要重新审视解决问题的基本方法,而不是这样做。
原因是,将端点直接连接到主题订阅并不是很典型 - 因为您可能不希望每个请求单独连接到 Kafka(尤其是具有相同的 groupId),这种方法有很多缺点,并且确实不会导致高性能或确定性 API。
我建议您在以下情况下进行设计:a)服务器端数据存储(单例、内存数据库或持久数据库,具体取决于您的需要)b)服务器端消费者订阅主题并将记录写入数据库。Spring @KafkaListener 很容易做到这一点。c) REST 控制器读取在步骤 a) 中完成的数据存储
使用这种设计,您可以为主题设置一个侦听器,将 API 与侦听器分离,并允许您对 API 实施例如查询过滤器(最新的,或自日期或偏移量以来的条目等)。
推荐阅读
- oracle - Oracle UNIQUE 约束允许在不同情况下使用相同的值
- c# - 无法在“FooEntity”上配置键,因为它是派生类型。必须在根类型“Foo”上配置密钥
- javascript - 在精确部分显示 div
- r - 在ggplotly折线图中填充背景间隔
- sql-server - “无法继续执行,因为会话处于终止状态”错误?
- xaml - XAML UWP ContentDialog 自定义标题 HorizontalAlignment = Stretch?
- javascript - 如何连接数组对象中的两个键?JS
- redirect - 将 CloudFront 放在 AWS Amplify 前面时如何避免 301 重定向循环?
- c++ - 什么是无序动态初始化、部分有序动态初始化和有序动态初始化
- html - html在文本输入上展开元素/表单