1. 程式人生 > >聊聊併發(十)生產者消費者模式

聊聊併發(十)生產者消費者模式

本文首發於InfoQ   作者:方騰飛  校對:張龍

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

這個阻塞佇列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。

生產者消費者模式實戰

我和同事一起利用業餘時間開發的Yuna工具中使用了生產者和消費者模式。首先我先介紹下Yuna工具,在阿里巴巴很多同事都喜歡通過郵件分享技術文章,因為通過郵件分享很方便,同學們在網上看到好的技術文章,複製貼上傳送就完成了分享,但是我們發現技術文章不能沉澱下來,對於新來的同學看不到以前分享的技術文章,大家也很難找到以前分享過的技術文章。為了解決這問題,我們開發了Yuna工具。Yuna取名自我非常喜歡的一款RPG遊戲”最終幻想”中女主角的名字。

首先我們申請了一個專門用來收集分享郵件的郵箱,比如[email protected],同學將分享的文章傳送到這個郵箱,讓同學們每次都抄送到這個郵箱肯定很麻煩,所以我們的做法是將這個郵箱地址放在部門郵件列表裡,所以分享的同學只需要象以前一樣向整個部門分享文章就行,Yuna工具通過讀取郵件伺服器裡該郵箱的郵件,把所有分享的郵件下載下來,包括郵件的附件,圖片,和郵件回覆,我們可能會從這個郵箱裡下載到一些非分享的文章,所以我們要求分享的郵件標題必須帶有一個關鍵字,比如[內貿技術分享],下載完郵件之後,通過confluence的web service介面,把文章插入到confluence裡,這樣新同事就可以在confluence裡看以前分享過的文章,並且Yuna工具還可以自動把文章進行分類和歸檔。

為了快速上線該功能,當時我們花了三天業餘時間快速開發了Yuna1.0版本。在1.0版本中我並沒有使用生產者消費模式,而是使用單執行緒來處理,因為當時只需要處理我們一個部門的郵件,所以單執行緒明顯夠用,整個過程是序列執行的。在一個執行緒裡,程式先抽取全部的郵件,轉化為文章物件,然後新增全部的文章,最後刪除抽取過的郵件。程式碼如下:

public void extract() {
        logger.debug("開始" + getExtractorName() + "。。");
        //抽取郵件
        List<Article> articles = extractEmail();
        //新增文章
        for (Article article : articles) {
            addArticleOrComment(article);
        }
        //清空郵件
        cleanEmail();
        logger.debug("完成" + getExtractorName() + "。。");
    }

Yuna工具在推廣後,越來越多的部門使用這個工具,處理的時間越來越慢,Yuna是每隔5分鐘進行一次抽取的,而當郵件多的時候一次處理可能就花了幾分鐘,於是我在Yuna2.0版本里使用了生產者消費者模式來處理郵件,首先生產者執行緒按一定的規則去郵件系統裡抽取郵件,然後存放在阻塞佇列裡,消費者從阻塞佇列裡取出文章後插入到conflunce裡。程式碼如下:

public class QuickEmailToWikiExtractor extends AbstractExtractor {

private ThreadPoolExecutor      threadsPool;

private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;

public QuickEmailToWikiExtractor() {
        emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000));

    }

public void extract() {
        logger.debug("開始" + getExtractorName() + "。。");
        long start = System.currentTimeMillis();

        //抽取所有郵件放到佇列裡
        new ExtractEmailTask().start();

        // 把佇列裡的文章插入到Wiki
        insertToWiki();

        long end = System.currentTimeMillis();
        double cost = (end - start) / 1000;
        logger.debug("完成" + getExtractorName() + ",花費時間:" + cost + "秒");
    }

    /**
     * 把佇列裡的文章插入到Wiki
     */
    private void insertToWiki() {
        //登入wiki,每間隔一段時間需要登入一次
        confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);

        while (true) {
            //2秒內取不到就退出
            ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
            if (email == null) {
                break;
            }
            threadsPool.submit(new insertToWikiTask(email));
        }
    }

     protected List<Article> extractEmail() {
        List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
        if (allEmails == null) {
            return null;
        }
        for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
            emailQueue.offer(exchangeEmailShallowDTO);
        }
        return null;
    }

    /**
     * 抽取郵件任務
     *
     * @author tengfei.fangtf
     */
    public class ExtractEmailTask extends Thread {
        public void run() {
            extractEmail();
        }
    }
}

使用了生產者和消費者模式後,郵件的整體處理速度比以前要快了很多。

多生產者和多消費者場景

在多核時代,多執行緒併發處理速度比單執行緒處理速度更快,所以我們可以使用多個執行緒來生產資料,同樣可以使用多個消費執行緒來消費資料。而更復雜的情況是,消費者消費的資料,有可能需要繼續處理,於是消費者處理完資料之後,它又要作為生產者把資料放在新的佇列裡,交給其他消費者繼續處理。如下圖:

