多執行緒 佇列 執行緒阻塞
阿新 • • 發佈:2019-02-06
使用Java.util.concurrent包下的類
一些概念:
- 阻塞:阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起。函式只有在得到結果之後才會返回
採用LockSupport.park()
阻塞住執行緒, 採用LockSupport.unpark(Thread)
釋放程序。 - 佇列:生產者消費者模式首選資料模式為佇列。Java提供的執行緒安全的佇列可分為阻塞佇列(BlockingQueue)和非阻塞佇列(ConcurrentLinkedQueue).根據需要選擇阻塞佇列還是非阻塞佇列。
- 執行緒安全:指類內共享的全域性變數的訪問必須保證是不受多執行緒形式影響的。如果由於多執行緒的訪問(比如修改、遍歷、檢視)而使這些變數結構被破壞或者針對這些變數操作的原子性被破壞,則這個類就不是執行緒安全的。
阻塞佇列: BlockingQueue
ArrayBlockingQueue: 規定大小
LinkedBlockingQueue: 大小可規定,不規定為Interger.MAX_VALUE來決定。
PriorityBlockingQueue: 更加優先順序來決定出佇列順序
SynchronousQueue: 無快取的等待佇列。
DelayQueue: 延遲佇列
方法:
操作 | 失敗 | 丟擲異常 | 特殊值false | 阻塞 超時 |
---|---|---|---|---|
插入 | Add(e) | Offer(e) | Put(e) | Offer(e,time, TimeUnit) |
移除 | Remove() | Poll() | Take() | Poll(time, TimeUnit) |
檢查 | element | Peek() |
雙端阻塞佇列:BlockingDeque
該介面是一種雙端佇列,向其中加入元素或取出元素都是現成安全的,如果不能對雙端佇列進行加入或刪除元素,則會阻塞執行緒。
非阻塞佇列: ConcurrentHashMap
多執行緒:
宣告執行緒的方法:
- 繼承Thread extends Thread();
- 實現介面Runnable implements Runnable(); 無返回值。 一般執行:new Thread(new
Runable()).start(); - implements Callable (有返回值); 執行 ,Callable.call()
非同步執行框架Executor框架:
支援多種不同型別的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的執行緒相當於生產者,執行任務的執行緒相當於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支援,以及統計資訊收集,應用程式管理機制和效能監視等機制。
Executor 子介面 ExecutorService
實現類:
AbstractExecutorService: 預設實現類
ScheduledExecutorService: 一個可定時排程任務的介面
ScheduledThreadPoolExecutor: ScheduledExecutorService實現類
ThreadPoolExecutor: 執行緒池,通過呼叫Executors中的靜態方法來建立執行緒池並返回一個ExecutorService物件。
Executor的生命週期:
- 執行 建立即執行;
- 關閉 shutdown 執行完已提交的任務關閉
- 終止 shutdownNow 強制終止所有執行中的任務,並不允許新增新的任務。
Executor 執行任務:
Executor.submit(new Runnable()); 或 Executor.execute(new Runnable())
Future f = Executor.submit(new Callable); 有返回值 f.get()
Executors
提供一系列靜態工廠方法用於建立各種執行緒池
//建立方法
//建立可重用且固定執行緒數的執行緒池。數量夠了再來則等待。
ExecutorService executor = Executors.newFixedThreadPool(int num)
//建立一個單執行緒的Executor. 如果異常結束則新建執行緒來執行後續任務
Executors.newSingleThreadExecutor()
//建立一個可延遲執行或定期執行的執行緒池
Executors.newScheduledThreadPool(int corePoolSize)
//建立可快取的執行緒池。六十秒沒用則清除。
Executors.newCachedThreadPool()
示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import com.thunisoft.artery.report.crs.engine.callback.imp.Callback;
import com.thunisoft.artery.services.config.ArteryConfigUtil;
@Service
public class RequestThreadHandle {
private static int queue_size;
private static int EXE_SIZE;
private static BlockingQueue<ThreadBean> queue;
private static ThreadPoolExecutor executorService;
//初始化
@PostConstruct
public void init() {
int queue_size = 200;
queue = new LinkedBlockingQueue<ThreadBean>(queue_size);
orc_size = 10;
executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(orc_size);
}
/**
* 新增到佇列中
* @param httpBean
* @return
* @throws InterruptedException
*/
public boolean offerQuery(ThreadBean httpBean) throws InterruptedException {
return queue.offer(httpBean);
}
@PostConstruct
public void consume() throws InterruptedException, ExecutionException{
Thread t = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
ThreadBean httpBean = queue.poll();
if (httpBean == null) {
continue;
}
executorService.submit(httpBean);
}
}
}
});
t.start();
}
//銷燬
@PreDestroy
public void CloseExe() {
executorService.shutdown();
}
}
public class ThreadBean extends Thread{
@Override
public void run() {
//do your thing
}
}