首页 > 技术文章 > hbase开发实例

learn21cn 2016-12-11 01:11 原文

1、put/checkAndPut

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.hbase.HBaseConfiguration;
 6 import org.apache.hadoop.hbase.client.HConnection;
 7 import org.apache.hadoop.hbase.client.HConnectionManager;
 8 import org.apache.hadoop.hbase.client.HTableInterface;
 9 import org.apache.hadoop.hbase.client.Put;
10 import org.apache.hadoop.hbase.util.Bytes; 
11 
12 public class TestPut {
13 
14     public static void main(String[] args) throws IOException {
15         
16         Configuration conf = HBaseConfiguration.create();
17         conf.set("hbase.zookeeper.quorum","localhost");
18         conf.set("hbase.zookeeper.property.clientPort","2181");
19         HConnection conn = HConnectionManager.createConnection(conf);
20         
21         HTableInterface table = conn.getTable("testdata");        
22         Put testput = new Put(Bytes.toBytes("row1"));
23         testput.add(Bytes.toBytes("cf"),Bytes.toBytes("col1"),Bytes.toBytes("E"));
24         table.put(testput);
25         //使用checkAndPut
26         table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("cf"),Bytes.toBytes("col5"),Bytes.toBytes("E"),testput);
27         table.close();
28         conn.close();
29 
30     }
31 
32 }

使用checkAndPut,需要先对数据进行验证,上面的例子中,向row1中的cf:col1写入数据"E",而验证的是row1中的cf:col5的值是否为"E",注意这一点,相当于加了条件。

 2、使用get读取数据

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.hbase.Cell;
 8 import org.apache.hadoop.hbase.CellUtil;
 9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.Get;
