深入jdk——追蹤Collections.sort 引發的bug(3)TimSort原始碼解讀
阿新 • • 發佈:2019-02-19
本來準備看Java容器原始碼的。但是看到一開始發現Arrays這個類我不是很熟,就順便把Arrays這個類給看了。Arrays類沒有什麼架構與難點,但Arrays涉及到的兩個排序演算法似乎很有意思。那順便把TimSort演算法和雙指標快速排序也研究一下吧。
首先強調一下,這是個穩定的排序演算法
看過程式碼之後覺得這個演算法沒有想象的那麼難。邏輯很清晰,整個演算法最大的特點就是充分利用陣列中已經存在順序。在歸併的過程中有一個 Galloping Mode(翻譯過來可以叫 飛奔模式),這是整個排序演算法中最不尋常的地方。簡單的理解就是,歸併過程中有兩個數列,比較的時候,有個數列連續有{MIN_GALLOP}
MIN_GALLOP
還是一個可以動態調整的值,這應該是統計優化的結果。除了演算法本身的魅力,作者的程式碼寫的很簡潔。讀起來很享受。大家有興趣可以自己讀一遍,我在下面貼出我看程式碼過程中的註釋。對邏輯所有的解釋都在註釋中。閱讀的方法是從static <T> void sort(T[] a, Comparator<? super T> c)
與 static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c)
除了上面提到的Galloping Mode,還有原始碼中還有一個概念叫作 run, 可以把它理解為一段已經排好序的數列。java的原始碼在java安裝路徑下的src.zip
檔案內,不需要要去網上下載
import java.util.Arrays; import java.util.Comparator; /** * Created by yxf on 16-5-30. * 這裡對TimSort演算法在java中的實現做了註釋,部分實現邏輯相似的註釋沒有處理,直接是原來的註釋。 * */ class TimSort<T> { /** * 參與序列合併的最短長度。比這個更短的序列將會通過二叉插入排序加長。如果整個陣列都比這個短,那就不會經過歸併排序。 * <p/> * 這個常量的值必須2的冪。Tim Perter 在C語言中的實現版本使用了64,但是根據經驗這裡的版本使用32更合適。在最壞的情況下,使用了非2的冪賦值,就必須要重寫 {@link # minRunLength}這個方法。 * 如果減小了這個值,就需要在構造方法中減小stackLen的值,不然將面臨陣列越界的風險。 */ private static final int MIN_MERGE = 32; /** * 將要被排序的陣列 */ private final T[] a; /** * 這次排序的比較器 */ private final Comparator<? super T> c; /** * 判斷資料順序連續性的閾值 * 後面結合程式碼看,會容易理解一點 */ private static final int MIN_GALLOP = 7; private int minGallop = MIN_GALLOP; /** * 歸併排序中臨時陣列的最大長度,陣列的長度也可以根據需求增長。 * 與C語言中的實現方式不同,對於相對較小的陣列,我們不用這麼大的臨時陣列。這點改變對效能有顯著的影響 */ private static final int INITIAL_TMP_STORAGE_LENGTH = 256; /** * 臨時陣列,根據泛型的內容可知,實際的儲存要用Object[],不能用T[] */ private T[] tmp; /** * 棧中待歸併的run的數量。一個run i的範圍從runBase[i]開始,一直延續到runLen[i]。 * 下面這個根據前一個run的結尾總是下一個run的開頭。 * 所以下面的等式總是成立: * runBase[i] + runLen[i] == runBase[i+1]; **/ private int stackSize = 0; //棧中run的數量 private final int[] runBase; private final int[] runLen; /** * 這個構造方法是私有的所以只能在類內部建立。 * 建立這個例項是為了儲存一次排序過程中的狀態變數。 */ private TimSort(T[] a, Comparator<? super T> c) { this.a = a; this.c = c; // 這裡是分配臨時陣列的空間。SuppressWainings是為了消除泛型陣列轉型的警告 // 臨時陣列的長度寫的很精煉,不明白的自己熟悉一下java位操作。 // 結果就是 陣列長度的一半或者是INITIAL_TMP_STORAGE_LENGTH int len = a.length; @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"}) T[] newArray = (T[]) new Object[len < 2 * INITIAL_TMP_STORAGE_LENGTH ? len >>> 1 : INITIAL_TMP_STORAGE_LENGTH]; tmp = newArray; /** * 這裡是分配儲存run的棧的空間,它不能在執行時擴充套件。 * C語言版本中的棧一直使用固定值85,但這樣對一些中小陣列來說有些浪費資源。所以, * 這個版本我們使用了相對較小容量的棧。 * 在MIN_MERGE減小的時候,這些‘魔法數’可能面臨陣列越界的風險。 * */ int stackLen = (len < 120 ? 5 : len < 1542 ? 10 : len < 119151 ? 24 : 40); runBase = new int[stackLen]; runLen = new int[stackLen]; } static <T> void sort(T[] a, Comparator<? super T> c) { sort(a, 0, a.length, c); } static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c) { if (c == null) { Arrays.sort(a, lo, hi); return; } rangeCheck(a.length, lo, hi); int nRemaining = hi - lo; if (nRemaining < 2) return; // 長度是0或者1 就不需要排序了。 // 小於MIN_MERGE長度的陣列就不用歸併排序了,殺雞焉用宰牛刀 if (nRemaining < MIN_MERGE) { int initRunLen = countRunAndMakeAscending(a, lo, hi, c); binarySort(a, lo, hi, lo + initRunLen, c); return; } /** * March over the array once, left to right, finding natural runs, * extending short natural runs to minRun elements, and merging runs * to maintain stack invariant. * * 下面將進入演算法流程的主體,首先理解原始碼註釋中run的含義,可以理解為升序序列的意思。 * * 從左到右,遍歷一邊陣列。找出自然排好序的序列(natural run),把短的自然升序序列通過二叉查詢排序 * 擴充套件到minRun長度的升序序列。最後合併棧中的所有升序序列,保證規則不變。 */ TimSort<T> ts = new TimSort<>(a, c); //新建TimSort物件,儲存棧的狀態 int minRun = minRunLength(nRemaining); do { //跟二叉查詢插入排序一樣,先找自然升序序列 int runLen = countRunAndMakeAscending(a, lo, hi, c); // If run is short, extend to min(minRun, nRemaining) // 如果 自然升序的長度不夠minRun,就把 min(minRun,nRemaining)長度的範圍內的數列排好序 if (runLen < minRun) { int force = nRemaining <= minRun ? nRemaining : minRun; binarySort(a, lo, lo + force, lo + runLen, c); runLen = force; } // Push run onto pending-run stack, and maybe merge //把已經排好序的數列壓入棧中,檢查是不是需要合併 ts.pushRun(lo, runLen); ts.mergeCollapse(); //把指標後移runLen距離,準備開始下一輪片段的排序 lo += runLen; //剩下待排序的數量相應的減少 runLen nRemaining -= runLen; } while (nRemaining != 0); // Merge all remaining runs to complete sort assert lo == hi; ts.mergeForceCollapse(); assert ts.stackSize == 1; } /** * 被優化的二分插入排序 * * 使用二分插入排序演算法給指定一部分陣列排序。這是給小陣列排序的最佳方案。最差情況下 * 它需要 O(n log n) 次比較和 O(n^2)次資料移動。 * * 如果開始的部分資料是有序的那麼我們可以利用它們。這個方法預設陣列中的位置lo(包括在內)到 * start(不包括在內)的範圍內是已經排好序的。 * * @param a 被排序的陣列 * @param lo 待排序範圍內的首個元素的位置 * @param hi 待排序範圍內最後一個元素的後一個位置 * @param start 待排序範圍內的第一個沒有排好序的位置,確保 (lo <= start <= hi) * @param c 本次排序的比較器 */ @SuppressWarnings("fallthrough") private static <T> void binarySort(T[] a, int lo, int hi, int start, Comparator<? super T> c) { assert lo <= start && start <= hi; //如果start 從起點開始,做下預處理;也就是原本就是無序的。 if (start == lo) start++; //從start位置開始,對後面的所有元素排序 for (; start < hi; start++) { //pivot 代表正在參與排序的值, T pivot = a[start]; // Set left (and right) to the index where a[start] (pivot) belongs // 把pivot應當插入的設定的邊界設定為left和right int left = lo; int right = start; assert left <= right; /* * 保證的邏輯: * pivot >= all in [lo, left). * pivot < all in [right, start). */ while (left < right) { int mid = (left + right) >>> 1; if (c.compare(pivot, a[mid]) < 0) right = mid; else left = mid + 1; } assert left == right; /** * 此時,仍然能保證: * pivot >= [lo, left) && pivot < [left,start) * 所以,pivot的值應當在left所在的位置,然後需要把[left,start)範圍內的內容整體右移一位 * 騰出空間。如果pivot與區間中的某個值相等,left指正會指向重複的值的後一位, * 所以這裡的排序是穩定的。 */ int n = start - left; //需要移動的範圍的長度 // switch語句是一條小優化,1-2個元素的移動就不需要System.arraycopy了。 // (這程式碼寫的真是簡潔,switch原來可以這樣用) switch (n) { case 2: a[left + 2] = a[left + 1]; case 1: a[left + 1] = a[left]; break; default: System.arraycopy(a, left, a, left + 1, n); } //移動過之後,把pivot的值放到應該插入的位置,就是left的位置了 a[left] = pivot; } } /** * 這一段程式碼是TimSort演算法中的一個小優化,它利用了陣列中前面一段已有的順序。 * 如果是升序,直接返回統計結果;如果是降序,在返回之前,將這段數列倒置, * 以確保這斷序列從首個位置到此位置的序列都是升序的。 * 返回的結果是這種兩種形式的,lo是這段序列的開始位置。 * * A run is the longest ascending sequence with: * * a[lo] <= a[lo + 1] <= a[lo + 2] <= ... * * or the longest descending sequence with: * * a[lo] > a[lo + 1] > a[lo + 2] > ... * * 為了保證排序的穩定性,這裡要使用嚴格的降序,這樣才能保證相等的元素不參與倒置子序列的過程, * 保證它們原本的順序不被打亂。 * * @param a 參與排序的陣列 * @param lo run中首個元素的位置 * @param hi run中最後一個元素的後面一個位置,需要確保lo<hi * @param c 本次排序的比較器 * @return 從首個元素開始的最長升序子序列的結尾位置+1 or 嚴格的降序子序列的結尾位置+1。 */ private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi, Comparator<? super T> c) { assert lo < hi; int runHi = lo + 1; if (runHi == hi) return 1; // 找出最長升序序的子序列,如果降序,倒置之 if (c.compare(a[runHi++], a[lo]) < 0) { // 前兩個元素是降序,就按照降序統計 while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0) runHi++; reverseRange(a, lo, runHi); } else { // 前兩個元素是升序,按照升序統計 while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0) runHi++; } return runHi - lo; } /** * 倒置陣列中一段範圍的元素 * * @param a 指定陣列 * @param lo 這段範圍的起始位置 * @param hi 這段範圍的終點位置的後一個位置 */ private static void reverseRange(Object[] a, int lo, int hi) { hi--; while (lo < hi) { Object t = a[lo]; a[lo++] = a[hi]; a[hi--] = t; } } /** * 返回參與合併的最小長度,如果自然排序的長度,小於此長度,那麼就通過二分查詢排序擴充套件到 * 此長度。{@link #binarySort}. * * 粗略的講,計算結果是這樣的: * * 如果 n < MIN_MERGE, 直接返回 n。(太小了,不值得做複雜的操作); * 如果 n 正好是2的冪,返回 n / 2; * 其它情況下 返回一個數 k,滿足 MIN_MERGE/2 <= k <= MIN_MERGE, * 這樣結果就能保證 n/k 非常接近但小於一個2的冪。 * 這個數字實際上是一種空間與時間的優化。 * * @param n 參與排序的陣列的長度 * @return 參與歸併的最短長度 * 這段程式碼寫得也很贊 */ private static int minRunLength(int n) { assert n >= 0; int r = 0; // 只要不是 2的冪就會置 1 while (n >= MIN_MERGE) { r |= (n & 1); n >>= 1; } return n + r; } /** * Pushes the specified run onto the pending-run stack. * 將指定的升序序列壓入等待合併的棧中 * * @param runBase 升序序列的首個元素的位置 * @param runLen 升序序列的長度 */ private void pushRun(int runBase, int runLen) { this.runBase[stackSize] = runBase; this.runLen[stackSize] = runLen; stackSize++; } /** * 檢查棧中待歸併的升序序列,如果他們不滿足下列條件就把相鄰的兩個序列合併, * 直到他們滿足下面的條件 * * 1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1] * 2. runLen[i - 2] > runLen[i - 1] * * 每次新增新序列到棧中的時候都會執行一次這個操作。所以棧中的需要滿足的條件 * 需要靠呼叫這個方法來維護。 * * 最差情況下,有點像玩2048。 */ private void mergeCollapse() { while (stackSize > 1) { int n = stackSize - 2; if (n > 0 && runLen[n - 1] <= runLen[n] + runLen[n + 1]) { if (runLen[n - 1] < runLen[n + 1]) n--; mergeAt(n); } else if (runLen[n] <= runLen[n + 1]) { mergeAt(n); } else { break; // Invariant is established } } } /** * 合併棧中所有待合併的序列,最後剩下一個序列。這個方法在整次排序中只執行一次 */ private void mergeForceCollapse() { while (stackSize > 1) { int n = stackSize - 2; if (n > 0 && runLen[n - 1] < runLen[n + 1]) n--; mergeAt(n); } } /** * 在一個序列中,將一個指定的key,從左往右查詢它應當插入的位置;如果序列中存在 * 與key相同的值(一個或者多個),那返回這些值中最左邊的位置。 * * 推斷: 統計概率的原因,隨機數字來說,兩個待合併的序列的尾假設是差不多大的,從尾開始 * 做查詢找到的概率高一些。仔細算一下,最差情況下,這種查詢也是 log(n),所以這裡沒有 * 用簡單的二分查詢。 * * @param key 準備插入的key * @param a 參與排序的陣列 * @param base 序列範圍的第一個元素的位置 * @param len 整個範圍的長度,一定有len > 0 * @param hint 開始查詢的位置,有0 <= hint <= len;越接近結果查詢越快 * @param c 排序,查詢使用的比較器 * @return 返回一個整數 k, 有 0 <= k <=n, 它滿足 a[b + k - 1] < a[b + k] * 就是說key應當被放在 a[base + k], * 有 a[base,base+k) < key && key <=a [base + k, base + len) */ private static <T> int gallopLeft(T key, T[] a, int base, int len, int hint, Comparator<? super T> c) { assert len > 0 && hint >= 0 && hint < len; int lastOfs = 0; int ofs = 1; if (c.compare(key, a[base + hint]) > 0) { // key > a[base+hint] // 遍歷右邊,直到 a[base+hint+lastOfs] < key <= a[base+hint+ofs] int maxOfs = len - hint; while (ofs < maxOfs && c.compare(key, a[base + hint + ofs]) > 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow ofs = maxOfs; } if (ofs > maxOfs) ofs = maxOfs; // 最終的ofs是這樣確定的,滿足條件 a[base+hint+lastOfs] < key <= a[base+hint+ofs] // 的一組 // ofs: 1 3 7 15 31 63 2^n-1 ... maxOfs // lastOfs: 0 1 3 7 15 31 2^(n-1)-1 < ofs // 因為目前的offset是相對hint的,所以做相對變換 lastOfs += hint; ofs += hint; } else { // key <= a[base + hint] // 遍歷左邊,直到[base+hint-ofs] < key <= a[base+hint-lastOfs] final int maxOfs = hint + 1; while (ofs < maxOfs && c.compare(key, a[base + hint - ofs]) <= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow ofs = maxOfs; } if (ofs > maxOfs) ofs = maxOfs; // 確定ofs的過程與上面相同 // ofs: 1 3 7 15 31 63 2^n-1 ... maxOfs // lastOfs: 0 1 3 7 15 31 2^(n-1)-1 < ofs // Make offsets relative to base int tmp = lastOfs; lastOfs = hint - ofs; ofs = hint - tmp; } assert -1 <= lastOfs && lastOfs < ofs && ofs <= len; /* * 現在的情況是 a[base+lastOfs] < key <= a[base+ofs], 所以,key應當在lastOfs的 * 右邊,又不超過ofs。在base+lastOfs-1到 base+ofs範圍內做一次二叉查詢。 */ lastOfs++; while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); if (c.compare(key, a[base + m]) > 0) lastOfs = m + 1; // a[base + m] < key else ofs = m; // key <= a[base + m] } assert lastOfs == ofs; // so a[base + ofs - 1] < key <= a[base + ofs] return ofs; } /** * 與gallopLeft相似,不同的是如果發現key的值與某些元素相等,那返回這些值最後一個元素的位置的 * 後一個位置 * * @param key 需要查詢待插入位置的那個值 * @param a 待排序的陣列 * @param base 被查詢的序列中第一個元素的位置 * @param len 被查詢的序列的長度 * @param hint 開始查詢的位置,0 <= hint < len.它越接近結果所在位置,查詢越快。 * @param c 本次排序的比較器 * @return 一個整數 k, 滿足0 <= k <= n 並且 a[b + k - 1] <= key < a[b + k] */ private static <T> int gallopRight(T key, T[] a, int base, int len, int hint, Comparator<? super T> c) { assert len > 0 && hint >= 0 && hint < len; int ofs = 1; int lastOfs = 0; if (c.compare(key, a[base + hint]) < 0) { // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs] int maxOfs = hint + 1; while (ofs < maxOfs && c.compare(key, a[base + hint - ofs]) < 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow ofs = maxOfs; } if (ofs > maxOfs) ofs = maxOfs; // Make offsets relative to b int tmp = lastOfs; lastOfs = hint - ofs; ofs = hint - tmp; } else { // a[b + hint] <= key // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs] int maxOfs = len - hint; while (ofs < maxOfs && c.compare(key, a[base + hint + ofs]) >= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow ofs = maxOfs; } if (ofs > maxOfs) ofs = maxOfs; // Make offsets relative to b lastOfs += hint; ofs += hint; } assert -1 <= lastOfs && lastOfs < ofs && ofs <= len; /* * Now a[b + lastOfs] <= key < a[b + ofs], so key belongs somewhere to * the right of lastOfs but no farther right than ofs. Do a binary * search, with invariant a[b + lastOfs - 1] <= key < a[b + ofs]. */ lastOfs++; while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); if (c.compare(key, a[base + m]) < 0) ofs = m; // key < a[b + m] else lastOfs = m + 1; // a[b + m] <= key } assert lastOfs == ofs; // so a[b + ofs - 1] <= key < a[b + ofs] return ofs; } /** * 合併在棧中位於i和i+1的兩個相鄰的升序序列。 i必須為從棧頂數,第二和第三個元素。 * 換句話說i == stackSize - 2 || i == stackSize - 3 * * @param i 待合併的第一個序列所在的位置 */ private void mergeAt(int i) { //校驗 assert stackSize >= 2; assert i >= 0; assert i == stackSize - 2 || i == stackSize - 3; //內部初始化 int base1 = runBase[i]; int len1 = runLen[i]; int base2 = runBase[i + 1]; int len2 = runLen[i + 1]; assert len1 > 0 && len2 > 0; assert base1 + len1 == base2; /* * 記錄合併後的序列的長度;如果i == stackSize - 3 就把最後一個序列的資訊 * 往前移一位,因為本次合併不關它的事。i+1對應的序列被合併到i序列中了,所以 * i+1 數列可以消失了 */ runLen[i] = len1 + len2; if (i == stackSize - 3) { runBase[i + 1] = runBase[i + 2]; runLen[i + 1] = runLen[i + 2]; } //i+1消失了,所以長度也減下來了 stackSize--; /* * 找出第二個序列的首個元素可以插入到第一個序列的什麼位置,因為在此位置之前的序列已經就位了。 * 它們可以被忽略,不參加歸併。 */ int k = gallopRight(a[base2], a, base1, len1, 0, c); assert k >= 0; // 因為要忽略前半部分元素,所以起點和長度相應的變化 base1 += k; len1 -= k; // 如果序列2 的首個元素要插入到序列1的後面,那就直接結束了, // !!! 因為序列2在陣列中的位置本來就在序列1後面,也就是整個範圍本來就是有序的!!! if (len1 == 0) return; /* * 跟上面相似,看序列1的最後一個元素(a[base1+len1-1])可以插入到序列2的什麼位置(相對第二個序列起點的位置,非在陣列中的位置), * 這個位置後面的元素也是不需要參與歸併的。所以len2直接設定到這裡,後面的元素直接忽略。 */ len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c); assert len2 >= 0; if (len2 == 0) return; // 合併剩下的兩個有序序列,並且這裡為了節省空間,臨時陣列選用 min(len1,len2)的長度 // 優化的很細呢 if (len1 <= len2) mergeLo(base1, len1, base2, len2); else mergeHi(base1, len1, base2, len2); } /** * 使用固定空間合併兩個相鄰的有序序列,保持陣列的穩定性。 * 使用本方法之前保證第一個序列的首個元素大於第二個序列的首個元素;第一個序列的末尾元素 * 大於第二個序列的所有元素 * * 為了效能,這個方法在len1 <= len2的時候呼叫;它的姐妹方法mergeHi應該在len1 >= len2 * 的時候呼叫。len1==len2的時候隨便呼叫哪個都可以 * * @param base1 index of first element in first run to be merged * @param len1 length of first run to be merged (must be > 0) * @param base2 index of first element in second run to be merged * (must be aBase + aLen) * @param len2 length of second run to be merged (must be > 0) */ private void mergeLo(int base1, int len1, int base2, int len2) { assert len1 > 0 && len2 > 0 && base1 + len1 == base2; //將第一個序列放到臨時陣列中 T[] a = this.a; // For performance T[] tmp = ensureCapacity(len1); System.arraycopy(a, base1, tmp, 0, len1); int cursor1 = 0; // 臨時陣列指標 int cursor2 = base2; // 序列2的指標,參與歸併的另一個序列 int dest = base1; // 儲存結果的指標 // 這裡先把第二個序列的首個元素,移動到結果序列中的位置,然後處理那些不需要歸併的情況 a[dest++] = a[cursor2++]; // 序列2只有一個元素的情況,把它移動到指定位置之後,剩下的臨時陣列 // 中的所有序列1的元素全部copy到後面 if (--len2 == 0) { System.arraycopy(tmp, cursor1, a, dest, len1); return; } // 序列1只有一個元素的情況,把它移動到最後一個位置,為了不覆蓋,先把序列2中的元素 // 全部移走。這個是因為序列1中的最後一個元素比序列2中的所有元素都大,這是該方法執行的條件 if (len1 == 1) { System.arraycopy(a, cursor2, a, dest, len2); a[dest + len2] = tmp[cursor1]; // Last elt of run 1 to end of merge return; } Comparator<? super T> c = this.c; // 本次排序的比較器 int minGallop = this.minGallop; // " " " " " // 不瞭解break標籤的同學要補補Java基本功了 outer: while (true) { /* * 這裡加了兩個值來記錄一個序列連續比另外一個大的次數,根據此資訊,可以做出一些 * 優化 * */ int count1 = 0; // 序列1 連續 比序列2大多少次 int count2 = 0; // 序列2 連續 比序列1大多少次 /* * 這裡是直接的歸併演算法的合併的部分,這裡會統計count1合count2, * 如果其中一個大於一個閾值,就會跳出迴圈 * */ do { assert len1 > 1 && len2 > 0; if (c.compare(a[cursor2], tmp[cursor1]) < 0) { a[dest++] = a[cursor2++]; count2++; count1 = 0; // 序列2沒有元素了就跳出整次合併 if (--len2 == 0) break outer; } else { a[dest++] = tmp[cursor1++]; count1++; count2 = 0; // 如果序列1只剩下最後一個元素了就可以跳出迴圈 if (--len1 == 1) break outer; } /* * 這個判斷相當於 count1 < minGallop && count2 <minGallop * 因為count1和count2總有一個為0 * */ } while ((count1 | count2) < minGallop); /* * 執行到這裡的話,一個序列會連續的的比另一個序列大,那麼這種連續性可能持續的 * 更長。那麼我們就按照這個邏輯試一試。直到這種連續性被打破。根據找到的長度, * 直接連續的copy就可以了,這樣可以提高copy的效率。 */ do { assert len1 > 1 && len2 > 0; // gallopRight就是之前用過的那個方法 count1 = gallopRight(a[cursor2], tmp, cursor1, len1, 0, c); if (count1 != 0) { System.arraycopy(tmp, cursor1, a, dest, count1); dest += count1; cursor1 += count1; len1 -= count1; if (len1 <= 1) // 結尾處理退化的序列 break outer; } a[dest++] = a[cursor2++]; if (--len2 == 0) //結尾處理退化的序列 break outer; count2 = gallopLeft(tmp[cursor1], a, cursor2, len2, 0, c); if (count2 != 0) { System.arraycopy(a, cursor2, a, dest, count2); dest += count2; cursor2 += count2; len2 -= count2; if (len2 == 0) break outer; } a[dest++] = tmp[cursor1++]; if (--len1 == 1) break outer; // 這裡對連續性比另外一個大的閾值減少,這樣更容易觸發這段操作, // 應該是因為前面的資料表現好,後面的資料類似的可能性更高? minGallop--; } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP); //如果連續性還是很大的話,繼續這樣處理s if (minGallop < 0) minGallop = 0; //同樣,這裡如果跳出了那段迴圈,就證明資料的順序程度不好,應當增加閾值,避免浪費資源 minGallop += 2; } //outer 結束 this.minGallop = minGallop < 1 ? 1 : minGallop; // Write back to field //這裡處理收尾工作 if (len1 == 1) { assert len2 > 0; System.arraycopy(a, cursor2, a, dest, len2); a[dest + len2] = tmp[cursor1]; // Last elt of run 1 to end of merge } else if (len1 == 0) { //因為序列1中的最後一個值,比序列2中的所有值都大,所以,不可能序列1空了,序列2還有元素 throw new IllegalArgumentException( "Comparison method violates its general contract!"); } else { assert len2 == 0; assert len1 > 1; System.arraycopy(tmp, cursor1, a, dest, len1); } } /** * Like mergeLo, except that this method should be called only if * len1 >= len2; mergeLo should be called if len1 <= len2. (Either method * may be called if len1 == len2.) * * @param base1 index of first element in first run to be merged * @param len1 length of first run to be merged (must be > 0) * @param base2 index of first element in second run to be merged * (must be aBase + aLen) * @param len2 length of second run to be merged (must be > 0) */ private void mergeHi(int base1, int len1, int base2, int len2) { assert len1 > 0 && len2 > 0 && base1 + len1 == base2; // Copy second run into temp array T[] a = this.a; // For performance T[] tmp = ensureCapacity(len2); System.arraycopy(a, base2, tmp, 0, len2); int cursor1 = base1 + len1 - 1; // Indexes into a int cursor2 = len2 - 1; // Indexes into tmp array int dest = base2 + len2 - 1; // Indexes into a // Move last element of first run and deal with degenerate cases a[dest--] = a[cursor1--]; if (--len1 == 0) { System.arraycopy(tmp, 0, a, dest - (len2 - 1), len2); return; } if (len2 == 1) { dest -= len1; cursor1 -= len1; System.arraycopy(a, cursor1 + 1, a, dest + 1, len1); a[dest] = tmp[cursor2]; return; } Comparator<? super T> c = this.c; // Use local variable for performance int minGallop = this.minGallop; // " " " " " outer: while (true) { int count1 = 0; // Number of times in a row that first run won int count2 = 0; // Number of times in a row that second run won /* * Do the straightforward thing until (if ever) one run * appears to win consistently. */ do { assert len1 > 0 && len2 > 1; if (c.compare(tmp[cursor2], a[cursor1]) < 0) { a[dest--] = a[cursor1--]; count1++; count2 = 0; if (--len1 == 0) break outer; } else { a[dest--] = tmp[cursor2--]; count2++; count1 = 0; if (--len2 == 1) break outer; } } while ((count1 | count2) < minGallop); /* * One run is winning so consistently that galloping may be a * huge win. So try that, and continue galloping until (if ever) * neither run appears to be winning consistently anymore. */ do { assert len1 > 0 && len2 > 1; count1 = len1 - gallopRight(tmp[cursor2], a, base1, len1, len1 - 1, c); if (count1 != 0) { dest -= count1; cursor1 -= count1; len1 -= count1; System.arraycopy(a, cursor1 + 1, a, dest + 1, count1); if (len1 == 0) break outer; } a[dest--] = tmp[cursor2--]; if (--len2 == 1) break outer; count2 = len2 - gallopLeft(a[cursor1], tmp, 0, len2, len2 - 1, c); if (count2 != 0) { dest -= count2; cursor2 -= count2; len2 -= count2; System.arraycopy(tmp, cursor2 + 1, a, dest + 1, count2); if (len2 <= 1) // len2 == 1 || len2 == 0 break outer; } a[dest--] = a[cursor1--]; if (--len1 == 0) break outer; minGallop--; } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP); if (minGallop < 0) minGallop = 0; minGallop += 2; // Penalize for leaving gallop mode } // End of "outer" loop this.minGallop = minGallop < 1 ? 1 : minGallop; // Write back to field if (len2 == 1) { assert len1 > 0; dest -= len1; cursor1 -= len1; System.arraycopy(a, cursor1 + 1, a, dest + 1, len1); a[dest] = tmp[cursor2]; // Move first elt of run2 to front of merge } else if (len2 == 0) { throw new IllegalArgumentException( "Comparison method violates its general contract!"); } else { assert len1 == 0; assert len2 > 0; System.arraycopy(tmp, 0, a, dest - (len2 - 1), len2); } } /** * 保證臨時陣列的大小能夠容納所有的臨時元素,在需要的時候要擴充套件臨時陣列的大小。 * 陣列的大小程指數增長,來保證線性的複雜度。 * * 一次申請步長太小,申請的次數必然會增多,浪費時間;一次申請的空間足夠大,必然會 * 浪費空間。正常情況下,歸併排序的臨時空間每次大的合併都會 * 2, * 最大長度不會超過陣列長度的1/2。 這個長度於2 有著緊密的聯絡。 * * @param minCapacity 臨時陣列需要的最小空間 * @return tmp 臨時陣列 */ private T[] ensureCapacity(int minCapacity) { // 如果臨時陣列長度不夠,那需要重新計算臨時陣列長度; // 如果長度夠,直接返回當前臨時陣列 if (tmp.length < minCapacity) { // 這裡是計算最小的大於minCapacity的2的冪。方法不常見,這裡分析一下。 // // 假設有無符號整型 k,它的位元組碼如下: // 00000000 10000000 00000000 00000000 k // 00000000 11000000 00000000 00000000 k |= k >> 1; // 00000000 11110000 00000000 00000000 k |= k >> 2; // 00000000 11111111 00000000 00000000 k |= k >> 4; // 00000000 11111111 11111111 00000000 k |= k >> 8; // 00000000 11111111 11111111 11111111 k |= k >> 16 // 上面的移位事實上只跟最高位有關係,移位的結果是最高位往後的bit全部變成了1 // 最後 k++ 的結果 就是剛好是比 minCapacity 大的2的冪 // 寫的真是6 int newSize = minCapacity; newSize |= newSize >> 1; newSize |= newSize >> 2; newSize |= newSize >> 4; newSize |= newSize >> 8; newSize |= newSize >> 16; newSize++; if (newSize < 0) // Not bloody likely! 估計作者在這裡遇到bug了 newSize = minCapacity; else newSize = Math.min(newSize, a.length >>> 1); @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"}) T[] newArray = (T[]) new Object[newSize]; tmp = newArray; } return tmp; } /** * 檢查範圍fromIndex到toIndex是否在陣列內,如果不是拋異常 * * @param arrayLen 整個陣列的長度 * @param fromIndex 該範圍的起點 * @param toIndex 該範圍的終點 * @throws IllegalArgumentException if fromIndex > toIndex * @throws ArrayIndexOutOfBoundsException if fromIndex < 0 or toIndex > arrayLen */ private static void rangeCheck(int arrayLen, int fromIndex, int toIndex) { if (fromIndex > toIndex) throw new IllegalArgumentException("fromIndex(" + fromIndex + ") > toIndex(" + toIndex + ")"); if (fromIndex < 0) throw new ArrayIndexOutOfBoundsException(fromIndex); if (toIndex > arrayLen) throw new ArrayIndexOutOfBoundsException(toIndex); } }