1. 程式人生 > >裝上小馬達的程式設計師的專欄

裝上小馬達的程式設計師的專欄

本章主要記錄近期學習執行緒池的一些知識心得體會,記錄一個通讀原始碼的過程。

一、執行緒池的目的:

在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。

JVM會為每一個執行緒分配相應的Heap空間,虛擬機器中用-Xss:設定每個執行緒的堆疊大小(也就是說,在相同實體記憶體下,減小這個值能生成更多的執行緒)。當執行緒數量超過空間的限制時會丟擲OutOfMemory異常(OOM)。

、執行緒池的組成部分:

1、執行緒池管理器(ThreadPoolManager):用於建立並管理執行緒池;

2、工作執行緒(WorkThread):執行緒池中的執行緒;

3、任務介面(Task):每個任務必須實現的介面,以供工作執行緒排程任務的執行;

4、任務佇列(Queue):用於存放沒有處理的任務,提供一種快取機制。


三、執行緒池實現過程分析:

示例一類常用的執行緒池:ThreadPoolExecutor(Spring可以通過ThreadPoolExecutorFactoryBean載入ThreadPoolExecutor,引數可通過Bean注入)

ThreadPoolExecutor主要引數包括:corePoolSize、 maximumPoolSize、keepAliveTime、Queue、ThreadFactory、RejectedExecutionHandler逐一對分析相關引數。

1、corePoolSize:執行緒池將new出的核心執行緒數。核心執行緒在處理完firstTask的之後,被執行緒池不會回收其資源,而是處於阻塞狀態,等待從任務佇列(Queue)中獲取下一個Task如此迴圈執行(但如果設定有allowCoreThreadTimeOut=true,則如果核心執行緒在等待keepAliveTime時間內沒有從佇列中獲取到新任務,也會被執行緒池回收)。相關原始碼如下:

            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
2、maximumPoolSize:執行緒池支援的最大並行執行緒數。當有新的任務請求進來時,先判斷當前keepAlive的執行緒數是否小於corePoolSize,若是,則將當前任務作為corePoolSize的firstTask啟動。若大於corePoolSize小於maximumPoolSize,則嘗試將任務加入到阻塞佇列中,如果佇列已滿,則嘗試直接新建一個裝載該任務的輔助執行緒,新建失敗則reject該請求;如果加入佇列成功,則新建一個輔助執行緒,該執行緒去阻塞佇列中獲取任務。相關原始碼如下:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
3、keepAliveTime:輔助執行緒在執行完上一個任務之後保持活性等待下一個Task的時間,一般帶有TimeUnit單位戳。

4、BlockingQueue:存放任務的阻塞佇列,一般沒有特殊要求只用兩種,LinkedBlockingQueue 連結串列阻塞佇列與SynchronousQueue 同步阻塞佇列。當期望的佇列深度為0時,則使用後者,否則為前者指定一個佇列大小(預設為Integer.MAX_VALUE作為queueCapacity)作為儲存佇列,除coreThread的firstTask、任務加入佇列失敗這兩種情況,其他的執行緒均從佇列中獲取任務(先進先出),新新增的任務如果不能被立馬執行,也會被加入到佇列中。其中對同步阻塞佇列的解釋如下:

 /**
* A {@linkplain BlockingQueue blocking queue} in which each insert
 * operation must wait for a corresponding remove operation by another
 * thread, and vice versa.  A synchronous queue does not have any
 * internal capacity, not even a capacity of one.  You cannot
 * <tt>peek</tt> at a synchronous queue because an element is only
 * present when you try to remove it; you cannot insert an element
 * (using any method) unless another thread is trying to remove it;
 * you cannot iterate as there is nothing to iterate.  The
 * <em>head</em> of the queue is the element that the first queued
 * inserting thread is trying to add to the queue; if there is no such
 * queued thread then no element is available for removal and
 * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
 * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
 * does not permit <tt>null</tt> elements.
*/
5、ThreadFactory:執行緒工廠,所有的執行緒都從執行緒工廠中產出,由執行緒工廠同一分配建立,並統一分組、命名規範等。預設的執行緒工廠程式碼如下:
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
6、RejectedExecutionHandler:當任務的請求由於執行緒池的限制原因無法新增執行緒處理或進入佇列時,執行的拒絕處理操作。執行緒池預設的處理操作原始碼:
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

四、執行緒池的底層實現原理(對狀態、執行緒池大小的控制)

