1. 程式人生 > >一億條資料的排序處理

一億條資料的排序處理

假設場景:

某大型網站,活躍使用者上億個。(當然不是指同時線上人數,這裡指的是再一段時間內有訪問操作的使用者數量,比如一個小時內)。

現在要每隔1小時,統計一次活躍使用者排行榜(使用者點選本網站的一個連線,活躍度就加1,按活躍度進行排名)。

首先,在此場景下,解決此問題不涉及資料庫操作(也不可能使用者點選一下,就更新一下資料庫!),訪問記錄就是記錄在日誌檔案中,例如:

zhangsan, http://a.com/b/

zhangsan, http://a.com/c/

lisi, http://a.com/b/

lisi, http://a.com/e/

lisi, http://a.com/x/

然後,我們不考慮使用者訪問量的統計過程,假設根據日誌檔案已經得出了這樣的檔案:

zhangsan 2

lisi 3

其中,2、3分別表示對應使用者的活躍度,我們要按此進行排序,但是使用者總量有一億個!

接著,我們繼續抽象、簡化。既然活躍度用整數表示,我們就單獨來考慮整數排序的問題,即,使用者名稱也先不考慮了,就處理一億個整數的排序。

先嚐試直接使用TreeSet來排序。

TreeSet底層是紅黑樹實現的,排序是很高效的,這裡不深究,我們就用它來完成排序:

1. 生產測試資料

package com.bebebird.data.handler;

import java.io.File;
import java.io.PrintWriter;
import java.util.Random;

/**
 * 
 * @author sundeveloper
 * 
 * 建立測試資料
 *
 */
public class DataProducer {

	/**
	 * 建立資料
	 * @param count 資料量
	 * @param out 輸出檔案路徑
	 */
	public static void produce(int count, String out) {
		long t1 = System.currentTimeMillis();
		File file = new File(out);
		if(file.exists())
			file.delete();
		
		try (PrintWriter writer = new PrintWriter(file, "UTF-8");) {
			Random random = new Random();
			for(int i=0; i<count; i++){
				writer.write(random.nextInt(count) + "\n");
			}
		}catch (Exception e){
			e.printStackTrace();
		}
		long t2 = System.currentTimeMillis();
		System.out.println("建立成功!耗時:" + (t2 - t1) + "毫秒。");
	}
	
}

呼叫produce()方法,指定資料量和資料輸出路徑,來生產測試資料。

2. 利用TreeSet對資料進行排序:

package com.bebebird.data.handler;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.TreeSet;

/**
 * 
 * @author sundeveloper
 * 
 * 使用TreeSet自動將資料排序
 * 
 * 處理資料量能達到千萬級,一千萬資料排序大約用時20秒。
 *
 */
public class SimpleTreeSetHandler {

	private Integer[] datas = null;
	
	/**
	 * 排序
	 * @param in 資料檔案路徑
	 */
	public void sort(String in){
		long t1 = System.currentTimeMillis();
		File file = new File(in);
		if(!file.exists())
			return;
		
		try(BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),"UTF-8"));){
			TreeSet<Integer> set = new TreeSet<>();
			String line = null;
			while((line = reader.readLine()) != null && !"".equals(line)){
				set.add(new Integer(line));
			}
			this.datas = set.toArray(new Integer[set.size()]);
		}catch(Exception e){
			e.printStackTrace();
		}
		long t2 = System.currentTimeMillis();
		
		System.out.println("排序完成!耗時:" + (t2 - t1) + "毫秒。");
	}
	
	/**
	 * 從pos開始,獲取count個數
	 * @param pos
	 * @param count
	 * @return
	 */
	public Integer[] limit(int pos, int count){
		long t1 = System.currentTimeMillis();
		if(pos < 0 || count <= 0){
			return null;
		}
		Integer[] result = new Integer[count];
		for (int i = 0; i < count && pos + i < this.datas.length; i++) {
			result[i] = this.datas[pos + i];
		}
		long t2 = System.currentTimeMillis();
		System.out.println("取數成功!耗時:" + (t2 - t1) + "毫秒。");
		return result;
	}
	
	// 測試:
	// 建立1千萬隨機數,進行排序
	public static void main(String[] args) {
		DataProducer.produce(10000000, "data");
		
		SimpleTreeSetHandler handler = new SimpleTreeSetHandler();
		handler.sort("data");
		
		Integer[] limit = handler.limit(10, 10);
		System.out.println(Arrays.asList(limit));
	}
}

呼叫SimpleTreeSetHandler的sort()方法,指定資料檔案路徑,對其排序。

經測試,直接使用TreeSet來處理,一千萬資料量很輕鬆就能處理,大概排序耗時20秒左右。

