spring - 目前是否可以在 spring-cloud-streams 中将 pulsar 换成 Kafka
问题描述
我一直在尝试进行一些搜索(Google、Slack、Stack),但还没有找到答案。我们有一些使用Spring Cloud Streams编写的应用程序,并且有兴趣将后端从 Kafka 交换到 Pulsar。Spring 目前没有对 Kafka 的任何原生支持,但是 pulsar 似乎提供了使用 Kafka API 直接与 pulsar 通信的能力(https://pulsar.apache.org/docs/en/adaptors-kafka)。
我想知道是否有人已经尝试Kafka-clients
在 Spring 云消息传递的上下文中使用这个替代库来替代库。
当然,另一种有效的方法是重新编写代码——但我想求助于社区,看看是否有人已经走上了这条路。
谢谢
解决方案
虽然不完全符合您的要求,但我尝试使用阴影库 org.apache.pulsar:pulsar-client-kafka:2.5.0 将 Pulsar 与 Spring-Kafka 集成,并且它与此堆栈跟踪有关:
Caused by: java.lang.UnsupportedOperationException: null
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:650) ~[pulsar-client-kafka-2.5.0.jar:2.5.0]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:312) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:292) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:311) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:255) ~[spring-kafka-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.10.RELEASE.jar:5.1.10.RELEASE]
可以使用 AbstractMessageListenerContainer.checkTopics 的自定义实现绕过此特定错误(对于这些特定库版本),但您可能会遇到更多问题。
推荐阅读
- azure - 什么是 ADF Mapping 数据流表达语言?
- javascript - 我需要测试一组累积奖金结果。我需要一个函数,如果所有结果都相同则返回 true,如果它们不同则返回 false
- php - Symfony 4 更改默认表单名称
- php - Laravel 6 - 显示验证消息的新方法
- c# - RegisterWaitForSingleObject vs System.Timers
- blockchain - 如何将英特尔 SGX 平台服务与 Hyperledger Sawtooth 结合使用?
- django - 列表字典输出不是我所期望的
- javascript - 更改应用状态后清除存储数据
- json - 转换 json 列表时无法使用 xmltodict.unparse 将 json 解析为 xml
- ios - 如何用 3 个带有标题的子项扩展 tableview