首页 > 技术文章 > Storm定时和批处理

thinkpad 2016-08-27 18:20 原文

  1 package com.storm.hbaseTest;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Iterator;
  6 import java.util.List;
  7 import java.util.Map;
  8 
  9 import org.apache.commons.lang.StringUtils;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.hbase.HBaseConfiguration;
 12 import org.apache.hadoop.hbase.client.HConnection;
 13 import org.apache.hadoop.hbase.client.HConnectionManager;
 14 import org.apache.hadoop.hbase.client.HTableInterface;
 15 import org.apache.hadoop.hbase.client.Put;
 16 
 17 import backtype.storm.Config;
 18 import backtype.storm.Constants;
 19 import backtype.storm.task.OutputCollector;
 20 import backtype.storm.task.TopologyContext;
 21 import backtype.storm.topology.BasicOutputCollector;
 22 import backtype.storm.topology.IRichBolt;
 23 import backtype.storm.topology.OutputFieldsDeclarer;
 24 import backtype.storm.topology.base.BaseBasicBolt;
 25 import backtype.storm.tuple.Fields;
 26 import backtype.storm.tuple.Tuple;
 27 
 28 import org.slf4j.Logger;
 29 import org.slf4j.LoggerFactory;
 30 
 31 import com.google.common.collect.Lists;
 32 
 33 /** 38  * @version V1.0
 39  */
 40 @SuppressWarnings("all")
 41 public class HbaseBout implements IRichBolt{
 42     
 43       private static final long serialVersionUID = 1L;
 44       private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class);
 45       
 46       protected OutputCollector collector;
 47       protected HbaseClient hbaseClient;
 48       protected String tableName;
 49       protected String configKey ="hbase.conf";
 50       
 51       //批处理大小
 52       protected int batchSize = 15000;
 53       List<Put> batchMutations;
 54       List<Tuple> tupleBatch;
 55       //tick Time
 56       int flushIntervalSecs = 1;
 57       
 62     public void prepare(Map map, TopologyContext context,
 63             OutputCollector collector) {
 64         
 65         this.collector = collector;
 66         Configuration hbConfig = HBaseConfiguration.create();
 67         Map<String,String> conf = (Map)map.get(this.configKey);
 68         
 69         //获取Hbase配置
 70         if (conf == null) {
 71           throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
 72         }
 73         
 74         //批大小
 75         if(map.get("batchSize") != null){
 76             this.batchSize = new Integer(map.get("batchSize").toString());
 77         }
 78         //Tick Time
 79         if(map.get("flushIntervalSecs") != null){
 80             this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString());
 81         }
 82         
 83         //Hbase 配置
 84         for (String key : conf.keySet()) {
 85           hbConfig.set(key, String.valueOf(conf.get(key)));
 86         }
 87         this.hbaseClient = new HbaseClient(hbConfig, this.tableName);
 88     }
 89 
 94     @Override
 95     public void execute(Tuple tuple) {
 96         
 97          boolean flush = false;
 98          
 99          try {
100              //Tick
101              if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
102                 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize)); 
103                 flush = true;
104             }else{
105                 
106                 Put put = new Put(tuple.getStringByField("rowKey").getBytes());
107                 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes());
108                 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes());
109                 this.batchMutations.add(put);
110                 this.tupleBatch.add(tuple);
111                 
112                 //当前tuple批大小
113                 if (this.tupleBatch.size() >= this.batchSize) {
114                   flush = true;
115                 }
116             }
117              //持久化操作
118              if ((flush) && (!this.tupleBatch.isEmpty())) {
119                   this.hbaseClient.batchMutate(this.batchMutations);
120                   LOG.debug("acknowledging tuples after batchMutate");
121                   for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) { 
122                       Tuple t = (Tuple)tuples.next();
123                         this.collector.ack(t);
124                   }
125                   this.tupleBatch.clear();
126                   this.batchMutations.clear();
127                }
128         } catch (Exception e) {
129               LOG.debug("inser batch fail");
130               this.collector.reportError(e);
131               for (Tuple t : this.tupleBatch) {
132                   this.collector.fail(t);
133                 }
134               this.tupleBatch.clear();
135               this.batchMutations.clear();
136         }
137     }
138 
143     public void declareOutputFields(OutputFieldsDeclarer declarer) {
144         
145     }
146     
151     @Override
152     public Map<String, Object> getComponentConfiguration() {
153         Config conf = new Config();
154         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs);
155         return conf;
156     }
157 
158     
163     @Override
164     public void cleanup() {
165         
166     }
167 }
168 
177 class HbaseClient{
178      
179      private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class);
180       HConnection hTablePool = null;
181       HTableInterface table = null;
182       
189       public HbaseClient(final Configuration configuration, final String tableName)
190       {
191         try
192         {
193             hTablePool =  HConnectionManager.createConnection(configuration) ;
194             this.table =  hTablePool.getTable(tableName);
195         }
196         catch (Exception e) {
197           throw new RuntimeException("HBase create failed: " + e.getMessage(), e);
198         }
199       }
200 
209       public void batchMutate(List<Put> puts) throws Exception {
210           
211           try {
212             this.table.put(puts);
213         } catch (Exception e) {
214               LOG.warn("Error insert batch to HBase.", e);
215               throw e;
216         }
217       }
218 }

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         

推荐阅读