首页 > 技术文章 > Storm HBase 集成

sunspeedzy 2017-09-04 15:59 原文

原文链接:http://storm.apache.org/releases/1.1.0/storm-hbase.html

Storm/Trident 和 Apache HBase 的集成

用法

和HBase集成的重要API是org.apache.storm.hbase.bolt.mapper.HBaseMapper接口

public interface HBaseMapper extends Serializable {
    byte[] rowKey(Tuple tuple);

    ColumnList columns(Tuple tuple);
}

rowKey()方法是简单明了的:输入一个Storm tuple,返回代表rowkey的字节数组。

columns()方法定义了要写入HBase行的内容。ColumnList类允许你添加 standard HBase columns和HBase counter columns(这两种列是什么???)

要添加一个standard column,使用addColumn()方法:

ColumnList cols = new ColumnList();
cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));

要添加一个counter column,使用addCounter()方法:

ColumnList cols = new ColumnList();
cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));

当远程HBase启用了安全认证,一个Kerberos keytab和相应的principle名需要被提供给storm-hbase连接器。

特别地,传递给Topology的Config对象应该包含{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}。例如:

Config config = new Config();
...
config.put("storm.keytab.file", "$keytab");
config.put("storm.kerberos.principal", "$principle");
StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());

使用授权的token与启用安全认证的 HBase 集成

如果你的Topology要和启用安全认证的HBase交互,你的bolts/states需要被HBase认证。解决办法是需要所有潜在的worker主机拥有"sotrm.keytab.file"。

如果你在一个集群上有多个Topology,每一个都使用不同的hbase用户,你必须创建多个keytab并把它分发到所有worker主机。

以上做法的替代方法是:

管理员可以配置nimbus能自动地代表提交Topology的用户获取授权token。这样,nimbus需要以以下配置启动:

nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"]
hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) 
hbase.kerberos.principal: "superuser@EXAMPLE.com" 
nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)

你的Topology配置应该包括:topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]

如果nimbus没有以上配置,你需要添加以上配置然后重启HBase。

确保hbase配置文件(core-site.xml,hdfs-site.xml 和 hbase-site.xml)和包含storm-hbase jar的依赖文件都在nimbus的classpath里。

Nimbus会使用在配置里指定的keytab和principal进行HBase认证。在每一个Topology提交时,nimbus都要模拟提交的Topology用户,并且代表提交的Topology用户获取授权token。

如果Topology以 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 这样的配置启动,nimbus会为了你的Topology把授权token推给所有的worker,

连接hbase的 bolt/state 会以这些token进行认证。

当nimbus模拟提交的Topology用户的用户时,你需要确保在 storm.kerberos.principal 中指定的user拥有代表其他user获取token的权限。

要达到这个要求,你需要按照列在这个link中的配置方向进行配置 http://hbase.apache.org/book/security.html#security.rest.gateway

你可以阅读关于建立安全认证的HBase的内容 http://hbase.apache.org/book/security.html

SimpleHBaseMapper

storm-hbase有一个含有通用目的HBaseMapper的实现——SimpleHBaseMapper,它可以把Storm tuple映射到标准HBase column和counter column

要使用SimpleHBaseMapper,你需要告诉它哪些field要被映射成什么类型的column。

以下代码创建了一个SimpleHBaseMapper实例:

1. 使用 word tuple值作为一个 row key

2. 为tuple域word 添加一个标准的HBase列

3. 为tuple域count 添加一个HBase counter列

4. 把值写到 cf 列族中

SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
        .withRowKeyField("word")
        .withColumnFields(new Fields("word"))
        .withCounterFields(new Fields("count"))
        .withColumnFamily("cf");

HBaseBolt

要使用HBaseBolt,需要提供 输出的表的名字 和 一个HBaseMapper的实现 进行构造

HBaseBolt hbase = new HBaseBolt("WordCount", mapper);

HBaseBolt会授权 mapper实例 明确如何把tuple数据持久化到HBase中。

HBaseValueMapper

这个类运行你把 HBase的查询结果转化成storm Values实例,这个Values实例将会被HBaseLookupBolt发射出去。

public interface HBaseValueMapper extends Serializable {
    public List<Values> toTuples(Result result) throws Exception;
    void declareOutputFields(OutputFieldsDeclarer declarer);
}

toTuples方法接收一个 HBase Result,输出一个包含Values的List。每一个由这个方法返回的值都会被HBaseLookupBolt发射。

declareOutputFields应该被用来声明HBaseLookupBolt的输出域 outputFields。

在 src/test/java 路径下有example

HBaseProjectionCriteria

这个类允许你指定HBase Get方法的投影条件。

这是lookupBolt可选的参数,如果你不指定这个实例的话,所有的列都会由HBaseLookupBolt返回。

public class HBaseProjectionCriteria implements Serializable {
    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
    public HBaseProjectionCriteria addColumn(ColumnMetaData column);
}

addColumnFamily 接收列族 columnFamily。指定这个参数意味着这个列族的所有列都将被包含在投影里

addColumn 接收一个 ColumnMetaData实例。指定这个参数意味着只有这个列族的这个列会成为投影的一部分。

以下代码会创建一个HBaseProjectionCriteria实例用来指定投影的条件:

1. 包含列族 cf 的count column

2. 包含列族 cf2 的所有列

HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
    .addColumnFamily("cf2");

HBaseLookupBolt

要使用HBaseLookupBolt,通过 要输出到的表名、一个HBaseMapper的实例、一个HBaseRowToStormValueMapper的实例去构建。

你可以指定或不指定HBaseProjectionCriteria。

HBaseLookupBolt会使用HBaseMapper的实例mapper获取用来查询的rowKey,

会使用HBaseProjectionCriteria的实例指定哪些列会被包含在结果中,

会使用HBaseRowToStormValueMapper的实例获取要被bolt发送的value。

你可以参照 在 src/test/java下的example:topology LookupWordCount.java

示例: 持久化 Word Count

在src/test/java下有一个可以运行的例子

Setup

以下步骤假定你正在本地运行HBase,或在classpath中有hbase-site.xml指出你的HBase集群

使用 hbase shell命令创建一个schema:

> create 'WordCount', 'cf'

执行

运行org.apache.storm.hbase.topology.PersistenWordCount类(它会运行这个Topology 10秒,然后退出)

在这个Topology运行过程中或运行之后,运行org.apache.storm.hbase.topology.WordCountClient类去查看存储在HBase的计数值。你会看到以下类似的内容:

Word: 'apple', Count: 6867
Word: 'orange', Count: 6645
Word: 'pineapple', Count: 6954
Word: 'banana', Count: 6787
Word: 'watermelon', Count: 6806

为了引用,列出这个Topology示例

public class PersistentWordCount {
    private static final String WORD_SPOUT = "WORD_SPOUT";
    private static final String COUNT_BOLT = "COUNT_BOLT";
    private static final String HBASE_BOLT = "HBASE_BOLT";


    public static void main(String[] args) throws Exception {
        Config config = new Config();

        WordSpout spout = new WordSpout();
        WordCounter bolt = new WordCounter();

        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                .withRowKeyField("word")
                .withColumnFields(new Fields("word"))
                .withCounterFields(new Fields("count"))
                .withColumnFamily("cf");

        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);


        // wordSpout ==> countBolt ==> HBaseBolt
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(WORD_SPOUT, spout, 1);
        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));


        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
            System.exit(0);
        } else {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

 

推荐阅读