生產者消費者模式

我們在一個長連線伺服器中使用了這種模式,生產者1負責將所有客戶端傳送的訊息存放在阻塞佇列1裡,消費者1從佇列裡讀訊息,然後通過訊息ID進行hash得到N個佇列中的一個,然後根據編號將訊息存放在到不同的佇列裡,每個阻塞佇列會分配一個執行緒來消費阻塞佇列裡的資料。如果消費者2無法消費訊息,就將訊息再拋回到阻塞佇列1中,交給其他消費者處理。

以下是訊息總佇列的程式碼;

/**
 * 總訊息佇列管理
 *
 * @author tengfei.fangtf
 */
public class MsgQueueManager implements IMsgQueue{

    private static final Logger              LOGGER
 = LoggerFactory.getLogger(MsgQueueManager.class);

    /**
     * 訊息總佇列
     */
    public final BlockingQueue<Message> messageQueue;

    private MsgQueueManager() {
        messageQueue = new LinkedTransferQueue<Message>();
    }

    public void put(Message msg) {
        try {
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Message take() {
        try {
            return messageQueue.take();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }

}

啟動一個訊息分發執行緒。在這個執行緒裡子佇列自動去總佇列裡獲取訊息。

/**
     * 分發訊息,負責把訊息從大佇列塞到小佇列裡
     *
     * @author tengfei.fangtf
     */
    static class DispatchMessageTask implements Runnable {
        @Override
        public void run() {
            BlockingQueue<Message> subQueue;
            for (;;) {
                //如果沒有資料,則阻塞在這裡
                Message msg = MsgQueueFactory.getMessageQueue().take();
                //如果為空,則表示沒有Session機器連線上來,
需要等待,直到有Session機器連線上來
                while ((subQueue = getInstance().getSubQueue()) == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                //把訊息放到小佇列裡
                try {
                    subQueue.put(msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

使用Hash演算法獲取一個子佇列。

/**
     * 均衡獲取一個子佇列。
     *
     * @return
     */
    public BlockingQueue<Message> getSubQueue() {
        int errorCount = 0;
        for (;;) {
            if (subMsgQueues.isEmpty()) {
                return null;
            }
            int index = (int) (System.nanoTime() % subMsgQueues.size());
            try {
                return subMsgQueues.get(index);
            } catch (Exception e) {
                //出現錯誤表示,在獲取佇列大小之後,佇列進行了一次刪除操作
                LOGGER.error("獲取子隊列出現錯誤", e);
                if ((++errorCount) < 3) {
                    continue;
                }
            }
        }
    }

使用的時候我們只需要往總佇列裡發訊息。

//往訊息佇列裡新增一條訊息
        IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
        Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);
        messageQueue.put(msg);

執行緒池與生產消費者模式

Java中的執行緒池類其實就是一種生產者和消費者模式的實現方式,但是我覺得其實現方式更加高明。生產者把任務丟給執行緒池,執行緒池建立執行緒並處理任務,如果將要執行的任務數大於執行緒池的基本執行緒數就把任務扔到阻塞佇列裡,這種做法比只使用一個阻塞佇列來實現生產者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。

我們的系統也可以使用執行緒池來實現多生產者消費者模式。比如建立N個不同規模的Java執行緒池來處理不同性質的任務,比如執行緒池1將資料讀到記憶體之後,交給執行緒池2裡的執行緒繼續處理壓縮資料。執行緒池1主要處理IO密集型任務,執行緒池2主要處理CPU密集型任務。

小結

本章講解了生產者消費者模式,並給出了例項。讀者可以在平時的工作中思考下哪些場景可以使用生產者消費者模式,我相信這種場景應該非常之多,特別是需要處理任務時間比較長的場景,比如上傳附件並處理,使用者把檔案上傳到系統後,系統把檔案丟到佇列裡,然後立刻返回告訴使用者上傳成功,最後消費者再去佇列裡取出檔案處理。比如呼叫一個遠端介面查詢資料,如果遠端服務介面查詢時需要幾十秒的時間,那麼它可以提供一個申請查詢的介面,這個介面把要申請查詢任務放資料庫中,然後該介面立刻返回。然後伺服器端用執行緒輪詢並獲取申請任務進行處理,處理完之後發訊息給呼叫方,讓呼叫方再來呼叫另外一個介面拿資料。


方 騰飛

花名清英,併發網(ifeve.com)創始人,暢銷書《Java併發程式設計的藝術》作者,螞蟻金服技術專家。目前工作於支付寶微貸事業部,關注網際網路金融,併發程式設計和敏捷實踐。微信公眾號aliqinying。