11 import org.apache.hadoop.hbase.client.HConnection;
12 import org.apache.hadoop.hbase.client.HConnectionManager;
13 import org.apache.hadoop.hbase.client.HTableInterface;
14 import org.apache.hadoop.hbase.client.Result;
15 
16 
17 import org.apache.hadoop.hbase.util.Bytes; 
18 
19 public class TestGet {
20 
21     public static void main(String[] args) throws IOException {
22         Configuration conf = HBaseConfiguration.create();
23         conf.set("hbase.zookeeper.quorum","localhost");
24         conf.set("hbase.zookeeper.property.clientPort","2181");
25         HConnection conn = HConnectionManager.createConnection(conf);        
26         HTableInterface table = conn.getTable("testdata");
27         
28         Get testget = new Get(Bytes.toBytes("row1"));
29         Result row1 = table.get(testget);
30         String value = new String(row1.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")));
31         System.out.println(value);        
32         //下面限定到具体的列
33         testget.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col2"));
34         Result result = table.get(testget);
35         if(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col2")) != null){
36             String value2 = new String(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col2")));
37             System.out.println(value2);
38         }
39         //另外一种读取方式
40         List<Cell> cells = row1.listCells();
41         for(Cell cell : cells){
42             String rowkey = new String(CellUtil.cloneRow(cell));
43             String family = new String(CellUtil.cloneFamily(cell));
44             String collumn = new String(CellUtil.cloneQualifier(cell));
45             String cvalue = new String(CellUtil.cloneValue(cell));
46             System.out.println("rowkey:" + rowkey + " family:" + family + " column:" + collumn +" value:" + cvalue);
47             
48         }
49 
50         //注意要关闭
51         table.close();
52         conn.close();
53 
54     }
55 
56 }

 参考结果:

3、使用scan获取数据

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.hbase.Cell;
 7 import org.apache.hadoop.hbase.CellUtil;
 8 import org.apache.hadoop.hbase.HBaseConfiguration;
 9 import org.apache.hadoop.hbase.client.HConnection;
10 import org.apache.hadoop.hbase.client.HConnectionManager;
11 import org.apache.hadoop.hbase.client.HTableInterface;
12 import org.apache.hadoop.hbase.client.Result;
13 import org.apache.hadoop.hbase.client.ResultScanner;
14 import org.apache.hadoop.hbase.client.Scan; 
15 
16 public class TestScan {
17 
18     public static void main(String[] args) throws IOException {
19         Configuration conf = HBaseConfiguration.create();
20         conf.set("hbase.zookeeper.quorum","localhost");
21         conf.set("hbase.zookeeper.property.clientPort","2181");
22         HConnection conn = HConnectionManager.createConnection(conf);        
23         HTableInterface table = conn.getTable("testdata");
24         
25         Scan testscan =new Scan();
26         ResultScanner rs = table.getScanner(testscan);
27         for(Result r : rs ){
28             List<Cell> cells = r.listCells();
29             for(Cell cell : cells){
30                 String rowkey = new String(CellUtil.cloneRow(cell));
31                 String family = new String(CellUtil.cloneFamily(cell));
32                 String collumn = new String(CellUtil.cloneQualifier(cell));
33                 String cvalue = new String(CellUtil.cloneValue(cell));
34                 System.out.println("rowkey:" + rowkey + " family:" + family + " column:" + collumn +" value:" + cvalue);
35                 
36             }
37         }
38         rs.close();
39         table.close();
40         conn.close();
41 
42     }
43 
44 }

4、delete/checkAndDelete

 1 package com.testdata;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.hbase.HBaseConfiguration;
 5 import org.apache.hadoop.hbase.client.Delete;
 6 import org.apache.hadoop.hbase.client.HConnection;
 7 import org.apache.hadoop.hbase.client.HConnectionManager;
 8 import org.apache.hadoop.hbase.client.HTableInterface;
 9 import java.io.IOException;
10 
11 import org.apache.hadoop.hbase.util.Bytes; 
12 
13 public class TestDelete {
14 
15     public static void main(String[] args) throws IOException {
16         Configuration conf = HBaseConfiguration.create();
17         conf.set("hbase.zookeeper.quorum","localhost");
18         conf.set("hbase.zookeeper.property.clientPort","2181");
19         HConnection conn = HConnectionManager.createConnection(conf);        
20         HTableInterface table = conn.getTable("testdata");
21         
22         Delete testdelete = new Delete(Bytes.toBytes("row1"));
23         testdelete.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes("col1"));
24         //区别只是checkAndDelete需要进行验证,相当于加了前提条件
25         //table.delete(testdelete);
26         table.checkAndDelete(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("col2"),Bytes.toBytes("BC"), testdelete);
27         table.close();
28         conn.close();
29 
30     }
31 
32 }

5、append

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.hbase.HBaseConfiguration;
 6 import org.apache.hadoop.hbase.client.Append;
 7 import org.apache.hadoop.hbase.client.HConnection;
 8 import org.apache.hadoop.hbase.client.HConnectionManager;
 9 import org.apache.hadoop.hbase.client.HTableInterface;
10 import org.apache.hadoop.hbase.util.Bytes; 
11 
12 public class TestAppend {
13 
14     public static void main(String[] args) throws IOException {
15         Configuration conf = HBaseConfiguration.create();
16         conf.set("hbase.zookeeper.quorum","localhost");
17         conf.set("hbase.zookeeper.property.clientPort","2181");
18         HConnection conn = HConnectionManager.createConnection(conf);        
19         HTableInterface table = conn.getTable("testdata");
20         
21         Append testappend = new Append(Bytes.toBytes("row1")); 
22         testappend.add(Bytes.toBytes("cf"),Bytes.toBytes("col1"),Bytes.toBytes("F"));
23         table.append(testappend);
24         table.close();
25         conn.close();
26     }
27 
28 }

下面是结果,注意append是在原有的值之上附加,先前的值为"E",现在变为"EF"

 6、计数器

计数器可以用于统计用户数,点击量等信息

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.hbase.HBaseConfiguration;
 6 import org.apache.hadoop.hbase.client.HConnection;
 7 import org.apache.hadoop.hbase.client.HConnectionManager;
 8 import org.apache.hadoop.hbase.client.HTableInterface;
 9 import org.apache.hadoop.hbase.util.Bytes;
10 
11 public class TestIncrement {
12 
13     public static void main(String[] args) throws IOException {
14         Configuration conf = HBaseConfiguration.create();
15         conf.set("hbase.zookeeper.quorum","localhost");
16         conf.set("hbase.zookeeper.property.clientPort","2181");
17         HConnection conn = HConnectionManager.createConnection(conf);        
18         HTableInterface table = conn.getTable("testdata");
19         
20         long result = table.incrementColumnValue(Bytes.toBytes("row1"),Bytes.toBytes("cf"),Bytes.toBytes("coli"), 10);
21         
22         System.out.println(result);
23         table.close();
24         conn.close();
25 
26     }
27 
28 }

注意 long result = table.incrementColumnValue(Bytes.toBytes("row1"),Bytes.toBytes("cf"),Bytes.toBytes("coli"), 10);

最后一个参数,可以为0,意味着读取,也可以是负数。

可以使用get_counter可以获取对应的计数器的值,也可以使用以下命令进行操作

 incr '<table>', '<row>', '<column>', |<increment-value>|

7、filter

使用时注意性能

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.hbase.Cell;
 7 import org.apache.hadoop.hbase.CellUtil;
 8 import org.apache.hadoop.hbase.HBaseConfiguration;
 9 import org.apache.hadoop.hbase.client.HConnection;
10 import org.apache.hadoop.hbase.client.HConnectionManager;
11 import org.apache.hadoop.hbase.client.HTableInterface;
12 import org.apache.hadoop.hbase.client.Result;
13 import org.apache.hadoop.hbase.client.ResultScanner;
14 import org.apache.hadoop.hbase.client.Scan;
15 import org.apache.hadoop.hbase.filter.BinaryComparator;
16 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
17 import org.apache.hadoop.hbase.filter.CompareFilter;
18 import org.apache.hadoop.hbase.filter.Filter;
19 import org.apache.hadoop.hbase.filter.QualifierFilter;
20 import org.apache.hadoop.hbase.filter.RowFilter;
21 import org.apache.hadoop.hbase.filter.SubstringComparator;
22 import org.apache.hadoop.hbase.util.Bytes; 
23 
24 public class TestSimplefilter {
25 
26     public static void main(String[] args) throws IOException {
27         Configuration conf = HBaseConfiguration.create();
28         conf.set("hbase.zookeeper.quorum","localhost");
29         conf.set("hbase.zookeeper.property.clientPort","2181");
30         HConnection conn = HConnectionManager.createConnection(conf);        
31         HTableInterface table = conn.getTable("testdata");
32         
33         Scan sc = new Scan();
34         sc.setCacheBlocks(false);
35         //行过滤器,判断"row1"与行的key是否相等
36         //Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("row1")));
37         //是否以"row"为前缀
38         //Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryPrefixComparator(Bytes.toBytes("row")));
39         //是否包含"row"
40         //Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("row"));
41         
42         //列过滤器,与行类似
43         Filter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("col1")));
44         
45         sc.setFilter(filter);
46         
47         ResultScanner rs = table.getScanner(sc);
48         for(Result r : rs ){
49             List<Cell> cells = r.listCells();
50             for(Cell cell : cells){
51                 String rowkey = new String(CellUtil.cloneRow(cell));
52                 String family = new String(CellUtil.cloneFamily(cell));
53                 String collumn = new String(CellUtil.cloneQualifier(cell));
54                 String cvalue = new String(CellUtil.cloneValue(cell));
55                 System.out.println("rowkey:" + rowkey + " family:" + family + " column:" + collumn +" value:" + cvalue);                
56             }
57         }
58         rs.close();
59         table.close();
60         conn.close();            
61     }
62 }

