1. 程式人生 > >Apache Ignite剖析

Apache Ignite剖析

tor cfg 管理系 碎片 內存數據 rdquo AC 體系 頻繁

1.概述

  Apache Ignite和Apache Arrow很類似,屬於大數據範疇中的內存分布式管理系統。在《Apache Arrow 內存數據》中介紹了Arrow的相關內容,它統一了大數據領域各個生態系統的數據格式,避免了序列化和反序列化所帶來的資源開銷(能夠節省80%左右的CPU資源)。今天來給大家剖析下Apache Ignite的相關內容。

2.內容

  Apache Ignite是一個以內存為中心的數據平臺,具有強一致性、高可用、強大的SQL、K/V以及其所對應的應用接口(API)。結構分布圖如下所示:

技術分享圖片

  在整個Ignite集群中的多個節點中,Ignite內存中的數據模式有三種,分別是LOCAL、REPLICATED和PARTITIONED。這樣增加了Ignite的擴展性,Ignite可以自動化的控制數據如何分區,使用者也可以插入自定義的方法,或是為了提供效率將部分數據並存在一起。

  Ignite和其他關系型數據庫具有相似的行為,但是在處理約束和索引方面略有不同。Ignite支持一級和二級索引,但是只有一級索引支持唯一性。在持久化方面,Ignite固化內存在內存和磁盤中都能良好的工作,但是持久化到磁盤是可以禁用的,一般將Ignite作為一個內存數據庫來使用。

  由於Ignite是一個全功能的數據網格,它既可以用於純內存模式,也可以帶有Ignite的原生持久化。同時,它還可以與任何第三方的數據庫集成,包含RDBMS和NoSQL。比如,在和Hadoop的HDFS、Kafka等,開發基於大數據平臺下的SQL引擎,來操作HDFS、Kafka這類的大數據存儲介質。

2.1 內存和磁盤

  Apache Ignite是基於固化內存架構的,當Ignite持久化存儲特性開啟時,它可以在內存和磁盤中存儲和處理數據和索引。在固化內存和Ignite持久化存儲同時開啟時,具有以下優勢:

2.1.1 內存優勢

  • 對外內存
  • 避免顯著的GC暫停現象
  • 自動化碎片清理
  • 可預估的內存消耗
  • 高SQL性能

2.1.2 磁盤優勢

  • 可選的持久化
  • 支持SSD介質
  • 分布式存儲
  • 支持事物
  • 集群瞬時啟動

2.2 持久化過程

  Ignite的持久化存儲時一個分布式的、支持ACID、兼容SQL的磁盤存儲。它作為一個可選的磁盤層,可以將數據和索引存儲到SSD這類磁盤介質,並且可以透明的與Ignite固化內存進行集成。Ignite的持久化存儲具有以下優勢:

  • 可以在數據中執行SQL操作,不管數據在內存還是在磁盤中,這意味著Ignite可以作為一個經過內存優化的分布式SQL數據庫
  • 可以不用講所有的數據和索引保持在內存中,持久化存儲可以在磁盤上存儲數據的大數據集合,然後只在內存中保持訪問頻繁的數據子集
  • 集群是瞬時啟動,如果整個集群宕機,不需要通過預加載數據來對內存進行數據“預熱”,只需要將所有集群的節點都連接到一起,整個集群即可正常工作
  • 數據和索引在內存和磁盤中以相似的格式進行存儲,避免復雜的格式轉化,數據集只是在內存和磁盤之間進行移動

  持久化流程如下圖所示:

技術分享圖片

2.3 分布式SQL內存數據庫

  在Apache Ignite中提供了分布式SQL數據庫功能,這個內存數據庫可以水平擴展、容錯且兼容標準的SQL語法,它支持所有的SQL及DML命令,包含SELECT、INSERT、DELETE等SQL命令。依賴於固化內存架構,數據集和索引可以同時在內存和磁盤中進行存儲,這樣可以跨越不同的存儲層執行分布式SQL操作,來獲得可以固化到磁盤的內存級性能。可以使用Java、Python、C++等原生的API來操作SQL與Ignite進行數據交互,也可以使用Ignite的JDBC或者ODBC驅動,這樣就具有了真正意義上的跨平臺連接性。具體架構體系,如下圖所示:

技術分享圖片

3.代碼實踐

  了解Apache的作用後,下面我們可以通過模擬編寫一個大數據SQL引擎,來實現對Kafka的Topic的查詢。首先需要實現一個KafkaSqlFactory的類,具體實現代碼如下所示:

/**
 * TODO
 * 
 * @author smartloli.
 *
 *         Created by Mar 9, 2018
 */
public class KafkaSqlFactory {

	private static final Logger LOG = LoggerFactory.getLogger(KafkaSqlFactory.class);

	private static Ignite ignite = null;

	private static void getInstance() {
		if (ignite == null) {
			ignite = Ignition.start();
		}
	}

	private static IgniteCache<Long, TopicX> processor(List<TopicX> collectors) {
		getInstance();
		CacheConfiguration<Long, TopicX> topicDataCacheCfg = new CacheConfiguration<Long, TopicX>();
		topicDataCacheCfg.setName(TopicCache.NAME);
		topicDataCacheCfg.setCacheMode(CacheMode.PARTITIONED);
		topicDataCacheCfg.setIndexedTypes(Long.class, TopicX.class);
		IgniteCache<Long, TopicX> topicDataCache = ignite.getOrCreateCache(topicDataCacheCfg);
		for (TopicX topic : collectors) {
			topicDataCache.put(topic.getOffsets(), topic);
		}
		return topicDataCache;
	}

	public static String sql(String sql, List<TopicX> collectors) {
		try {
			IgniteCache<Long, TopicX> topicDataCache = processor(collectors);
			SqlFieldsQuery qry = new SqlFieldsQuery(sql);
			QueryCursor<List<?>> cursor = topicDataCache.query(qry);
			for (List<?> row : cursor) {
				System.out.println(row.toString());
			}
		} catch (Exception ex) {
			LOG.error("Query kafka topic has error, msg is " + ex.getMessage());
		} finally {
			close();
		}
		return "";
	}

	private static void close() {
		try {
			if (ignite != null) {
				ignite.close();
			}
		} catch (Exception ex) {
			LOG.error("Close Ignite has error, msg is " + ex.getMessage());
		} finally {
			if (ignite != null) {
				ignite.close();
			}
		}
	}

}

  然後,模擬編寫一個生產者來生產數據,並查詢數據集,實現代碼如下所示:

public static void ignite(){
		List<TopicX> collectors = new ArrayList<>();
		int count = 0;
		for (int i = 0; i < 10; i++) {
			TopicX td = new TopicX();
			if (count > 3) {
				count = 0;
			}
			td.setPartitionId(count);
			td.setOffsets(i);
			td.setMessage("hello_" + i);
			td.setTopicName("test");
			collectors.add(td);
			count++;
		}

		String sql = "select offsets,message from TopicX where offsets>6 and partitionId in (0,1) limit 1";
		long stime = System.currentTimeMillis();
		KafkaSqlFactory.sql(sql, collectors);
		System.out.println("Cost time [" + (System.currentTimeMillis() - stime) / 1000.0 + "]ms");
	}

  執行結果如下所示:

技術分享圖片

4.總結

  Apache Ignite整體來說,它基本把現在分布式的一些概念都集成了,包含分布式存儲、分布式計算、分布式服務、流式計算等等。而且,它對Java語言的支持,與JDK能夠很好的整合,能夠很友好的兼容JDK的現有API,當你開啟一個線程池,你不需要關系是本地線程池還是分布式線程池,只管提交任務就行。Apache Ignite在與RDBMS、Hadoop、Spark、Kafka等傳統關系型數據庫和主流大數據套件的集成,提供了非常靈活好用的組件API。

5.結束語

  這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麽問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

Apache Ignite剖析