1. 程式人生 > >java執行緒池在web專案中應用

java執行緒池在web專案中應用

執行緒池

JANUARY 8,2016

遇到了問題

依稀還記得是15年初的時候,一些業務資料發生了錯誤,經過仔細分析,認為是重複發起了請求引起的,經過多方確認任務重複請求不是認為操作,分析程式發現程式也不會無緣無故發起二次請求。因為這個情況只發生過一次兩次,再加上仔細檢查程式碼任務不肯能發生的事一般會推給操作人誤操作,所以問題就這麼擱置了。 再後來因為操作越來越頻繁上面的情況越來越多,然後才意識到這個問題的嚴重性,以至於到處google baidu 論壇提問,當時找到了遇到相同問題的貼子瀏覽器在一定超時後會自動再次請求,然後又把問題推給瀏覽器(哈哈,反正當時不認為程式有問題)

再後來有機會了解了Nginx 其中

Nginx配置重複提交現象我從前臺提交一個get請求。後臺處理了兩次!頓時引起了我的注意。再繼續瞭解到F5也有tiemout屬性配置,然後再通過後臺數據重複提交的間隔大概都是5分鐘,聯想到timeout屬性配置是300秒才意識到是執行的業務方法時間過長,瀏覽器得不到響應F5自動觸發了二次請求。

解決問題

上面說了,引起重複提交是由於方法執行時間過長導致F5自動觸發二次請求。 解決方案1:使方法執行時間縮短(肯定不可能做到,執行方法時間不可估計) 解決方案2:使長時間執行的方法新建一個執行緒,如果頁面請求直接告訴前端頁面說明方法執行中,剩下的交給後臺執行緒去執行(最後使用的)。

再次遇到問題

由於執行時間長的方法都新啟動執行緒去執行,方法多而導致新開的執行緒多,最後伺服器會由執行緒池滿了而崩潰,伺服器崩潰我也跟著崩潰

瞭解執行緒池

使用執行緒池的好處

1,減少在建立和銷燬執行緒上所花的時間及系統資源開銷 2,減少系統建立大量執行緒而導致消耗完系統記憶體及“過渡切換” 3,池中的執行緒數量是固定的,如果需要執行大量執行緒方法超過執行緒池數量會由排隊策略決定執行緒的執行過程

新建執行緒池

package net.uni.ap.thread;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 處理內容:執行緒池工廠類
 */
public class ThreadPoolExecutorFactory {
	/**
	 * corePoolSize 池中所儲存的執行緒數,包括空閒執行緒。  
	 */
	private static final int corePoolSize = 40;
	/**
	 * maximumPoolSize - 池中允許的最大執行緒數(採用LinkedBlockingQueue時沒有作用)。  
	 */
	private static final int maximumPoolSize = 40;
	/**
	 * keepAliveTime -當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間,執行緒池維護執行緒所允許的空閒時間
	 */
	private static final int keepAliveTime = 60;
	
	/**
	 * 執行前用於保持任務的佇列(緩衝佇列)
	 */
	private static final int capacity = 300;
	
	/**
	 * 執行緒池物件
	 */
	private static ThreadPoolExecutor threadPoolExecutor = null;
	
	//構造方法私有化
	private ThreadPoolExecutorFactory(){}
	
	public static ThreadPoolExecutor getThreadPoolExecutor(){
		if(null == threadPoolExecutor){
			ThreadPoolExecutor t;
			synchronized (ThreadPoolExecutor.class) {
				t = threadPoolExecutor;
				if(null == t){
					synchronized (ThreadPoolExecutor.class) {
						t = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardOldestPolicy());
					}
					threadPoolExecutor = t;
				}
			}
		}
		return threadPoolExecutor;
	}
}

執行過程是: 1)當池子大小小於corePoolSize就新建執行緒,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的執行緒數大於corePoolSize的時候,多餘的執行緒會等待keepAliveTime長的時間,如果無請求可處理就自行銷燬

Executors工具

建立執行緒池另一種方法就是使用:Executors.newFixedThreadPool(int)這個方法,因為它既可以限制數量,而且執行緒用完後不會一直被cache住;那麼就通過它來看看原始碼,回過頭來再看其他構造方法的區別:
<span style="font-size:10px;">public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}</span>
其實你可以自己new一個ThreadPoolExecutor,來達到自己的引數可控的程度,例如,可以將LinkedBlockingQueue換成其它的(如:SynchronousQueue),只是可讀性會降低,這裡只是使用了一種設計模式。我們現在來看看ThreadPoolExecutor的原始碼是怎麼樣的。這裡來看下構造方法中對那些屬性做了賦值

