1. 程式人生 > >java分散式簡單實現

java分散式簡單實現

案例:文章推薦

論壇進入文章頁面後,顯示一個推薦列表:看過這篇文章的人還看過哪些文章,包含列為文章article、點選數count。

可能有很好很簡單的解決辦法,但是到最後再講。

傳統的方法是:建一張表,欄位有article和user。每點選一次,增加一條記錄。一個大論壇幾天之內記錄數就能達到千萬條。而沒有必要建索引,其他優化的辦法,我還想不到,這樣的查詢別提多慢了。

傳統資料庫解決不了,那麼分散式就該上場了。如果功能特別簡單,完全可以不去使用MAPREDUCE和HBASE,自己動手搞一個吧。

這裡最簡單的實現:資料儲存在txt檔案,用java IO讀寫,for迴圈掃描全表進行篩選,現成的Collections排序。

sql:

SELECT T1.ARTICLE,COUNT(*) C  
	FROM ATB2 T1 INNER JOIN (SELECT T.USER FROM ATB2 T WHERE T.ARTICLE=888) T2
	WHERE T1.USER=T2.USER
	AND T1.ARTICLE!=888
	GROUP BY T1.ARTICLE
	ORDER BY C DESC
	LIMIT 10;

先檢視過文章的使用者列表,再查這些使用者看過的文章列表,聚合,排序
package com.src.reader;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import com.src.entity.ATB;

public class DataReader {
	public static void main(String[] args) throws Exception{
		long start = System.currentTimeMillis();
		select(888);
		long end = System.currentTimeMillis();
		System.out.println("Select cost time: "+(end-start)/1000.0+" seconds.");
	}
	public static void select(int article) throws Exception{
		//讀檔案到字串
		File file = new File("d://b.txt");
		String str = reader1(file);
		//字串切割為陣列
		ATB[] all = converStr2Array1(str);
		System.out.println("陣列長度為"+all.length);
		//檢視過文章的使用者列表,去重
		Set<Integer> users = getUsersByArticle(article, all);
		//遍歷使用者列表,查每個使用者看過的文章,去掉引數文章(:先遍歷全表再遍歷使用者列表)
		List<Integer> articles = getArticlesByUsers(all, users, article);
		//以文章分類,查每篇文章的總數
		Map<Integer,Integer> map = groupBy(articles);
		//排序
//		List<String> result = orderAll(map);
		List<String> result = limitAndOrder(map, 10);
//		System.out.println(result);
	}
	public static String reader1(File file) throws Exception{
		long start = System.currentTimeMillis();
		BufferedReader br = new BufferedReader(new FileReader(file));
		StringBuffer sb = new StringBuffer();
		while(br.ready()){
			sb.append(br.readLine());
		}
		br.close();
		long end = System.currentTimeMillis();
		System.out.println("讀檔案完成,用時"+(end-start)/1000.0+"秒。");
		return sb.toString();
	}
	
	public static ATB[] converStr2Array1(String str){
		long start = System.currentTimeMillis();
		String[] arr = str.split(";");
		System.out.println("字串切割用時"+(System.currentTimeMillis()-start)/1000.0+"秒。");
		ATB[] all = new ATB[arr.length];
		for(int i=0;i<arr.length;i++){
			int article = Integer.parseInt(arr[i].split(",")[0]);
			int user = Integer.parseInt(arr[i].split(",")[1]);
			all[i] = new ATB(article,user);
		}
		long end = System.currentTimeMillis();
		System.out.println("字串轉換為陣列完成,用時"+(end-start)/1000.0+"秒。");
		return all;
	}
	
	public static Set<Integer> getUsersByArticle(int article,ATB[] all){
		long start = System.currentTimeMillis();
		Set<Integer> set = new HashSet<Integer>();
		for(ATB a:all){
			if(a.getArticle()==article){
				set.add(a.getUser());
			}
		}
		long end = System.currentTimeMillis();
		System.out.println("查詢user列表完成,用時"+(end-start)/1000.0+"秒。");
		return set;
	}
	
