1. 程式人生 > >ElasticSearch大批量資料入庫

ElasticSearch大批量資料入庫

最近著手處理大批量資料的任務。

現狀是這樣的,一個數據採集程式承載大批量資料的儲存和檢索。後期可能需要對大批量資料進行統計。

資料分佈情況

13個點定時生成採集結果到4個檔案(小檔案生成周期是5分鐘)

名稱                                                 大小(b)
gather_1_2014-02-27-14-50-0.txt                      568497
gather_1_2014-02-27-14-50-1.txt                      568665
gather_1_2014-02-27-14-50-2.txt                      568172
gather_1_2014-02-27-14-50-3.txt                      568275

同步使用shell指令碼對四個檔案入到sybase_iq庫的一張表tab_tmp_2014_2_27中.

每天資料量大概是3億條,所以小檔案的總量大概是3G。小檔案數量大,單表容量大執行復合主鍵查詢,由原來2s延時變成了,5~10分鐘。

針對上述情況需要對目前的儲存結構進行優化。

才是看了下相關係統 catior使用的是環狀資料庫,儲存相關的資料優點方便生成MRTG圖,缺點不利於資料統計。後來引入elasticsearch來對大資料檢索進行優化。

測試平臺

cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz   * 32
記憶體: 64G
硬碟:1.5T
作業系統:Red Hat Enterprise Linux Server release 6.4 (Santiago)

讀取檔案的目錄結構:

[[email protected] data]$ ls
0  1  2  3

 簡單測試程式碼:

public class FileReader
{

	private File file;
	private String splitCharactor;
	private Map<String, Class<?>> colNames;
	private static final Logger LOG = Logger.getLogger(FileReader.class);

	/**
	 * @param path
	 *            檔案路徑
	 * @param fileName
	 *            檔名
	 * @param splitCharactor
	 *            拆分字元
	 * @param colNames
	 *            主鍵名稱
	 */
	public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames)
	{
		this.file = file;
		this.splitCharactor = splitCharactor;
		this.colNames = colNames;
	}

	/**
	 * 讀取檔案
	 * 
	 * @return
	 * @throws Exception
	 */
	public List<Map<String, Object>> readFile() throws Exception
	{
		List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
		if (!file.isFile())
		{
			throw new Exception("File not exists." + file.getName());
		}
		LineIterator lineIterator = null;
		try
		{
			lineIterator = FileUtils.lineIterator(file, "UTF-8");
			while (lineIterator.hasNext())
			{
				String line = lineIterator.next();
				String[] values = line.split(splitCharactor);
				if (colNames.size() != values.length)
				{
					continue;
				}
				Map<String, Object> map = new HashMap<String, Object>();
				Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet()
						.iterator();
				int count = 0;
				while (iterator.hasNext())
				{
					Entry<String, Class<?>> entry = iterator.next();
					Object value = values[count];
					if (!String.class.equals(entry.getValue()))
					{
						value = entry.getValue().getMethod("valueOf", String.class)
								.invoke(null, value);
					}
					map.put(entry.getKey(), value);
					count++;
				}
				list.add(map);
			}
		}
		catch (IOException e)
		{
			LOG.error("File reading line error." + e.toString(), e);
		}
		finally
		{
			LineIterator.closeQuietly(lineIterator);
		}
		return list;
	}
}
public class StreamIntoEs
{

	public static class ChildThread extends Thread
	{

		int number;

		public ChildThread(int number)
		{
			this.number = number;
		}

