java - 从 HDFS 流式传输文件与将其复制到本地磁盘
问题描述
在我的 Java 应用程序中,我使用的是保存在 HDFS 中的文本文件(大小 ~ 300 MB)。文件的每一行都包含一个字符串和一个以逗号分隔的整数 ID。我正在逐行读取文件并从中创建 Hashmaps(String, ID) 。
该文件如下所示:
String1,Integer1
String2,Integer2
...
现在,我目前正在使用 Apacha Hadoop 配置和 FileSystem 对象直接从 HDFS 读取文件。
Configuration conf = new Configuration();
conf.addResource("core-site.xml"));
conf.addResource("hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
path= "<some location in HDFS>"
FileSystem fs = FileSystem.get(URI.create(path), conf);
in = fs.open(new Path(path));
输入流“in”被传递给另一个名为read(InputStream in)的函数来读取文件。
public void init(InputStream is) throws Exception {
ConcurrentMap<String, String> pageToId = new ConcurrentHashMap();
ConcurrentMap<String, String> idToPage = new ConcurrentHashMap();
logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
InputStreamReader stream = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(stream);
List<String> pageIdMappingColumns = ServerProperties.getInstance().getIdMappingColumns();
String line;
int line_no=0;
while (true) {
try {
line = reader.readLine();
if (line == null) {
break;
}
line_no++;
//System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
String[] values = line.split(COMMA);
//System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
if (values.length < pageIdMappingColumns.size()) {
throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
}
String id = EMPTY_STR;
String page = EMPTY_STR;
for (int i = 0; i < values.length; i++) {
String s = values[i].trim();
if (PAGEID.equals(pageIdMappingColumns.get(i))) {
id = s;
continue;
}
if (PAGENAME.equals(pageIdMappingColumns.get(i))) {
page = s;
}
}
pageToId.put(page, id);
idToPage.put(id, page);
} catch (Exception e) {
logger.error(PAGEMAPPER_INIT + e.toString() + " on line " + line_no);
}
}
logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
logger.info("Total number of lines: " + line_no);
reader.close();
ConcurrentMap<String, String> oldPageToId = pageToIdRef.get();
ConcurrentMap<String, String> oldIdToPage = idToPageRef.get();
idToPage.put(MINUS_1, START);
idToPage.put(MINUS_2, EXIT);
pageToId.put(START, MINUS_1);
pageToId.put(EXIT, MINUS_2);
/* Update the Atomic reference hashmaps in memory in two conditions
1. If there was no map in memory(first iteration)
2. If the number of page-names and page-id pairs in the mappings.txt file are more than the previous iteration
*/
if (oldPageToId == null || oldIdToPage != null && oldIdToPage.size() <= idToPage.size() && oldPageToId.size() <= pageToId.size()) {
idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);
logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_UPDATE_MAPPING);
} else {
logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_LOG_MSZ);
}
}
像这样完成工作后,我将关闭流:
IOUtils.closeQuietly(is);
我每 1 小时执行一次上述代码,因为在此期间文件在 HDFS 中被更改。所以现在,我得到了 java.lang.OutOfMemoryError: Java heap space。
我的问题是:就内存要求而言,将文件复制到磁盘然后使用它而不是直接从 HDFS 访问它更好吗?
注意:该文件有 > 3200000 行。
解决方案
流始终是选择的方式。
您收到 OutOfMemory 是因为您从不关闭流,因此内存泄漏。
手动关闭您的流或使用 try-with-resource
编辑
pageToId.put(page, id);
idToPage.put(id, page);
您将至少 2 倍的文件大小存储在内存中。大约是 600MB。
之后,将该值分配给某个ref
变量:
idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);
我猜您仍然在ref
某处引用旧数据,因此未发布内部地图数据。
你也有资源泄漏
throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
finally
您应该使用 try-with-resource 或在块中手动关闭您的流。
推荐阅读
- python - Python,“[i]”附近的Sqlite:语法错误错误
- javascript - 这个 JavaScript 表达式是什么意思?
- reactjs - 使用 ReactJS 和 useEffect 单击时无法暂停间隔
- twilio - Twilio 在使用循环重复语音通话时有暂停期
- postgresql - 设置 Kubernetes 集群并运行数据库
- javascript - 我们不能在 React.js 的数组中迭代 material-ui 吗?
- c++ - 如何打印 NXN 框的对角线?检查我的代码?
- reactjs - 使用 useEffect 清理先前组件的 React Redux 双挂载
- python - 为 Django 组中的用户分配权限
- python - 避免 Zipfile.write 中的日期更改