1. 程式人生 > >測試 ThreadPoolExecutor 中遇到的近似死鎖問題

測試 ThreadPoolExecutor 中遇到的近似死鎖問題

今天在做一個簡單的小test時,發現了ThreadPoolExecutor的一個問題,先列出程式碼:主要功能是往一個連結中插入、刪除資料

連結串列的節點:

public class BoundedNode {
	 
    public Object value;
    public BoundedNode next;
    public BoundedNode(Object x){
         value=x;
         next=null;
    }
}

增加、刪除節點的操作:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueue {
  	  ReentrantLock enqlock,deqlock;
	  Condition notempty,notfull;
	  AtomicInteger size;
	  BoundedNode head,tail;//佇列頭哨兵和尾哨兵
	  int capacity;
	
	  public BoundedQueue(int capacity) throws InterruptedException{
	      this.capacity=capacity;
	      head=new BoundedNode(null);
	      tail=head;
	      size=new AtomicInteger(0);
	      enqlock=new ReentrantLock();
	      deqlock=new ReentrantLock();
	      notfull=enqlock.newCondition();
	      notempty=deqlock.newCondition();
	  }
	
	  public void enq(Object x) throws InterruptedException{
	      boolean weakdeq=false;
	      //入隊者首先獲取入隊鎖
	      enqlock.lock();
	
	      try{
	          //判斷佇列是否為滿,通過迴圈判斷,結合上面的加鎖,因此此方法也稱為自旋//加鎖,優勢效率較高,缺點造成CPU消耗較大
	          while(size.get()==capacity){
	           
		          //如果佇列滿,則在“不滿”條件上等待,直到佇列不滿的條件發生,等待時會//暫時釋放入隊鎖
		          notfull.await();
		       }
	
		       //如果“不滿”條件滿足,則構建新的佇列元素,並將新的佇列元素掛接到佇列//尾部
		       BoundedNode e=new BoundedNode(x);
		       tail.next=tail=e;
		       
		       System.out.println("add:"+size);
		
		       //獲取元素入隊前佇列容量,並在獲取後將入隊前佇列容量增加1
		       if(size.getAndIncrement()==0){
		       //如果入隊前佇列容量等於0,則說明有出隊執行緒正在等待出隊條件notempty
		       //發生,因此要將相關標誌置為true
		           weakdeq=true;
		       }
	    }finally{
	       //入隊者釋放入隊鎖
	       enqlock.unlock();
	    }
	
	    //判斷出隊等待標誌
	    if(weakdeq){
	        //入隊執行緒獲取出隊鎖
	        deqlock.lock();
	
	        try{
	        //觸發出隊條件,即佇列“不空”條件,使等待出隊的執行緒能夠繼續執行
	          notempty.signalAll();
	       }finally{
	          //入隊執行緒釋放出隊鎖
	          deqlock.unlock();
	       }
	    }
	}
	
	public Object deq() throws InterruptedException{
	
	    Object result=null;
	    boolean weakenq=false;
	    //出隊者首先獲取出隊鎖
	    deqlock.lock();
	    try{
	
	        //判斷佇列是否為空,通過迴圈判斷,結合上面的加鎖,因此此方法也稱為自旋//加鎖,優勢效率較高,缺點造成CPU消耗較大
	        while(size.get()==0){
	            //如果佇列空,則在“不空”條件上等待,直到佇列不空的條件發生,等待時會//暫時釋放出隊鎖
	            notempty.await();
	       }
	
	      //如果“不空”條件滿足,則通過佇列頭部哨兵獲取首節點,並獲取佇列元素值
	
	       result=head.next.value;
	       head=head.next;
	       //獲取元素出隊前佇列容量,並在獲取後將出隊前佇列容量減少1
	       System.out.println("delete:"+size);
	       if(size.getAndDecrement()==capacity){
	            //如果出隊前佇列容量等於佇列限額,則說明有入隊執行緒正在等待入隊條件//notfull發生,因此要將相關標誌置為true
	            weakenq=true;
	       }
	    }finally{
	        //出隊者釋放出隊鎖
	        deqlock.unlock();
	    }
	
	    //判斷入隊等待標誌
	    if(weakenq){
	        //出隊執行緒獲取入隊鎖
	        enqlock.lock();
	       try{
	            //觸發入隊條件,即佇列“不滿”條件,使等待入隊的執行緒能夠繼續執行
	            notfull.signalAll();
	       }finally{
	            //出隊執行緒釋放入隊鎖
	            enqlock.unlock();
	       }
	    }
	
	    return result;
	}
}

兩個執行緒,一個增加、一個刪除

import java.util.concurrent.Callable;

public class ThreadAdd implements Callable<String>{
	
	public BoundedQueue boundedQueue;
	
	public ThreadAdd(BoundedQueue boundedQueue){
		this.boundedQueue = boundedQueue;
	}

	@Override
	public String call() {
		try {
			boundedQueue.enq("x");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "";
	}

}

import java.util.concurrent.Callable;

public class ThreadDelete implements Callable<String>{

	public BoundedQueue boundedQueue;
	
	public ThreadDelete(BoundedQueue boundedQueue){
		this.boundedQueue = boundedQueue;
	}

	@Override
	public String call() {
		
		try {
			boundedQueue.deq();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "";
		
	}
}

測試案例:

package pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class mainTest {
	
	public static void main(String[] args) throws InterruptedException{

		BoundedQueue boundedQueue = new BoundedQueue(10);
		ThreadPoolExecutor x = new ThreadPoolExecutor(1,1,1000,TimeUnit.MILLISECONDS,   
				 			new LinkedBlockingQueue<Runnable>());
		List<Future<String>> list = new ArrayList<Future<String>>();
		
		for(int i=0;i<3;i++){
			Future<String> n = x.submit(new ThreadAdd(boundedQueue));  //增加
			Future<String> m = x.submit(new ThreadDelete(boundedQueue));  //刪除
			Future<String> l = x.submit(new ThreadDelete(boundedQueue));  //刪除
			
			list.add(n);
			list.add(m);
			list.add(l);
		}
		
		Thread.sleep(3000);
		
		for(Future<String> future : list){
			System.out.println(future.isDone());
		}
		
		x.shutdown();
	}
}

執行結果:
add:0
delete:1
true
true
false
false
false
false
false
false
false
發現只有前面兩個執行緒操作成功,其它的都陷入等待狀態;

分析原因發現可能是執行緒池設定過小有關,調整如下:

ThreadPoolExecutor x = new ThreadPoolExecutor(10,10,1000,TimeUnit.MILLISECONDS,   
				 	 new LinkedBlockingQueue<Runnable>());
此時開了10個執行緒的執行緒池,按理來說足夠用了,執行下:
add:0
delete:1
add:0
delete:1
add:0
delete:1
true
true
true
true
true
false
true
false
false
發現還是有三個執行緒未能操作成功,分析原因可知,是由於佇列為空,刪除操作的執行緒陷入了等待的狀態。


總體分析原因:

1、執行緒池大小為1時,由於刪除操作過多,導致某次刪除執行緒A的操作掛起,等待能刪的時候繼續執行;A一直等某個新增執行緒操作後才能繼續刪除

     而由於執行緒池大小設計為1,剩餘的執行緒都被放入佇列進行等待A執行完後繼續執行,但是A被掛起,一直不能執行,導致整個操作都相當於死鎖了~~~

2、執行緒池大小為10時,刪除的操作比新增的操作多,就會產生刪除執行緒一直等待這個問題;