首页 > 解决方案 > 理解 flink jobmanager 内存

问题描述

我有一个 flink 作业,它有一个 nfs 文件系统文件夹作为源,kafka 作为接收器。此时没有进行任何转换。

我使用了 Continuousmonitoring 功能来持续监控文件夹上的事件,并使用 ContinuousFileReaderOperator 来读取数据。

ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(
                inputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(),
                MONITORING_INTERVAL);

ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(inputFormat);

文件夹的初始大小约为 40GB,其中包含 3785468 个文件(在所有子目录中)。

我创建了 1 个具有 25G 堆的作业管理器和 2 个具有 4 个任务槽和以下内存值的任务管理器。

taskmanager.memory.process.size: "26g"
taskmanager.memory.flink.size: "24g"
jobmanager.heap.size: "25g"
taskmanager.memory.jvm-overhead.max: "2g"
taskmanager.memory.task.off-heap.size: "1024M"
taskmanager.memory.task.heap.size: "16g"
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.max: "2g"

当工作开始时,工作经理正在准备工作,准备状态需要很长时间,大约 2 小时。一旦工作开始,它将文件传输到kafka就可以正常工作。

我正在尝试微调这项工作,任何人都可以帮助我了解在准备阶段会发生什么以及在此状态下记忆的哪一部分是重要的?

我正在尝试使用内存参数,但似乎没有任何效果,因为我不知道我无法继续使用什么内存。

我已经阅读了有关内存的 flink 文档,但不清楚托管内存的用途以及处理作业时 DirectMemory 的用途。

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration

有人可以帮助我了解我应该考虑什么来微调这项工作吗?

标签: javamemory-managementapache-kafkaapache-flinkflink-streaming

解决方案


推荐阅读