1、執行緒池的大小及狀態用一個AtomicInteger變數進行控制:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
用移位運算及按位與、按位或對執行緒數、執行緒池狀態等進行控制

2、執行緒池包括5種狀態:Running、ShutDown、Stop、Tidying、Terminated

RUNNING:  Accept new tasks and process queued tasks

SHUTDOWN: Don't accept new tasks, but process queued tasks

STOP:     Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks

TIDYING:  All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING will run the terminated() hook method

TERMINATED: terminated() has completed

3、執行緒池鎖 ReentrantLock mainLock = new ReentrantLock();

五、個人總結

線上程池的學習過程中,在瞭解原始碼的同時,也學到了相當多的優化程式設計小技巧,如:

if (isRunning(c) && workQueue.offer(command))

在判斷中執行邏輯。通讀原始碼的收益還是很可觀的。

相關推薦

馬達程式設計師專欄

本章主要記錄近期學習執行緒池的一些知識心得體會,記錄一個通讀原始碼的過程。 一、執行緒池的目的: 在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行

#Java又和Go語言槓了!程式設計師:10年了!擡走,下一個

隨著IT行業越來越火,有很多小夥伴也就入了程式設計的坑。IT行業就業前景好使毋庸置疑的,從事IT行業的人,就算是跳槽再就業也是不會有什麼難度的,與其他行業相比,IT行業只要有技術,其他的就都微不足道了。 如果有想學習java的程式設計師,可來我們的java學習扣qun:72340,3928免

Java又和Go語言槓了!程式設計師:20年了,擡走,下一個!

隨著IT行業越來越火,有很多小夥伴也就入了程式設計的坑。IT行業就業前景好使毋庸置疑的,從事IT行業的人,就算是跳槽再就業也是不會有什麼難度的,與其他行業相比,IT行業只要有技術,其他的就都微不足道了。 最近看到一篇帖子,有網友說,服務端未來是go的,說go不好的都是看了點go語法的,覺得

最良心程式設計師,在程式碼註釋裡,告訴這家公司有多坑

好的程式碼本身就是最好的說明文件。為了追求這一目標,除了寫出優雅的程式碼外,給程式碼們添加註釋,也是優秀程式設計師的一個好習慣。由於註釋不會影響程式本身,因此被程式設計師們玩壞了。   有人用來娛樂,有人用來吐槽,有人用來調侃。程式設計師壓力大,需要一個地方發洩,可又不能因此

苦逼三流公司程式設計師這半年找工作經歷(3)——選擇offer

本文按照企業規模、性質、規模,分成三類,點評一下遇到的這些公司。也算是為半年找工作經歷的一個總結。 1,三流小公司     公司規模類似於筆者跳槽前所在企業,性質有外商獨資、合資,當然大多數都是民營企業,規模一般20-200人左右。這類企業雖然規模最小,但數量卻是最多的,遍

如何看待又一位深圳 24 歲程式設計師倒在工位?是否程式設計師下班晚、加班多成了常態?

這是職業的焦慮還是環境如此? 如何避免過度加班帶來的身體損傷? 8月25日下午,深圳紅孩兒資訊科技有限公司的程式設計師程某倒在了自己的崗位上。據同事反應,他當天的臉色不是很好,當時正在寫程式碼,起身倒水時暈倒在地不省人事。 程某經常加班到凌晨,有時甚至到早上五六點鐘,第

談談城市程式設計師的迷茫和堅持

看到此文標題,我要說些什麼,想必你大概可以知道一些,——小城市(二三線的城市)IT業發展比較薄弱,有些城市可能連真正做IT(軟體開發相關)的比較大的公司都沒有幾家,像我現在所在的城市襄陽(原名:襄樊),就只有那屈指可數的幾家公司;公司少,選擇和發展的空間相對也小,——這也造

程式設計師的自我修煉

作為一個新晉的小白,思考一個爭議性的問題,是先有思想還是有語言。1、語言 java、C、C#、C++每一個語言都能實踐自己定義的功能,但是這個語言開發的效率因素決定整個專案開發的速度,但我們敲下一整段程式碼和一個個功能的時候,還是引用一個個庫的功能的時候,我們就是會思考是什

五年從程式設計師到架構師!這是我見過史最好的程式設計師職業規劃

作者:大齊老師 連結:https://www.jianshu.com/p/4afeba725b2e 來源:簡書 第一部分: 對於參加工作一年以內的同學。恭喜你,這個時候,你已經擁有了一份Java的工作。這個階段是你成長極快的階段,而且你可能會經常加班。但是加班不代

