java - Kafka 流订阅错误 - 无效版本
问题描述
尝试从 Java jetty 微服务连接到主题时,我收到此 Kafka 内部版本不匹配错误:
stream-thread [App-94d44dcd-f1d4-49a6-9dd3-8d4eee06f82a-StreamThread-1] Encountered the following error during processing:
java.lang.IllegalArgumentException: version must be between 1 and 3; was: 4
at org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.<init>(SubscriptionInfo.java:67)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscription(StreamsPartitionAssignor.java:312)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:176)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:515)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:466)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:412)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:861)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:814)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
关于什么可能导致这种异常的任何想法?
解决方案
我自己也遇到过这个错误,很可能是因为您使用了非唯一的 APPLICATION_ID_CONFIG 和/或 CLIENT_ID_CONFIG
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-client");
推荐阅读
- typescript - TypeScript:键入具有多种返回类型的 httprequest
- java - 无法加载 Libgdx gdx64.dll
- java - 在 Netbeans 的 Maven Java 项目中放置资源文件的位置
- matlab - 在 Octave 上运行 mex 命令的问题
- javascript - 如何构造 Firebase Realtime DB 读取请求:按 Key 的子级排序
- java - 设置时区的 SimpleDateFormat 获得正确的值但错误的区域
- docker - Docker swarm worker 服务无法访问管理器上的服务
- c - libgcc_s_dw2-1.dll的作用是什么?
- android - 当我在片段中的按钮上使用单击侦听器时应用程序崩溃
- c++ - 溢出值在 Arduino 上打印非溢出值