	public static List<Integer> getArticlesByUsers(ATB[] all,Set<Integer> users,int article){
		long start = System.currentTimeMillis();
		List<Integer> list = new ArrayList<Integer>();
		for(ATB a:all){
			if(article!=a.getArticle()&&users.contains(a.getUser())){
				list.add(a.getArticle());
			}
		}
		long end = System.currentTimeMillis();
		System.out.println("由user列表查詢article列表完成,用時"+(end-start)/1000.0+"秒。");
		return list;
	}

	public static Map<Integer, Integer> groupBy(List<Integer> list){
		long start = System.currentTimeMillis();
		Map<Integer,Integer> map = new HashMap<Integer, Integer>();
		for(Integer i:list){
			if(map.containsKey(i)){
				map.put(i, map.get(i)+1);
			}else{
				map.put(i, 1);
			}
		}
		long end = System.currentTimeMillis();
		System.out.println("group 完成,用時"+(end-start)/1000.0+"秒。");
		return map;
	}
	
	public static List<String> limitAndOrder(Map<Integer,Integer> map,int limit){
		//排序辦法:把value排序,取限制條數,去重,遍歷map,由value取key
		long start = System.currentTimeMillis();
		List<String> result = new ArrayList<String>();
		List<Integer> values = new ArrayList<Integer>(map.values());
		Collections.sort(values,new Comparator<Integer>() {
			public int compare(Integer i,Integer j){
				return (j - i);
			}
		});
		long end = System.currentTimeMillis();
		System.out.println("value排序完成,用時"+(end-start)/1000.0+"秒。");
		values = values.subList(0, limit);
		//去重省略
		Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator();
		while(itr.hasNext()){
			Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next();
			int article = entry.getKey();
			int count = entry.getValue();
			if(values.contains(count)){
				String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article);
				result.add(str);
				//由value查到的key可能有多個,一種辦法是在新增前判斷到達長度限制時刪除result列表中count最小的行
				//或者再次排序和取限
			}
		}
		//再次排序和取限
		Collections.sort(result, new Comparator<String>() {
			public int compare(String str1,String str2){
				return - str1.compareTo(str2);
			}
		});
		result = result.subList(0, limit);
		long end2 = System.currentTimeMillis();
		System.out.println("排序和取限完成,總共用時"+(end2-start)/1000.0+"秒。");
		return result;
	}
	public static List<String> orderAll(Map<Integer,Integer> map){
		//排序辦法分兩種:1把value排序,由value取key;2重組字串
		long start = System.currentTimeMillis();
		List<String> result = new ArrayList<String>();
		Iterator<Entry<Integer, Integer>> itr = map.entrySet().iterator();
		while(itr.hasNext()){
			Map.Entry<Integer, Integer> entry = (Entry<Integer, Integer>) itr.next();
			int article = entry.getKey();
			int count = entry.getValue();
			String str = leftFillWith0(String.valueOf(count)) + "," + String.valueOf(article);
			result.add(str);
		}
		Collections.sort(result, new Comparator<String>() {
			public int compare(String str1,String str2){
				return - str1.compareTo(str2);
			}
		});
		long end = System.currentTimeMillis();
		System.out.println("排序完成,用時"+(end-start)/1000.0+"秒。");
		return result;
	}
	public static String leftFillWith0(String str){
		int length = 8;
		String s = "";
		for(int i=0;i<length-str.length();i++){
			s = s + "0";
		}
		return s + str;
	}
}
//讀檔案完成,用時0.253秒。
//字串轉換為陣列完成,用時4.054秒。
//陣列長度為543243
//查詢user列表完成,用時0.0080秒。
//由user列表查詢article列表完成,用時0.085秒。
//group 完成,用時0.04秒。
//排序完成,用時0.069秒。
//Select cost time: 4.526 seconds.

//讀檔案完成,用時0.25秒。
//字串切割用時0.72秒。
//字串轉換為陣列完成,用時4.634秒。
//陣列長度為543243
//查詢user列表完成,用時0.0090秒。
//由user列表查詢article列表完成,用時0.096秒。
//group 完成,用時0.036秒。
//value排序完成,用時0.034秒。
//排序和取限完成,總共用時0.064秒。
//Select cost time: 5.113 seconds.