但是,一億資料量時就廢了!CPU滿,記憶體佔用上2.5G左右,並且N多分鐘後不出結果,只能結束程序!(有條件的話,可以試試,具體多久能排出來)

機器配置簡要說明:2.5 GHz Intel Core i5,系統記憶體10G。

3. 既然用TreeSet處理一千萬資料很容易,那麼把一億條分成10個一千萬不就能夠處理了?每個一千萬用時20秒,10個一千萬大概200秒,三分鐘拍出來還是可以接受的!(當然,這麼算不準確,但大概是這個數量級的!)

package com.bebebird.data.handler;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

/**
 * 
 * @author sundeveloper
 *
 * 將資料進行分成若干片段;
 * 分別對每個片段進行排序,存入臨時檔案;
 * 將臨時檔案進行合併
 *
 */
public class DivideTreeSetHandler {

	/**
	 * 排序
	 * @param in 資料檔案路徑
	 * @param size 每個資料檔案的大小(行數)
	 */
	public List<String> divide(String in, int size){
		long t1 = System.currentTimeMillis();
		File file = new File(in);
		if(!file.exists())
			return null;
		
		List<String> outs = new ArrayList<String>();
		
		try(BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),"UTF-8"));){
			int fileNo = 0; // 臨時檔案編號
			Set<Integer> set = new TreeSet<Integer>();
			while(true){
				String line = reader.readLine();
				
				// 讀取結束!
				if(line == null){
					writeSetToTmpFile(set, fileNo, outs);
					break;
				}
				
				// 空行,跳過
				if("".equals(line.trim())){
					continue;
				}
				
				set.add(new Integer(line));
				
				// 資料量達到:
				if(set.size() >= size){
					writeSetToTmpFile(set, fileNo, outs);
					fileNo ++;
				}
			}
			
		}catch(Exception e){
			e.printStackTrace();
		}
		long t2 = System.currentTimeMillis();
		
		System.out.println("拆分完成!耗時:" + (t2 - t1) + "毫秒。");
		
		return outs;
	}
	
	// set資料寫入到檔案中:
	private void writeSetToTmpFile(Set<Integer> set, int fileNo, List<String> outs) {
		long t1 = System.currentTimeMillis();
		File file = new File("tmp_" + fileNo);
		if(file.exists())
			file.delete();
		
		try (PrintWriter writer = new PrintWriter(file, "UTF-8");) {
			Iterator<Integer> iterator = set.iterator();
			while(iterator.hasNext()){
				writer.write(iterator.next() + "\n");
			}
			set.clear();
		}catch (Exception e){
			e.printStackTrace();
		}
		long t2 = System.currentTimeMillis();
		System.out.println("生成臨時檔案:" + file.getAbsolutePath() + "!耗時:" + (t2 - t1) + "毫秒。");
		outs.add(file.getAbsolutePath());
	}
	
	/**
	 * 合併資料
	 * @param ins
	 */
	public String combine(List<String> ins) {
		long t1 = System.currentTimeMillis();
		
		if(ins == null || ins.size() <= 1)
			return null;
		
		File file = new File("tmp");
		if(file.exists())
			file.delete();
		
		try(PrintWriter writer = new PrintWriter(file, "UTF-8");){
			List<BufferedReader> readers = new ArrayList<>();
			for (String in : ins) {
				readers.add(new BufferedReader(new InputStreamReader(new FileInputStream(in),"UTF-8")));
			}
			
			while(readers.size() > 0){
				BufferedReader reader0 = readers.get(0);
				while(true){
					String line = reader0.readLine();
					if(line == null){
						readers.remove(0);
						break;
					}
					if("".equals(line.trim()))
						continue;
					
					// 用個set記錄從多個檔案中取出的資料,這些資料需要繼續排序:
					Set<Integer> set = new TreeSet<Integer>();
					
					int data = new Integer(line);
					
					// 先把data放入set:
					set.add(data);
					
					for(int i = readers.size() - 1; i > 0; i--){
						BufferedReader readeri = readers.get(i);
						while(true){
							// 設定一個標記,如果後邊datai大於data了,需要reset到此處!
							readeri.mark(1024); 
							
							String linei = readeri.readLine();
							if(linei == null){
								readers.remove(i);
								break;
							}
							if("".equals(linei.trim()))
								continue;
							
							int datai = new Integer(linei);
							
							// datai小於data,則把datai放入set,會自動排序
							if(datai < data){
								set.add(datai);
							}
							// datai等於data,則暫時退出,停止讀取
							else if(datai == data){
								break;
							}
							// datai大於data,則往回退一行(退到標記處),停止讀取
							else{
								readeri.reset();
								break;
							}
						}
					}
					
					// 按data查詢,小於data的值,都已經存入set了,此時把set輸出到檔案中:
					Iterator<Integer> iterator = set.iterator();
					while(iterator.hasNext()){
						writer.write(iterator.next() + "\n");
					}
					set.clear();
				}
			}
		}catch(Exception e){
			e.printStackTrace();
		}
		
		long t2 = System.currentTimeMillis();
		System.out.println("合併完成!耗時:" + (t2 - t1) + "毫秒。");
		
		return file.getAbsolutePath();
	}

	/**
	 * 從pos開始,獲取count個數
	 * @param pos
	 * @param count
	 * @return
	 */
	public Integer[] limit(int pos, int count, String in){
		// TODO : 從排序後的檔案中讀取資料即可!不寫了!
		return null;
	}
	
	// 測試:
	public static void main(String[] args) {
		// 資料量:
		int dataCount = 100000000;
		// 分頁數(拆分檔案數):
		int pageCount = 10;
		// 每頁資料量:
		int perPageCount = dataCount / pageCount;
		
		// 生成一億資料:
		DataProducer.produce(dataCount, "data");
		
		DivideTreeSetHandler handler = new DivideTreeSetHandler();
		
		// 拆分排序:
		List<String> tmps = handler.divide("data", perPageCount);
		
		// 合併排序:
		String tmp = handler.combine(tmps);
		
		// 獲取資料:
		Integer[] limit = handler.limit(10, 10, tmp);
	}

}

