首页 > 解决方案 > 使用带有 java 的 kafka 流导致错误:“从 STARTING 到 PARTITIONS_ASSIGNED 的意外状态转换”

问题描述

大家好,请在使用kafka-connect和java时遇到问题,它对我不起作用有没有人可以帮助我?


堆栈跟踪

[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] partition assignment took 2 ms.
    current active tasks: []
    current standby tasks: []
    previous active tasks: []

[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    ... 3 more
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN


代码

public static void main(String[] args) {
        String bootstrapServer = "127.0.0.1:9092";
        String applicationId = "kafka-streams-test";

        // Creating properties
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                       Serdes.StringSerde.class.getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                       Serdes.StringSerde.class.getName());

        // Creating topology
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // input topic
        KStream<String, String> inputTopic = streamsBuilder.stream("tweets");

        KStream<String, String> filteredStream = inputTopic.filter(
                (k, tweets) -> filterTweet(extractFollowerNumberFromTweets(tweets))
        );
        filteredStream.to("deletesTweets");

        // building topology
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
}

private static BigInteger extractFollowerNumberFromTweets(String tweet) {
    try {
        JSONObject jsonObject = new JSONObject(tweet);
        return jsonObject.getJSONObject("payload").getJSONObject("User").getBigInteger("FollowersCount");
    } catch (Exception ex) {
        return BigInteger.valueOf(0);
    }
}

static boolean filterTweet(BigInteger followerCount) {
    if (followerCount.compareTo(BigInteger.valueOf(1000)) == 1) {
        return true;
    }
    return false;
}

我已经创建了我的主题,我向它推送了一些推文

这些是我执行后的日志

标签: javaapache-kafkaapache-kafka-streams

解决方案


kafka-streams图书馆版本发生在我身上2.3.1。我将它升级到版本2.5.0,这个问题就消失了。


推荐阅读