昆明java培训
达内昆明广州春城路

18487146383

热门课程

Java消息队列任务的平滑关闭问题分析

  • 时间:2016-11-16
  • 发布:昆明Java培训
  • 来源:达内新闻

昆明Java培训班的老师给大家讲平滑关闭的问题分析。

平滑关闭的思路如下:

在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中

关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)

程序退出

关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api

关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭.

那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?

在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过kill -l查看kill命令的其它信号量,比如使用12) SIGUSR2信号量

我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。

伪代码如下

//注册linux kill信号量 kill -12

Signal sig = new Signal("USR2");

Signal.handle(sig, ew SignalHandler() {

@Override

public void handle(Signal signal) {

//关闭订阅者

//关闭线程池

//退出

}

});

下面通过一个demo模拟相关逻辑操作

首先模拟一个生产者,每秒生产5个消息

然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。

package com.lujianing.demo;

import sun.misc.Signal;

import sun.misc.SignalHandler;

import java.util.concurrent.*;

/**

* @author lujianing01@58.com

* @Description:

* @date 2016/11/14

*/

public class MsgClient {

//模拟消息队列订阅者同时4个线程处理

private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

//模拟消息队列生产者

private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

//用于判断是否关闭订阅

private static volatile boolean isClose = false;

public static void main(String[] args) throws InterruptedException {

BlockingQueue <String> queue = ew ArrayBlockingQueue<String>(100);

producer(queue);

consumer(queue);

}

//模拟消息队列生产者

private static void producer(final BlockingQueue queue){

//每200毫秒向队列中放入一个消息

SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {

public void run() {

queue.offer("");

}

}, 0L, 200L, TimeUnit.MILLISECONDS);

}

//模拟消息队列消费者生产者每秒生产5个  消费者4个线程消费1个1秒 每秒积压1个

private static void consumer(final BlockingQueue queue) throws InterruptedException {

while (!isClose){

getPoolBacklogSize();

//从队列中拿到消息

final String msg = (String)queue.take();

//放入线程池处理

if(!THREAD_POOL.isShutdown()) {

THREAD_POOL.execute(new Runnable() {

public void run() {

try {

//System.out.println(msg);

TimeUnit.MILLISECONDS.sleep(1000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

}

}

}

//查看线程池堆积消息个数

private static long getPoolBacklogSize(){

long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();

System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));

return backlog;

}

static {

String osName = System.getProperty("os.name").toLowerCase();

if(osName != null && osName.indexOf("window") == -1) {

//注册linux kill信号量 kill -12

Signal sig = new Signal("USR2");

Signal.handle(sig, new SignalHandler() {

@Override

public void handle(Signal signal) {

System.out.println("收到kill消息,执行关闭操作");

//关闭订阅消费

isClose = true;

//关闭线程池,等待线程池积压消息处理

THREAD_POOL.shutdown();

//判断线程池是否关闭

while (!THREAD_POOL.isTerminated()) {

try {

//每200毫秒判断线程池积压数量

getPoolBacklogSize();

TimeUnit.MILLISECONDS.sleep(200L);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

System.out.println("订阅者关闭,线程池处理完毕");

System.exit(0);

}

});

}

}

}

当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

输入图片说明

另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id

ps -fe|grep MsgClient

输入图片说明

当我们执行kill -12 pid的时候可以看到关闭业务逻辑

昆明Java培训班的老师对问题的总结

在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。

在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。

上一篇:Java消息队列任务的平滑关闭问题背景
下一篇:RxJava 2.0.1发布,Rx的Java实现

程序猿必学之Java 9——昆明达内

java集合-Iterator迭代

达内编程培训大神带你修炼技术!

AI初创企谷歌跑在前面了!达内java培训

选择城市和中心
贵州省

广西省

海南省