java分散式簡單實現
案例:文章推薦
論壇進入文章頁面後,顯示一個推薦列表:看過這篇文章的人還看過哪些文章,包含列為文章article、點選數count。
可能有很好很簡單的解決辦法,但是到最後再講。
傳統的方法是:建一張表,欄位有article和user。每點選一次,增加一條記錄。一個大論壇幾天之內記錄數就能達到千萬條。而沒有必要建索引,其他優化的辦法,我還想不到,這樣的查詢別提多慢了。
傳統資料庫解決不了,那麼分散式就該上場了。如果功能特別簡單,完全可以不去使用MAPREDUCE和HBASE,自己動手搞一個吧。
這裡最簡單的實現:資料儲存在txt檔案,用java IO讀寫,for迴圈掃描全表進行篩選,現成的Collections排序。
sql:
SELECT T1.ARTICLE,COUNT(*) C
FROM ATB2 T1 INNER JOIN (SELECT T.USER FROM ATB2 T WHERE T.ARTICLE=888) T2
WHERE T1.USER=T2.USER
AND T1.ARTICLE!=888
GROUP BY T1.ARTICLE
ORDER BY C DESC
LIMIT 10;
先檢視過文章的使用者列表,再查這些使用者看過的文章列表,聚合,排序
package com.src.reader; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import com.src.entity.ATB; public class DataReader { public static void main(String[] args) throws Exception{ long start = System.currentTimeMillis(); select(888); long end = System.currentTimeMillis(); System.out.println("Select cost time: "+(end-start)/1000.0+" seconds."); } public static void select(int article) throws Exception{ //讀檔案到字串 File file = new File("d://b.txt"); String str = reader1(file); //字串切割為陣列 ATB[] all = converStr2Array1(str); System.out.println("陣列長度為"+all.length); //檢視過文章的使用者列表,去重 Set<Integer> users = getUsersByArticle(article, all); //遍歷使用者列表,查每個使用者看過的文章,去掉引數文章(:先遍歷全表再遍歷使用者列表) List<Integer> articles = getArticlesByUsers(all, users, article); //以文章分類,查每篇文章的總數 Map<Integer,Integer> map = groupBy(articles); //排序 // List<String> result = orderAll(map); List<String> result = limitAndOrder(map, 10); // System.out.println(result); } public static String reader1(File file) throws Exception{ long start = System.currentTimeMillis(); BufferedReader br = new BufferedReader(new FileReader(file)); StringBuffer sb = new StringBuffer(); while(br.ready()){ sb.append(br.readLine()); } br.close(); long end = System.currentTimeMillis(); System.out.println("讀檔案完成,用時"+(end-start)/1000.0+"秒。"); return sb.toString(); } public static ATB[] converStr2Array1(String str){ long start = System.currentTimeMillis(); String[] arr = str.split(";"); System.out.println("字串切割用時"+(System.currentTimeMillis()-start)/1000.0+"秒。"); ATB[] all = new ATB[arr.length]; for(int i=0;i<arr.length;i++){ int article = Integer.parseInt(arr[i].split(",")[0]); int user = Integer.parseInt(arr[i].split(",")[1]); all[i] = new ATB(article,user); } long end = System.currentTimeMillis(); System.out.println("字串轉換為陣列完成,用時"+(end-start)/1000.0+"秒。"); return all; } public static Set<Integer> getUsersByArticle(int article,ATB[] all){ long start = System.currentTimeMillis(); Set<Integer> set = new HashSet<Integer>(); for(ATB a:all){ if(a.getArticle()==article){ set.add(a.getUser()); } } long end = System.currentTimeMillis(); System.out.println("查詢user列表完成,用時"+(end-start)/1000.0+"秒。"); return set; } public static List<Integer> getArticlesByUsers(ATB[] all,Set<Integer> users,int article){ long start = System.currentTimeMillis(); List<Integer> list = new ArrayList<Integer>(); for(ATB a:all){ if(article!=a.getArticle()&&users.contains(a.getUser())){ list.add(a.getArticle()); } } long end = System.currentTimeMillis(); System.out.println("由user列表查詢article列表完成,用時"+(end-start)/1000.0+"秒。"); return list; } public static Map<Integer, Integer> groupBy(List<Integer> list){ long start = System.currentTimeMillis(); Map<Integer,Integer> map = new HashMap<Integer, Integer>(); for(Integer i:list){ if(map.containsKey(i)){ map.put(i, map.get(i)+1); }else{ map.put(i, 1); } } long end = System.currentTimeMillis(); System.out.println("group 完成,用時"+(end-start)/1000.0+"秒。"); return map; } public static List<String> limitAndOrder(Map<Integer,Integer> map,int limit){ //排序辦法:把value排序,取限制條數,去重,遍歷map,由value取key long start = System.currentTimeMillis(); List<String> result = new ArrayList<String>(); List<Integer> values = new ArrayList<Integer>(map.values()); Collections.sort(values,new Comparator<Integer>() { public int compare(Integer i,Integer j){ return (j - i); } }); long end = System.currentTimeMillis(); System.out.println("value排序完成,用時"+(end-start)/1000.0+"秒。"); values = values.subList(0, limit); //去重省略 Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator(); while(itr.hasNext()){ Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next(); int article = entry.getKey(); int count = entry.getValue(); if(values.contains(count)){ String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article); result.add(str); //由value查到的key可能有多個,一種辦法是在新增前判斷到達長度限制時刪除result列表中count最小的行 //或者再次排序和取限 } } //再次排序和取限 Collections.sort(result, new Comparator<String>() { public int compare(String str1,String str2){ return - str1.compareTo(str2); } }); result = result.subList(0, limit); long end2 = System.currentTimeMillis(); System.out.println("排序和取限完成,總共用時"+(end2-start)/1000.0+"秒。"); return result; } public static List<String> orderAll(Map<Integer,Integer> map){ //排序辦法分兩種:1把value排序,由value取key;2重組字串 long start = System.currentTimeMillis(); List<String> result = new ArrayList<String>(); Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator(); while(itr.hasNext()){ Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next(); int article = entry.getKey(); int count = entry.getValue(); String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article); result.add(str); } Collections.sort(result, new Comparator<String>() { public int compare(String str1,String str2){ return - str1.compareTo(str2); } }); long end = System.currentTimeMillis(); System.out.println("排序完成,用時"+(end-start)/1000.0+"秒。"); return result; } public static String leftFillWith0(String str){ int length = 8; String s = ""; for(int i=0;i<length-str.length();i++){ s = s + "0"; } return s + str; } } //讀檔案完成,用時0.253秒。 //字串轉換為陣列完成,用時4.054秒。 //陣列長度為543243 //查詢user列表完成,用時0.0080秒。 //由user列表查詢article列表完成,用時0.085秒。 //group 完成,用時0.04秒。 //排序完成,用時0.069秒。 //Select cost time: 4.526 seconds. //讀檔案完成,用時0.25秒。 //字串切割用時0.72秒。 //字串轉換為陣列完成,用時4.634秒。 //陣列長度為543243 //查詢user列表完成,用時0.0090秒。 //由user列表查詢article列表完成,用時0.096秒。 //group 完成,用時0.036秒。 //value排序完成,用時0.034秒。 //排序和取限完成,總共用時0.064秒。 //Select cost time: 5.113 seconds.
這段程式碼很多地方可以優化。
現在用十臺主機作為分散式節點NODE,每臺開啟一個hessian伺服器,提供一個處理資料的介面。一臺主專案MASTER中呼叫這十臺NODE。可以開10個執行緒去呼叫。
假如5000萬條記錄,新增記錄時平均分佈到每個節點,這樣每臺主機有500萬資料。然後儲存在100個文字檔案,每個檔案就是5萬條記錄。
然後再同時開100個甚至更多執行緒,同時處理這100個檔案,把CPU撐到爆。
對於現在這個案例來說,分散式的處理過程是這樣的:
MASTER通過hessian發起請求,只有一個引數article,每個節點接下來做的事情一樣,最終要得到一個列表,如
補0是為方便排序,左邊是count右邊是article。[00000020,9980, 00000020,9731, 00000020,8783, 00000020,8374, 00000018,9908, 00000018,9391, 00000018,8728, 00000018,8725, 00000017,9789, 00000017,9511]
這些節點得到的列表可能存在重複,如9980在節點1裡面查出來點選了20次,在節點2裡面查到點選了19次,這樣所以要在MASTER做一個彙總,話說回來,前面一步是MAP,這一步就是REDUCE。
彙總的過程先是merge,得到以article為key,count為value的一個HashMap,然後是排序order by,然後分頁。
merge和排序的開銷可能又會很大,那還是老辦法,再想辦法分發到各個節點去做。其中排序我想到的方法,同時做分頁的話比較容易,比如取點選量最大的100條,那在每個節點先做排序,取前100條返回到MASTER,然後MASTER給這1000條排序。如果查100-200條,在節點裡面全表排序取前面200條,MASTER要排序的有2000條。依次下去假如每個節點總共查出10000條記錄,分頁在4900-5000的話,每個節點返回給MASTER有5000行,(查9000-10000行可以倒序排列只返回100行),所以這樣下去還不是個完美好辦法。
無所謂,再開執行緒,加節點就是了。
一來,在節點之中查最大100條,可以分給多個執行緒或者節點去做,意思是把10000條記錄分成幾段,查出每一段的前100條,然後彙總。
二來,在10個節點查出各自的100條之後,不會由MASTER全部處理,而是分成5份每份200條傳送到五個節點分別去前100個,然後剩下500條資料,如果資料量大就再加節點。
---------------------------------------------------------------------------------------------------------------------------------------------
(三,查最中間100條的時候,能運用分散式的辦法,是先查出之前的所有資料,比如用一個執行緒查第一個100條,第二個執行緒查第二個100條,全部查出,最後減去這些資料,剩下就不多了。這個方法確實是分散式,但是笨到家了。
四,全表排序如果要運用分散式,還是可以用上面的方法,100條100條的查出來,拼一下。)
---------------------------------------------------------------------------------------------------------------------------------------------
這樣行不通,後來才發現其實分散式排序很簡單。
比如MASTER有1000個數字,根據數字大小,分到10個節點,第一個節點儲存0-100,依次101-200。然後每個節點查出來可以直接合並,這才是達到分散式的效果。
而首先還要做一個數據分佈取樣,以保證每個節點分到的資料量平均。取樣的過程,也很容易分佈化。
擴充套件:
如果節點資料儲存在mysql而不是文字檔案上面,貌似更加方便的很。
節點可以使用記憶體儲存管理資料。
資料異常與備份。異常的處理。
最後,這個案例還有一個辦法,資料表設定兩個欄位,article為唯一主鍵,第二個列記錄所有user的點選數。如果嫌這個字串太大,那就放到檔案裡用java IO讀吧。
這樣article和user都是唯一的。可以建索引。
如果用java做,那就儲存在一個HashMap,article作為key,value也是一個HashMap,記錄user和count。
這種實現,估計是最理想的。
所以,能用資料庫和java做好的,就不要搞分散式了。儘量還是要用傳統的方法。