使用filterlist

 1 package com.testdata;
 2 
 3 import java.io.IOException;
 4 import java.util.ArrayList;
 5 import java.util.List;
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.hbase.Cell;
 8 import org.apache.hadoop.hbase.CellUtil;
 9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.HConnection;
11 import org.apache.hadoop.hbase.client.HConnectionManager;
12 import org.apache.hadoop.hbase.client.HTableInterface;
13 import org.apache.hadoop.hbase.client.Result;
14 import org.apache.hadoop.hbase.client.ResultScanner;
15 import org.apache.hadoop.hbase.client.Scan;
16 import org.apache.hadoop.hbase.filter.BinaryComparator;
17 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
18 import org.apache.hadoop.hbase.filter.CompareFilter;
19 import org.apache.hadoop.hbase.filter.Filter;
20 import org.apache.hadoop.hbase.filter.FilterList;
21 import org.apache.hadoop.hbase.filter.QualifierFilter;
22 import org.apache.hadoop.hbase.filter.RegexStringComparator;
23 import org.apache.hadoop.hbase.filter.RowFilter;
24 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
25 import org.apache.hadoop.hbase.filter.SubstringComparator;
26 import org.apache.hadoop.hbase.util.Bytes; 
27 
28 public class TestFilterList {
29 
30     public static void main(String[] args) throws IOException {
31         Configuration conf = HBaseConfiguration.create();
32         conf.set("hbase.zookeeper.quorum","localhost");
33         conf.set("hbase.zookeeper.property.clientPort","2181");
34         HConnection conn = HConnectionManager.createConnection(conf);        
35         HTableInterface table = conn.getTable("testdata");
36         
37         Scan sc = new Scan();
38         Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("row2")));
39         SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("col1"),CompareFilter.CompareOp.EQUAL,new BinaryPrefixComparator(Bytes.toBytes("B")));
40         
41         SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("col1"),CompareFilter.CompareOp.EQUAL,new RegexStringComparator("B|C"));
42         filter2.setFilterIfMissing(true);
43         filter3.setFilterIfMissing(true);
44         
45         //List<Filter> filters = new ArrayList<Filter>();
46         //filters.add(filter1);
47         //filters.add(filter2);        
48         //FilterList filterlist = new FilterList(filters);
49         
50         //也可以这样写,MUST_PASS_ALL标识满足所有的filter,当然也可以使用MUST_PASS_ONE,标识只需要满足一个
51         FilterList filterlist = new FilterList(FilterList.Operator.MUST_PASS_ALL);
52         filterlist.addFilter(filter1);
53         filterlist.addFilter(filter2);
54         filterlist.addFilter(filter3);        
55         sc.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("col1"));
56         sc.setFilter(filterlist);
57         
58         ResultScanner rs = table.getScanner(sc);
59         for(Result r : rs ){
60             List<Cell> cells = r.listCells();
61             for(Cell cell : cells){
62                 String rowkey = new String(CellUtil.cloneRow(cell));
63                 String family = new String(CellUtil.cloneFamily(cell));
64                 String collumn = new String(CellUtil.cloneQualifier(cell));
65                 String cvalue = new String(CellUtil.cloneValue(cell));
66                 System.out.println("rowkey:" + rowkey + " family:" + family + " column:" + collumn +" value:" + cvalue);                
67             }
68         }
69         rs.close();
70         table.close();
71         conn.close();
72     }
73 }

以上一组filter标识了这样的条件,即行的key必须为"row2",列名必须为"col1",值必须为"B"

结果参考:

如果没有 sc.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("col1"));这一句,结果会是下面的样子

rowkey:row2 family:cf column:col1 value:B
rowkey:row2 family:cf column:colb value:U

问题出在 SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("col1"),CompareFilter.CompareOp.EQUAL,new BinaryPrefixComparator(Bytes.toBytes("B")));这一句,如果打印Bytes.toBytes("B")Bytes.toBytes("U"),会发现都是以"B"开头的。即使换成BinaryComparator,也不会解决问题。

这里是值得注意的地方,搜索网络可以发现一样的结论,使用时务必使用sc.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("col1"))类似的语句。

rowkey:row2 family:cf column:col1 value:B

 

推荐阅读