這段程式碼很多地方可以優化。

現在用十臺主機作為分散式節點NODE,每臺開啟一個hessian伺服器,提供一個處理資料的介面。一臺主專案MASTER中呼叫這十臺NODE。可以開10個執行緒去呼叫。

假如5000萬條記錄,新增記錄時平均分佈到每個節點,這樣每臺主機有500萬資料。然後儲存在100個文字檔案,每個檔案就是5萬條記錄。

然後再同時開100個甚至更多執行緒,同時處理這100個檔案,把CPU撐到爆。

對於現在這個案例來說,分散式的處理過程是這樣的:

MASTER通過hessian發起請求,只有一個引數article,每個節點接下來做的事情一樣,最終要得到一個列表,如

[00000020,9980, 00000020,9731, 00000020,8783, 00000020,8374, 00000018,9908, 00000018,9391, 00000018,8728, 00000018,8725, 00000017,9789, 00000017,9511]
補0是為方便排序,左邊是count右邊是article。

這些節點得到的列表可能存在重複,如9980在節點1裡面查出來點選了20次,在節點2裡面查到點選了19次,這樣所以要在MASTER做一個彙總,話說回來,前面一步是MAP,這一步就是REDUCE。

彙總的過程先是merge,得到以article為key,count為value的一個HashMap,然後是排序order by,然後分頁。

merge和排序的開銷可能又會很大,那還是老辦法,再想辦法分發到各個節點去做。其中排序我想到的方法,同時做分頁的話比較容易,比如取點選量最大的100條,那在每個節點先做排序,取前100條返回到MASTER,然後MASTER給這1000條排序。如果查100-200條,在節點裡面全表排序取前面200條,MASTER要排序的有2000條。依次下去假如每個節點總共查出10000條記錄,分頁在4900-5000的話,每個節點返回給MASTER有5000行,(查9000-10000行可以倒序排列只返回100行),所以這樣下去還不是個完美好辦法。

無所謂,再開執行緒,加節點就是了。

一來,在節點之中查最大100條,可以分給多個執行緒或者節點去做,意思是把10000條記錄分成幾段,查出每一段的前100條,然後彙總。

二來,在10個節點查出各自的100條之後,不會由MASTER全部處理,而是分成5份每份200條傳送到五個節點分別去前100個,然後剩下500條資料,如果資料量大就再加節點。

---------------------------------------------------------------------------------------------------------------------------------------------

(三,查最中間100條的時候,能運用分散式的辦法,是先查出之前的所有資料,比如用一個執行緒查第一個100條,第二個執行緒查第二個100條,全部查出,最後減去這些資料,剩下就不多了。這個方法確實是分散式,但是笨到家了。

四,全表排序如果要運用分散式,還是可以用上面的方法,100條100條的查出來,拼一下。)

---------------------------------------------------------------------------------------------------------------------------------------------

這樣行不通,後來才發現其實分散式排序很簡單。

比如MASTER有1000個數字,根據數字大小,分到10個節點,第一個節點儲存0-100,依次101-200。然後每個節點查出來可以直接合並,這才是達到分散式的效果。

而首先還要做一個數據分佈取樣,以保證每個節點分到的資料量平均。取樣的過程,也很容易分佈化。

擴充套件:

如果節點資料儲存在mysql而不是文字檔案上面,貌似更加方便的很。

節點可以使用記憶體儲存管理資料。

資料異常與備份。異常的處理。

最後,這個案例還有一個辦法,資料表設定兩個欄位,article為唯一主鍵,第二個列記錄所有user的點選數。如果嫌這個字串太大,那就放到檔案裡用java IO讀吧。

這樣article和user都是唯一的。可以建索引。

如果用java做,那就儲存在一個HashMap,article作為key,value也是一個HashMap,記錄user和count。

這種實現,估計是最理想的。

所以,能用資料庫和java做好的,就不要搞分散式了。儘量還是要用傳統的方法。