首页 > 技术文章 > Storm系列(十九)普通事务ITransactionalSpout及示例

jianyuan 2015-10-26 17:03 原文

普通事务API详解

public interface ITransactionalSpout<T> extends IComponent {
    public interface Coordinator<X> {
        // 事务初始化
        X initializeTransaction(BigInteger txid, X prevMetadata);
        // 启动事务,返回true表示开始
        boolean isReady();       
        // 结束时调用主要用于释放资源
        void close();
    }
10      
11      public interface Emitter<X> {
12          // 发射batch中的tuple到下一级Bolt
13          void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
14          // 根据事务ID进行状态数据的清理
15          void cleanupBefore(BigInteger txid);
16          // 结束时调用主要用于释放资源   
17          void close();
18      }
19          
20      Coordinator<T> getCoordinator(Map conf, TopologyContext context);
21      
22      Emitter<T> getEmitter(Map conf, TopologyContext context);
23  }

 

示例

入口类

public class TestMain {
 
    public static void main(String[] args) {
 
        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
                "ttbId", "spoutid", new TestTransaction(), 1);
        builder.setBolt("bolt1", new TestTransBolt1(), 3).shuffleGrouping(
                "spoutid");
        builder.setBolt("committer", new TestTransBolt2(), 1).shuffleGrouping(
10                  "bolt1");
11   
12          Config conf = new Config();
13          conf.setDebug(false);
14   
15          if (args.length > 0) {
16              try {
17                  StormSubmitter.submitTopology(args[0], conf,
18                          builder.buildTopology());
19              } catch (AlreadyAliveException e) {
20                  e.printStackTrace();
21              } catch (InvalidTopologyException e) {
22                  e.printStackTrace();
23              }
24          } else {
25              LocalCluster localCluster = new LocalCluster();
26              localCluster.submitTopology("mytopology", conf,
27                      builder.buildTopology());
28          }
29      }
30   
31  }

 

普通事务spout

public class TestTransaction implements ITransactionalSpout<TestMetaDate> {
 
    private static final long serialVersionUID = 1L;
    private Map<Long, String> DATA_BASE = null;
 
    public TestTransaction(){
        DATA_BASE = new HashMap<Long, String>();
       
        for (long i=0;i<50;i++){
10              DATA_BASE.put(i, "TestTransaction:"+i);
11          }
12         
13          System.out.println("TestTransaction start");
14      }
15     
16      @Override
17      public void declareOutputFields(OutputFieldsDeclarer declarer) {
18          declarer.declare(new Fields("tx","count"));
19      }
20   
21      @Override
22      public Map<String, Object> getComponentConfiguration() {
23          return null;
24      }
25   
26      @Override
27      public backtype.storm.transactional.ITransactionalSpout.Coordinator<TestMetaDate> getCoordinator(
28              Map conf, TopologyContext context) {
29          System.out.println("TestTransaction getCoordinator start");
30          return new TestCoordinator();
31      }
32   
33      @Override
34      public backtype.storm.transactional.ITransactionalSpout.Emitter<TestMetaDate> getEmitter(
35              Map conf, TopologyContext context) {
36          System.out.println("TestTransaction getEmitter start");
37          return new TestEmitter(DATA_BASE);
38      }
39  }

 

元数据实现类(存储到zookeeper中)

public class TestMetaDate implements Serializable {
 
    private static final long serialVersionUID = 1L;
   
    private long _index;
    private long _size;
   
    public long get_index() {
        return _index;
10      }
11      public void set_index(long _index) {
12          this._index = _index;
13      }
14     
15      public long get_size() {
16          return _size;
17      }
18      public void set_size(long _size) {
19          this._size = _size;
20      }
21     
22      @Override
23      public String toString() {
24          return "[_index=" + _index + ", _size=" + _size + "]";
25      }
26  }

 

元数据协调处理类

public class TestCoordinator implements ITransactionalSpout.Coordinator<TestMetaDate>{
 
    public TestCoordinator(){
        System.out.println("TestCoordinator start");
    }
   
    @Override
    public TestMetaDate initializeTransaction(BigInteger txid,
            TestMetaDate prevMetadata) {
10          long index = 0L;
11          if (null == prevMetadata){
12              index = 0L;
13          }
14          else {
15              index = prevMetadata.get_index()+prevMetadata.get_size();
16          }
17          TestMetaDate metaDate = new TestMetaDate();
18          metaDate.set_index(index);
19          metaDate.set_size(10);
20          System.out.println("开始事务:"+metaDate.toString());
21          return metaDate;
22      }
23   
24      @Override
25      public boolean isReady() {
26          Utils.sleep(1000);
27          return true;
28      }
29   
30      @Override
31      public void close() {
32      }
33     
34  }

 

Batch中的tuple发送处理类

public class TestEmitter implements ITransactionalSpout.Emitter<TestMetaDate> {
 
    private Map<Long, String> _dbMap = null;
 
