首页 > 解决方案 > Flink + Kafka,并行度 > 1 时出现 java.lang.OutOfMemoryError

问题描述

我有一个玩具 Flink 工作,它从 3 个 kafka 主题中读取,然后合并所有这 3 个流。就是这样,没有额外的工作。

如果在我的 Flink 工作中使用并行度 1,一切似乎都很好,只要我更改并行度 > 1,它就会失败:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

为什么它适用于并行度 1 而不是并行度 > 1?

它与kafka服务器端设置有关吗?或者它与我的 java 代码中的消费者设置有关(我的代码中还没有特殊配置)?

我知道这里提供的信息可能还不够,但我无法触及 kafka 集群。我只是希望一些大师之前可能会遇到同样的错误,并且可以与我分享一些建议。

我正在使用 kafka 0.10,flink 1.5。

非常感谢。

标签: apache-kafkaapache-flink

解决方案


正如您在错误日志中看到的,此错误来自您的 Kafka 集群。当 Kafka Broker 的 Direct Buffer Memory超过分配给 JVM的堆大小时,会出现此问题。Direct Buffer Memory 根据应用程序的需要从 JVM 的堆中分配。当并行度大于 1 时,多个 Flink 任务min(Number of Flink Slots, Number of Kafka partitions)将同时消耗来自 Kafka 的数据,导致与并行度等于 1 时相比,使用更多的 Kafka brokers 堆大小就会发生所谓的错误。标准解决方案是通过将KAFKA_HEAP_OPTS变量添加到Kafka env 文件或作为操作系统环境变量。例如,添加以下行将堆大小设置为 2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

但是在您无法访问 Kafka 代理的情况下(根据您的问题),您可以减少在一次调用 poll() 时返回的记录数,因此代理中对堆内存的需求将减少。(这不是标准解决方案,我建议只是为了消除错误)。

这个答案

Kafka Consumers 通过以下两个参数处理数据积压,

max.poll.interval.ms
使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。默认值为 300000。

max.poll.records
在一次 poll() 调用中返回的最大记录数。默认值为 500。

忽略根据要求设置上述两个参数可能会导致轮询消费者可能无法使用可用资源处理的最大数据,从而导致 OutOfMemory 或有时无法提交消费者偏移量。因此,始终建议使用 max.poll.records 和 max.poll.interval.ms 参数。

因此,对于测试,将max.poll.records的值减小到例如 250 并检查是否会发生错误。

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);

推荐阅读