公司程式設計師怎麼進大公司--容易的路越走越難走

轉載:http://blog.csdn.net/foruok/article/details/74908128 進了小公司的應屆程式設計師如何翻身進入大公司——知乎上的一個問題,有近 4700 人關注,130 多萬次瀏覽,我的回答有 125 人贊同。看來這

程式設計師的幾點建議

接納自己是一張白紙這個事實        我覺得這是一個首要的前提。也許你很優秀,有很強的學習能力,有強大的信念,有超強的小宇宙,有百折不撓的韌性……但是,你沒做過,你確實是一張白 紙。這是一個客觀事實,我們必須要要承認。我們所做的一切,都是在接納現

公司程式設計師怎麼進大公司

進了小公司的應屆程式設計師如何翻身進入大公司——知乎上的一個問題,有近 4700 人關注,130 多萬次瀏覽,我的回答有 125 人贊同。看來這是很多在小公司顛簸流離多年感到疲憊的開發者都會關注的問題。 問題描述如下: 都知道大家說畢業要去大公司,但總有不小心或實力不濟,進了小公司的應屆程式設計師。請問

60 多年前,一群程式設計師扒了一個俄羅斯的開源框架--白都能看懂的作業系統Communix的歷史(轉)

該系統程序按許可權等級分為五類,Core process(核心程序), Privilege process(特權程序), Monitor process(監控程序), Normal process(普通程序)和Vulnerable process(弱勢程序). Core process擁有至高無上的權利,控

如果你喜歡了一個程式設計師小夥,獻給所有的程式設計師女友

如果你喜歡上了一個程式設計師小夥,就不用再害怕電腦中病毒QQ被盜,因為他至少嘗試過10款以上防毒軟體,他知道用那款佔資源最少防毒效果最好的保護你的電腦。 如果你喜歡上了一個程式設計師小夥,就不用擔心他會外遇,因為他工作內容深奧無邊,每天沉醉於業務研究之中的他,沒有應酬,更沒有酒吧KTV,在他眼裡那唯一誘惑,

最全程式設計師資源分享

原文連結:http://blog.geekidentity.com/others/share/ 在這裡分享一些不錯的學習資料 以下資料只能用於學習用,東西太多了我會在部落格中不斷進行更新。 基礎知識類 天津大學 高等數學(上下冊)_蔡高廳 這個是一個公開課,

公司程式設計師的程式設計水平與BAT大廠相比,有多大差距?

BAT的牛人多,普通人也多,雖然他們不是每個人都能達到令人仰望的技術水平,但畢竟平臺高,所以眼光會變得寬闊;程式碼要求更為嚴格,所

程式設計師地鐵寫程式碼被網友抓拍,程式設計師:我這逼的6不6?

一提起程式設計師,很多人就會聯想到寫程式碼。有業內人士戲言,程式設計師忙起來,不分場合,不分時間地點,拿起電腦就開始寫程式碼,這才是真正的程式設計師風範。這不,一名在地鐵上寫程式碼的程式設計師小哥就被網友抓拍並將其照片釋出到了網路上,引起了圍觀網友的討論與熱議。 從圖片上看,這名程式設計師

王思聰吃熱狗又被程式設計師了,各種火爆遊戲都開發出來了!

近日 IG熱逐漸消退之餘 吃瓜網友盯著齣戲的王校長不放 #王思聰吃熱狗#的熱搜在微博居高不下   如今 這一張熱狗圖 已成為網友們的快樂源泉 大家冒著生命危險,也要抓住這次調戲的機會   網上迅速發酵,出現了各種頭像表情包

程式設計師在高鐵寫程式碼被人抓拍,網友熱議:我這逼的6不6?

說起程式設計師,很多人腦海中飄過的第一印象便是:一名禿頭男子坐在電腦前,身體一動不動,只有手指在鍵盤間飛快的揮舞,對著電腦全神貫注,再看看其頭頂,不是稀稀拉拉幾根頭髮就是禿頂。確實,程式設計師的工作是一個燒腦的行業,加班也比一般行業要多很多,遇到釋出版本的時候,通宵也是常事。 有一位網

“細思極逗”的段子:程式設計師哥桌子的按鈕,是不能隨便按的!

七組非常搞笑的圖片,個個都是逗比段子!     同樣都是瀏覽器,相互之間的差別怎麼這麼大呢! IE你能不能爭口氣啊,你可是老大哥啊!     剛才網咖沒機子了,我把一個小學生的機子給下了,我強行上機,就是欺負小學生!怎麼