呼叫DivideTreeSetHandler的divide()方法,指定資料檔案、拆分的每頁放多少資料,將資料拆分。當然,拆分的時候就已經分別使用TreeSet排序了!

呼叫DivideTreeSetHandler的combine()方法,將拆分後的若干個檔案進行合併,合併的過程中同樣也會排序!

最終,輸出一個完全排序了的檔案。

經測試,一億資料量,拆分加合併共用時約3.6分鐘(包含各種IO操作的用時),可以接受。

到這裡,核心問題解決了,剩餘的就是物件排序了,把使用者、活躍度封裝成物件,用TreeSet將物件進行排序,物件實現compareTo,重寫hashcode、equals等等,就不再多說了。

當然,DivideTreeSetHandler的還有很多優化空間,比如,可以把拆分、合併用多執行緒來處理。這裡就先不搞了,有空再說。

說明:

寫程式碼時,並不知道這種排序演算法已經有名字了(叫“歸併排序”),還想著為其命名呢~

實際上,是受到hadoop的map-reduce思想的啟發,想到用這個方法來處理。

思想都是想通的:一個人搞不了了,就要分而治之!

相關推薦

資料排序處理

假設場景: 某大型網站,活躍使用者上億個。(當然不是指同時線上人數,這裡指的是再一段時間內有訪問操作的使用者數量,比如一個小時內)。 現在要每隔1小時,統計一次活躍使用者排行榜(使用者點選本網站的一個連線,活躍度就加1,按活躍度進行排名)。 首先,在此場景下,解決此問題不

維陣列資料處理排序,刪除,插入)

注意點 一:排序後陣列轉移儲存,以便後續操作,比如插入處理 二:熟練掌握陣列的錄入,排序處理 三:掌握障眼法刪減陣列元素 // // main.c // 123 // // Created by utotao on 20

sql分組(orderBy、GroupBy)獲取每組前(幾)資料

