首页 > 技术文章 > 记一次Hbase查询速度优化经历

10158wsj 2018-01-08 14:29 原文

项目背景

  在这次影像系统中,我们利用大数据平台做的是文件(图片、视频等)批次的增删改查,每个批次都包含多个文件,上传完成以后要添加文件索引(文件信息及批次信息),由于在Hbase存储的过程中,每个文件都对应一个文件rowKey,一个批次就会有很多个RoweKey,查询的下载的时候就必须根据每个文件的rowkey找到对应的文件,如果一个批次有很多个文件的话,就需要查找很多次,这样是很浪费时间的,一开始没注意这么多,开发并且完成功能测试后,觉得一切OK,但是作为大数据后台,对效率的要求非常高,在压力测试的时候出现了问题,并发量上来之后,查询下载的速度非常慢,TPS总上不去,仔细分析代码后,发现了问题。

改进之前的部分代码如下:

public List<FileInfo> batchGetFileMeta (String systemType, String batchNo,

                         String fileName,String versionNo,BufferedOutputStream bw) {

                 List<FileInfo> fileInfoList = new ArrayList<FileInfo>();
                 FileStoreInfo fileStoreInfo = batchGetFileStoreInfo(systemType,batchNo, versionNo,bw);
                 if(fileStoreInfo == null){
                         return null;
                 }
                 List<String> fileNameList=batchGetFileNameByBathNO(systemType,batchNo, fileName,
                                  versionNo,bw);
                 if(fileNameList == null || fileNameList.size()==0 ){
                         return null;
                 }
                 if(fileNameList.size()==1 && ("".equals(fileNameList.get(0)))){
                         fileInfoList.add(null);
                         return fileInfoList;
                 }
                 int hash = batchNo.hashCode();
                 String rowKey = "";
                 String fileNName = fileStoreInfo.getFile_N_Name();
                 String[] fileNNameArray = fileNName.split(Constants.SPLIT);
                 for(int i=0;i<fileNNameArray.length;i++){
                         for(int j=0;j<fileNameList.size();j++){
                                  String[] fileNInfo = fileNNameArray[i].split(Constants.SPLITF);
                                  if(fileNInfo[0].equals(fileNameList.get(j))){
                                          String version = fileNInfo[1];
                                          String versionNow = version;
                                          if(versionNow != null && !versionNow.equals("")){
                                                  int length2 = versionNow.length();
                                                  for (int k=0 ;k<3-length2 ;k++) {
                                                           versionNow = "0"+versionNow;
                                                  }
                                          }
                                          rowKey =  hash + "1" +batchNo + versionNow + fileNInfo[0];
                                          FileInfo fileInfo = batchGetFileMetaByIndex(systemType, rowKey, bw);
                                          if(fileInfo == null){
                                                  return null;
                                          }
                                          fileInfo.setFileVersionNO(version);
                                          fileInfoList.add(fileInfo);
                                  }
                         }
                 }      
                 return fileInfoList;
         }
public FileInfo batchGetFileMetaByIndex(String systemType, String rowKey,
                         BufferedOutputStream bw) {
                 Map<String,String> fileInfoMaps = new HashMap<String,String>();
                 fileInfoMaps = HbaseUtil.queryBykey(Constants.HBASE_TAB+systemType, rowKey,
                                  Constants.HBASE_FAMILYY_CF1, Constants.HBASE_COLUMN_L);
                 if(fileInfoMaps == null ){
                         return null;
                 }
            String fileInfoStr = fileInfoMaps.get("value");
            FileInfo fileInfo = new FileInfo();
            fileInfo = (FileInfo) Utils.jsonToObj(fileInfoStr,fileInfo);
            if(fileInfo == null){
                     return null;
            }
            String userdefinede = getUserDefinedE(systemType, rowKey);
            fileInfo.setUserDefined(userdefinede);
            return fileInfo;
        }
public Map<String,String > queryBykey(String tableName, String rowKey,String fam, String col) {
                 Map<String, String> result =  new HashMap<String, String>();
                 HTable table=null;
                 try {
                         if(isExistTable(tableName)){
                                  table = new HTable(conf, tableName);
                                  Get scan = new Get(rowKey.getBytes());
                                  Result r = table.get(scan);
                                  byte[] bs = r.getValue(Bytes.toBytes(fam), Bytes.toBytes(col));
                                  String value = Bytes.toString(bs);
                                  result.put("value", value);
                                  table.close();
                                  return result;
                         }else{
                                  return null;
                         }
                 } catch (IOException e) {
                         e.printStackTrace();
                         return null;
                 }

测试结果如下:

  虽然时间比较少,但是远远不能满足效率要求。仔细分析上面代码不难发现:由于业务需要查询数据的时候要校验文件信息,所以代码中出现了循环套循环的情况,如果某批次的文件数量特别多的话那么循环查询的次数的增长不是一个数量级的,相当大的一个数字,问题的原因在于拼接rowkey,然后拿着rowkey去查询,循环多少次就查多少次,虽然Hbase查询速度快,但这样也是在浪费时间,经过思考和研究HbaseAPI的时候发现,Hbase支持rowkey批量查询,思路大概是这样的:

1)  循环文件信息,循环之中得到拼接rowkey的信息

