首页 > 解决方案 > 来自kafka流的context().headers()修改期间的异常标点调用

问题描述

我有一个使用处理器 API 的 kafka 流应用程序。我有一个基于挂钟的标点符号,用于检查本地 statestore 中的陈旧条目并删除它们并在其他服务正在侦听的 kafa 主题上发布消息。比如说,如果在标点调用期间从 statestore 中的 100 个条目中识别出 10 个过时的条目,则这 10 个条目中的每一个都将被删除并在一个 kafka 主题上发布。该应用程序的 num.stream.threads 设置为 3。有 4 个输入主题,每个主题有 100 个分区 - 所以我的 localstatestore 也有 100 个分区。我有 2.1.1 客户端和 2.1+ 经纪人(不确定确切数字)。这一切都很好。

最近,一个用于 kafka 消息的消费应用程序要求添加一个特定于每个被删除条目的标头,这样如果消费应用程序不感兴趣标头,它们就不需要打开有效负载。由于标题特定于每个条目;我正在执行以下操作:

this.context().schedule(Duration.ofMinutes(EXPIRED_MINUTES), PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
      @Override
      public void punctuate(long timestamp) {
        try {
          expireEntries(myStore);
        } catch (Exception e ) {
          LOG.error("Exception: ", e);
        }
      }
    });

  private void expireEntries(KeyValueStore<String, byte[]> store) {
     try (KeyValueIterator<String, byte[]> range = store.all()) {
      while (range.hasNext()) {
        KeyValue<String, byte[]> next = range.next();
        if (store.isExpired(next.key, expiredMs)) {
            addPublishEvent(next.key, next.value);
            store.delete(next.key);
        }
  }

    void addPublishEvent(String key, Message message, String topic) {

        if (message.hasInterestingProperty()) {
            for (Iterator<Header> iterator = context().headers().iterator(); iterator.hasNext();) 
            {
                Header h = iterator.next();
                if (h.key().equals("header-key")) {
                    iterator.remove();
                }
            }
            context().headers().add("header-key",  getHeaderValue(message).getBytes());
        }
        context().forward(key, message.toByteArray(), To.child(App.SINK_PREFIX + topic));

      String getHeaderValue(Message m) {
         // return m's property of interest to app;
      }

我得到以下异常

  1. 异常:java.util.ConcurrentModificationException: null\n at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)\n at java.util.ArrayList$Itr.next(ArrayList.java:859)\n at org.apache.kafka.common.header.internals.RecordHeaders$1.next(RecordHeaders.java:136)\n 在 org.apache.kafka.common.header.internals.RecordHeaders$1.next(RecordHeaders.java:129)\ n 在 org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:91)\n 在 org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)\ n 在 org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:243)\n在 org.apache.kafka.streams.state.internals.CachingKeyValueStore。deleteInternal(CachingKeyValueStore.java:290)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore.delete(CachingKeyValueStore.java:282)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore。删除(CachingKeyValueStore.java:38)\n 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.delete(MeteredKeyValueStore.java:195)\n

  2. 异常:java.util.ConcurrentModificationException: null\n at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)\n at java.util.ArrayList$Itr.remove(ArrayList.java:873)\n at org.apache.kafka.common.header.internals.RecordHeaders$1.remove(RecordHeaders.java:142)\n

那么,问题来了:为什么我会收到 ConcurrentModificationException?如果我将 num.stream.threads 设为 1,它会停止吗?我不能永远将线程更改为 1;那么如何避免遇到此异常并为我发布到接收器主题/应用程序的每条消息添加具有相同键但不同值的标头?

标签: javaapache-kafkaapache-kafka-streams

解决方案


AConcurrentModificationException与多线程无关,但如果Iterator修改了 an 的底层集合,则会抛出它。坦率地说,我不确定您为什么会遇到异常,因为您似乎使用iterator.remove()通常可以避免异常的方法。

但是,与其遍历 ,不如Headers直接通过以下方式删除所有带有相应键的标头会更简单Headers#remove()

context().headers().remove("header-key");

参考:https ://kafka.apache.org/21/javadoc/org/apache/kafka/common/header/Headers.html#remove-java.lang.String-


推荐阅读