    public TestEmitter(Map<Long, String> dbMap) {
        System.err.println("start TestEmitter");
        this._dbMap = dbMap;
    }
 
10      @Override
11      public void emitBatch(TransactionAttempt tx, TestMetaDate coordinatorMeta,
12              BatchOutputCollector collector) {
13          long index = coordinatorMeta.get_index();
14          long size = index + coordinatorMeta.get_size();
15          System.err.println("TestEmitter emitBatch size:" + size
16                  + ",_dbMap size:" + _dbMap.size());
17          if (size > _dbMap.size()) {
18              return;
19          }
20          for (; index < size; index++) {
21              if (null == _dbMap.get(index)) {
22                  System.out.println("TestEmitter continue");
23                  continue;
24              }
25              System.err.println("TestEmitter emitBatch index:"+index);
26              collector.emit(new Values(tx, _dbMap.get(index)));
27          }
28      }
29   
30      @Override
31      public void cleanupBefore(BigInteger txid) {
32      }
33   
34      @Override
35      public void close() {
36      }
37   
38  }

 

数据单元统计实现类

public class TestTransBolt1 extends BaseTransactionalBolt {
 
    private static final long serialVersionUID = 1L;
    private BatchOutputCollector _outputCollector;
    private TransactionAttempt _tx;
    private int count = 0;
    private TopologyContext _context;
 
    public TestTransBolt1() {
10          System.out.println("start TestTransBolt1 ");
11      }
12   
13      @Override
14      public void prepare(@SuppressWarnings("rawtypes") Map conf,
15              TopologyContext context, BatchOutputCollector collector,
16              TransactionAttempt id) {
17          this._context = context;
18          this._outputCollector = collector;
19          System.out.println("1 prepare TestTransBolt1 TransactionId:"
20                  + id.getTransactionId() + ",AttemptId:" + id.getAttemptId());
21   
22      }
23   
24      @Override
25      public void execute(Tuple tuple) {
26          _tx = (TransactionAttempt) tuple.getValueByField("tx");
27          String content = tuple.getStringByField("count");
28          System.out.println("1 TaskId:"+_context.getThisTaskId()+",TestTransBolt1 TransactionAttempt "
29                  + _tx.getTransactionId() + "  attemptid" + _tx.getAttemptId());
30          if (null != content && !content.isEmpty()) {
31              count++;
32          }
33      }
34   
35      @Override
36      public void finishBatch() {
37          System.out.println("1 TaskId:"+_context.getThisTaskId()+",finishBatch count:"+count);
38          _outputCollector.emit(new Values(_tx, count));
39      }
40   
41      @Override
42      public void declareOutputFields(OutputFieldsDeclarer declarer) {
43          declarer.declare(new Fields("tx", "count"));
44      }
45   
46  }

 

数据汇总统计实现类

public class TestTransBolt2 extends BaseTransactionalBolt implements ICommitter {
 
    private static final long serialVersionUID = 1L;
    private int sum = 0;
    private TransactionAttempt _tx;
    private static int _result = 0;
    private static BigInteger _curtxid=null;
 
    public TestTransBolt2() {
10          System.out.println("TestTransBolt2 start!");
11      }
12   
13      @Override
14      public void prepare(@SuppressWarnings("rawtypes") Map conf,
15              TopologyContext context, BatchOutputCollector collector,
16              TransactionAttempt id) {
17   
18          this._tx = id;
19          System.out.println("TestTransBolt2 prepare TransactionId:" + id);
20      }
21   
22      @Override
23      public void execute(Tuple tuple) {
24          _tx = (TransactionAttempt) tuple.getValueByField("tx");
25          sum += tuple.getIntegerByField("count");
26   
27          System.out.println("TestTransBolt2 execute TransactionAttempt:" + _tx);
28      }
29   
30      @Override
31      public void finishBatch() {
32          System.out.println("finishBatch _curtxid:" + _curtxid
33                  + ",getTransactionId:" + _tx.getTransactionId());
34          if (null == _curtxid || !_curtxid.equals(_tx.getTransactionId())) {
35   
36              System.out.println("****** 1 _curtxid:" + _curtxid
37                      + ",_tx.getTransactionId():" + _tx.getTransactionId());
38   
39              if (null == _curtxid) {
40                  _result = sum;
41              } else {
42                  _result += sum;
43              }
44              _curtxid = _tx.getTransactionId();
45              System.out.println("****** 2 _curtxid:" + _curtxid
46                      + ",_tx.getTransactionId():" + _tx.getTransactionId());
47          }
48   
49          System.out.println("total==========================:" + _result);
50      }
51   
52      @Override
53      public void declareOutputFields(OutputFieldsDeclarer declarer) {
54   
55      }
56  }

 

结果:

TestTransBolt2 prepare TransactionId:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
TestTransBolt2 execute TransactionAttempt:5:3709460125605136723
finishBatch _curtxid:4,getTransactionId:5
****** 1 _curtxid:4,_tx.getTransactionId():5
****** 2 _curtxid:5,_tx.getTransactionId():5
total==========================:50
开始事务:[_index=50, _size=10]
TestEmitter emitBatch size:60,_dbMap size:50

推荐阅读