Java訊息佇列任務的平滑關閉
對於訊息佇列任務的監聽,我們一般使用Java寫一個獨立的程式,在Linux伺服器上執行。當訂閱者程式啟動後,會通過訊息佇列客戶端接收訊息,放入執行緒池中併發的處理。
那麼問題來了,當我們修改程式後,需要重新啟動時,如何保證訊息都能夠被處理呢?
一些開源的訊息佇列中介軟體,會提供ACK機制(訊息確認機制),當訂閱者處理完訊息後,會通知服務端刪除對應訊息,如果訂閱者出現異常,服務端未收到確認消費,則會重試傳送。
那如果訊息佇列中介軟體沒有提供ACK機制,或者為了高吞度量的考慮關閉了ACK功能,如何最大可能保證訊息都能夠被處理呢?
正常來說,訂閱者程式關閉後,訊息會在佇列中堆積,等待訂閱者下次訂閱消費,所以未接收的訊息是不會丟失的。可能出現的問題就是在關閉的一瞬間,已經從訊息佇列中取出,但還沒有被處理的訊息。
因此我們需要一套平滑關閉的機制,保證在重啟的時候,已接收的訊息可以得到正常處理。
2.問題分析
平滑關閉的思路如下:
- 在關閉程式時,首先關閉訊息訂閱,保證不再接收新的訊息。
- 關閉執行緒池,等待執行緒池中的訊息處理完畢。
- 程式退出。
關閉訊息訂閱:訊息佇列的客戶端都會提供關閉連線的方法,具體可以自行檢視API。
關閉執行緒池:Java的ThreadPoolExecutor
執行緒池提供shutdown()
和shutdownNow()
兩個方法,區別是前者會等待執行緒池中的訊息都處理完畢,後者會直接停止所有執行緒並返回未處理完的執行緒List。因為我們需要使用shutdown()
方法進行關閉,並通過isTerminated()
那麼問題又來了,我們如何通知到程式,需要執行關閉操作呢?
在Linux中,程序的關閉是通過訊號傳遞的,我們可以用kill -9 pid
關閉程序,除了-9之外,我們可以通過 kill -l
,檢視kill
命令的其它訊號量。
這裡提供兩種關閉方法:
-
程式中新增Runtime.getRuntime().addShutdownHook鉤子方法,SIGTERM,SIGINT,SIGHUP三種訊號都會觸發該方法(分別對應
kill -1
/kill -2
/kill -15
,Ctrl+C也會觸發SIGINT訊號)。 -
程式中通過Signal類註冊訊號監聽,比如USR2(對應
kill -12
補充說明:addShutdownHook方法和handle方法中如果再呼叫System.exit,會造成deadlock,使程序無法正常退出。
虛擬碼分別如下
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
//關閉訂閱者
//關閉執行緒池
//退出
}
});
//註冊linux kill訊號量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
//關閉訂閱者
//關閉執行緒池
//退出
}
});
模擬Demo
下面通過一個demo模擬相關邏輯操作
首先模擬一個生產者,每秒生產5個訊息
然後模擬一個訂閱者,收到訊息後,放入執行緒池進行處理,執行緒池固定4個執行緒,每個執行緒處理時間1秒,這樣執行緒池每秒會積壓1個訊息。
package com.lujianing.demo;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;
/**
* @author [email protected]
* @Description:
* @date 2016/11/14
*/
public class MsgClient {
//模擬消費執行緒池 同時4個執行緒處理
private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
//模擬訊息生產任務
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
//用於判斷是否關閉訂閱
private static volatile boolean isClose = false;
public static void main(String[] args) throws InterruptedException {
//註冊鉤子方法
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
close();
}
});
BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
producer(queue);
consumer(queue);
}
//模擬訊息佇列生產者
private static void producer(final BlockingQueue queue){
//每200毫秒向佇列中放入一個訊息
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
public void run() {
queue.offer("");
}
}, 0L, 200L, TimeUnit.MILLISECONDS);
}
//模擬訊息佇列消費者 生產者每秒生產5個 消費者4個執行緒消費1個1秒 每秒積壓1個
private static void consumer(final BlockingQueue queue) throws InterruptedException {
while (!isClose){
getPoolBacklogSize();
//從佇列中拿到訊息
final String msg = (String)queue.take();
//放入執行緒池處理
if(!THREAD_POOL.isShutdown()) {
THREAD_POOL.execute(new Runnable() {
public void run() {
try {
//System.out.println(msg);
TimeUnit.MILLISECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
//檢視執行緒池堆積訊息個數
private static long getPoolBacklogSize(){
long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
return backlog;
}
private static void close(){
System.out.println("收到kill訊息,執行關閉操作");
//關閉訂閱消費
isClose = true;
//關閉執行緒池,等待執行緒池積壓訊息處理
THREAD_POOL.shutdown();
//判斷執行緒池是否關閉
while (!THREAD_POOL.isTerminated()) {
try {
//每200毫秒 判斷執行緒池積壓數量
getPoolBacklogSize();
TimeUnit.MILLISECONDS.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("訂閱者關閉,執行緒池處理完畢");
}
static {
String osName = System.getProperty("os.name").toLowerCase();
if(osName != null && osName.indexOf("window") == -1) {
//註冊linux kill訊號量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
close();
}
});
}
}
}
當我們在服務上執行時,通過控制檯可以看到相關的輸出資訊,demo中輸出了執行緒池的積壓訊息個數
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
另開啟一個終端,通過ps命令檢視程序號,或者通過nohup啟動Java程序拿到程序id
ps -fe|grep MsgClient
當我們執行kill -12 pid
的時候 可以看到關閉業務邏輯
3.總結
其實不單單訊息佇列任務,在常見的RPC服務中也會見到類似的功能,比如58的SCF,在原始碼中,也會分別註冊了USR2訊號量和addShutdownHook鉤子方法。
在重啟指令碼中,首先會發送kill -12
命令,RPC服務收到訊號後會修改Server狀態為關閉。接著會發送kill -15
命令,觸發鉤子方法,關閉所有的連線。