原始碼1

public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     if (corePoolSize < 0 ||
         maximumPoolSize <= 0 ||
         maximumPoolSize < corePoolSize ||
         keepAliveTime < 0)
         throw new IllegalArgumentException();
     if (workQueue == null || threadFactory == null || handler == null)
         throw new NullPointerException();
     this.corePoolSize = corePoolSize;
     this.maximumPoolSize = maximumPoolSize;
     this.workQueue = workQueue;
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     this.threadFactory = threadFactory;
     this.handler = handler;
 }
這裡你可以看到最終賦值的過程,可以先大概知道下引數的意思: corePoolSize:核心執行的poolSize,也就是當超過這個範圍的時候,就需要將新的Thread放入到等待佇列中了; maximumPoolSize:一般你用不到,當大於了這個值就會將Thread由一個丟棄處理機制來處理,但是當你發生:newFixedThreadPool的時候,corePoolSize和maximumPoolSize是一樣的,而corePoolSize是先執行的,所以他會先被放入等待佇列,而不會執行到下面的丟棄處理中,看了後面的程式碼你就知道了。workQueue:等待佇列,當達到corePoolSize的時候,就向該等待佇列放入執行緒資訊(預設為一個LinkedBlockingQueue),執行中的佇列屬性為:workers,為一個HashSet;內部被包裝了一層,後面會看到這部分程式碼。 keepAliveTime:預設都是0,當執行緒沒有任務處理後,保持多長時間,cachedPoolSize是預設60s,不推薦使用。threadFactory:是構造Thread的方法,你可以自己去包裝和傳遞,主要實現newThread方法即可;handler:也就是引數maximumPoolSize達到後丟棄處理的方法,java提供了5種丟棄處理的方法,當然你也可以自己弄,主要是要實現介面:RejectedExecutionHandler中的方法:public void rejectedExecution(Runnabler, ThreadPoolExecutor e)java預設的是使用:AbortPolicy,他的作用是當出現這中情況的時候會丟擲一個異常;其餘的還包含: 1、CallerRunsPolicy:如果發現執行緒池還在執行,就直接執行這個執行緒 2、DiscardOldestPolicy:線上程池的等待佇列中,將頭取出一個拋棄,然後將當前執行緒放進去。 3、DiscardPolicy:什麼也不做 4、AbortPolicy:java預設,丟擲一個異常:RejectedExecutionException。 通常你得到執行緒池後,會呼叫其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會呼叫execute方法來進行操作,只是他提供了一個Future來託管返回值的處理而已,當你呼叫需要有返回值的資訊時,你用它來處理是比較好的;這個Future會包裝對Callable資訊,並定義一個Sync物件(),當你發生讀取返回值的操作的時候,會通過Sync物件進入鎖,直到有返回值的資料通知,具體細節先不要看太多,繼續向下:來看看execute最為核心的方法吧:

原始碼2

//corePoolSize 核心執行緒池數量(線上程數量小於corePoolSize時,新增加執行任務時會優先
//建立新的執行緒執行該任務,在=核心執行緒池數量時,會將任務加入BlockingQueue阻塞佇列,當隊
//列滿時會又建立新的執行緒執行該任務,直到等於maximumPoolSize,當更進一步的時候將會執
//行reject(command))
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
               //佇列滿且執行緒池呼叫了shutdown後,還在呼叫execute方法
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

附錄

package net.uni.ap.thread;

/**
 * 
 * 處理內容:執行緒處理類
 * @version: 1.0
 * @see:net.uni.ap.thread.IThreadPoolExecutorHandler.java
 * @date:2015-5-13
 * @author:梅海波
 */
