使用Java佇列來處理日誌資訊(執行緒池的使用)
阿新 • • 發佈:2018-11-12
阿里的規範是使用new ThreadPoolExecutor()來建立執行緒池,二不是使用Excutor的靜態工具類來建立執行緒池,具體可以檢視部落格(兩篇):
https://blog.csdn.net/angus_Lucky/article/details/79862491
https://blog.csdn.net/qq_31615049/article/details/80756781
,以及部落格中有詳解介紹
關於執行緒池ThreadPoolExecutor,其總共有四個構造器,其中最完整的一個構造方法為如下:
public ThreadPoolExecutor(int corePoolSize, //核心執行緒池數量 int maximumPoolSize, //最大執行緒池大小 long keepAliveTime, //執行緒池中超過 corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間 TimeUnit unit, //keepAliveTime時間單位 BlockingQueue<Runnable> workQueue, //阻塞任務佇列 ThreadFactory threadFactory, //新建執行緒工廠 RejectedExecutionHandler handler) {...} //當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
關於其中的引數介紹:
一:ThreadPoolExecutor的重要引數 corePoolSize:核心執行緒數 核心執行緒會一直存活,及時沒有任務需要執行 當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理 設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉 queueCapacity:任務佇列容量(阻塞佇列) 當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行 maxPoolSize:最大執行緒數 當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務 當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常 keepAliveTime:執行緒空閒時間 當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize 如果allowCoreThreadTimeout=true,則會直到執行緒數量=0 allowCoreThreadTimeout:允許核心執行緒超時 rejectedExecutionHandler:任務拒絕處理器 兩種情況會拒絕處理任務: 當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務 當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務 執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常 ThreadPoolExecutor類有幾個內部實現類來處理這類情況: AbortPolicy 丟棄任務,拋執行時異常 CallerRunsPolicy 執行任務 DiscardPolicy 忽視,什麼都不會發生 DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務 實現RejectedExecutionHandler介面,可自定義處理器 二、ThreadPoolExecutor執行順序: 執行緒池按以下行為執行任務 當執行緒數小於核心執行緒數時,建立執行緒。 當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。 當執行緒數大於等於核心執行緒數,且任務佇列已滿 若執行緒數小於最大執行緒數,建立執行緒 若執行緒數等於最大執行緒數,丟擲異常,拒絕任務 三、如何設定引數 預設值 corePoolSize=1 queueCapacity=Integer.MAX_VALUE maxPoolSize=Integer.MAX_VALUE keepAliveTime=60s allowCoreThreadTimeout=false rejectedExecutionHandler=AbortPolicy() 如何來設定 需要根據幾個值來決定 tasks :每秒的任務數,假設為500~1000 taskcost:每個任務花費時間,假設為0.1s responsetime:系統允許容忍的最大響應時間,假設為1s 做幾個計算 corePoolSize = 每秒需要多少個執行緒處理? threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個執行緒。corePoolSize設定應該大於50 根據8020原則,如果80%的每秒任務數小於800,那麼corePoolSize設定為80即可 queueCapacity = (coreSizePool/taskcost)*responsetime 計算可得 queueCapacity = 80/0.1*1 = 80。意思是佇列裡的執行緒可以等待1s,超過了的需要新開執行緒來執行 切記不能設定為Integer.MAX_VALUE,這樣佇列會很大,執行緒數只會保持在corePoolSize大小,當任務陡增時,不能新開執行緒來執行,響應時間會隨之陡增。 maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) 計算可得 maxPoolSize = (1000-80)/10 = 92 (最大任務數-佇列容量)/每個執行緒每秒處理能力 = 最大執行緒數 rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理 keepAliveTime和allowCoreThreadTimeout採用預設通常能滿足 以上都是理想值,實際情況下要根據機器效能來決定。如果在未達到最大執行緒數的情況機器cpu load已經滿了,則需要通過升級硬體(呵呵)和優化程式碼,降低taskcost來處理。
新建操作日誌的佇列: LogQueue.java
package com.tencent.queuedemo.queue; import com.tencent.queuedemo.moudel.LogEntity; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 操作日誌的佇列 */ public class LogQueue { //佇列大小 public static final int QUEUE_MAX_SIZE = 100; /** * 訊息入隊 * @param logEntity * @return */ public static void push(LogEntity logEntity) throws Exception { //佇列已滿時,會阻塞佇列,直到未滿 blockingQueue.put(logEntity); } /** * 訊息出隊 * @return */ public static LogEntity poll() { LogEntity result = null; try { //佇列為空時會阻塞佇列,直到不是空 result = blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } /** * 獲取佇列大小 * @return */ public static int size() { return blockingQueue.size(); } /** * 設定核心池大小,也就是能允許同時執行的執行緒數,corePoolSize 表示允許執行緒池中允許同時執行的最大執行緒數。 */ static int corePoolSize = 100; /** 表示執行緒沒有任務時最多保持多久然後停止。預設情況下,只有執行緒池中執行緒數大於corePoolSize 時,keepAliveTime 才會起作用。 換句話說,當執行緒池中的執行緒數大於corePoolSize,並且一個執行緒空閒時間達到了keepAliveTime,那麼就是shutdown。 * */ static long keepActiveTime = 200; /** * 執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。 * 值得注意的是,如果使用了無界的任務佇列這個引數就沒用了。 */ static int maximumPoolSize = 300; static TimeUnit timeUnit = TimeUnit.SECONDS; /**建立ThreadPoolExecutor執行緒池物件,並初始化該物件的各種引數 * */ public static ThreadPoolExecutor executor = null; /** 初始化阻塞佇列 * */ public static BlockingQueue<LogEntity> blockingQueue = null; static{ /** * 這是日誌佇列,用來實際操作的 */ blockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); /** *queue:workQueue必須是BlockingQueue阻塞佇列。當執行緒池中的執行緒數超過它的corePoolSize的時候,執行緒會進入阻塞佇列進行阻塞等待。通過workQueue,執行緒池實現了阻塞功能 */ /** * 這個只是執行緒池的阻塞佇列 */ executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepActiveTime,timeUnit,new LinkedBlockingQueue<Runnable>(100)); } /** * 初始化執行緒池 * @return */ /*public static ThreadPoolExecutor createThreadPool(){ if(executor == null){ executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepActiveTime,timeUnit,blockingQueue); } return executor; }*/ }
建立controller,就是生產者(生產日誌,每次訪問就記錄一次日誌)
package com.tencent.queuedemo.controller;
import com.tencent.queuedemo.moudel.LogEntity;
import com.tencent.queuedemo.queue.LogQueue;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
@Controller
public class LogController {
@GetMapping("/visit/{url}")
public void visitLog(@PathVariable("url") String url){
LogEntity logEntity = new LogEntity();
logEntity.setVisitTime(new Date());
logEntity.setVisitUrl(url);
System.out.println("訪問的資訊:"+logEntity.toString());
//將任務加入到佇列中去
BlockingQueue<LogEntity> blockingQueue = LogQueue.blockingQueue;
try {
blockingQueue.put(logEntity);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
日誌的實體類為
package com.tencent.queuedemo.moudel;
import java.util.Date;
/**
* 日誌實體
*/
public class LogEntity {
/**
* 訪問的時間
*/
private Date visitTime;
/**
* 訪問的url
*/
private String visitUrl;
public Date getVisitTime() {
return visitTime;
}
public void setVisitTime(Date visitTime) {
this.visitTime = visitTime;
}
public String getVisitUrl() {
return visitUrl;
}
public void setVisitUrl(String visitUrl) {
this.visitUrl = visitUrl;
}
@Override
public String toString() {
return "LogEntity{" +
"visitTime=" + visitTime +
", visitUrl='" + visitUrl + '\'' +
'}';
}
}
建立操作日誌的執行緒,到時候需要將這個執行緒放到執行緒池中去
package com.tencent.queuedemo.thread;
import com.tencent.queuedemo.moudel.LogEntity;
import org.springframework.stereotype.Component;
/**
* 日誌執行緒類,將日誌儲存到佇列中
*/
@Component
public class LogThread implements Runnable {
private LogEntity logEntity;
public LogThread() {
}
public LogThread(LogEntity logEntity) {
this.logEntity = logEntity;
}
@Override
public void run() {
try {
System.out.println("這是在將日誌儲存到佇列中去:"+logEntity.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
建立一個定時器來講佇列中的日誌,儲存到資料庫(也就是消費者,消費日誌)
package com.tencent.queuedemo.timer;
import com.tencent.queuedemo.moudel.LogEntity;
import com.tencent.queuedemo.queue.LogQueue;
import com.tencent.queuedemo.thread.LogThread;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
@Component
@EnableScheduling
public class LogTimer {
@Scheduled(cron = "0/59 * * * * ? ") //每5秒執行一次
public void saveLog(){
/**
* 定點迴圈執行佇列中的任務
*/
while(true){
//獲取到阻塞佇列
//獲取到執行緒池
ThreadPoolExecutor threadPool = LogQueue.executor;
LogEntity entity = LogQueue.poll();
//執行佇列中的任務
if(null != entity){
System.out.println("hah,佇列的大小:"+LogQueue.size());
threadPool.submit(new LogThread(entity));
}
}
}
}