Hadoop是對大資料集進行分散式計算的標準工具,這也是為什麼當你穿過機場時能看到”大資料(Big Data)”廣告的原因。它已經成為大資料的作業系統,提供了包括工具和技巧在內的豐富生態系統,允許使用相對便宜的商業硬體叢集進行超級計算機級別的計算。2003和2004年,兩個來自Google的觀點使Hadoop成為可能:一個分散式儲存框架(Google檔案系統),在Hadoop中被實現為HDFS;一個分散式計算框架(MapReduce)。
這兩個觀點成為過去十年規模分析(scaling analytics)、大規模機器學習(machine learning),以及其他大資料應用出現的主要推動力!但是,從技術角度上講,十年是一段非常長的時間,而且Hadoop還存在很多已知限制,尤其是MapReduce。對MapReduce程式設計明顯是困難的。對大多數分析,你都必須用很多步驟將Map和Reduce任務串接起來。這造成類SQL的計算或機器學習需要專門的系統來進行。更糟的是,MapReduce要求每個步驟間的資料要序列化到磁碟,這意味著MapReduce作業的I/O成本很高,導致互動分析和迭代演算法(iterative algorithms)開銷很大;而事實是,幾乎所有的最優化和機器學習都是迭代的
為了解決這些問題,Hadoop一直在向一種更為通用的資源管理框架轉變,即YARN(Yet Another Resource Negotiator, 又一個資源協調者)。YARN實現了下一代的MapReduce,但同時也允許應用利用分散式資源而不必採用MapReduce進行計算。通過將叢集管理一般化,研究轉到分散式計算的一般化上,來擴充套件了MapReduce的初衷。
Apache Spark™是用於大規模資料處理的統一分析引擎,是第一個脫胎於該轉變的快速、通用分散式計算正規化,並且很快流行起來。Spark使用函數語言程式設計正規化擴充套件了MapReduce模型以支援更多計算型別,可以涵蓋廣泛的工作流,這些工作流之前被實現為Hadoop之上的特殊系統。Spark使用記憶體快取來提升效能,因此進行互動式分析也足夠快速(就如同使用Python直譯器,與叢集進行互動一樣)。快取同時提升了迭代演算法的效能,這使得Spark非常適合資料理論任務,特別是機器學習。
安裝 pip install pyspark
執行 pyspark ,還需java環境
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from io importStringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module py2 reader = csv.reader(StringIO(line)) return """ reader = csv.reader(StringIO(line)) return next(reader) def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(range(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min + 1, idx + 0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx + 0.5), va='center') # Set the ticks ticks = plt.yticks([idx + 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis='x', color='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("data/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("data/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print ("%0.0f minutes delayed\t%s" % (d[1], d[0])) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)