具體解釋Redis源代碼中的部分高速排序算法(pqsort.c)
看標題。你可能會疑惑:咦?你這家夥。怎麽不解說完整的快排,僅僅講一部分快排……-。-
哎,冤枉。
“部分快排”是算法的名字。實際上本文相當具體呢。本文差點兒與普通快排無異。看懂了本文,你對普通的快排也會有更深的認識了。
高速排序算法(qsort)的原理我們大都應該了解。本文介紹的是部分高速排序算法。
事實上其算法本質是一樣的,僅僅只是限定了排序的左右區間。也就是僅僅對一個數字序列的一部分進行排序。故稱為“部分高速排序算法”。簡稱:
pqsort
Redis
項目中的pqsort.c
文件實現了pqsort()
函數。其源代碼見本文最後一節 pqsort.c源代碼 。 另外補充一句:長文慎入 :-)
導讀
外部資料
維基百科
快排基本流程不了解的童鞋。請移步 高速排序wiki
論文
實際上pqsort.c
的快排流程是改編自一個經典實現,該實現被很多庫的實現所使用。
請參考Bentley & McIlroy
所著論文 “Engineering a Sort Function”
源代碼結構
主要函數 | pqsort() |
---|---|
靜態函數 | _pqsort()、swapfuc()、med3() |
宏函數 | min()、swap()、swapcode()、vecswap()、SWAPINIT() |
整體來說,pqsort.c文件對外僅僅提供了一個函數—— pqsort()
接下來的介紹中。我會簡介宏函數和幾個靜態函數。把重點放在靜態函數_pqsort()上。它才是整個算法的核心部分。
pqsort()與qsort()
C標準庫
中有一個快排的函數qsort()
,它與本文介紹的pqsort()
所提供的編程接口極為類似,請看兩者聲明:
void qsort (void *a, size_t n, size_t es, int (*cmp)(const void *, const void *));
void pqsort(void *a, size_t n, size_t es, int (*cmp)(const void *, const void *),
size_t lrange, size_t rrange);
參數解讀
參數 | 說明 |
---|---|
a | 待排序數組的首地址 |
n | 待排序元素的個數 |
es | element size :每一個元素的字節大小 |
cmp | 回調函數。定義了比較的規則,直接影響排序結果是遞增排序或遞減排序,並支持非標準類型的排序 |
lrange | 待排序的左邊界 |
rrange | 待排序的右邊界 |
pqsort()與_pqsort()
pqsort()源代碼
void
pqsort(void *a, size_t n, size_t es,
int (*cmp) (const void *, const void *), size_t lrange, size_t rrange)
{
_pqsort(a,n,es,cmp,((unsigned char*)a)+(lrange*es),
((unsigned char*)a)+((rrange+1)*es)-1);
}
能夠看出我們的qpsort()事實上是在調用_pqsort()來完畢排序功能的。
這兩個函數非常像,區別在於參數上。
看一下兩者的函數原型:
void
pqsort (void *a, size_t n, size_t es, int (*cmp)(const void *, const void *),
size_t lrange, size_t rrange);
static void
_pqsort(void *a, size_t n, size_t es, int (*cmp)(const void *, const void *),
void *lrange, void *rrange)
差異的關鍵在於:
- pqsort() 的參數中的左右邊界值,其含義值下標
- _pqsort()的參數中的左右邊界值,其含義是指針
這樣pqsort()源代碼就不足為奇了。所以我前面說該文件的核心部分是_pqsort()
預備知識
看一下除了_pqsort()之外的源代碼部分。這些都是_pqsort()函數實現的輔助。
med3
static inline char *
med3(char *a, char *b, char *c,
int (*cmp) (const void *, const void *))
{
return cmp(a, b) < 0 ?
(cmp(b, c) < 0 ?
b : (cmp(a, c) < 0 ?
c : a ))
:(cmp(b, c) > 0 ?
b : (cmp(a, c) < 0 ?
a : c ));
}
依據回調函數cmp
指定的比較規則。則求出變量a,b,c中處於中間大小的變量。
換句話說:就是在求 中位數。
min
#define min(a, b) (a) < (b) ? a : b
這是個簡單的宏。看一眼就呵呵即可了。
SWAPINIT
#define SWAPINIT(a, es) swaptype = ((char *)a - (char *)0) % sizeof(long) || \
es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1;
該宏的目的在於,給swaptype賦值,它有例如以下幾種取值:
swaptype | 說明 |
---|---|
0 | 數組a中每一個元素的大小是sizeof(long) |
1 | 數組a中每一個元素的大小是sizeof(long) 的倍數。但不等於sizeof(long) |
2 | 數組a中每一個元素的大小不是sizeof(long) 的倍數 |
其它 | 數組a的首地址不是sizeof(long) 的倍數,即不是總線字節對齊 |
swaptype等於0、1、2的時候,數組a的首地址都是sizeof(long)
字節對齊的。
題外話:
我們常說8字節對齊,指的是64位機器中要滿足8字節對齊(首地址是8的倍數),則數據的讀取效率會更高。而32位系統應滿足的是4字節對齊。具體大小是和機器字長相關的,機器字長指的是計算機一次能讀取的二進制位數。
一般機器字長和long類型的大小同樣,所以能夠說要滿足sizeof(long)字節對齊。
以下首先介紹的是幾個與交換操作相關的函數(或宏),這裏我假定A → B表示A函數會調用B函數(宏)。
我們從右向左解讀
swapcode
這是個宏函數 。其功能是將以parmi
和parmj
為首地址的n個字節進行交換。
#define swapcode(TYPE, parmi, parmj, n) { \
- 形參
TYPE
就是指的類型。閱讀後面代碼。可知其實參是char
和long
這兩種。 - 形參
n
指定的是待交換字節數。
請同意我在宏這裏。使用了術語:形參、實參。盡管可能不搭,但目的是便於讀者理解。
size_t i = (n) / sizeof (TYPE); TYPE *pi = (TYPE *)(void *)(parmi); TYPE *pj = (TYPE *)(void *)(parmj); \
i
就是指定類型(char或long)的元素的個數。然後將參數parmi
和parmj
轉換成指定的類型的指針pi
和pj
。
do { TYPE t = *pi; *pi++ = *pj; \ //等價於*pi = *pj; pi++;
*pj++ = t; } while (--i > 0); } //end of #define
一個do-while
循環,內部執行了交換操作。
swapfunc
static inline void
swapfunc(char *a, char *b, size_t n, int swaptype)
{
if (swaptype <= 1)
swapcode(long, a, b, n)
else
swapcode(char, a, b, n)
}
簡單的if
條件語句。假設swaptype <= 1
(swaptype為0或1。即元素類型為sizeof(long)的倍數)則按long類型的大小來進行交換。否則就按char類型的大小來進行交換。
這樣做的目的主要是提高交互操作的效率。
swap
#define swap(a, b) \
if (swaptype == 0) { long t = *(long *)(void *)(a); *(long *)(void *)(a) = *(long *)(void *)(b); *(long *)(void *)(b) = t; } else swapfunc(a, b, es, swaptype)
前面已經說過了,swaptype
為0的時候,表示數組元素的大小等於long類型的大小。所以這裏進行了這種交互操作。
vecswap
#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
該宏和swap(a, b)
事實上非常像。都是在調用swapfunc
來完畢交互操作。
但而二者的不同之處是:vecswap(a, b, n)
進行的是n*2個元素的交換,而swap(a, b)
僅僅進行兩個元素之間的交換。
vecswap是vector swap的縮寫。
vector即向量,表示多個元素
好了,言歸正傳。前面說了這麽多,事實上都是基礎先修課,接下來才是真正的核心代碼呦。
_pqsort
回想一下聲明部分:
static void
_pqsort(void *a, size_t n, size_t es,
int (*cmp) (const void *, const void *), void *lrange, void *rrange);
由於a是帶排序數字序列的首地址,所以我以下希望能用數組的寫法來簡化我的描寫敘述。
比方&a[1] = (char *) a + es
,&a[n-1] = (char *) a + (n-1)*es
等號右邊的表達式是void *實現C語言泛型功能的典型方法。
誠然,在語法上。二者並非等價的。但在邏輯上是能夠理解的。僅僅是為了便於理解,簡化敘述
cmp前面我也提到了是一個回調函數,實現了自己定義的比較操作。這裏為了簡化敘述。我們假定要完畢的就是一個遞增序列,而cmp完畢的就是一般的大小比較操作。
同樣為了便於表述。我們假定我們要完畢的是數字的排序工作,而不是其它自己定義類型的排序工作。
局部變量
char *pa, *pb, *pc, *pd, *pl, *pm, *pn;
size_t d, r;
int swaptype, cmp_result;
loop循環
loop: SWAPINIT(a, es);
這一行使用SWAPINIT宏函數,求解出了swapcode的值。行首有一label(標簽)——loop:說明接下來會有一個goto的循環語句。
讀者朋友請不要在這裏跟我糾結方法論中的論調,我僅僅想說: goto 有時候確實是非常方便的。可讀性也不錯。
每循環一次完畢的是快排的一趟排序工作。
一段冒泡
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0;
pl -= es)
swap(pl, pl - es);
return;
}
這段代碼。假設你使用了我前面簡化的數組表示法來代換的話,實際上不難理解。 在帶排序元素個數小於7的時候,我們採用 冒泡排序 。
在元素個數不多的時候,使用快排反而不能提高效率。倒不如傳統的冒泡來的實在。
然而究竟這個數為什麽是7,而不是6。8或其它數字。我也不得而知。
我僅僅能說這就是一個
Magic Number
(中文譯為魔數、幻數。指代碼中出現的不明所以。意義不明的數字)。
選取模糊中位數
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp);
pm = med3(pm - d, pm, pm + d, cmp);
pn = med3(pn - 2 * d, pn - d, pn, cmp);
}
pm = med3(pl, pm, pn, cmp);
}
swap(a, pm);
首先是pm = &a[n/2]
,在n大於7的時候。 pl =&a[0]; pn = &a[n-1];
然後在元素個數n大於40的時候:
沒錯,為什麽是40。
這又是一個
Magic Number
又一次選擇新的pl,pm,pr。d = (n / 8) * es;
我們能夠假想將n個數字分成8個子區間。
- pl是左邊三個區間首部中的中位數索引(首部指的是子區間第0個元素)
- pm是中間三個區間首部中的中位數索引
pr是右邊三個區間首部中的中位數索引
接著一個
pm = med3(pl, pm, pn, cmp);
在這三個中位數中選取中位數。所以最後我們得到的pm實際上是比較接近於整個數字序列中位數的索引。當然並非全部數字中的中位數。我們可稱它為模糊中位數。
了解快排的過程,我們就會知道每趟排序之前選取一個元素作為基準。排序之後保證該基準左邊都小於它。基準的右邊都大於它。然後該基準的左右區間在反復這一排序過程。假設我們每趟選取的基準都接近中位數,保證左右區間的長度大致同樣。那麽接下來排序的效率就更高。
swap(a, pm);
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
將pm的的值與a[0]的值交互。我們的模糊中位數此時保存在了第一個元素中,接下來我稱它為基準。
然後:pa = pb = a[1];
pc = pd = a[n-1];
一趟排序
for (;;) {
while (pb <= pc && (cmp_result = cmp(pb, a)) <= 0) {
if (cmp_result == 0) {
swap(pa, pb);
pa += es;
}
pb += es;
}
while (pb <= pc && (cmp_result = cmp(pc, a)) >= 0) {
if (cmp_result == 0) {
swap(pc, pd);
pd -= es;
}
pc -= es;
}
if (pb > pc)
break;
swap(pb, pc); //能執行到這一步。說明*pb>*a,*pc<*a。交換一下。
pb += es;
pc -= es;
}
一個兩層循環。涉及代碼量較多,這裏我簡單地介紹一下它的功能。大家努力去自行理解。好吧,事實上是我說累了,懶得說了。它的功能是基本完畢了快排中的一趟排序。唯一的不足之處就是我們的基準還不在中間位置。此外該操作還把序列中和基準a[0]同樣的數都交換到了序列的左端和右端的連續區間。
所以接下來我們要把基準區間都交換到中間位置才行。
把基準交換到中間
pn = (char *) a + n * es; //pn = a[n]...不要操心越界。以下並不會訪問該內存
r = min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = min((size_t)(pd - pc), pn - pd - es);
vecswap(pb, pn - r, r);
這部分代碼就是把數字序列左右兩端的連續區間(值等於基準)都交換到序列的中間。
之所以調用min()來確定交換的個數r。是由於交換前後兩個區間是可能有重合的,所以我們要保證交換的元素個數最少。
以左端的交換為例(黃顏色的部分表示值都等於基準a[0]):
- A圖表示pa - (char *) a < pb - pa
- B圖表示pa - (char *) a > pb - pa
到此為止。我們一趟排序工作完畢了。接下來要做的就是用遞歸或循環來開始下一趟排序。
開始下一趟排序
簡單地描寫敘述一下快排過程,在一趟快排結束後。我們要用遞歸(或循環叠代)的方式在反復排序工作。此後就是在基準左邊這一區間展開一趟排序,在基準右邊區間也展開一趟排序。這就是分治思想。
if ((r = pb - pa) > es) {
void *_l = a, *_r = ((unsigned char*)a)+r-1;
if (!((lrange < _l && rrange < _l) ||
(lrange > _r && rrange > _r)))
_pqsort(a, r / es, es, cmp, lrange, rrange);
}
這段代碼是對基準左邊的區間進行一趟遞歸的快排。
註意,最外層的if
條件中對r
進行了又一次賦值(r = pb - pa)。 推斷pb - pa
這一個區間元素個數是否大於1(僅僅有一個元素顯然不須要排序的)。為什麽是推斷pb - pa
而不是推斷pa - a
呢?直接上圖(與前文中的AB兩種情況相應):
黃色左邊的白色部分。是我們要排序的區間
接著看代碼,內層也嵌套了一個if
,他的條件非常復雜。
肢解一下。這個條件有一個。
非操作。我設該條件為!T
,用偽碼表示:
// T = ((lrange < _l && rrange < _l)||(lrange > _r && rrange > _r))
if (!T)
_pqsort(...);
去理解它的逆命題(else): 假設滿足條件T
。則不會進行排序。事實上非常好理解。lrange
。rragne
是待排序的區間左右邊界。而_l
和_r
是基準左側區間的實際左右邊界。假設待排序的邊界比實際左邊界還要小。或者比實際的右邊界還要大,顯然是不滿足條件的。
實際上在整個pqsort.c源代碼中,所做的操作差點兒於普通的快排無異,唯一體現了部分快排算法的部分二字的地方就是這內層嵌套的循環而已。
if ((r = pd - pc) > es) {
void *_l, *_r;
/* Iterate rather than recurse to save stack space */
a = pn - r;
n = r / es;
_l = a;
_r = ((unsigned char*)a)+r-1;
if (!((lrange < _l && rrange < _l) ||
(lrange > _r && rrange > _r)))
goto loop;
}
這段代碼是對基準右邊的區間進行了一次快排。其過程和前面類似,就不贅述了。不同之處是關於首元素索引不再是原先的a
。而是pn - r
,這並不難理解。另外一個變化就是這一趟新排序的開始不是使用的遞歸,而是循環(goto loop
)。作者在凝視中也解釋了,沒有繼續採用遞歸是為了節省棧空間。
pqsort.c源代碼
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
static inline char *med3 (char *, char *, char *,
int (*)(const void *, const void *));
static inline void swapfunc (char *, char *, size_t, int);
#define min(a, b) (a) < (b) ? a : b
/*
* Qsort routine from Bentley & McIlroy‘s "Engineering a Sort Function".
*/
#define swapcode(TYPE, parmi, parmj, n) { \
size_t i = (n) / sizeof (TYPE); TYPE *pi = (TYPE *)(void *)(parmi); TYPE *pj = (TYPE *)(void *)(parmj); do { TYPE t = *pi; *pi++ = *pj; *pj++ = t; } while (--i > 0); }
#define SWAPINIT(a, es) swaptype = ((char *)a - (char *)0) % sizeof(long) || \
es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1;
static inline void
swapfunc(char *a, char *b, size_t n, int swaptype)
{
if (swaptype <= 1)
swapcode(long, a, b, n)
else
swapcode(char, a, b, n)
}
#define swap(a, b) \
if (swaptype == 0) { long t = *(long *)(void *)(a); *(long *)(void *)(a) = *(long *)(void *)(b); *(long *)(void *)(b) = t; } else swapfunc(a, b, es, swaptype)
#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
static inline char *
med3(char *a, char *b, char *c,
int (*cmp) (const void *, const void *))
{
return cmp(a, b) < 0 ?
(cmp(b, c) < 0 ? b : (cmp(a, c) < 0 ?
c : a ))
:(cmp(b, c) > 0 ? b : (cmp(a, c) < 0 ? a : c ));
}
static void
_pqsort(void *a, size_t n, size_t es,
int (*cmp) (const void *, const void *), void *lrange, void *rrange)
{
char *pa, *pb, *pc, *pd, *pl, *pm, *pn;
size_t d, r;
int swaptype, cmp_result;
loop: SWAPINIT(a, es);
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0;
pl -= es)
swap(pl, pl - es);
return;
}
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp);
pm = med3(pm - d, pm, pm + d, cmp);
pn = med3(pn - 2 * d, pn - d, pn, cmp);
}
pm = med3(pl, pm, pn, cmp);
}
swap(a, pm);
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
for (;;) {
while (pb <= pc && (cmp_result = cmp(pb, a)) <= 0) {
if (cmp_result == 0) {
swap(pa, pb);
pa += es;
}
pb += es;
}
while (pb <= pc && (cmp_result = cmp(pc, a)) >= 0) {
if (cmp_result == 0) {
swap(pc, pd);
pd -= es;
}
pc -= es;
}
if (pb > pc)
break;
swap(pb, pc);
pb += es;
pc -= es;
}
pn = (char *) a + n * es;
r = min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = min((size_t)(pd - pc), pn - pd - es);
vecswap(pb, pn - r, r);
if ((r = pb - pa) > es) {
void *_l = a, *_r = ((unsigned char*)a)+r-1;
if (!((lrange < _l && rrange < _l) ||
(lrange > _r && rrange > _r)))
_pqsort(a, r / es, es, cmp, lrange, rrange);
}
if ((r = pd - pc) > es) {
void *_l, *_r;
/* Iterate rather than recurse to save stack space */
a = pn - r;
n = r / es;
_l = a;
_r = ((unsigned char*)a)+r-1;
if (!((lrange < _l && rrange < _l) ||
(lrange > _r && rrange > _r)))
goto loop;
}
/* qsort(pn - r, r / es, es, cmp);*/
}
void
pqsort(void *a, size_t n, size_t es,
int (*cmp) (const void *, const void *), size_t lrange, size_t rrange)
{
_pqsort(a,n,es,cmp,((unsigned char*)a)+(lrange*es),
((unsigned char*)a)+((rrange+1)*es)-1);
}
具體解釋Redis源代碼中的部分高速排序算法(pqsort.c)