sql資料庫實現分組並取每組的前1(幾)條資料 測試資料準備工作: 根據某一個欄位分組取最大(小)值所在行的資料: 建立表並且插入資料 CREATE table Test_orderByOrGroupBy_tb(Name nvarchar(50),Val int,Describe n

java 使用jdbc向mysql資料庫中插入1資料

<span style="font-size:14px;"><span style="font-size:14px;">package com.ddx.zhang; import java.sql.SQLException; import java

使用hbase來解決上資料的準實時響應

使用hbase來解決億級資料的準實時響應 專案中的app行為日誌,使用者授權收集的通訊錄、通話記錄、簡訊和聯絡人資訊,隨著時間的推進,資料量進入億資料級,千萬級的建立索引,來加快查詢速度的優化方式,此時可能已經不起作用了。為解決信審階段實時的查詢請求,引入hbase來解決響應

net.sz.framework 框架 ORM 消消樂超過資料排行榜分析 天王蓋地虎

序言 天王蓋地虎, 老婆馬上生孩子了,在家待產,老婆喜歡玩消消樂類似的休閒遊戲,閒置狀態,無聊的分析一下消消樂遊戲的一些技術問題; 由於我主要是伺服器研發,客戶端屬於半吊子,所以就分析一下消消樂排行榜問題; 第一章 消消樂排行榜大致分為好友排行榜和全國排行榜; 好友排行榜和全國排行榜的其實是重合的只是需要

次列表載入超資料優化

Android中列表是每個應用都會有的UI效果,而與使用者的互動無非就是使用者上下滑動、左右滑動、點選item 等等,本文就從小編遇到一次載入大量資料而影響體驗優化之旅。 專案的列表採用RecycleView + BaseMultiItemQuickAdapter 分組效果,資料量100

關於mybatis多表查詢只查詢部分欄位,而丟失資料問題

今天在寫一個list資料按某欄位排序問題時遇到了一個問題,就是有很多個數據這個欄位一樣時,只會查出來一個。 如下 頁面,5條資料只查出來3條資料 基礎資訊和接單什麼資訊的分成了2個表,根據接單數排名 select e.engineer_name,

有一個擁有1資料的表,只需要保留其中的5,其他刪除,如何做?

DELETE語句可以通過WHERE對要刪除的記錄進行選擇。而使用TRUNCATE TABLE將刪除表中的所有記錄。因此,DELETE語句更靈活。如果DELETE不加WHERE子句, DELETE可以返回被刪除的記錄數,而TRUNCATE TABLE返回的是0。如果一個表中有自增欄位,使用TRUNCATE T

2018上半年約26資料洩露

網路威脅情報公司Risk Based Security的一份報告顯示,在2018年上半年,2308起資料洩露事件被公開披露,約26億條使用者記錄被曝光。據該公司的“2018年中資料洩露QuickView”報告資料,我們可以看到與2017年上半年報告的2439起資料洩露事件

Sqoop分批匯入Mysql上資料的表到HDFS

因資料量過大,執行sqoop跑不動或者卡記憶體,於是通過寫指令碼分批匯入到HDFS,然後再載入到Hive表中。 shell指令碼如下: #!/bin/bash source /etc/profi

10long型資料外部排序的檔案分割實踐及優化過程(JAVA)

 一、題目     生成10億個long隨機數正整數,把它寫入一個檔案裡。然後實現一個函式 fetch(int k,int n)。(fetch函式的輸出結果是這10億個正整數中從小到大中第k個開始(不包含第k個)往後取n個數。)     給定記憶體為2G(一開始為1G)。

處理分頁 當前頁>1時, 操作的最後頁的最後資料後,向前提前

* handleAgentJobs({ payload }, { call, put }) { const data = yield call(handleAgentJob, payload) if (data && data.code === 200) { yiel

資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記()——二次排序

寫在前面: 在做直播的時候有同學問Spark不是用Scala語言作為開發語言麼,的確是的,從網上查資料的話也會看到大把大把的用Scala編寫的Spark程式,但是仔細看就會發現這些用Scala寫的文章

js對json資料處理,將同一省裡的多資料合併為資料

test:function(){    var arr =[        {pName:'內蒙古',pId:'1',cName:'內1',cId:'11'},        {pName:'內蒙古',pId:'1',cName:'內2',cId:'12'},       

mysql 先排序分組 取組裡面最新資料

最近又遇到這個問題了,不知道是不是mysql 的bug.  一般寫sql  先排序在分組取最新的一條資料 不外乎  SELECT p.* FROM (SELECT * FROM sys_message ORDER BY id DESC  )p GROUP BY  p.m

mysql 取left join表中最近時間的資料

要求:根據狀態,最新編輯時間排序 狀態為表1中的使用者狀態,最新編輯時間為表2中最後一次編輯時間 表1為users, 表2為opt_user_log 例: SELECT u.id, u.user_name, u.last_upd_time AS audit_time, u.s

Python3 向Bmob後臺提交資料的範例

踩了不少坑之後終於得到了這個標準的範例 大家可以拿來做參考,避免踩坑了 Haha python2只需要稍微改一下引用就可以了 import urllib.request import http.client import json test_data = {'C

oracle中根據時間獲取最新的資料

1、select kd.CREATEUSERID as userid,kd.LOCATION,kd.createtime as location from KT_DEVICESTRACK kd where rownum=1 order by kd.createtime 2、SELECT *

文詳解大規模資料計算處理原理及操作重點

摘要: 大資料技術主要針對的是大規模資料的計算處理問題,那麼要想解決的這一問題,首先要解決的就是大規模資料的儲存問題。 一、RAID技術 大資料技術主要針對的是大規模資料的計算處理問題,那麼要想解決的這一問題,首先要解決的就是大規模資料的儲存問題。大規模資料儲存要解決的核心問題有三個方面: