首页 > 技术文章 > Spring与多线程

juepei 2015-06-17 17:32 原文

前言背景

在做新项目,作为中间件的项目,主要做数据服务。这次想把项目做的简洁一些,之前用的什么ActiveMq等中间件产品,这次全部不用,能自己实现就自己实现。自己用BlockingQueue阻塞队列,按照自己的数据量,1G内存也能存上两千多万数据。设计上,需要一个线程去阻塞队列中拿数据,必须是系统启动的时候就去取。没有则阻塞,直到有数据来。
首先一个问题是,在spring项目中,自定义的New对象和线程,是不受spring管理的。所以在以前的处理中,经常是写一个单例,获取ApplicationContext,这种方式是可行的。但是在用注解的时候,使用这种方式,看上去会比较丑陋。所以,我们还是希望,在使用线程的时候,可以 像以前那样,可以自动的注入我们需要的bean。

开始动手

还是用实际的项目来举个栗子。
作为数据服务,入口是一个webservices的接口,这里因为历史原因,还是采用soap方式。
入口如下:

      @WebMethod
        public FeedResult send(NocPacket nocPacket) {
                logger.info("Receive data");
                QueueManager.getInstance().put(nocPacket,Const.nocQueue);
                return "successful";
        }

这里采用生产者和消费者的模式,其中的Const.nocQueue,是我们的BlockingQueue,作为数据缓冲区。
既然如此,还应该有一个消费者。
之前说过,我们要在系统启动的时候,就去queue中取我们的数据,有则拿出来,作为业务处理,没有则阻塞,等待数据到来。同时,拿到数据后,将这些数据入库。
这里需要两个类,一个是随系统一起启动的,暂且叫守护程序吧。另一个是做主业务的。
守护程序代码:

/**
 * Created by Wl on 15-6-17.
 * 业务消费者
 */

@Component
public class Business  {
        private Logger logger = Logger.getLogger(Business.class);

        @Autowired
        private FlowDataService<FlowData> flowDataService;

        public void doBusi() {
                logger.info("业务线程开始");
                ExecutorService executor = Executors.newCachedThreadPool();
                BusiTask task = new BusiTask(Const.nocQueue,flowDataService);
                executor.submit(task);
                FlowData data = flowDataService.queryById(1);
                logger.info("Get Data :"+data.getPlateNo());
        }
}

处理业务的类:

/**
 * Created by Wl on 15-6-17.
 * 业务处理主方法
 */
public class BusiTask implements Callable<BlockingQueue<NocPacket>> {
        private Logger logger = Logger.getLogger(BusiTask.class);
        private BlockingQueue<NocPacket> queue;
        private FlowDataService<FlowData> flowDataService;

        public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) {
                this.queue = queue;
                this.flowDataService = flowDataService;
        }

        public BlockingQueue<NocPacket> call() throws Exception {
                NocPacket nocPacket = queue.take();
                logger.info("Data:"+nocPacket.getGWName());
                Payload sc = NocUtil.getPayload(nocPacket);
                FlowData data = new FlowData();
                flowDataService.add(data);
                logger.info("成功插入数据");
                return null;
        }
}

此处,守护程序是随系统一起启动的,这一点可以用spring配置。这个守护程序不能有特殊性,它不能是我们自建的一个线程,也不能new一个方法,再去调用doBusi,因为这样就不受spring管理了。它的第一个目的是拿到注入的service。
这两个类,采用Future和Callable的方式进行异步线程处理的。如果直接去处理BusiTask中的任务,就会阻塞到queue的take上。同时,通过守护程序,BusiTask也可以拿到services,进行入库等操作。

结尾

spring是可以进行多线程开发的,目前没使用过,以后有时间好好研究下。

推荐阅读