		@Override
		public void run()
		{
			Settings settings = ImmutableSettings.settingsBuilder()
					.put("client.transport.sniff", true)
					.put("client.transport.ping_timeout", 100)
					.put("cluster.name", "elasticsearch").build();
			TransportClient client = new TransportClient(settings)
					.addTransportAddress(new InetSocketTransportAddress("192.168.32.228",
							9300));
			File dir = new File("/export/home/es/data/" + number);
			LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>();
			colNames.put("aa", Long.class);
			colNames.put("bb", String.class);
			colNames.put("cc", String.class);
			colNames.put("dd", Integer.class);
			colNames.put("ee", Long.class);
			colNames.put("ff", Long.class);
			colNames.put("hh", Long.class);
			int count = 0;
			long startTime = System.currentTimeMillis();
			for (File file : dir.listFiles())
			{
				int currentCount = 0;
				long startCurrentTime = System.currentTimeMillis();
				FileReader reader = new FileReader(file, "\\$", colNames);
				BulkResponse resp = null;
				<strong>BulkRequestBuilder bulkRequest = client.prepareBulk();</strong>
				try
				{
					List<Map<String, Object>> results = reader.readFile();
					for (Map<String, Object> col : results)
					{
						bulkRequest.add(client.prepareIndex("flux", "fluxdata")
								.setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime")));
						count++;
						currentCount++;
					}
					resp = bulkRequest.execute().actionGet();
				}
				catch (Exception e)
				{
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				long endCurrentTime = System.currentTimeMillis();
				System.out.println("[thread-" + number + "-]per count:" + currentCount);
				System.out.println("[thread-" + number + "-]per time:"
						+ (endCurrentTime - startCurrentTime));
				System.out.println("[thread-" + number + "-]per count/s:"
						+ (float) currentCount / (endCurrentTime - startCurrentTime)
						* 1000);
				System.out.println("[thread-" + number + "-]per count/s:"
						+ resp.toString());
			}
			long endTime = System.currentTimeMillis();
			System.out.println("[thread-" + number + "-]total count:" + count);
			System.out.println("[thread-" + number + "-]total time:"
					+ (endTime - startTime));
			System.out.println("[thread-" + number + "-]total count/s:" + (float) count
					/ (endTime - startTime) * 1000);
			// IndexRequest request =
			// = client.index(request);
		}
	}

	public static void main(String args[])
	{
		for (int i = 0; i < 4; i++)
		{
			ChildThread childThread = new ChildThread(i);
			childThread.start();
		}
	}
}

 起了4個執行緒來做入庫,每個檔案解析完成進行一次批處理。

初始化指令碼:

curl -XDELETE 'http://192.168.32.228:9200/twitter/'
curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '
{
     "index" :{
          "number_of_shards" : 5,
          "number_of_replicas ": 0,
          <strong>"index.refresh_interval": "-1",
         "index.translog.flush_threshold_ops": "100000"</strong>
     }
}'
curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '
{
             "<span style="font-size: 1em; line-height: 1.5;">twiterdata</span><span style="font-size: 1em; line-height: 1.5;">": {</span>
                    "aa" : {"type" : "long", "index" : "not_analyzed"},
                    "bb" : {"type" : "String", "index" : "not_analyzed"},
                    "cc" : {"type" : "String", "index" : "not_analyzed"},
                    "dd" : {"type" : "integer", "index" : "not_analyzed"},
                    "ee" : {"type" : "long", "index" : "no"},
                    "ff" : {"type" : "long", "index" : "no"},
                    "gg" : {"type" : "long", "index" : "no"},
                    "hh" : {"type" : "long", "index" : "no"},
                    "ii" : {"type" : "long", "index" : "no"},
                    "jj" : {"type" : "long", "index" : "no"},
                    "kk" : {"type" : "long", "index" : "no"},
                }
}

