java - Spring Kafka Binder 未收到任何消息但已连接到主题
问题描述
您好,我一直在使用 Spring Kafka Binder 作为消费者。查看日志,我能够连接到该主题,尽管我不确定它为什么不处理来自生产者的任何消息。
关于可能缺少什么的任何想法?谢谢!
聚甲醛
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
应用 YML
cloud:
zookeeper:
connect-string: port1.test.com:2181,port2.test.com:2181,port3.test.com:2181
stream:
kafka:
binder:
brokers:
- port1.test.com:6667
- port2.test.com:6667
- port3.test.com:6667
auto-create-topics: false
auto-add-partitions: false
jaas:
controlFlag: REQUIRED
loginModule: com.sun.security.auth.module.Krb5LoginModule
options:
useKeyTab: true
storeKey: true
serviceName: kafka
# Change location to your local location
keyTab: C:\\Users\\src\\main\\resources\\kafka\\kafka_user.keytab
principal: kafka_user@TEST.COM
debug: true
configuration:
security:
protocol: SASL_PLAINTEXT
bindings:
stream-input:
binder: kafka
destination: TOPIC
group: service-dev
security:
krb5conf:
# Change location to your local location
location: C:\\Users\\src\\main\\resources\\kafka\\krb5nonprod.conf
消费类
public interface EventConsumer {
@Input("stream-input")
SubscribableChannel consumeMessage();
}
监听类
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {
@StreamListener(target = "stream-input")
public void processMessage(Object msg) {
日志
Started Application in 75.471 seconds (JVM running for 184.663)
2021-09-29 19:45:01.342 INFO 30340 --- [container-0-C-1] org.apache.kafka.clients.Metadata
: [Consumer clientId=consumer-2, groupId=service-dev] Cluster ID: qa_IFa70SravzxvdcDhHA
2021-09-29 19:45:01.390 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
Discovered group coordinator port1.test.com:6667 (id: 2147482644 rack: null)
2021-09-29 19:45:01.399 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
Revoking previously assigned partitions []
2021-09-29 19:45:01.400 INFO 30340 --- [container-0-C-1]
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : service-dev: partitions revoked: []
2021-09-29 19:45:01.401 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
(Re-)joining group
2021-09-29 19:45:01.854 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
(Re-)joining group
2021-09-29 19:45:04.387 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
Successfully joined group with generation 36
2021-09-29 19:45:04.400 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
Setting newly assigned partitions: TOPIC-0
2021-09-29 19:45:04.481 INFO 30340 --- [container-0-C-1]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=service-dev]
Setting offset for partition TOPIC-0 to the committed offset FetchPosition{offset=1076,
offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=port1.test.com:6667 (id: 1003
rack: /default-rack), epoch=2}}
2021-09-29 19:45:04.557 INFO 30340 --- [container-0-C-1]
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : service-dev: partitions assigned: [TOPIC-0]
从日志中可以看出,它能够连接到主题。虽然我不确定为什么我没有收到生产者的任何消息。是因为撤销了分区吗?这与我为什么没有收到任何消息有关吗?生产者来自第三方,他需要做一些事情才能让我接收消息吗?很明显,我能够连接到该主题。谢谢!
解决方案
日志看起来不错;这意味着在该组的当前提交偏移之后没有更多记录可读取
>Setting offset for partition TOPIC-0 to the committed offset FetchPosition{offset=1076 ...
如果您想重新阅读整个主题,您可以设置 resetOffsets kafka 消费者绑定属性,或者将其更改为group
没有提交偏移量的属性。
推荐阅读
- java - 如何在java中的给定时间(即上午11:00)和另一个时间(即下午1:00)之间遍历?
- ruby - 意外的keyword_end MongoDB注入
- bitbucket - 工件未在 bitbucket 管道中发布
- mysql - 在sql中加入查询
- jenkins - 如何为所有 Jenkins 作业创建构建后脚本
- python - 如何解决此错误“当前路径,注销,与其中任何一个都不匹配。”
- html - Python 不会从 CSS 加载图像
- algorithm - 分治算法的复杂性
- php - 防止 Wireshark 从 URL Codeigniter 请求获取方法的安全问题
- reactjs - 当用户在另一个数组中使用onchangeevent react js取消选择选项时,我想删除对象数组中的元素