java - 使用带有 java 的 kafka 流导致错误:“从 STARTING 到 PARTITIONS_ASSIGNED 的意外状态转换”
问题描述
大家好,请在使用kafka-connect和java时遇到问题,它对我不起作用有没有人可以帮助我?
- 卡夫卡:本地主机:9092
- 动物园管理员:本地主机:2181
堆栈跟踪:
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] partition assignment took 2 ms.
current active tasks: []
current standby tasks: []
previous active tasks: []
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
... 3 more
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
代码:
public static void main(String[] args) {
String bootstrapServer = "127.0.0.1:9092";
String applicationId = "kafka-streams-test";
// Creating properties
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
// Creating topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String, String> inputTopic = streamsBuilder.stream("tweets");
KStream<String, String> filteredStream = inputTopic.filter(
(k, tweets) -> filterTweet(extractFollowerNumberFromTweets(tweets))
);
filteredStream.to("deletesTweets");
// building topology
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
}
private static BigInteger extractFollowerNumberFromTweets(String tweet) {
try {
JSONObject jsonObject = new JSONObject(tweet);
return jsonObject.getJSONObject("payload").getJSONObject("User").getBigInteger("FollowersCount");
} catch (Exception ex) {
return BigInteger.valueOf(0);
}
}
static boolean filterTweet(BigInteger followerCount) {
if (followerCount.compareTo(BigInteger.valueOf(1000)) == 1) {
return true;
}
return false;
}
我已经创建了我的主题,我向它推送了一些推文
解决方案
kafka-streams
图书馆版本发生在我身上2.3.1
。我将它升级到版本2.5.0
,这个问题就消失了。
推荐阅读
- python - Apache Beam 如何处理中间窗格?
- ios - 记录未保存 Cloudkit
- c# - EF Core:分离的延迟加载导航属性
- html - Spritesheet 没有为 HTML5 Canvas 设置动画
- algorithm - 递归下降解析器:RPN [2] 的中缀
- r - R logit模型变量选择
- python - WTForms TextAreaField DataRequired 不会在 Bootstrap 模式中引发验证错误
- python - 导入数值后加逗号
- javascript - 添加到 dom [object DocumentFragment] 问题
- python - 使用 python 请求发送 POST 请求的问题