首页 > 技术文章 > Java API 本地读取HDFS指定目录下的所有文件失败及任务提交到集群失败问题分析

irvin-chen 2020-12-16 15:09 原文

背景:要读取hdfs上指定路径分区下的所有文件内容(5个文件),并对数据内容进行处理整合成单个文件

 

正确遍历读取hdfs中指定路径下的数据文件样例代码如下:

private List<RIRInfo> getIanaDataFromHdfs(String ianaDataHdfsPath) {
List<RIRInfo> mergeList = new ArrayList<>();
List<RIRInfo> sortList = new ArrayList<>();
if (StringUtils.isNotEmpty(ianaDataHdfsPath)) {
Configuration configuration = new Configuration();
try (FileSystem fileSystem = FileSystem.get(URI.create(ianaDataHdfsPath), configuration)) {
FileStatus[] status = fileSystem.listStatus(new Path(ianaDataHdfsPath));
for (FileStatus file : status) {
try (FSDataInputStream fsDataInputStream = fileSystem.open(file.getPath());
BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream))) {
br.lines().forEach(line -> {
if (line.contains(ConfigConstants.VERTICALLINE)) {
String[] value = line.split("\\|");
if (value.length >= ianaConstants.MINI_FIELD_LENGTH
&& ConfigConstants.IPV4TAG.equals(value[ianaConstants.IPV4TAG_INDEX])
&& !"".equals(value[ianaConstants.COUNTRYCODE_INDEX])) {
long startIp = IpUtil.ip2Long(value[ianaConstants.STARTIP_INDEX]);
long endIp = startIp + Long.parseLong(value[ianaConstants.IPRANGE_INDEX]) - 1;
mergeList.add(new RIRInfo(startIp, endIp, value[ianaConstants.COUNTRYCODE_INDEX]));
}
}
});
} catch (IOException e) {
log.error("rirFile read exception.");
}
}
sortList = mergeList.stream().sorted(Comparator.comparing(RIRInfo::getStartIp)).collect(Collectors.toList());
} catch (IOException e) {
log.error("ianaDataHdfsPath load exception.");
}
}
return sortList;
}

没加红色部分代码,报错信息为加载了错误的文件系统 file:///

分析源码:
FileSystem.get(configuration)) 和FileSystem.get(URI.create(ianaDataHdfsPath), configuration)) 的区别
前者是没有指定URI,故读取了默认的文件系统;

后者指定URI,读取了指定的文件系统

当任务提交到集群上执行时,出现java.io.IOException:Filesystem closed报错,原因为:

任务在集群上执行时,多个datanode在getFileSystem过程中,由于Configuration一样,会得到同一个FileSystem。如果有一个datanode在使用完关闭连接,其它的datanode在访问就会出现上述异常。

解决方案:https://blog.csdn.net/bitcarmanlee/article/details/68488616

 

参考内容:

1、Java API 读取HDFS目录下的所有文件

https://blog.csdn.net/yeweiouyang/article/details/38666063

2、Java API 读取HDFS上单个文件内容

https://blog.csdn.net/alex_81d/article/details/103633658

推荐阅读