spring-boot - 使用 Spring Cloud Stream Kafka Binder + Kafka Streams Binder 的 Spring Boot 应用程序不起作用 - 生产者不发送消息
问题描述
我的带有 SCS Hoshram.SR6 的 Spring Boot 2.3.1 应用程序正在使用 Kafka Streams Binder。我需要添加一个将在应用程序的另一部分中使用的 Kafka Producer,因此我添加了 kafka binder。问题是生产者不工作,抛出异常:
19:49:40.082 [scheduling-1] [900cdeb11106e199] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:332)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:79)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:222)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:336)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:368)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:342)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:319)
这是我的配置:
spring:
cloud:
function:
definition: myProducer
stream:
bindings:
myKStream-in-0:
destination: my-kstream-topic
producer:
useNativeEncoding: true
myProducer-out-0:
destination: producer-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: true
retries: 0x7fffffff
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
request:
timeout:
ms: 5000
streams:
binder:
brokers: ${kafka.brokers:localhost}
configuration:
application:
id: ${spring.application.name}
server: ${POD_IP:localhost}:${local.server.port:8080}
schema:
registry:
url: ${schema-registry.url}
key:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
processing:
guarantee: exactly_once
replication:
factor: 3
group:
id: kpi
deserialization-exception-handler: logandcontinue
min-partition-count: 3
replication-factor: 3
state-store-retry:
max-attempts: 20
backoff-period: 1500
这里可能是什么问题?
更新
我调整了配置如下:
spring:
cloud:
function:
definition: myProducer
stream:
function:
definition: myKStream
现在我看不到任何异常,但消息没有进入主题。
在另一个仅使用 kafka binder 的应用程序中,它可以完美运行:
@Configuration
class KafkaProducerConfiguration {
@Bean
fun myProducerProcessor(): EmitterProcessor<Message<XXX>> {
return EmitterProcessor.create()
}
@Bean
fun myProducer(): Supplier<Flux<Message<XXX>>> {
return Supplier { myProducerProcessor() }
}
}
...
@Component
class XXXProducer(@Qualifier("myProducerProcessor") private val myProducerProcessor: EmitterProcessor<Message<XXX>>) {
fun send(...): Mono<Void> {
return Mono.defer {
myProducerProcessor.onNext(message)
Mono.empty()
}
}
更新 2
我设置了 logging.level.org.springframework.cloud.stream: debug
在日志中显示以下跟踪:
o.s.c.s.binder.DefaultBinderFactory - Creating binder: kstream
但是,没有任何关于 a 的内容Creating binder: kafka
。
解决方案
我错过了 kafka 和 kstreams 的多个活页夹配置(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring- cloud-stream-binder-kafka.html#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder)
因此,我必须设置:spring.cloud.stream.function.definition=myProducer;myKStream
推荐阅读
- html - 引导多个折叠项目
- php - 无法在codeigniter中使用ajax将数据保存在数据库中
- javascript - 向 API 发送数据时,为什么在 http 请求上收到错误 400?
- python - 我已经安装了 ffmpge,但得到以下 DownloadError: ERROR: ffprobe/avprobe and ffmpeg/avconv not found。请安装一个
- javascript - firebase 云消息传递在单击“允许”并手动添加服务工作人员后给出错误的 HTTP 响应代码 (404) 不起作用
- aframe - 如何在网页上嵌入 3D 模型?
- django - 如何批量删除 Django 会话变量?
- c++ - 作为在 C++ 中调用函数的结果的表达式
- scala - 如何通过解析 CSV 文件创建 Map[K, V] 和 Seq[A]?
- android - Android:如何通过 DataBinding 绑定 ProgressBar 可见性?