 執行效率參考:

不開啟refresh_interval
[[email protected] bin]$ more StreamIntoEs.out|grep total
[thread-2-]total count:1199411
[thread-2-]total time:1223718
[thread-2-]total count/s:980.1368
[thread-1-]total count:1447214
[thread-1-]total time:1393528
[thread-1-]total count/s:1038.5253
[thread-0-]total count:1508043
[thread-0-]total time:1430167
[thread-0-]total count/s:1054.4524
[thread-3-]total count:1650576
[thread-3-]total time:1471103
[thread-3-]total count/s:1121.9989
4195.1134

開啟refresh_interval
[[email protected] bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:996111
[thread-2-]total count/s:1204.0938
[thread-1-]total count:1447214
[thread-1-]total time:1163207
[thread-1-]total count/s:1244.1586
[thread-0-]total count:1508043
[thread-0-]total time:1202682
[thread-0-]total count/s:1253.9
[thread-3-]total count:1650576
[thread-3-]total time:1236239
[thread-3-]total count/s:1335.1593
5037.3117

開啟refresh_interval  欄位型別轉換
[[email protected] bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:1065229
[thread-2-]total count/s:1125.9653
[thread-1-]total count:1447214
[thread-1-]total time:1218342
[thread-1-]total count/s:1187.8552
[thread-0-]total count:1508043
[thread-0-]total time:1230474
[thread-0-]total count/s:1225.5789
[thread-3-]total count:1650576
[thread-3-]total time:1274027
[thread-3-]total count/s:1295.5581
4834.9575

開啟refresh_interval  欄位型別轉換 設定id
[thread-2-]total count:1199411
[thread-2-]total time:912251
[thread-2-]total count/s:1314.7817
[thread-1-]total count:1447214
[thread-1-]total time:1067117
[thread-1-]total count/s:1356.1906
[thread-0-]total count:1508043
[thread-0-]total time:1090577
[thread-0-]total count/s:1382.7937
[thread-3-]total count:1650576
[thread-3-]total time:1128490
[thread-3-]total count/s:1462.6412
5516.4072

 580M的資料平均用時大概是20分鐘。索引檔案大約為1.76G

 相關測試結果可以參考這裡:

相關推薦

ElasticSearch批量資料入庫

最近著手處理大批量資料的任務。 現狀是這樣的,一個數據採集程式承載大批量資料的儲存和檢索。後期可能需要對大批量資料進行統計。 資料分佈情況 13個點定時生成採集結果到4個檔案(小檔案生成周期是5分鐘) 名稱 大

快速傳輸批量資料(tar+lz4+pv+ssh)

快速傳輸大批量資料(tar+lz4+pv+ssh) 伺服器之間傳輸資料平時常使用的命令如scp、rsync這兩個,一些小的檔案或目錄這兩個命令足以搞定,但是一旦資料幾十上百G,這樣傳輸就有些慢了。 前兩天做遠端資料傳輸的時候,用scp命令始終感覺有點慢,就google了一下,發現了一

高德地圖批量資料(上萬)畫歷史軌跡實現方案

轉載請註明出處:https://www.cnblogs.com/Joanna-Yan/p/9896180.html  需求:裝置傳回伺服器的軌跡點,需要在web地圖上顯示。包括畫座標點覆蓋物、軌跡路線圖。當資料量達到一定量時,介面出現卡頓。問題出現幾天前端人員都未解決。 第一反應,大量的覆蓋物肯

MySQL刪除批量資料

1.刪除大表的部分資料 一個表有1億6000萬的資料,有一個自增ID。最大值就是1億6000萬,需要刪除大於250萬以後的資料,有什麼辦法可以快速刪除? 看到mysql文件有一種解決方案:http://dev.mysql.com/doc/refman/5.0/en/delete.html  

Java 匯出批量資料excel(百萬級)(轉載)

參考資料:http://bbs.51cto.com/thread-1074293-1-1.html                 http://bbs.51cto.com/viewthread.php?tid=1074

oracle 匯入批量資料的一些坑

匯入 1.2g的dmp檔案 時候 發現報錯 oracle 的1658 意思 是 你表空間的容量 不夠了 有幾種可能性: 1: dbf檔案 所在的磁碟檔案不夠了 2: 表空間沒有設定自增 第一種情況 自行查詢 第二種情況 可以先用 SELECT FIL

[樂意黎]MySQL使用事務及 PDO 插入批量資料

<?php $serverIP = "127.0.0.1"; $userName= "user"; $passWord= "pass"; $dbname = "aerchi"; try { $pdo = new PDO("mysql:host=$serverIP;dbname=

如何解決本地批量資料的更新,和後臺的同步,講解socket的IPC和socket的通訊

說這個問題首先我先說下這個業務的使用場景。隨著網際網路的發展進入了下半場,有以前的app大而且多的局面滿滿的走向精而細的劃分,每一個app的如何基於大資料統計使用者行為是衡量一款產品的優劣標準之一,因為這些資料驅動老闆、產品、市場、運營的業務決策,深度瞭解你的使用者行為,評估

AWK命令生成批量資料並記錄到文字檔案中

AWK命令生成大批量資料並記錄到文字檔案 示例: #!/usr/bin/ksh startnum=0 endnum=100 awk ‘BEGIN{ for (j = ‘1’; j <= ‘900000’ ; ++j) { printf “10%08d|99

ORACLE資料庫更新批量資料案例

更新大批量資料的背景: 使用者需要將VIP的微信標識,傳給使用者的ERP會員檔案中,已知存量資料約50W行資料,線下的微信標識資料我們開發提供了openid和erpid的csv檔案,erpid和線下的會員檔案id對應,需要將openid也更新到會員檔案裡。

C# SqlBulkCopy類批量資料存入資料庫

最近因為要做資料效能優化,原有的資料通過foreach迴圈插入資料庫,由於每迴圈一次就要和資料庫互動一次,效能非常差,用時很長,在網上查閱了一些資料找到了SqlBulkCopy這個類,在原有的資料基礎上進行改造,效能非常高。在此將自己的一些使用心得和大家分享: private voi

shell指令碼刪除線上MySQL批量資料

【需求】 有時線上會有這種需求: 將A表中id欄位等於B表id欄位的記錄刪掉,A表和B表資料分佈在不同例項的不同庫裡,且資料量很大。 【解決辦法】 將B表的id欄位從備庫匯出,select into outfile 在A表所在例項test庫建立臨時表tmp_id,匯入資料,

資料庫插入或者更新批量資料的效能優化

1、一條SQL語句插入多條資料2、在事務中進行插入處理。3、資料有序插入。而使用合併資料+事務+有序資料的方式在資料量達到千萬級以上表現依舊是良好,在資料量較大時,有序資料索引定位較為方便,不需要頻繁對磁碟進行讀寫操作,所以可以維持較高的效能。

mysql批量資料插入技巧

轉自:https://jingyan.baidu.com/album/95c9d20d61b01dec4f75615a.html?picindex=6 首先我是簡單的寫了一個mysql的迴圈插入資料的SP,具體如下: 這是插入100W資料的過程和結果,可

MySQL插入批量資料是報錯“The total number of locks exceeds the lock table size”的解決辦法

事情的原因是:我執行了一個load into語句的SQL將一個很大的檔案匯入到我的MySQL資料庫中,執行了一段時間後報錯“The total number of locks exceeds the lock table size”。 首先使用命令 show variables like '%storage

Spring+Hibernate處理批量資料

原文:http://blog.csdn.net/ye1992/article/details/9291237 關於使用spring+hibernate進行大批量資料的插入和更新,它的效能和使用JDBC  PreparedStatement的batch批量操作以及資料庫的儲

mysql匯入批量資料出現MySQL server has gone away的解決方法

因工作需要,需要匯入一個200M左右的sql到user庫 執行命令 mysql> use user Database changed mysql> source /tmp/user.

SQL Server從讀寫頻繁的表中刪除批量資料

如果我們直接用delete from語句來刪除讀寫頻繁的大表中的資料,很有可能會因為where的條件是全表掃描從而導致整個表被鎖住了。如果該表是讀寫頻繁的生產庫那簡直就是一場災難,所有的線上讀寫請求都會因為表被鎖而超時。 如果不想如此糟糕的情況發生,我們可以先分析一下導致表被鎖住的原因。既然全表掃描會會導致

C# 多執行緒+佇列處理批量資料,進而縮短處理時間

public void DealData(){                int pageSize = 200; //建立佇列                         var queue = new MessageQueueManager<Model>

php讀取檔案使用redis的pipeline匯入批量資料

第一次寫部落格,哈哈,純屬用來記錄一下自己工作中遇到的問題及解決辦法。昨天因為工作的需求,需要做一個後臺上傳TXT檔案,讀取其中的內容,然後匯入redis庫中。要求速度快,並且支援至少10W以上的資料,而內容也就一個欄位存類似openid和QQ。我一開始做的時候就老套路,遍歷