2)  把得到的rowkey放入list中

3)  循环完毕,用List去查Hbase,将得到的信息放入Map返回

4)  获取Map中的信息

下面是改进之后的代码:

改进后:

public List<FileInfo> batchGetFileMetaByBathNo(String systemType, String batchNo, 
            String fileName,String versionNo,BufferedOutputStream bw) {
        List<FileInfo> fileInfoList = new ArrayList<FileInfo>();
        FileStoreInfo fileStoreInfo =batchGetFileStoreInfo(systemType, batchNo, versionNo,bw);
        if(fileStoreInfo == null){
            return null;
        }
        List<String>fileNameList=batchGetFileNameByBathNO(systemType, batchNo, fileName, 
                versionNo,bw);
        if(fileNameList == null || fileNameList.size()==0 ){
            return null;
        }
        if(fileNameList.size()==1 && ("".equals(fileNameList.get(0)))){
            fileInfoList.add(null);
            return fileInfoList;
        }
        String rowKey = "";
        List<String> rowkeylist=new ArrayList<>();
        
        String fileNName = fileStoreInfo.getFile_N_Name();
        String[] fileNNameArray = fileNName.split(Constants.SPLIT);
        for(int i=0;i<fileNNameArray.length;i++){
            for(int j=0;j<fileNameList.size();j++){
                String[] fileNInfo = fileNNameArray[i].split(Constants.SPLITF);
                if(fileNInfo[0].equals(fileNameList.get(j))){
                    String version = fileNInfo[1];
                    String versionNow = version;
                    if(versionNow != null && !versionNow.equals("")){
                        versionNow=PublicMethod.chengeVerNo(versionNow);
                    }
                    rowKey = batchNo.hashCode()+"1"+batchNo+ versionNow + fileNInfo[0];
                    rowkeylist.add(rowKey);
                }
            }
        }    
        fileInfoList = batchGetFileMetaByIndex(systemType, rowkeylist, bw);
        return fileInfoList;
     }


public List<FileInfo> batchGetFileMetaByIndex(String systemType, List<String> rowKey,
            BufferedOutputStream bw) {
        List<FileInfo> fileInfoList=new ArrayList<>();
    List<Map<String,String>>list=HbaseUtil.queryByList(Constants.HBASE_TAB+systemType,
                rowKey);
        if(list.size()==0){
            return null;
        }
        for(Map<String,String> resultMap:list){
            FileInfo fileInfo = new FileInfo();
            String LValue = resultMap.get(Constants.HBASE_COLUMN_L);
            String EValue=resultMap.get(Constants.HBASE_COLUMN_E);
            if(!"".equals(LValue)&&null!=LValue){
                fileInfo = (FileInfo) Utils.jsonToObj(LValue,fileInfo);
            }
            if(!"".equals(EValue)&&null!=EValue&&fileInfo!=null){
                fileInfo.setUserDefined(EValue);
            }
            fileInfoList.add(fileInfo);
        }
        return fileInfoList;
    }

public List<Map<String, String>> queryByList(String tableName,List<String> rowKeyList){
        
        Connection connection=null;
        List<Map<String, String>> list=new ArrayList<>();
        List<Get> getList=new ArrayList<Get>();
        try {
            connection=ConnectionFactory.createConnection(conf);
            Table table=connection.getTable(TableName.valueOf(tableName));
            for(String rowKey:rowKeyList){
                Get get=new Get(Bytes.toBytes(rowKey));
                get.addFamily(Bytes.toBytes(Constants.HBASE_FAMILYY_CF1));
                getList.add(get);
            }
            
            Result[]results=table.get(getList);
            for (Result result:results) {
                Map<String, String> listMap=new HashMap<>();
                for(Cell kv:result.rawCells()){
            if(Bytes.toString(kv.getQualifier()).equals(Constants.HBASE_COLUMN_E)){
                        listMap.put(Constants.HBASE_COLUMN_E, 
                                Bytes.toString(CellUtil.cloneValue(kv)));
                    }else{
                        listMap.put(Constants.HBASE_COLUMN_L, 
                                Bytes.toString(CellUtil.cloneValue(kv)));
                    }
                }
                list.add(listMap);
            }
        } catch (IOException e) {
            e.fillInStackTrace();
        }finally{
            try {
                if(connection!=null&&!connection.isClosed()){
                    connection.close();
                }
            } catch (IOException e) {
                e.fillInStackTrace();
            }
        }
        return list;
    }

  Hbase是支持批量查询的,经过改进之后,从代码中就可以看出,效率提升了很多,我们对10000条数据进行了测试,发现提升的效率非常明显,下面是测试图:

  进过优化后,从时间上,我们可以看到,提升的效率非常明显,这就告诉我们在做项目写代码的时候,不要只局限于功能的实现,还要考虑效率上的可行性,从一开始就要做好铺垫,否则到后期再改是非常麻烦的。

推荐阅读