多執行緒設計模式——Producer-Consumer生產者消費者模式
這些都是根據我最近看的《Java實戰指南多執行緒程式設計(設計模式篇)》所得整理。
模式名稱
Producer-Consumer生產者消費者模式
模式面對的問題
有的執行緒的結果是另外一些執行緒的原料,也就是說,一些執行緒是生產者,另外一些執行緒是消費者,消費者需要生產者生產的東西才能正常執行,協調兩者的關係成了一個大的問題。
解決思路
有一箇中間的儲存位置,用來儲存生產者生產出來的東西,稱之為通道。
Producer生產者
Product生產者所生產的任務
Channel通道的抽象
BlockingQueueChannel基於阻塞佇列的Channel實現
Consumer消費者
例子程式碼
某內容管理系統需要支援對文件附件中的檔案進行全文檢索。改系統中,附件會被上傳到專用的檔案伺服器上,對附件進行全文檢索的功能模組也是部署在檔案伺服器上的。
模式主類
public class AttachmentProcessor {
private final String ATTACHMENT_STORE_BASE_DIR = "/home/viscent/tmp/attachments/" ;
//模式角色Producer-Consumer.Channer
priuvate final Channer<File> channer = new BlockingQueueChannel<File>(
new ArrayBlockingQueue<File>(200));
//模式角色Producer-Consumer.Consumer
private final AbstractTerminatableThread indexingThread = new
AbstractTerminatableThread(){
@Override
protected void doRun()throws Exception{
File file =null;
file = channel.take();
try{
indexFile(file);
}catch(Exception e){
e.printStackTrace();
}finally{
terminationToken.reservations.decrementAndGet();
}
}
//根據制定檔案生成全文搜尋所需的索引檔案
private void indexFile(File file) throws Exception{
//省略與模式無關的程式碼
//模擬生成索引檔案的時間消耗
Radom rnd =new Random();
try{
Thread.sleep(rnd.nextInt(100));
}catch(InterruptedException e){
;
}
}
};
public void init(){
indexingThread.start();
}
public void shutdown(){
indexingThread.terminate();
}
public void saveAttachment(InputStream in,
String documentId,String originalFileName)throws IOException{
File file = saveAsFile(in,documentId,originalFileName);
try{
channel.put(file);
}catch(InterruptedException e){
;
}
indexingTread.terminationToken.reservations.incrementAndGet();
}
private FTPClient initFTPClient(String ftpServer,String userName,
String password) throws Exception{
FTPClient ftpClient = new FTPClient();
FTOClientConfig config = new FTPClientConfig();
ftpClient.config(config);
int reply;
ftpClient.connect(ftpServer);
System.out.print(ftpClient.getReplyString());
reply = ftpClient.getReplyCode();
if(!dirName.equals(file.getCanoicalFile().getParent())){
throw new SecurityException("Invalid originalFileName:"+originalFileName);
}
BufferedOutputStream bos =null;
BufferedInputStream bis = new BufferedInputStream(in);
byte[]buf = new byte[2048];
int len = -1;
try{
bos =new BufferedOutputStream(new FileOutputSteram(file));
while((len = bis.read(buf) > 0)){
bos.write(buf,0,len);
}
bos.flush();
}finally{
try{
bis.close();
}catch(IOException e){
;
}
try{
if(null != bos){
bos.close();
}
}catch(IOException e){
;
}
}
ftpClient.setFileType(FTP.ASCII_FILE_TYPE);
return ftpClient;
}
}
Channel介面
public interface Channel<P> {
//從通道中取一個"產品"。
P take() throws InterruptedException;
//往通道里面儲存一個"產品"。
void put(P product) throws InterruptedException;
}
BlockingQueueChannel類
public class BlockingQueueChannel<P> implements Channel<P> {
private final BlockingQueue<P>queue;
public BlockingQueueChannel(BlockingQueue<P>queue){
this.queue = queue;
}
@Override
public P take() throws InterruptedException{
return queue.take();
}
@Override
public void put(P product) throws InterruptedException{
queue.put(product);
}
}
模式的評價與實現考量
生產這消費者模式是一個經典的執行緒模式嗎,但是它也有一些容易出現的問題:
1. 管道積壓:生產者消費者模式中消費者的處理能力往往低於生產這的處理能力,會出現管道擠壓的現象。處理這種現象,有集中方法:使用有界阻塞佇列,佇列到一定數量就不在生產,等待消費;使用有流量控制的無界阻塞佇列,線上程的時間分配時對生產者的時間進行限制來平衡。
2. 工作竊取演算法:如果是多個消費者從管道中取得產品,會出現執行緒安全的問題,所以會有一個通道例項對應多個佇列例項來處理。
3. 執行緒的停止:整個模式也可以看做一個執行緒,這個執行緒的停止會比一般的執行緒要複雜一些,需要注意處理。
4. 高效能高可靠性:這裡的示例程式碼是一個比較一般的實現,如果有較高的要求,可以考慮Producer-Consumer模式實現庫LMAX Disruptor:https://github.com/LMAX-Exchange/disruptor