平臺搭建---storm 教程
批處理和流式處理的應用程式對比
解決方案 | Storm | Spark Streaming | Flink | S4 | Hadoop |
---|---|---|---|---|---|
開發者 | UC Berkeley AMPLab | Apache | Yahoo! | Apache | |
型別描述 | Twitter的流式處理大資料分析方案 | 支援記憶體資料集和彈性恢復的分析平臺 | 針對流資料和批資料的分散式處理引擎,所有的資料都看作流 | Yahoo!的分散式流式計算平臺 | MapReduce正規化的第一個開源實現 |
吞吐量 | 低 | 高 | 高 | ||
延遲 | 毫秒級 | 秒級 | 亳秒級 | ||
語義保障 | at least once | exactly once | exactly once/ at least once | ||
處理模式 | 單條資料 | 處理批量資料處理 | 單條、批量資料處理 | ||
成熟度 | 成熟 | 成熟 | 新興框架 | ||
SQL支援 | Beta | 成熟 | 新興 |
storm是什麼
Storm is a distributed realtime computation system.
關鍵詞:分散式、實時、計算
你什麼時候需要storm
當你有海量資料需要進行實時處理的時候,在這種場景下你往往需要利用到多臺機器,而且讓你關注的某一類資料按一定的規則路由到確切的節點,從而實現對資訊流(往往需是有狀態的)的連續計算。
實際上分散式計算就是一大堆節點(一般是在多臺機器上)之間的互相通訊,而storm管理了這些節點,定義了一個計算的模型(topology)讓開發者可以忽略很多細節(比如叢集管理、訊息佇列),從而把實現實時分散式計算任務簡單化。
storm的哲學
storm的元件
- Nimbus: 分發程式碼,分發任務,監控錯誤
- Zookeeper: 管理各個元件,保持系統穩定
- Supervisor: 執行任務,往往多個組成一個拓撲(Topology)
storm的計算模型
- topology: 拓撲,實際上是一副圖,代表了對某個計算過程的描述,他的組成部分有 Spout, Bolt, stream
- Spout: 產生資料流,資料流的起點
- Bolt: 接收資料流,執行計算或者重新轉發出資料流
- Stream: 資料流,即上圖的箭頭
- Tuple: 資料流在計算模型中是由無數個tuple組成的所有的節點在這個拓撲中都是併發執行的。
storm的幾種路由方式
路由(grouping)定義了stream如何在各個節點之中流動,下面只介紹幾種常見方式,如下:
Shuffle grouping: 洗牌模式。隨機平均地發配到下游節點上。
Fields grouping: 按照某一個欄位來分配,擁有相同值的欄位會分配到同一個節點上(即可連續跟蹤某個固定特徵的資料流)
Global grouping: 強制到某唯一的節點,實際上如果有多個節點去到任務號最低的節點。
all grouping: 強制到所有節點,需小心使用。
Partial Key grouping: 最新支援的,帶負載均衡的Fields grouping。
None grouping: 不關心資料流是如何分配的,當前等同於Shuffle grouping。
Direct grouping: 手動指定要流動到的節點。
**Local or shuffle grouping: ** 如果bolt有多個任務,那麼資料流只會分配當正在處理的任務;其他情況與shuffle grouping一樣 。
《Storm入門》
章節目錄
介紹Storm的特性以及可能的應用場景。
第二章 起步
講述了Storm的執行模式,Storm工程包含的元件,以及如何建立一個Storm工程。
第三章 拓撲
對Storm的拓撲結構,各個元件如何分工協作做了詳細介紹,資料流分組是本章重點。
介紹Storm的資料來源——spouts,Storm的所有資料都從這裡開始。
介紹Storm處理資料的元件。
以一個簡單的WEB應用講解如何Storm進行資料分析。
以PHP為例講述如何使用非JVM語言開發Storm工程。
講解支援事務的拓撲,當然不要把這裡的事務跟關係型資料庫的事務等同起來。
附錄A
安裝Storm客戶端,以及常用命令。
附錄B
安裝與部署Storm叢集。
附錄C
如何執行第六章的例子
- 兩步:一是建立topologies ,二是用其他語言來執行spouts 和bolts 。
- 用其他語言建立topologies很容易,因為topologies是thrift 框架(連線到storm.shift)
- 用其他語言來執行spouts和bolts被稱作 “multilang components” 或者"shelling"
- thrift 框架讓你能夠明確地以程式或指令碼的方式來定義多語言模組(比如python,py檔案會執行你的bolt)
- 對於java語言,通過覆蓋ShellBolt 或ShellSpout來建立多語言模組。
- 注意:輸出欄位聲明瞭thrift 框架中要發生的事情,所以在java中,你可以通過下面的方式來建立多語言模組。
- 用java來宣告欄位,然後用其它語言來實現處理邏輯並在shellbolt構造器中指明。
- 注意:輸出欄位聲明瞭thrift 框架中要發生的事情,所以在java中,你可以通過下面的方式來建立多語言模組。
- 各語言都是以json 作為標準輸入輸出資料格式,以便和其他過程進行通訊。
- Storm附帶了 Ruby, Python, and Fancy 的介面卡的庫。以python為例
- python 支援emitting, anchoring, acking, and logging
- “storm shell” 命令使得構建和上傳jar變得很容易
- 建立jar並上傳
- 通過nimbus 的主機/埠和jarfile id來呼叫你的應用。
用非java語言執行DSL
從 src/storm.thrift開始是個不錯的選擇,因為storm的拓撲結構就是Thrift 框架,Nimbus 是一個Thrift 守護程序。你可以用任何語言建立和提交拓撲。
當你為spouts 和bolts建立Thrift 結構時,spout 或bolt 相關的程式碼是在ComponentObject 結構中指明。
union ComponentObject {
1: binary serialized_java;
2: ShellComponent shell;
3: JavaObject java_object;
}
對於非java DSL,你需要利用 “2” 和"3",ShellComponent 可以設定執行那個元件的指令碼(比如你的python程式碼),JavaObject 可以設定原生java語言的spout 和 bolt (storm會使用對映去建立那個spout 或bolt)。
有一個storm shell命令可以提交一個拓撲結構,用法如下:
storm shell resources/ python topology.py arg1 arg2
storm shell 會將resources/ 下的檔案打包進一個jar包,上傳這個jar包到Nimbus,並像下面那樣呼叫你的topology.py 指令碼:
python topology.py arg1 arg2 {nimbus-host} {nimbus-port} {uploaded-jar-location}
然後你就可以用thift API連線到Nimbus 並提交這個拓撲結構,呼叫時需要將 {uploaded-jar-location}作為引數傳遞給提交的submitTopology 方法。下面給出了一個submitTopology 定義的參考:
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology)
throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
利用python 操作storm
一般的入門會讓你開始你的第一個java程式來提交topology,這裡會使用python(對,只需要python)來進行示例。
Python目前有兩個庫,一個是pyleus(yelp公司出品),一個是streamparse。前者在github上已經有兩年都不更新了,只支援到storm 0.9。後者一直在更新,需要選擇配套的storm和streamparse版本。
python玩storm趟坑記
二十八、在storm上執行python程式
二十九、在storm上執行python程式(修正)
streamparse 快速上手
依賴
Java and Clojure
具體講:
1.JDK 7+
2.lein ,lein是Clojure的包管理工具和編譯工具,可通過 Leiningen project page 或github
lein的安裝有兩種方式,一種是用指令碼下載安裝,一種是直接linux系統安裝,如下所示,可能需要新增可靠的源。
yum install lein
我自己的安裝方式是
- 下載指令碼,windows是一個指令碼。具體看官網。
我將指令碼另存為lein - 把指令碼複製到shell 可以找到的地方,比如/usr/local/bin
- 讓指令碼可執行
chmod a+x /usr/local/bin/lein
- 執行指令碼,就會自己下載相關的包leiningen-2.8.1-standalone.jar.
lein檔案一定要放在系統路徑上
可通過lein version
檢視lein是否安裝,成功安裝會有類似如下的顯示:
Leiningen 2.3.4 on Java 1.7.0_55 Java HotSpot(TM) 64-Bit Server VM
3.Apache Storm 的開發環境,至少需要 0.10.0以上版本。
具體安裝參考另一篇元件安裝的資料。
可通過storm version
檢視strom是否安裝。安裝成功是有類似如下的顯示:
Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.1 -Dstorm.log.dir=/opt/apache-storm-1.0.1/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.1/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.1/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.1/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.1/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.1/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.1/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.1/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.1/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.1/lib/storm-core-1.0.1.jar:/opt/apache-storm-1.0.1/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.1/lib/storm-rename-hack-1.0.1.jar:/opt/apache-storm-1.0.1/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.1/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.1/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.1/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.1/conf org.apache.storm.utils.VersionInfo
Storm 1.0.1
URL https://git-wip-us.apache.org/repos/asf/storm.git -r b5c16f919ad4099e6fb25f1095c9af8b64ac9f91
Branch (no branch)
Compiled by tgoetz on 2016-04-29T20:44Z
From source with checksum 1aea9df01b9181773125826339b9587e
安裝streamparse
pip3 install streamparse
即然是一個python庫就會有各種安裝方法,這是一個安裝包的位置,上面還有一個wordcount的例子。
由於使用pip安裝可能會需要libffi等系統依賴(也可以先安裝這些系統依賴)
yum install libffi-devel
如果提示“致命錯誤:openssl/opensslv.h:沒有那個檔案或目錄”
,可參考:作者yum info openssl
發現,openssl已經安裝過了,怎麼還是會缺少openssl.c的檔案呢?openssl是已經安裝二進位制的可執行程式,而這裡的安裝scrapy則需要的是openssl的原始檔程式,比如openssl.h。故這裡需要補充安裝的是openssh.h的開發版,其中包含相關的安裝原始碼檔案。在確認了問題之後,接下來就是安裝openssl-devel的安裝包了:yum install openssl-deve
由於我的系統上有兩個版本的python,系統預設是較低版本,自己安裝的是python3.5,spark中已經設定預設指向python3.5,strom中如何設定還要研究。但是個人覺得streamparse即是python的一個庫,同時又有點獨立應用的感覺(就是可以自己單獨執行,像spark下的pyspark一樣)。故而如果我將streamparse安裝在python3.5下,同時將streamparse路徑新增到環境變數。啟動streamparse應該就是使用python3.5環境來運行了。
安裝完streamparse後會在對應版本python的bin目錄下產生以下幾個檔案
/usr/local/python35/bin/sparse
/usr/local/python35/bin/streamparse
/usr/local/python35/bin/streamparse_run
如果bin目錄不在系統路徑中,那麼應該把sparse 或streamparse_run(連結到linux shell可以識別的地方我把streamparse也連結過去了)。後面我們就可以在shell裡通過sparse來執行相關程式的。
Your First Project
lein安裝好,streamparse安裝好,strom安裝並啟動(nimbus和supervisor都要啟動)後,就可以啟動streamparse來執行我們的程式了。
建立專案檔案
執行
sparse quickstart wordcount
這是streamparse團隊編的一個例子,執行後會自動在當前目錄產生一個wordcount目錄,內含
wordcount/.gitignore
wordcount/config.json
wordcount/fabfile.py
wordcount/project.clj
wordcount/README.md
wordcount/src
wordcount/src/bolts
wordcount/src/bolts/__init__.py
wordcount/src/bolts/wordcount.py
wordcount/src/spouts
wordcount/src/spouts/__init__.py
wordcount/src/spouts/words.py
wordcount/topologies
wordcount/topologies/wordcount.py
wordcount/virtualenvs
wordcount/virtualenvs/wordcount.txt
其實就是建立了一個python語言的storm專案,內含config.json等配置檔案,拓撲的定義檔案,spouts的定義檔案等。
執行本地拓撲
先修改專案配置檔案,配置config.json如下:
{
"serializer": "json",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"envs": {
"prod": {
"user": "digger",
"ssh_password": "",
"nimbus": "localhost",
"workers": ["localhost"],
"log": {
"path": "",
"max_bytes": 1000000,
"backup_count": 10,
"level": "info"
},
"virtualenv_root": "~/tmp/wordcount/virtualenvs"
}
}
}
應該是隻是nimbus和workers地址設定比較重要。
cd wordcount
sparse run
需要跑到專案下執行,可能是以config.json
的位置為參考;這個應該是可以設定的吧,要不執行起來太不靈活了。不過執行過程會自動將相關程式打包成jar檔案,個人感覺這個jar檔案才是關鍵,然後這個jar被提交的storm.
如果碰到下面這樣的錯誤
ValueError: Local Storm version, 1.2.2, is not the same as the version in your project.clj, 1.1.0. The versions must match.
就要修改wordcount/project.clj
檔案,以及更換對應版本的storm.
我是僅修改了project.clj檔案對應strom版本號,沒有重新安裝storm。
如果碰到下面的錯誤
Caused by: java.io.IOException: Cannot run program "streamparse_run" (in directory "/tmp/b5e287fa-4eba-4097-a8d5-22e2a4911694/supervisor/stormdist/wordcount-1-1540393384/resources"): error=2, 沒有那個檔案或目錄
那可能是linux系統不認識streamparse_run,應該像sparse命令一樣,新增到shell可以識別的路徑。
機器會花一定時間來編譯JAR檔案,然後就能看到實時流的輸出了。
除了編譯還會執行一大堆的東西,包括各種檔案的複製。最終持續進行中的狀態是類似下的輸出內容不斷滾動
102540 [Thread-29] INFO o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:23,417 - pystorm.component.count_bolt - counted [360,000] words [pid=61460]
103257 [Thread-29] INFO o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:24,135 - pystorm.component.count_bolt - counted [363,000] words [pid=61460]
103914 [Thread-29] INFO o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:24,791 - pystorm.component.count_bolt - counted [366,000] words [pid=61460]
104643 [Thread-29] INFO o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:25,521 - pystorm.component.count_bolt - counted [369,000] words [pid=61460]
這個快速上手教程提供了一個簡單的拓撲例子,可以進一步檢視和修改。
更多命令
如果想要看sparse有哪些命令,可以sparse -h
,具體的命令如下:
jar Create a deployable JAR for a topology.
kill Kill the specified Storm topology
list List the currently running Storm topologies
quickstart Create new streamparse project template.
remove_logs Remove logs from Storm workers.
run Run the local topology with the given arguments
slot_usage Display slots used by every topology on the cluster.
stats Display stats about running Storm topologies.
submit Submit a Storm topology to Nimbus.
tail Tail logs for specified Storm topology.
update_virtualenv Create or update a virtualenv on Storm workers.
visualize Create a Graphviz visualization of the topology
worker_uptime Display uptime for Storm workers.
help Print help information about other commands.
專案結構
File/Folder | Contents |
---|---|
config.json | Configuration information for all of your topologies. |
Optional custom fabric tasks. | |
project.clj | leiningen project file (can be used to add external JVM dependencies). |
src/ | Python source files (bolts/spouts/etc.) for topologies. |
Optional custom invoke tasks. | |
topologies/ | Contains topology definitions written using the Topology DSL. |
virtualenvs/ | Contains pip requirements files used to install dependencies on remote Storm servers. |
定義拓撲結構
storm是基於shift框架的,可以用純python語言來定義拓撲結構。
下面是例子中的拓撲結構
"""
Word count topology
"""
from streamparse import Grouping, Topology
from bolts.wordcount import WordCountBolt
from spouts.words import WordSpout
class WordCount(Topology):
word_spout = WordSpout.spec()
count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')},
par=2)
count_bolt 告訴storm,輸入的元組是按單詞作為域進行路由。storm提供了豐富的路由方式可供選擇,用得最多的是隨機分配和按域分配。
Spouts 和 Bolts
一般用streamparse 建立Spouts 和 Bolts的方式是將兩類檔案加入專案下的src資料夾,並更新拓撲檔案。
下面是定義傳送句子的spout 。
import itertools
from streamparse.spout import Spout
class SentenceSpout(Spout):
outputs = ['sentence']
def initialize(self, stormconf, context):
self.sentences = [
"She advised him to take a long holiday, so he immediately quit work and took a trip around the world",
"I was very glad to get a present from her",
"He will be here in half an hour",
"She saw him eating a sandwich",
]
self.sentences = itertools.cycle(self.sentences)
def next_tuple(self):
sentence = next(self.sentences)
self.emit([sentence])
def ack(self, tup_id):
pass # if a tuple is processed properly, do nothing
def fail(self, tup_id):
pass # if a tuple fails to process, do nothing
一旦spout 進入主程式,streamparse 就是呼叫initialize() 方法,初始化完成後streamparse就會不斷地呼叫next_tuple()方法。在這個函式中會不斷髮送元組,而通過拓撲的設定,對應的bolt就能接收到這些元組。
下面是一個bolt 的例子,它接收句子,並將它拆分成單詞。
import re
from streamparse.bolt import Bolt
class SentenceSplitterBolt(Bolt):
outputs = ['word']
def process(self, tup):
sentence = tup.values[0] # extract the sentence
sentence = re.sub(r"[,.;!\?]", "", sentence) # get rid of punctuation
words = [[word.strip()] for word in sentence.split(" ") if word.strip()]
if not words:
# no words to process in the sentence, fail the tuple
self.fail(tup)
return
for word in words:
self.emit([word])
# tuple acknowledgement is handled automatically
bolt 執行的操作更簡單,這裡只是簡單的覆蓋process()方法,當由spout 或其他bolt傳送的輸入流到達的時候streamparse 就會呼叫process()方法來進行相應處理。在這裡你可以定義自己的處理邏輯,並將結果傳送給下游。
如果呼叫process()時發生意外,那麼streamparse 會先丟掉當前元組資訊,然後再殺死python程序。
Failed Tuples
Bolt Configuration Options
Handling Tick Tuples
如何接收外部資料輸入和輸出資料到外部系統
Spouts 和 Bolts是我們資料處理的核心。但是之前介紹Spouts都是手動製造的資料 ,而介紹Bolt時只介紹了一個簡單的資料處理方式和傳送機制,並沒有真正傳送到外部系統(預設是到標準輸出用螢幕列印?)
外部資料輸入
:
這是一個混合了java和python例子,資料輸入部分是用java從kafka讀取資料實現的spout,資料輸出是由python寫的bolt實現。
PixelSpout.java
package pixelcount.spouts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.spout.SchemeAsMultiScheme;
public class PixelSpout extends KafkaSpout {
public PixelSpout(SpoutConfig spoutConf) {
super(spoutConf);
}
public PixelSpout() {
this(PixelSpout.defaultSpoutConfig());
}
public static SpoutConfig defaultSpoutConfig() {
ZkHosts hosts = new ZkHosts("streamparse-box:2181", "/brokers");
SpoutConfig spoutConf = new SpoutConfig(hosts, "pixels", "/kafka_storm", "pixel_reader");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = true;
return spoutConf;
}
}
如何執行在python環境:
第一種方法是把這個java寫的spout當成一個後續資料接入的配置檔案,我們只要修改裡面kafka相關的內容,就可以實現不同場景的移植。這樣即使我不會java我也可以完成自己的專案。
第二種方法是用python重寫,spout的結構我們是知道的,用python寫的spout的也見過,只是之前是自己造的資料來源;如果直接在spout.py檔案中讀入kafka資料,並處理這些資料,處理結果給spout的相關介面,應該也是可以。有時間可以試一下。
整個用streamparse操作storm跟用pyspark-streaming處理流式資料很像,程式都要提交給一個框架。
結果給出的外部系統
from collections import Counter
from redis import StrictRedis
from streamparse import Bolt
class WordCountBolt(Bolt):
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.counter = Counter()
self.total = 0
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
if self.total % 1000 == 0:
self.logger.info("counted %i words", self.total)
self.emit([word, self.counter[word]])
class RedisWordCountBolt(Bolt):
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.redis = StrictRedis()
self.total = 0
def _increment(self, word, inc_by):
self.total += inc_by
return self.redis.zincrby("words", word, inc_by)
def process(self, tup):
word = tup.values[0]
count = self._increment(word, 10 if word == "dog" else 1)
if self.total % 1000 == 0:
self.logger.info("counted %i words", self.total)
self.emit([word, count])
其中的initialize()和_increment()函式就是定義資料輸出到redis的。process()是資料處理過程。