apache-kafka - Spring Cloud @StreamListener 消费者未在消费者组中注册 CONSUMER-ID、HOST 和 CLIENT-ID
问题描述
我们有一个 spring 云消费者来读取来自一个 kafka 主题的消息。以下是频道界面
@Component
public interface CollectionStreams {
String INPUT_REPORT = "report-in";
String OUTPUT_REPORT = "report-out";
@Input(INPUT_REPORT)
SubscribableChannel inboundReport();
@Output(OUTPUT_REPORT)
MessageChannel outboundReportToJM();
}
我们面临的问题是,在消费者组“报告”中列出时,我们无法按预期看到 CONSUMER-ID、HOST 和 CLIENT-ID。
[root@innolx131112 templates]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group report
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'report' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
report 3 2 3 1 - - -
report 1 4 4 0 - - -
report 2 1 1 0 - - -
report 4 2 2 0 - - -
report 0 2 2 0 - - -
顺便说一句,我们在 Kubernetes 中运行我们的应用程序以及 Kafka。
由于这个问题,我们无法将我们的应用程序的多个 POD 作为所有 POD
以下是频道界面
@Component
public interface CollectionStreams {
String INPUT_REPORT = "report-in";
String OUTPUT_REPORT = "report-out";
@Input(INPUT_REPORT)
SubscribableChannel inboundReport();
@Output(OUTPUT_REPORT)
MessageChannel outboundReportToJM();
}
我们已经定义了如下方法来从主题中读取消息。
@StreamListener(CollectionStreams.INPUT_REPORT)
//public void handleMessage(@Payload MessageT message) {
public void handleMessage(@Payload MessageT message, @Headers MessageHeaders msg) {
以下是配置yaml
**
cloud:
stream:
kafka:
binder:
brokers: kafka
autoCreateTopics: false
bindings:
report-in:
consumer:
autoCommitOffset: false
autoCommit: false
auto-offset-reset: earliest
autoCommitOnError: false
resetOffsets: false
autoRebalanceEnabled: false
ackEachRecord: false
bindings:
report-in:
destination: report
contentType: application/json
group: report
consumer:
concurrency: 5
partitioned: true
report-out:
destination: jobmanager
contentType: application/json
group: jobmanager
producer:
autoAddPartitions: true
**
我们还有另一个消费者,我们没有为其设置任何与 kafka 相关的消费者道具,令人惊讶的是,这些消费者正在正确地注册自己。
配置:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
brokers: kafka
bindings:
parse-in:
destination: parser
contentType: application/json
group: parser
consumer:
concurrency: 3
partitioned: true
parse-out:
destination: jobmanager
contentType: application/json
group: jobmanager
producer:
partitionKeyExpression: headers['contentType']
autoAddPartitions: true
并描述命令输出如下
[root@innolx131112 shyama]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group parser
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
parser 0 14 14 0 consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26 consumer-3
parser 1 13 13 0 consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26 consumer-3
parser 2 15 15 0 consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26 consumer-4
parser 3 15 15 0 consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26 consumer-4
parser 4 12 12 0 consumer-5-b9ac7e36-58cf-40cb-b37d-a0fa092a0d56 /192.168.0.26 consumer-5
是不是因为我们为第一个消费者(报告消费者)提供了kafka相关的道具,所以无法注册???
解决方案
Consumer group 'report' has no active members.
这仅仅意味着当你输入命令时你的应用程序没有运行。
该信息是暂时的,在应用停止时不会保留。
下次可能会将分区分配给不同的实例。
编辑
每当我们使用同一组实例化更多消费者时,他们正在处理已由旧消费者处理的相同旧消费者
好吧,更仔细地查看您的配置,这正是您所要求的......
autoRebalanceEnabled: false
... 表示 Kafka 不会使用组管理,分区将由 Spring Cloud 流分配。
autoCommitOffset: false
意味着 Spring 不会提交任何偏移量,这是您的应用程序的责任。如果你不提交偏移量,你会得到你观察到的行为。
推荐阅读
- angular - 有多少种方法可以提高角度性能?
- python - 使用 ImageField 对 Django ModelForm 进行单元测试,测试显示无效的表单
- c - 在 C 宏和转换方面需要帮助
- c# - 在 C# 中将 IByteBuffer 转换为字符串
- selenium - 如何在 Junit 中同时使用 Jmeter 和 Selenium Webdriver 运行 1000 个用户而不导致 PC 挂起?
- date - 如何使用没有数字偏移的默认区域格式化日期
- python - 将pymysql结果转换为对象
- javascript - 如何使用 ASP.NET MVC 将 javascript 图像文件发布到 ActionResult?
- android - 使用 AOSP 更改构建自定义 SDK
- azure - Azure 应用服务 - 用户没有足够的权限来收集 Windows 性能计数器