首页 > 技术文章 > 记录Storm写HBase性能的一个问题

zhengchunhao 2016-07-08 15:31 原文

一、集群storm版本:

   storm version命令打出来的:

Storm 0.10.0.2.3.0.0-2557
URL git@github.com:hortonworks/storm.git -r 38fad7c05bd00ac4ca61b68abf7411d9abc6189c
Branch (no branch)
Compiled by jenkins on 2015-07-14T14:45Z
From source with checksum 43c4b3baaad6a0bca88145356d46327

 

本地storm版本:apache-storm-0.10.1 注意版本和集群并不一致

     storm-hbase jar包版本:

    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hbase</artifactId>
    <version>0.10.0</version>
    </dependency>

HBase version:

[root@node3 tmp]# hbase version
2016-07-08 14:02:14,137 INFO  [main] util.VersionInfo: HBase 1.1.1.2.3.0.0-2557
2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Source code repository git://ip-10-0-0-89.ec2.internal/grid/0/jenkins/workspace/HDP-dal-centos6/bigtop/build/hbase/rpm/BUILD/hbase-1.1.1.2.3.0.0 revision=6a55f21850cfccf19fa651b9e2c74c7f99bbd4f9
2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: Compiled by jenkins on Tue Jul 14 09:41:13 EDT 2015
2016-07-08 14:02:14,140 INFO  [main] util.VersionInfo: From source with checksum 8f076e3255b10e166a73c2436c2b1706

 

二、本地模式下测试往HBase里写数据,拓扑定义代码如下,用了自带的HBaseBolt类

public class PersistTopology {

    private static final String KAFKA_SPOUT = "KAFKA_SPOUT";
    private static final String HBASE_BOLT = "HBASE_BOLT";
    
    public static void main(String[] args) throws Exception {
        
        /* define spout */
        KafkaSpout kafkaSpout = new KafkaSpout();
 
        System.setProperty("hadoop.home.dir", "E:\\eclipse\\");
        
        /* define HBASE Bolt */
        HBaseMapper mapper = new MyHBaseMapper();
        HBaseBolt hbaseBolt = new HBaseBolt("testhbasebolt", mapper).withConfigKey("hbase.conf");
        
        /* define topology*/
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT, kafkaSpout);
        builder.setBolt(HBASE_BOLT, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT);

        Config conf = new Config();
        conf.setDebug(true);

        Map<String, Object> hbConf = new HashMap<String, Object>();  
        conf.put("hbase.conf", hbConf);if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(60000000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

测试结果:520多万条数据,写了几个多小时,平均一秒钟才几百条

问题原因:看了HBaseBolt类的源码发现,此版本实现不是批量发的,如下,每收到一个tuple会调用execute函数,然后就直接batchMutate发出去了

@Override
    public void execute(Tuple tuple) {
        byte[] rowKey = this.mapper.rowKey(tuple);
        ColumnList cols = this.mapper.columns(tuple);
        List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);

        try {
            this.hBaseClient.batchMutate(mutations);
        } catch(Exception e){
            this.collector.reportError(e);
            this.collector.fail(tuple);
            return;
        }

        this.collector.ack(tuple);
    }

二、下了一个apache-storm-1.0.1的原码发现execute函数的实现已经变成真正的批量发送,如下:

    @Override
    public void execute(Tuple tuple) {
        boolean flush = false;
        try {
            if (TupleUtils.isTick(tuple)) {
                LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize);
                collector.ack(tuple);
                flush = true;
            } else {
                byte[] rowKey = this.mapper.rowKey(tuple);
                ColumnList cols = this.mapper.columns(tuple);
                List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
                batchMutations.addAll(mutations);
                tupleBatch.add(tuple);
                if (tupleBatch.size() >= batchSize) {
                    flush = true;
                }
            }

            if (flush && !tupleBatch.isEmpty()) {
                this.hBaseClient.batchMutate(batchMutations);
                LOG.debug("acknowledging tuples after batchMutate");
                for(Tuple t : tupleBatch) {
                    collector.ack(t);
                }
                tupleBatch.clear();
                batchMutations.clear();
            }
        } catch(Exception e){
            this.collector.reportError(e);
            for (Tuple t : tupleBatch) {
                collector.fail(t);
            }
            tupleBatch.clear();
            batchMutations.clear();
        }
    }
batchSize可以设置,一旦当前数据量超过这个值就会被批量写入到HBase,同时,if (TupleUtils.isTick(tuple))这个目测是一种机制,隔一段时间bolt就会收到这样一个tick tuple,类似于一种
定时的机制,这样可保证到达这个时间后即使数据量不到batchsize这么多也能被及时写入,该值可以设置,通过代码或storm.yaml配置文件都可以,代码设置如下:
        conf.put("hbase.conf", hbConf);
        
        //conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
//        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
//        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
//        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);
          conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,    1);

配置文件设置如下,代码设置应该优先级更高(还没试过):

[root@node1 conf]# more storm.yaml |grep tuple
topology.tick.tuple.freq.secs : 1

没有升级storm版本,直接在当前的版本里把新版本中优化的代码抄了过来,上集群测试。
测试结果:数据量和之前一样,但是有非常大的提升,之前需要5个多小时写入的数据,差不多二十分钟就写完了

 

推荐阅读