10億條long型資料外部排序的檔案分割實踐及優化過程(JAVA)
一、題目
生成10億個long隨機數正整數,把它寫入一個檔案裡。然後實現一個函式 fetch(int k,int n)。(fetch函式的輸出結果是這10億個正整數中從小到大中第k個開始(不包含第k個)往後取n個數。) 給定記憶體為2G(一開始為1G)。
二、題目分析
(1)首先生成10億個long隨機正整數,可考慮使用ThreadLocalRandom和多執行緒生成隨機數。由於全部資料記憶體佔用10幾G,需要分批寫入檔案。(一個數據一行,行末為\n) (2)fetch函式的實現: 1.先對隨機數進行外部排序。由於隨機數檔案較大,無法一次性讀取全部資料進行排序,所以必須對隨機數檔案進行分割成多個完成資料排序的小檔案,然後通過多路歸併實現外部排序。 2.然後實現fetch函式,輸出結果 因此本文主要針對外部排序的檔案分割部分進行說明,至於多路歸併和fetch函式的實現本文暫不開展。
三、程式設計
本文主要針對大檔案切割的程式設計進行分析。 設計思路:通過BufferedReader的readLine()方法讀取每一行資料為String(read),並將String轉換為long(parse),存放在一個long[]數組裡。當裝滿long[]時通過Araay.sort()排序(sort),將排序好的long[]按行輸出(write)。(long[]大小自行設定)通過多次反覆操作實現大檔案分割。(為了方便描述,後文將大檔案切割分為read、parse、sort、write四個部分來描述)為了加快效率,我用一個執行緒執行read、parse,另一個執行緒執行sort、write,兩執行緒間用BlockingQueue交流資料。 本方案程式碼實現簡單,但是效率不忍直視,而且由於過大的記憶體開銷,很容易就堆滿了,沒等它執行成功就對它進行優化了。。。 優化分為以下幾個部分 read部分: 10億個資料要讀10億次。。。因此read部分必須進行優化。因此採用RandomAccessFile和多執行緒結合進行讀取,根據偏移量進行分次讀取,每次讀取32M(這個量是比較快而且不容易出現堆滿的)。讀取的位元組資料存放在byte[]資料裡,這時會出現一個新問題:每次讀取的資料的末尾不一定是以“\n”結束,那麼必定有個隨機數被分割了! 由於採用多執行緒進行IO讀操作,因此為了解決隨機數被分割問題肥了點心思。 主要思路是每完成一次read(b, 0, length)之後,往後繼續read()一個byte,直到遇到第一個[10](即'\n');同時還要判斷每次read起始部分是否為完整的一個隨機數,從byte[0]開始判斷直到遇見第一個[10](即'\n')。這樣才能在parse的時候資料時完整的。由於程式碼不小心刪了,這裡就提供一個思路。 parse部分: 將byte[]轉為String,再用parseLong()轉為long,然後放入ArrayList。。。其實這很蠢。 sort部分: 直接呼叫sort函式。 write部分: 將long資料按行寫出為字元。後來考慮到分割的檔案是臨時檔案,用完就刪。所以用DataOutputStream包裝BufferOutputStream輸出為一個個8位元組的long,這樣減少了一半以上的檔案大小,且能提高輸出效率。(ps:這算是一個不錯的想法) 歷經千辛萬苦,跑了20幾分鐘才分割完資料,太慢了。 反思:方案一有許多不足之處。 採用多執行緒進行I/O操作並不一定會提高效率,有時反而會影響效率。因為一個磁碟一個時間段內只能進行一個I/O操作,如果通過多執行緒進行I/O操作,可能造成每次I/O是磁頭尋道的偏移量較大,也就是尋道時間長,反而增加了I/O時間。 其次parse部分。都說了好蠢。謹慎使用String,每次新的一個String都會佔用常量池。為了避免使用String,我尋思如何直接將byte[]資料轉換為long。於是乎想到了迭代計算,同時參考parseLong()的原始碼,進行優化。
優化分析:
這個問題想要實現質的優化,必須合理使用多執行緒。後經高人點播:既然大檔案分割分為read、parse、sort、write四個部分,而且電腦是四核(二核四執行緒),那麼一個部分用一個執行緒進行操作,形成一條流水線,流水線上的資料通過BlockingQueue來傳遞。
read部分:
//方案一 class ReadData implements Runnable{ private int spiltSize;//每次讀取檔案的大小 private File file;//原始檔 private int spiltNum;//read次數,spiltNum=file.length/spiltSize(有餘數就加1) private long startPosition ;//每次read的偏移量 private BlockingQueue<byte[]> bq; ReadData(File file,int spiltSize,int spiltNum,BlockingQueue<byte[]> bq){ this.file=file; this.spiltSize=spiltSize; this.spiltNum=spiltNum; this.bq=bq; } public void run() { try(RandomAccessFile raf = new RandomAccessFile(file,"r");){ byte[] b=new byte[spiltSize]; for(int k=0;k<spiltNum;k++) { startPosition =(long)k*(long)spiltSize; raf.seek(startPosition); int read = raf.read(b, 0, spiltSize); if(read==spiltSize) { bq.put(b); }else { byte[] temp = Arrays.copyOfRange(b, 0, read); bq.put(temp); } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
這裡將每次讀取的資料直接傳遞給parse。為了減少寫出檔案的數量,我儘可能的將spiltSize設定大,這個方式並沒有充分利用多執行緒,read時間較長,後置的執行緒(parse、sort、write)處於阻塞狀態。由於缺乏對磁碟IO的理解,我侷限的認為一次性讀取的資料越大,減少I/O次數而提高效率,同時我又想保證每次寫出的資料也越多越好,這樣也可以減少後續歸併的路數。因此將每次讀取檔案大小盡可能調大。 通過對磁碟IO的瞭解,影響磁碟的關鍵因數是磁碟服務時間,即磁碟完成一個I/O請求所花費的時間,它由尋道時間、旋轉延遲和資料傳輸時間三部分構成。其中尋道時間、旋轉延遲是佔主要的,資料傳輸時間可以忽略。由於磁碟上每個扇區512byte,而作業系統的檔案系統不是一個扇區一個扇區的來讀資料,所以有了block(塊)的概念,它是一個塊一個塊的讀取的,塊(block)是基本的資料傳輸單元(一般的作業系統block size為4k)。那麼在磁碟上的同樣儲存位置,JAVA進行1024次4k的IO請求和進行1次4M的IO請求,在磁碟服務時間應該是差不多的(不知道這麼理解對不對,如有不對之處請指出)。那麼減少每次read的大小(保證為block size 的整數倍),保證流水線一直處於執行狀態,提高CPU的利用率。 ps:作業系統層對於IO的影響這邊就不考慮了。 針對這個問題做了優化。 (1)使用FileChannel包裝(FileInputStream)來進行讀檔案,每次讀取檔案大小spiltSize為8k。(由於電腦較差,每次執行結果波動很大,不能判斷哪種spiltSize取多少最好,經測試8k比4k好,16k比8k略差。這個地方有待繼續論證,但不影響整體的設計思路。FileChannel包裝也可以包裝RandomAccessFile,還沒測試出FileInputStream和RandomAccessFile哪個好,有高手可以指出) (2)考慮的原始檔來源可能是網路傳輸,並不能識別原始檔實際大小,無法判斷實際分割的小檔案的個數,因此取消了`for(int i=0;i<spiltNum;i++)`來判定迴圈,而改用`while(true) `的死迴圈,直到原始檔讀取完畢退出,同時向下遊傳送一個size=0的陣列,作為結束的標誌,這種處理方式提高了程式的健壯性。(值得推廣) 程式碼優化如下:
//方案二
public void run() {
try(
FileInputStream fis = new FileInputStream(file); //z這裡也可以包裝RandomAccessFile
FileChannel fc = fis.getChannel();
){
ByteBuffer bb= ByteBuffer.allocate(spiltSize);//spiltSize
long startPosition ;
int read;
int k=0;
byte[] temp;
while(true) {
startPosition =(long)k*(long)spiltSize;
read=fc.read(bb, startPosition);
if(read!=-1) {
temp =new byte[read];
bb.flip();
bb.get(temp);
bb.clear();
bq.put(temp);
k++;
}else {
bq.put(new byte[0]);
break;
}
}
}catch(IOException e) {
e.printStackTrace();
}catch(InterruptedException e1) {
e1.printStackTrace();
}
}
parse部分:
parseLong()方法也是直接將byte[]轉化為long。那麼鑑於此,我通過迭代計算將byte[]陣列轉換為long,同時用位運算計算乘法更快。 一開始由於不知道byte[]中可以parse出多少long資料,無法宣告一個固定大小的陣列,所以我這裡考慮用List,這樣增加的拆裝與包裝的過程。在read部分優化之後,考慮到每次讀取資料為8k,需要積累一定數量的long資料再輸出比較好好,所以通過自定義一個固定大小的long[]陣列來儲存parse後的long型資料。這樣可以避免使用LIst<Long>,一定程度減小了記憶體開銷和提高了效能。
//方案二,方案一使用List,這裡不給出程式碼
class Parse implements Runnable{
private BlockingQueue<byte[]> bq;
private BlockingQueue<long[]> bq1;
private int num ;//自定義儲存long資料的個數
Parse(BlockingQueue<byte[]> bq,BlockingQueue<long[]> bq1,int num){
this.bq=bq;
this.bq1=bq1;
this.num=num;
}
public void run() {
byte[] b;
int count=0;
long l=0L;//byte[]轉為long型資料,臨時
long[] temp =new long[num];
try {
while(true) {
b=bq.take();
if(b.length==0) {
long[] temp2= new long[count];
System.arraycopy(temp, 0,temp2,0,count);
bq1.put(temp2);
bq1.put(new long[0]);
break;
}else {
for(int j=0,len=b.length;j<len;j++) {
if(b[j]!='\n') {
//l=l*10+(long)(b[j]-48);優化前
l=(l<<3)+(l<<1)+(b[j]-'0');//優化後
}else{
temp[count]=l;
count++;
l=0L;
if(count==num) {
bq1.put(temp);
count=0;
temp =new long[num];
}
}
}
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
sort部分:
這個部分沒啥問題,sort部分是4個部分中執行速度最快的。
//方案二
class Sort implements Runnable{
private BlockingQueue<long[]> bq1;
private BlockingQueue<long[]> bq2;
Sort(BlockingQueue<long[]> bq1,BlockingQueue<long[]> bq2){
this.bq1=bq1;
this.bq2=bq2;
}
public void run() {
long[] longs;
try {
while(true) {
longs=bq1.take();
if(longs.length==0) {
bq2.put(new long[0]);
break;
}else {
Arrays.sort(longs);
bq2.put(longs);
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
write部分:
方案一write部分的寫操作是通過遍歷一個個寫出long,效率比較低,造成一定的堵塞狀態,需考慮其他IO方式,一次性寫出較多的資料。後通過FileChannel 和 ByteBuffer寫資料,具體使用方法可以上網查閱NIO的API。以緩衝塊的方式寫出資料比一個一個寫明顯快多了。
//方案一
/*
long[] l=bq2.take();
DataOutputStream dos = new DataOutputStream(new BufferedOutputStream (new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+i+".txt"))));
for(long a:l) {
dos.writeLong(a);
}
dos.close();
*/
//方案二
class WriteData implements Runnable{
private BlockingQueue<long[]> bq2;
private File tempFolder;//臨時資料夾
private CountDownLatch end;//告訴主執行緒可以繼續執行,主要用於計時
private int num ;//自定義儲存long資料的個數
private long[] longs;
private int count =0;
WriteData(BlockingQueue<long[]> bq2,File tempFolder,CountDownLatch end,int num){
this.bq2=bq2;
this.end=end;
this.tempFolder=tempFolder;
this.num=num;
}
public void run() {
try {
ByteBuffer bb= ByteBuffer.allocate(num*8);
while(true) {
longs=bq2.take();
bb.asLongBuffer().put(longs);
if(longs.length==num) {
try(
FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+count+".txt"));
FileChannel fc = fos.getChannel();
){
fc.write(bb);
bb.clear();
count++;
}catch (IOException e) {
e.printStackTrace();
}
}else if(longs.length!=0) {
bb.limit(longs.length*8);
try(
FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\檔案"+count+".txt"));
FileChannel fc = fos.getChannel();
){
fc.write(bb);
bb.clear();
}catch (IOException e) {
e.printStackTrace();
}
}else {
end.countDown();
break;
}
}
}catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
總結: 通過分析,保證read和write能夠不間斷的佔用磁碟活動,且提高IO速度才是提高程式效能的關鍵。通過監控各個階段的BlockingQueue阻塞情況得出read<parse>sort<write(>表示前者單次迴圈用時大於後者,<反之)。由於parse部分消耗時間比較多,read的部分可能存在堵塞,需要合理設定BlockingQueue的大小。由於read操作比write操作頻繁,兩執行緒會同時爭搶磁碟IO操作,在系統層面排程可能造成不能及時write導致輕微堵塞情況。我考慮是不是利用鎖或者同一個執行緒來控制IO操作,保證write能及時將資料寫出。 同時方案二中還存在記憶體開銷較大的情況,GC次數較多,影響IO效率。如:迴圈中不斷new byte[]和new long[]。這裡需要考慮記憶體複用,減少記憶體開銷。這對整體的效能有很大的影響。除了優化GC次數外,過大的記憶體開銷也會降低磁碟IO的速率(原因本人暫不瞭解,有高人可以指點一下),因此記憶體複用,降低記憶體開銷是必須的。
記憶體複用以及在應用層控制IO:
首先是記憶體複用問題。 在原來的方案中,在迴圈內不停new新的物件,很容易就出現的堆滿異常。因此記憶體複用對效能的優化很重要。 將陣列比作盤子,資料比作食物。如果使用new的方式建立盤子(陣列),食物被吃完了盤子就等GC把它當垃圾回收了。不斷建立和銷燬盤子是消耗效能的。因此將被清洗乾淨的盤子重新利用起來保證記憶體複用是很重要的。具體怎麼實現呢?我的思路是先分析某一種盤子(如byte[]陣列)在程式執行中同一時刻最大的使用量,盤子(如byte[]陣列)的最大使用量為BlockingQueue中的數量加上執行緒中處理的數量。(這樣的話需要對BlockingQueue的大小進行限制,這需要根據程式實際執行情況來設定大小。)因此一開始建立最大需求量的盤子,通過迴圈使用來減少記憶體開銷。 具體實現如下,以read部分程式碼為例:
class ReadData implements Runnable{
private File file;
private BlockingQueue<byte[]> bq;
private int spiltSize;
private LinkedList<byte[]> list;//裝盤子的容器
ReadData(File file,BlockingQueue<byte[]> bq,int spiltSize){
this.spiltSize=spiltSize;
this.file=file;
this.bq=bq;
}
/*
*init()初始化byte[]陣列並存在LinkedList裡面
*而後通過temp =list.remove()從頭取陣列和list.add(temp)從尾加陣列
*這種的方式保證List裡的盤子總數量不變,且第一個被使用的陣列內的資料處理完之前不會複用這個陣列
*其實也可以通過get(index)的方式迴圈取陣列,但是這種方式計算開銷會略大一點
*/
public void init() {//初始化盤子
list = new LinkedList<byte[]>();
for(int i=0;i<30;i++) {
byte[] b = new byte[spiltSize];
list.add(b);
}
}
public void run() {
try(
RandomAccessFile raf = new RandomAccessFile(file,"r");
FileChannel fc = raf.getChannel();
){
ByteBuffer bb= ByteBuffer.allocate(spiltSize);
long startPosition ;
int read;
int k=0;
byte[] temp;
while(true) {
startPosition =(long)k*(long)spiltSize;
read=fc.read(bb, startPosition);
if(read!=spiltSize) {//最後的臨界情況單獨考慮
temp =new byte[read];
bb.flip();
bb.get(temp);
bb.clear();
bq.put(temp);
bq.put(new byte[0]);
break;
}else {
temp =list.remove();//取盤子
bb.flip();
bb.get(temp);
bb.clear();
bq.put(temp);
k++;
list.add(temp); //將盤子放在隊尾,等待複用
}
}
}catch(IOException e) {
e.printStackTrace();
}catch(InterruptedException e1) {
e1.printStackTrace();
}
}
}
這裡有個不足之處,需要根據程式的執行情況,手動設定一個合理的總盤子數。可能需要一個更靈活的方法。 應用層控制IO 由於windows系統層對IO的排程存在不確定性,write程序的IO請求有時候可能會滯後,為保證write程序的資料能及時寫出,考慮採用synchronized方式對read()和write()進行上鎖(切記保證正確性的情況下,synchronized程式碼塊越小越好)。`synchronized(lock){read=fc.read(bb, startPosition);}`和`synchronized(lock){fc.write(bb);}`。執行的結果與不用鎖的情況(優化記憶體開銷後的方案二)比起來略好一點或者說更穩定一些,這可能是硬體和作業系統的問題。按理說增加獲取鎖的開銷,效能應該差一些。(這裡稱之為**雙執行緒有鎖控制IO**和**雙執行緒無鎖控制IO**,這裡是讀和寫執行緒分開) 另一種方式只用一個執行緒來控制所有IO操作。 一種想法是通過**單執行緒無腦執行IO操作**。每一次的read和write作為任務投遞給一個執行緒執行,用一個BlockingQueue裝著。這樣執行緒拿到任務就無腦執行了。當然這裡還有一個問題:因為投遞任務的速度很快,而且read任務和write任務不是一個數量級(20000:1),會造成write任務被read任務阻塞情況出現。由於BlockingQueue一致處於阻塞狀態,執行一個任務加一個任務,write任務一定正常執行,所以效能上並沒有什麼大的影響(總不至於20000次put的都是read任務吧,這概率得多小)。還有一個問題就是結束問題!因為新增read任務的人(主執行緒)不知道read任務的總數是多少,這時候會出現無效的read任務,這裡只能通過read任務的執行情況的反饋來告知主執行緒什麼時候停止任務。
BlockingQueue<Runnable> bq3 = new LinkedBlockingQueue<Runnable>(200);//這queue的大小可以調整
while(true) {
if(rd.read!=-1) {//原始檔讀完後表示讀任務結束
bq3.put(rd);
else {
bq3.remove(rd);//將多餘的無效任務刪除
break;
}
}
但sort完成後投遞write任務,由最後一次write任務讓程式結束執行
while(true) {
longs=bq1.take();
if(longs.length==0) {
bq2.put(new long[0]);
bq3.put(wd);
break;
}else {
Arrays.sort(longs);
bq2.put(longs);
bq3.put(wd);
}
}
}
還有一種**單執行緒有序執行IO任務**,就是執行任務的執行緒一邊執行一邊一天新增任務。因為只有執行任務的執行緒才能第一時間直到任務啥時候結束。當然效率上肯定比無腦執行任務那種方式差一點。
public void run() {
try (
RandomAccessFile raf = new RandomAccessFile(file,"r");
FileChannel fc = raf.getChannel();
){
while(flag) { //flag初始化為true,最後一次寫結束後flag=false
if(read!=-1) {
readData(fc);
if(Rcount%1000==0&&bq2.size()>0) {//迴圈一定次數後再判斷bq2中是否有資料需要寫出
writeData();
}
}else {
writeData();
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
通過對以上幾種IO控制的方案進行對比(**雙執行緒有鎖控制IO**、**雙執行緒無鎖控制IO**、**單執行緒無腦執行IO操作**、**單執行緒有序執行IO任務**),其中**單執行緒無腦執行IO操作**執行出最快333s,平均IO速率將近80M/每秒。(讀檔案大小18.5G,寫檔案總大小7.45G)。由於電腦硬體老化的緣故,每次執行的結果相差較大,出現幾十秒的差距。因此從程式碼層面來講,**雙執行緒無鎖控制IO**效能也是可以的,只是將IO排程的控制交由作業系統處理了。不過由於執行IO的排程方式不同,導致他們的執行時記憶體佔用也不同。(後續進行更充分的測試補充結論)
總結:
JAVA不同的IO類有不同的特點,根據不同情況進行合理選擇,提高IO速度。在對IO進行優化時,需理解磁碟的物理結構和工作原理,避免走入誤區。(一開始沒找到問題根源所在,亂用多執行緒優化,浪費了大量的時間)每次讀取資料大塊檔案時,以4k的整數倍比較好。 充分利用多執行緒,提高CPU利用率,通過多執行緒進行非同步操作,就像“燒開水泡茶”一樣。 在對資料的處理時,應該儘量減少記憶體的開支。new一個物件是比較消耗效能的,應該儘量複用記憶體,減少GC次數。務必避免大量使用String,這個很佔記憶體。有些計算使用位運算更快。總之就是減少計算,減少記憶體開支,當然正確性是前提。 在保證正確性的前提下,提高程式的健壯性和可讀性也很重要。同時,設計程式首先想著不是如何修改功能,而是如何擴充套件功能。 不足之處: parse的耗時較長,sort的耗時較小,如果將parse部分的計算量勻一部分到sort這樣可以提高CPU的利用率,減少堵塞情況。 記憶體複用的方式不夠好,應該設計一個更靈活的方法。 IO是否還有提升空間?嘗試使用記憶體對映的方式進行寫操作,但是由於寫檔案總大小較大,記憶體佔用(堆外記憶體)上升,IO效能下降。 程式中BlockingQueue的大小設定、每次讀寫檔案的大小對程式效能的影響還沒有明確的一個結論。本文中給出的引數,主要通過本人破電腦測試,選取結果較好的,但不一定就是好的。
本人作為剛入門的新人,第一次寫部落格,不足之處還望各位指正。希望各位大佬指點指點