首页 > 技术文章 > hbase数据导入

mapr-hadoop 2013-10-11 15:23 原文

hbase数据导入:

参考http://blog.csdn.net/hua840812/article/details/7414875,在把代码copy下来后,发现运行总是报错:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;

原因是map的输出必须按照现有的版本来写,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>

要这样写,不能简单的写成extends Mapper,

代码还是贴出来:

生成hfile的代码:

package com.bonc.db;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

    public static class TestHFileToHBaseMapper  extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\\|");
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                    values[0].toString().trim().getBytes());
            KeyValue kvProtocol;
            if (values.length>1){
             kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim()
                    .getBytes());
            }else{
                kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes());
            }
            context.write(rowkey, kvProtocol);
            // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(),
            // "SRCIP".getBytes(), values[1].getBytes());
            // context.write(k, kvSrcip);
//             HFileOutputFormat.getRecordWriter 
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conf, "TestHFileToHBase");
        job.setJarByClass(TestHFileToHBase.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);

        job.setMapperClass(TestHFileToHBaseMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);
//        job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);
        // job.setNumReduceTasks(4);
        // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

        // HBaseAdmin admin = new HBaseAdmin(conf);
        HTable table = new HTable(conf, "url_rule");

        HFileOutputFormat.configureIncrementalLoad(job, table);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

hfile导入到表的代码:

 1 package com.bonc.db;
 2 import java.io.IOException;
 3 
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.HTable;  
 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  
 9 import org.apache.hadoop.hbase.util.Bytes;  
10   
11 public class TestLoadIncrementalHFileToHBase {  
12   
13     // private static final byte[] TABLE = Bytes.toBytes("hua");  
14     // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID");  
15     // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID");  
16   
17     public static void main(String[] args) throws IOException {  
18         Configuration conf = HBaseConfiguration.create();  
19 //      byte[] TABLE = Bytes.toBytes("hua");  
20         byte[] TABLE = Bytes.toBytes(args[0]);  
21         HTable table = new HTable(TABLE);  
22         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
23         loader.doBulkLoad(new Path(args[1]), table);  
24 //      loader.doBulkLoad(new Path("/hua/testHFileResult/"), table);  
25     }  
26   
27 }  
View Code

 悲剧的是在从hfile导入到表的时候报错:

java.io.IOException: java.io.IOException: Failed rename of maprfs://133.0.76.41:7222/user/hbasetest/url_type/4447706551787973235 to maprfs://133.0.76.41:7222/hbase/url_rule/732e6d3d150caa8bd3d8d228e3d9c9a0/url_type/914168143008836217
        at org.apache.hadoop.hbase.regionserver.StoreFile.rename(StoreFile.java:512)

虽然解决办法在这里:

http://answers.mapr.com/questions/685/hbase-and-completebulkload

但是我实在是没看懂。so,我采用了最原始的方法:

split将文件分割成小文件,然后:

 1 package com.bonc.db;
 2 import java.io.BufferedReader;
 3 import java.io.FileReader;
 4 import java.io.IOException;
 5 
 6 import org.apache.hadoop.hbase.client.HTable;
 7 import org.apache.hadoop.hbase.client.HTablePool;
 8 import org.apache.hadoop.hbase.client.Put;
 9 
10 import com.bonc.URLMatch.HBaseMain;
11 public class URL_HBase {
12 
13         public static void main(String[] args) throws IOException{
14             //文件绝对路径改成你自己的文件路径
15             FileReader fr=new FileReader(args[0]);
16             //可以换成工程目录下的其他文本文件
17             HTablePool pool = new HTablePool(HBaseMain.conf, 1000);
18             HTable table = (HTable) pool.getTable("url_rule");
19             BufferedReader br=new BufferedReader(fr);
20             while(br.readLine()!=null){
21                 String[] s=br.readLine().toString().split("\\|");
22                 if(s.length>1){
23                 Put put = new Put(s[0].trim().getBytes());
24                 put.add("url_type".getBytes(), "type".getBytes(), s[1].trim().getBytes());
25                 table.put(put);
26                 }else{
27                     System.out.println(s);
28                 }
29             }
30             br.close();
31         }
32     }
View Code

终于成功了,希望有人能够帮我翻译一下,怎么解决是个什么意思。。唉。

 

推荐阅读