public interface IThreadPoolExecutorHandler extends Runnable{
	
}
package net.uni.ap.thread;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadHandler<T> extends ThreadHandlerAbstract{
	private static final Logger logger = LoggerFactory.getLogger(ThreadHandler.class);
	protected T t;
	protected Class<T> modelClass;
	protected String method = "";
	
	
	
	@SuppressWarnings("unchecked")
	public ThreadHandler(Integer threadCount,T t){
		this.t = t;
		modelClass = (Class<T>) t.getClass();
		if(null != threadCount){
			super.countDownLatch = new CountDownLatch(threadCount);
		}
	}
	
	
	@Override
	public void run() {
		try{
			Method[] methods = this.modelClass.getMethods();
			
			for (Method method : methods) {
				if(method.getName().equals(this.method)){
					method.invoke(t);
				}
			}
			if(null != super.countDownLatch){
				super.countDownLatch.countDown();
			}
			if(null != ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue() && (ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size() < 20
					|| ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size() == 0 )){
				 ThreadPoolExecutorFactory.getThreadPoolExecutor().setCorePoolSize(20);
			}else{
				ThreadPoolExecutorFactory.getThreadPoolExecutor().setCorePoolSize(40);
			}
		}catch (Exception e) {
			e.printStackTrace();
			logger.error("執行緒池處理異常 方法:" + this.method,e);
		}
		
	}

	@Override
	public void execute(IThreadPoolExecutorHandler threadPoolExecutorHandler,String method)throws Exception{
		this.method = method;
		
		try {
			ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
			threadPoolExecutor.execute(threadPoolExecutorHandler);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("執行緒池處理異常 execute 方法:" + this.method,e);
			throw new Exception(e.getMessage(),e);
		}
	
	}


	@Override
	public void await() throws Exception {
		try {
			if(super.countDownLatch != null){
				countDownLatch.await();
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("執行緒池處理異常 await 方法:" + this.method,e);
			throw new Exception(e.getMessage(),e);
		}
		
	}


	@Override
	public void shutdown() throws Exception {
		try {
			ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
			threadPoolExecutor.shutdown();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("執行緒池處理異常 shutdown 方法:" + this.method,e);
			throw new Exception(e.getMessage(),e);
		}
		
	}
	
	public int getPoolSize(){
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
		return threadPoolExecutor.getPoolSize();
		
	}
	
	/**
	 * 獲取執行緒池佇列狀態數量
	 * @return
	 * @方法說明 
	 * @date 2015-8-26
	 * @author 梅海波
	 */
	public int getQueueSize(){
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
		return threadPoolExecutor.getQueue().size();
	}
	
	
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
		for (int i = 0; i < 10000; i++) {
			final int index = i;
			threadPoolExecutor.execute(
					new Runnable() {
						@Override
						public void run() {
							try {
								System.out.println(index + "佇列數:" + ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size());
								Thread.sleep(200);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					});

		}
	}
	
}
package net.uni.ap.thread;

import java.util.concurrent.CountDownLatch;

public abstract class ThreadHandlerAbstract implements IThreadPoolExecutorHandler {

	/**
	 * 一個任務分多個執行緒處理時使用
	 */
	protected CountDownLatch countDownLatch = null;
	
	public ThreadHandlerAbstract(CountDownLatch countDownLatch){
		this.countDownLatch = countDownLatch;
	}
	
	public ThreadHandlerAbstract(){}
	
	public abstract void execute(IThreadPoolExecutorHandler threadPoolExecutorHandler,String method)throws Exception;
	
	public abstract void await()throws Exception;
	
	public abstract void shutdown()throws Exception;
}
內容暫時寫這麼多,後期有時間再繼續補充 作者:梅海波



相關推薦

java執行web專案應用

執行緒池 JANUARY 8,2016 遇到了問題 依稀還記得是15年初的時候,一些業務資料發生了錯誤,經過仔細分析,認為是重複發起了請求引起的,經過多方確認任務重複請求不是認為操作,分析程式發現程式也不會無緣無故發起二次請求。因為這個情況只發生過一次兩次,再加上仔細檢查程

java執行以程式碼的順序執行,主要是記錄一下繼承執行的內容

1.這個是自定義的執行緒池類,直接上程式碼 package org.jimmy.threadtest20181121; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecut

Java執行submit()和execute之間的區別?

一: submit()方法,可以提供Future < T > 型別的返回值。 executor()方法,無返回值。 execute無返回值 public void execute(Runnable command) { if (command == null)

在spring boot使用java執行ExecutorService

1. 認識java執行緒池 1.1 在什麼情況下使用執行緒池? 1.單個任務處理的時間比較短 2.需處理的任務的數量大 1.2 使用執行緒池的好處: 1.減少在建立和銷燬執行緒上所花的時間以及系統資源的開銷 2.如不使用執行緒池,有可能造成系統建立大量執行緒而導致消耗完系統記

Java執行的核心執行是如何被重複利用的

在Java開發中,經常需要建立執行緒去執行一些任務,實現起來也非常方便,但如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。此時,我們很自然會想到使用執行緒池來解決這個問題。 使用執行緒池的好處

如何等待java執行所有任務完成

一、等待執行緒池所有執行緒完成: 有時候我們需要等待java thread pool中所有任務完成後再做某些操作,如想要等待所有任務完成,僅需呼叫threadPool.awaitTermination

Java執行(2)——執行的幾個重要方法詳解

【內容摘要】 在java中,如果需要進行多執行緒程式設計,可以採用java自帶的執行緒池來實現,執行緒池對於我們新手來說是一個非常好的選擇,因為我們可以不用關心執行緒池中執行緒是如何排程的,避免在多執行緒程式設計過程產生死鎖等問題。在瞭解執行緒池的使用前,本文

java 執行管理多執行操作Hbase資料庫完整專案

Hbase-site.xml配置檔案: <?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configur

java執行和關閉執行執行

如果執行緒經常喜歡去new的話是不對的,你需要一個池子管理。 newCachedThreadPool 這個一個帶快取的執行緒池,是個可以無限大的執行緒池,新建的執行緒放倒這個池子裡,當執行緒停掉了的時候,下個個執行緒進來,可以複用這個執行緒。 newFixe

java執行任務異常處理

首先我們看個例子,當使用執行緒池執行任務時如果某個任務出現異常會是什麼效果 import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecut

java執行以及newCachedThreadPool使用過程的問題

      為什麼要用執行緒池?原因很簡單,效能好,而且不用自己費心費力的管理執行緒       1、執行緒池基本說明及定義       從JDK 1.5開始,添加了Executors工具類,這個類定義了Executor、ExecutorService、ScheduledE

執行 —— 多執行WEB開發應用相關問題

    1. web應用中,要對某一個任務用多執行緒實現,最簡單的程式碼格式是不是必須把要執行的程式碼放在run方法中?     WEB伺服器會幫你把每個訪問請求開闢一個執行緒,你只要按照你所開發的框架,比如tomcat會讓你利用servlet這個框架來寫程式碼。具體真的一

java執行任務執行完成後再次執行

最近做爬蟲用到了執行緒池(我是建立一個固定執行緒數量的執行緒池,然後不斷往裡扔任務) 現在要求網站連結如果爬完後再次啟動任務爬取連結,所以就想到線上程池中任務都執行完成後在重新啟動任務. demo如下: public static void ex(Connection c

執行在實際專案的簡單應用

專案中如何使用多執行緒       多執行緒在專案中主要用來解決併發任務執行。java中執行緒的主要實現方式有三種:繼承Thread類 實現Runnable介面 實現Callable介面。另外還可以通過Executor類來建立多執行緒執行緒池。        執行緒生命週期:

聊聊面試Java 執行

​背景 關於 Java 的執行緒池我想大家肯定不會陌生,在工作中或者自己平時的學習中多多少少都會用到,那你真的有了解過底層的實現原理嗎?還是說只停留在用的階段呢?而且關於 Java 執行緒池也是在面試中的一個高頻的面試題,就像 HashMap 的實現原理一樣,基本上面試必問,估計都已經被問爛大街了。

Java 執行執行複用是如何實現的?

前幾天,技術群裡有個群友問了一個關於執行緒池的問題,內容如圖所示: ![](https://img-blog.csdnimg.cn/20200614214310802.jpg) 關於執行緒池相關知識可以先看下這篇:[為什麼阿里巴巴Java開發手冊中強制要求執行緒池不允許使用Executors建立?

java執行詳細入門教程即原始碼解析

##1、執行緒池概念      執行緒池是執行緒的集合,通過執行緒池我們不需要自己建立執行緒,將任務提交給執行緒池即可。為什麼要使用執行緒池,首先,使用執行緒池可以重複利用已有的執行緒繼續執行任務,避免執行緒在建立和銷燬時造成的消耗。其次,由

Java執行ThreadPoolExecutor詳解

  1、執行緒池的工作原理?   執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。 當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:

java 執行 ExecutorService相關歸納

public class ExecutorServiceDemo {     public static void main(String[] args) {          // 單執行緒池  &

Java執行Executor框架詳解

Java的執行緒既是工作單元,也是執行機制。從JDK 5開始,把工作單元與執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供。 Executor框架簡介在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本