spring-cloud-stream - Spring Cloud Stream 函数路由器输出尝试绑定到 Kafka 主题
问题描述
我正在尝试迁移到 Spring Cloud Stream 的新函数式编程模型,替换像这样的条件 StreamListener 注释
@StreamListener("app-input", condition = "headers['eventName']=='Funded'")
有类似的东西
@Bean
fun router() = MessageRoutingCallback {
when (it.headers["eventName"]) {
"Funded" -> "funded"
else -> "ignored"
}
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
...
}
@Bean
fun ignored() = Consumer { message: Message<*> ->
...
}
具有将频道链接到主题的相关属性
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic
我需要这种间接级别,因为有多种 Avro 消息类型都到达 MyTopic 需要反序列化并以不同方式路由
这一切都很顺利,我可以按预期使用和路由消息。然而,以这种方式使用 functionRouter 有一个意想不到的副作用,即当没有可用的主题时,它也会尝试将 functionRouter-out-0 绑定到 Kafka,因此应用程序每 30 秒尝试附加到正如您所料,代理上的一个名为“functionRouter-out-0”的主题因授权错误而失败。
2021-05-06 12:57:55.654 WARN [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN [screening] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]
所以问题是a)我怎样才能停止试图绑定到Kafka的functionRouter-out-0通道,或者b)我怎样才能在不需要中间通道的情况下实现这一点?
Spring Cloud Stream 事件路由功能自动创建新主题类似,但从未收到答案。
解决方案
我相信这是一个错误。如果您想关注它,我打开了一个问题:
https://github.com/spring-cloud/spring-cloud-stream/issues/2168
作为一种解决方法,只需将其指向同一个目的地
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839
由于代表是所有Consumer
我们永远不会真正发送任何东西。
推荐阅读
- android - 更改 jitsi-sdk for android 中的默认摄像头
- api - 使用 Here API 的超时请求
- c# - IEquatable 文档令人困惑
- oracle - Oracle APEX 导出 PDF 文件名
- latex - 胶乳在对齐内使用 bmatrix
- oauth - 无法验证 Moodle 发送的 OAuth1 签名
- c++ - 如何定义类类型转换为函数指针?
- python - 将数据框中的列从字符串类型转换为元组
- python-3.x - 构建 docker 的“python:3.5-alpine”映像时“构建正则表达式失败”
- optaplanner - Optaplanner 中的免费期间罚款