java - 来自kafka流的context().headers()修改期间的异常标点调用
问题描述
我有一个使用处理器 API 的 kafka 流应用程序。我有一个基于挂钟的标点符号,用于检查本地 statestore 中的陈旧条目并删除它们并在其他服务正在侦听的 kafa 主题上发布消息。比如说,如果在标点调用期间从 statestore 中的 100 个条目中识别出 10 个过时的条目,则这 10 个条目中的每一个都将被删除并在一个 kafka 主题上发布。该应用程序的 num.stream.threads 设置为 3。有 4 个输入主题,每个主题有 100 个分区 - 所以我的 localstatestore 也有 100 个分区。我有 2.1.1 客户端和 2.1+ 经纪人(不确定确切数字)。这一切都很好。
最近,一个用于 kafka 消息的消费应用程序要求添加一个特定于每个被删除条目的标头,这样如果消费应用程序不感兴趣标头,它们就不需要打开有效负载。由于标题特定于每个条目;我正在执行以下操作:
this.context().schedule(Duration.ofMinutes(EXPIRED_MINUTES), PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
try {
expireEntries(myStore);
} catch (Exception e ) {
LOG.error("Exception: ", e);
}
}
});
private void expireEntries(KeyValueStore<String, byte[]> store) {
try (KeyValueIterator<String, byte[]> range = store.all()) {
while (range.hasNext()) {
KeyValue<String, byte[]> next = range.next();
if (store.isExpired(next.key, expiredMs)) {
addPublishEvent(next.key, next.value);
store.delete(next.key);
}
}
void addPublishEvent(String key, Message message, String topic) {
if (message.hasInterestingProperty()) {
for (Iterator<Header> iterator = context().headers().iterator(); iterator.hasNext();)
{
Header h = iterator.next();
if (h.key().equals("header-key")) {
iterator.remove();
}
}
context().headers().add("header-key", getHeaderValue(message).getBytes());
}
context().forward(key, message.toByteArray(), To.child(App.SINK_PREFIX + topic));
String getHeaderValue(Message m) {
// return m's property of interest to app;
}
我得到以下异常
异常:java.util.ConcurrentModificationException: null\n at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)\n at java.util.ArrayList$Itr.next(ArrayList.java:859)\n at org.apache.kafka.common.header.internals.RecordHeaders$1.next(RecordHeaders.java:136)\n 在 org.apache.kafka.common.header.internals.RecordHeaders$1.next(RecordHeaders.java:129)\ n 在 org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:91)\n 在 org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)\ n 在 org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:243)\n在 org.apache.kafka.streams.state.internals.CachingKeyValueStore。deleteInternal(CachingKeyValueStore.java:290)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore.delete(CachingKeyValueStore.java:282)\n 在 org.apache.kafka.streams.state.internals.CachingKeyValueStore。删除(CachingKeyValueStore.java:38)\n 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.delete(MeteredKeyValueStore.java:195)\n
异常:java.util.ConcurrentModificationException: null\n at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)\n at java.util.ArrayList$Itr.remove(ArrayList.java:873)\n at org.apache.kafka.common.header.internals.RecordHeaders$1.remove(RecordHeaders.java:142)\n
那么,问题来了:为什么我会收到 ConcurrentModificationException?如果我将 num.stream.threads 设为 1,它会停止吗?我不能永远将线程更改为 1;那么如何避免遇到此异常并为我发布到接收器主题/应用程序的每条消息添加具有相同键但不同值的标头?
解决方案
AConcurrentModificationException
与多线程无关,但如果Iterator
修改了 an 的底层集合,则会抛出它。坦率地说,我不确定您为什么会遇到异常,因为您似乎使用iterator.remove()
通常可以避免异常的方法。
但是,与其遍历 ,不如Headers
直接通过以下方式删除所有带有相应键的标头会更简单Headers#remove()
:
context().headers().remove("header-key");
推荐阅读
- sql - 如何在 Oracle SQL 中对函数进行分组和应用
- gitlab - 是否可以同时在 gitlab 项目中添加多个成员?
- java - Java 客户端返回 401 Unauthorize 错误但在 Postman 中工作
- css - Shinyapps.io闪亮应用程序中的Custum字体
- linux - 错误 {org.wso2.carbon.identity.application.authentication.endpoint.util.TenantMgtAdminServiceClient} - 向 URL 发送 POST 请求
- html - 仅冻结顶行和滚动内容
- django - 第二张图像未使用序列化程序保存
- ruby-on-rails - 如何将Ruby on Rails的所有输入字符大写
- javascript - 使用开发工具观看响应式设计
- python - 在二维数组上使用贝叶斯统计分析进行音频过滤的 Numpy 建议