课程咨询 :186 8716 1620      qq:2066486918

昆明Java培训 > 达内新闻 > Java消息队列任务的平滑关闭问题分析
  • Java消息队列任务的平滑关闭问题分析

    发布:昆明Java培训      来源:达内新闻      时间:2016-11-16

  • 昆明Java培训班的老师给大家讲平滑关闭的问题分析。

    平滑关闭的思路如下:

    在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中

    关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)

    程序退出

    关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api

    关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(), 法判断线程池是否已经关闭.

    那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?

    在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过kill -l查看kill命令的其它信号量,比如使用12) SIGUSR2信号量

    我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。

    伪代码如下

    //注册linux kill信号量 kill -12

    Signal sig = new Signal("USR2");

    Signal.handle(sig, ew SignalHandler() {

    @Override

    public void handle(Signal signal) {

    //关闭订阅者

    //关闭线程池

    //退出

    }

    });

    下面通过一个demo模拟相关逻辑操作

    首先模拟一个生产者,每秒生产5个消息

    然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。

    package com.lujianing.demo;

    import sun.misc.Signal;

    import sun.misc.SignalHandler;

    import java.util.concurrent.*;

    /**

    * @author lujianing01@58.com

    * @Description:

    * @date 2016/11/14

    */

    public class MsgClient {

    //模拟消息队列订阅者同时4个线程处理

    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

    //模拟消息队列生产者

    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

    //用于判断是否关闭订阅

    private static volatile boolean isClose = false;

    public static void main(String[] args) throws InterruptedException {

    BlockingQueue <String> queue = ew ArrayBlockingQueue<String>(100);

    producer(queue);

    consumer(queue);

    }

    //模拟消息队列生产者

    private static void producer(final BlockingQueue queue){

    //每200毫秒向队列中放入一个消息

    SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {

    public void run() {

    queue.offer("");

    }

    }, 0L, 200L, TimeUnit.MILLISECONDS);

    }

    //模拟消息队列消费者生产者每秒生产5个  消费者4个线程消费1个1秒 每秒积压1个

    private static void consumer(final BlockingQueue queue) throws InterruptedException {

    while (!isClose){

    getPoolBacklogSize();

    //从队列中拿到消息

    final String msg = (String)queue.take();

    //放入线程池处理

    if(!THREAD_POOL.isShutdown()) {

    THREAD_POOL.execute(new Runnable() {

    public void run() {

    try {

    //System.out.println(msg);

    TimeUnit.MILLISECONDS.sleep(1000L);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    });

    }

    }

    }

    //查看线程池堆积消息个数

    private static long getPoolBacklogSize(){

    long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();

    System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));

    return backlog;

    }

    static {

    String osName = System.getProperty("os.name").toLowerCase();

    if(osName != null && osName.indexOf("window") == -1) {

    //注册linux kill信号量 kill -12

    Signal sig = new Signal("USR2");

    Signal.handle(sig, new SignalHandler() {

    @Override

    public void handle(Signal signal) {

    System.out.println("收到kill消息,执行关闭操作");

    //关闭订阅消费

    isClose = true;

    //关闭线程池,等待线程池积压消息处理

    THREAD_POOL.shutdown();

    //判断线程池是否关闭

    while (!THREAD_POOL.isTerminated()) {

    try {

    //每200毫秒判断线程池积压数量

    getPoolBacklogSize();

    TimeUnit.MILLISECONDS.sleep(200L);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    System.out.println("订阅者关闭,线程池处理完毕");

    System.exit(0);

    }

    });

    }

    }

    }

    当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数

    java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

    输入图片说明

    另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id

    ps -fe|grep MsgClient

    输入图片说明

    当我们执行kill -12 pid的时候可以看到关闭业务逻辑

    昆明Java培训班的老师对问题的总结

    在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。

    在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。

    推荐文章

上一篇:Java消息队列任务的平滑关闭问题背景

下一篇:RxJava 2.0.1发布,Rx的Java实现

最新开班日期  |  更多

Java--零基础全日制班

Java--零基础全日制班

开班日期:11/30

Java--零基础业余班

Java--零基础业余班

开班日期:11/30

Java--周末提升班

Java--周末提升班

开班日期:11/30

Java--零基础周末班

Java--零基础周末班

开班日期:11/30

  • 网址:http://km .java.tedu.cn      地址:昆明市官渡区春城路62号证券大厦附楼6楼
  • 课程培训电话:186 8716 1620      qq:2066486918    全国服务监督电话:400-827-0010
  • 服务邮箱 ts@tedu.cn
  • 2001-2016 达内国际公司(TARENA INTERNATIONAL,INC.) 版权所有 京ICP证08000853号-56