1. 程式人生 > >平臺搭建---storm 教程

平臺搭建---storm 教程

批處理和流式處理的應用程式對比

解決方案 Storm Spark Streaming Flink S4 Hadoop
開發者 Twitter UC Berkeley AMPLab Apache Yahoo! Apache
型別描述 Twitter的流式處理大資料分析方案 支援記憶體資料集和彈性恢復的分析平臺 針對流資料和批資料的分散式處理引擎,所有的資料都看作流 Yahoo!的分散式流式計算平臺 MapReduce正規化的第一個開源實現
吞吐量
延遲 毫秒級 秒級 亳秒級
語義保障 at least once exactly once exactly once/ at least once
處理模式 單條資料 處理批量資料處理 單條、批量資料處理
成熟度 成熟 成熟 新興框架
SQL支援 Beta 成熟 新興

storm是什麼

Storm is a distributed realtime computation system.

關鍵詞:分散式、實時、計算

你什麼時候需要storm

當你有海量資料需要進行實時處理的時候,在這種場景下你往往需要利用到多臺機器,而且讓你關注的某一類資料按一定的規則路由到確切的節點,從而實現對資訊流(往往需是有狀態的)的連續計算。
實際上分散式計算就是一大堆節點(一般是在多臺機器上)之間的互相通訊,而storm管理了這些節點,定義了一個計算的模型(topology)讓開發者可以忽略很多細節(比如叢集管理、訊息佇列),從而把實現實時分散式計算任務簡單化。

storm的哲學

storm的元件

storm的元件
  • Nimbus: 分發程式碼,分發任務,監控錯誤
  • Zookeeper: 管理各個元件,保持系統穩定
  • Supervisor: 執行任務,往往多個組成一個拓撲(Topology)

storm的計算模型

storm的計算模型
  • topology: 拓撲,實際上是一副圖,代表了對某個計算過程的描述,他的組成部分有 Spout, Bolt, stream
  • Spout: 產生資料流,資料流的起點
  • Bolt: 接收資料流,執行計算或者重新轉發出資料流
  • Stream: 資料流,即上圖的箭頭
  • Tuple: 資料流在計算模型中是由無數個tuple組成的所有的節點在這個拓撲中都是併發執行的。

storm的幾種路由方式

路由(grouping)定義了stream如何在各個節點之中流動,下面只介紹幾種常見方式,如下:
Shuffle grouping: 洗牌模式。隨機平均地發配到下游節點上。
Fields grouping: 按照某一個欄位來分配,擁有相同值的欄位會分配到同一個節點上(即可連續跟蹤某個固定特徵的資料流)
Global grouping: 強制到某唯一的節點,實際上如果有多個節點去到任務號最低的節點。
all grouping: 強制到所有節點,需小心使用。
Partial Key grouping: 最新支援的,帶負載均衡的Fields grouping。
None grouping: 不關心資料流是如何分配的,當前等同於Shuffle grouping。
Direct grouping: 手動指定要流動到的節點。
**Local or shuffle grouping: ** 如果bolt有多個任務,那麼資料流只會分配當正在處理的任務;其他情況與shuffle grouping一樣 。

《Storm入門》

來源:

章節目錄

介紹Storm的特性以及可能的應用場景。

第二章 起步

講述了Storm的執行模式,Storm工程包含的元件,以及如何建立一個Storm工程。

第三章 拓撲

對Storm的拓撲結構,各個元件如何分工協作做了詳細介紹,資料流分組是本章重點。

介紹Storm的資料來源——spouts,Storm的所有資料都從這裡開始。

介紹Storm處理資料的元件。

以一個簡單的WEB應用講解如何Storm進行資料分析。

以PHP為例講述如何使用非JVM語言開發Storm工程。

講解支援事務的拓撲,當然不要把這裡的事務跟關係型資料庫的事務等同起來。

附錄A

安裝Storm客戶端,以及常用命令。

附錄B

安裝與部署Storm叢集。

附錄C

如何執行第六章的例子

  • 兩步:一是建立topologies ,二是用其他語言來執行spouts 和bolts 。
  • 用其他語言建立topologies很容易,因為topologies是thrift 框架(連線到storm.shift)
  • 用其他語言來執行spouts和bolts被稱作 “multilang components” 或者"shelling"
    • thrift 框架讓你能夠明確地以程式或指令碼的方式來定義多語言模組(比如python,py檔案會執行你的bolt)
    • 對於java語言,通過覆蓋ShellBolt 或ShellSpout來建立多語言模組。
      • 注意:輸出欄位聲明瞭thrift 框架中要發生的事情,所以在java中,你可以通過下面的方式來建立多語言模組。
        • 用java來宣告欄位,然後用其它語言來實現處理邏輯並在shellbolt構造器中指明。
    • 各語言都是以json 作為標準輸入輸出資料格式,以便和其他過程進行通訊。
    • Storm附帶了 Ruby, Python, and Fancy 的介面卡的庫。以python為例
      • python 支援emitting, anchoring, acking, and logging
  • “storm shell” 命令使得構建和上傳jar變得很容易
    • 建立jar並上傳
    • 通過nimbus 的主機/埠和jarfile id來呼叫你的應用。

用非java語言執行DSL

從 src/storm.thrift開始是個不錯的選擇,因為storm的拓撲結構就是Thrift 框架,Nimbus 是一個Thrift 守護程序。你可以用任何語言建立和提交拓撲。
當你為spouts 和bolts建立Thrift 結構時,spout 或bolt 相關的程式碼是在ComponentObject 結構中指明。

union ComponentObject {
  1: binary serialized_java;
  2: ShellComponent shell;
  3: JavaObject java_object;
}

對於非java DSL,你需要利用 “2” 和"3",ShellComponent 可以設定執行那個元件的指令碼(比如你的python程式碼),JavaObject 可以設定原生java語言的spout 和 bolt (storm會使用對映去建立那個spout 或bolt)。
有一個storm shell命令可以提交一個拓撲結構,用法如下:

storm shell resources/ python topology.py arg1 arg2

storm shell 會將resources/ 下的檔案打包進一個jar包,上傳這個jar包到Nimbus,並像下面那樣呼叫你的topology.py 指令碼:

python topology.py arg1 arg2 {nimbus-host} {nimbus-port} {uploaded-jar-location}

然後你就可以用thift API連線到Nimbus 並提交這個拓撲結構,呼叫時需要將 {uploaded-jar-location}作為引數傳遞給提交的submitTopology 方法。下面給出了一個submitTopology 定義的參考:

void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology)
    throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);

利用python 操作storm

一般的入門會讓你開始你的第一個java程式來提交topology,這裡會使用python(對,只需要python)來進行示例。

Python目前有兩個庫,一個是pyleus(yelp公司出品),一個是streamparse。前者在github上已經有兩年都不更新了,只支援到storm 0.9。後者一直在更新,需要選擇配套的storm和streamparse版本。
python玩storm趟坑記
二十八、在storm上執行python程式
二十九、在storm上執行python程式(修正)

streamparse 快速上手

依賴

Java and Clojure
具體講:
1.JDK 7+
2.lein ,lein是Clojure的包管理工具和編譯工具,可通過 Leiningen project pagegithub
lein的安裝有兩種方式,一種是用指令碼下載安裝,一種是直接linux系統安裝,如下所示,可能需要新增可靠的源。

yum install lein

我自己的安裝方式是

  • 下載指令碼,windows是一個指令碼。具體看官網
    我將指令碼另存為lein
  • 把指令碼複製到shell 可以找到的地方,比如/usr/local/bin
  • 讓指令碼可執行chmod a+x /usr/local/bin/lein
  • 執行指令碼,就會自己下載相關的包leiningen-2.8.1-standalone.jar.
    lein檔案一定要放在系統路徑上

可通過lein version檢視lein是否安裝,成功安裝會有類似如下的顯示:
Leiningen 2.3.4 on Java 1.7.0_55 Java HotSpot(TM) 64-Bit Server VM
3.Apache Storm 的開發環境,至少需要 0.10.0以上版本。
具體安裝參考另一篇元件安裝的資料。
可通過storm version檢視strom是否安裝。安裝成功是有類似如下的顯示:

Running: java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.1 -Dstorm.log.dir=/opt/apache-storm-1.0.1/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.1/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.1/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.1/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.1/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.1/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.1/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.1/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.1/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.1/lib/storm-core-1.0.1.jar:/opt/apache-storm-1.0.1/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.1/lib/storm-rename-hack-1.0.1.jar:/opt/apache-storm-1.0.1/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.1/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.1/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.1/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.1/conf org.apache.storm.utils.VersionInfo
Storm 1.0.1
URL https://git-wip-us.apache.org/repos/asf/storm.git -r b5c16f919ad4099e6fb25f1095c9af8b64ac9f91
Branch (no branch)
Compiled by tgoetz on 2016-04-29T20:44Z
From source with checksum 1aea9df01b9181773125826339b9587e

安裝streamparse

pip3 install streamparse
即然是一個python庫就會有各種安裝方法,這是一個安裝包的位置,上面還有一個wordcount的例子。
由於使用pip安裝可能會需要libffi等系統依賴(也可以先安裝這些系統依賴)
yum install libffi-devel
如果提示“致命錯誤:openssl/opensslv.h:沒有那個檔案或目錄”,可參考:作者yum info openssl發現,openssl已經安裝過了,怎麼還是會缺少openssl.c的檔案呢?openssl是已經安裝二進位制的可執行程式,而這裡的安裝scrapy則需要的是openssl的原始檔程式,比如openssl.h。故這裡需要補充安裝的是openssh.h的開發版,其中包含相關的安裝原始碼檔案。在確認了問題之後,接下來就是安裝openssl-devel的安裝包了:yum install openssl-deve

由於我的系統上有兩個版本的python,系統預設是較低版本,自己安裝的是python3.5,spark中已經設定預設指向python3.5,strom中如何設定還要研究。但是個人覺得streamparse即是python的一個庫,同時又有點獨立應用的感覺(就是可以自己單獨執行,像spark下的pyspark一樣)。故而如果我將streamparse安裝在python3.5下,同時將streamparse路徑新增到環境變數。啟動streamparse應該就是使用python3.5環境來運行了。

安裝完streamparse後會在對應版本python的bin目錄下產生以下幾個檔案

 /usr/local/python35/bin/sparse
  /usr/local/python35/bin/streamparse
  /usr/local/python35/bin/streamparse_run

如果bin目錄不在系統路徑中,那麼應該把sparse 或streamparse_run(連結到linux shell可以識別的地方我把streamparse也連結過去了)。後面我們就可以在shell裡通過sparse來執行相關程式的。

Your First Project

lein安裝好,streamparse安裝好,strom安裝並啟動(nimbus和supervisor都要啟動)後,就可以啟動streamparse來執行我們的程式了。
建立專案檔案
執行

sparse quickstart wordcount

這是streamparse團隊編的一個例子,執行後會自動在當前目錄產生一個wordcount目錄,內含

wordcount/.gitignore
wordcount/config.json
wordcount/fabfile.py
wordcount/project.clj
wordcount/README.md
wordcount/src
wordcount/src/bolts
wordcount/src/bolts/__init__.py
wordcount/src/bolts/wordcount.py
wordcount/src/spouts
wordcount/src/spouts/__init__.py
wordcount/src/spouts/words.py
wordcount/topologies
wordcount/topologies/wordcount.py
wordcount/virtualenvs
wordcount/virtualenvs/wordcount.txt

其實就是建立了一個python語言的storm專案,內含config.json等配置檔案,拓撲的定義檔案,spouts的定義檔案等。
執行本地拓撲
先修改專案配置檔案,配置config.json如下:

{
    "serializer": "json",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "digger",
            "ssh_password": "",
            "nimbus": "localhost",
            "workers": ["localhost"],
            "log": {
                "path": "",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },
            "virtualenv_root": "~/tmp/wordcount/virtualenvs"
        }
    }
}

應該是隻是nimbus和workers地址設定比較重要。

cd wordcount
sparse run

需要跑到專案下執行,可能是以config.json的位置為參考;這個應該是可以設定的吧,要不執行起來太不靈活了。不過執行過程會自動將相關程式打包成jar檔案,個人感覺這個jar檔案才是關鍵,然後這個jar被提交的storm.

如果碰到下面這樣的錯誤

ValueError: Local Storm version, 1.2.2, is not the same as the version in your project.clj, 1.1.0. The versions must match.

就要修改wordcount/project.clj檔案,以及更換對應版本的storm.
我是僅修改了project.clj檔案對應strom版本號,沒有重新安裝storm。

如果碰到下面的錯誤

Caused by: java.io.IOException: Cannot run program "streamparse_run" (in directory "/tmp/b5e287fa-4eba-4097-a8d5-22e2a4911694/supervisor/stormdist/wordcount-1-1540393384/resources"): error=2, 沒有那個檔案或目錄

那可能是linux系統不認識streamparse_run,應該像sparse命令一樣,新增到shell可以識別的路徑。

機器會花一定時間來編譯JAR檔案,然後就能看到實時流的輸出了。
除了編譯還會執行一大堆的東西,包括各種檔案的複製。最終持續進行中的狀態是類似下的輸出內容不斷滾動

102540 [Thread-29] INFO  o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:23,417 - pystorm.component.count_bolt - counted [360,000] words [pid=61460]
103257 [Thread-29] INFO  o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:24,135 - pystorm.component.count_bolt - counted [363,000] words [pid=61460]
103914 [Thread-29] INFO  o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:24,791 - pystorm.component.count_bolt - counted [366,000] words [pid=61460]
104643 [Thread-29] INFO  o.a.s.t.ShellBolt - ShellLog pid:61460, name:count_bolt 2018-10-25 00:05:25,521 - pystorm.component.count_bolt - counted [369,000] words [pid=61460]

這個快速上手教程提供了一個簡單的拓撲例子,可以進一步檢視和修改。

更多命令
如果想要看sparse有哪些命令,可以sparse -h,具體的命令如下:

    jar                 Create a deployable JAR for a topology.
    kill                Kill the specified Storm topology
    list                List the currently running Storm topologies
    quickstart          Create new streamparse project template.
    remove_logs         Remove logs from Storm workers.
    run                 Run the local topology with the given arguments
    slot_usage          Display slots used by every topology on the cluster.
    stats               Display stats about running Storm topologies.
    submit              Submit a Storm topology to Nimbus.
    tail                Tail logs for specified Storm topology.
    update_virtualenv   Create or update a virtualenv on Storm workers.
    visualize           Create a Graphviz visualization of the topology
    worker_uptime       Display uptime for Storm workers.
    help                Print help information about other commands.

專案結構

File/Folder Contents
config.json Configuration information for all of your topologies.
Optional custom fabric tasks.
project.clj leiningen project file (can be used to add external JVM dependencies).
src/ Python source files (bolts/spouts/etc.) for topologies.
Optional custom invoke tasks.
topologies/ Contains topology definitions written using the Topology DSL.
virtualenvs/ Contains pip requirements files used to install dependencies on remote Storm servers.

定義拓撲結構

storm是基於shift框架的,可以用純python語言來定義拓撲結構。
下面是例子中的拓撲結構

"""
Word count topology
"""

from streamparse import Grouping, Topology

from bolts.wordcount import WordCountBolt
from spouts.words import WordSpout


class WordCount(Topology):
    word_spout = WordSpout.spec()
    count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')},
                                    par=2)

count_bolt 告訴storm,輸入的元組是按單詞作為域進行路由。storm提供了豐富的路由方式可供選擇,用得最多的是隨機分配和按域分配。

Spouts 和 Bolts

一般用streamparse 建立Spouts 和 Bolts的方式是將兩類檔案加入專案下的src資料夾,並更新拓撲檔案。
下面是定義傳送句子的spout 。

import itertools

from streamparse.spout import Spout


class SentenceSpout(Spout):
    outputs = ['sentence']

    def initialize(self, stormconf, context):
        self.sentences = [
            "She advised him to take a long holiday, so he immediately quit work and took a trip around the world",
            "I was very glad to get a present from her",
            "He will be here in half an hour",
            "She saw him eating a sandwich",
        ]
        self.sentences = itertools.cycle(self.sentences)

    def next_tuple(self):
        sentence = next(self.sentences)
        self.emit([sentence])

    def ack(self, tup_id):
        pass  # if a tuple is processed properly, do nothing

    def fail(self, tup_id):
        pass  # if a tuple fails to process, do nothing

一旦spout 進入主程式,streamparse 就是呼叫initialize() 方法,初始化完成後streamparse就會不斷地呼叫next_tuple()方法。在這個函式中會不斷髮送元組,而通過拓撲的設定,對應的bolt就能接收到這些元組。
下面是一個bolt 的例子,它接收句子,並將它拆分成單詞。

import re

from streamparse.bolt import Bolt

class SentenceSplitterBolt(Bolt):
    outputs = ['word']

    def process(self, tup):
        sentence = tup.values[0]  # extract the sentence
        sentence = re.sub(r"[,.;!\?]", "", sentence)  # get rid of punctuation
        words = [[word.strip()] for word in sentence.split(" ") if word.strip()]
        if not words:
            # no words to process in the sentence, fail the tuple
            self.fail(tup)
            return

        for word in words:
            self.emit([word])
        # tuple acknowledgement is handled automatically

bolt 執行的操作更簡單,這裡只是簡單的覆蓋process()方法,當由spout 或其他bolt傳送的輸入流到達的時候streamparse 就會呼叫process()方法來進行相應處理。在這裡你可以定義自己的處理邏輯,並將結果傳送給下游。
如果呼叫process()時發生意外,那麼streamparse 會先丟掉當前元組資訊,然後再殺死python程序。

Failed Tuples

Bolt Configuration Options

Handling Tick Tuples

如何接收外部資料輸入和輸出資料到外部系統

Spouts 和 Bolts是我們資料處理的核心。但是之前介紹Spouts都是手動製造的資料 ,而介紹Bolt時只介紹了一個簡單的資料處理方式和傳送機制,並沒有真正傳送到外部系統(預設是到標準輸出用螢幕列印?)

外部資料輸入


這是一個混合了java和python例子,資料輸入部分是用java從kafka讀取資料實現的spout,資料輸出是由python寫的bolt實現。

PixelSpout.java

package pixelcount.spouts;

import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.spout.SchemeAsMultiScheme;

public class PixelSpout extends KafkaSpout {

	public PixelSpout(SpoutConfig spoutConf) {
		super(spoutConf);
	}

	public PixelSpout() {
		this(PixelSpout.defaultSpoutConfig());
	}

	public static SpoutConfig defaultSpoutConfig() {
		ZkHosts hosts = new ZkHosts("streamparse-box:2181", "/brokers");
		SpoutConfig spoutConf = new SpoutConfig(hosts, "pixels", "/kafka_storm", "pixel_reader");
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		spoutConf.forceFromStart = true;
		return spoutConf;
	}
}

如何執行在python環境:
第一種方法是把這個java寫的spout當成一個後續資料接入的配置檔案,我們只要修改裡面kafka相關的內容,就可以實現不同場景的移植。這樣即使我不會java我也可以完成自己的專案。
第二種方法是用python重寫,spout的結構我們是知道的,用python寫的spout的也見過,只是之前是自己造的資料來源;如果直接在spout.py檔案中讀入kafka資料,並處理這些資料,處理結果給spout的相關介面,應該也是可以。有時間可以試一下。
整個用streamparse操作storm跟用pyspark-streaming處理流式資料很像,程式都要提交給一個框架。

結果給出的外部系統

from collections import Counter

from redis import StrictRedis

from streamparse import Bolt


class WordCountBolt(Bolt):
    outputs = ['word', 'count']

    def initialize(self, conf, ctx):
        self.counter = Counter()
        self.total = 0

    def _increment(self, word, inc_by):
        self.counter[word] += inc_by
        self.total += inc_by

    def process(self, tup):
        word = tup.values[0]
        self._increment(word, 10 if word == "dog" else 1)
        if self.total % 1000 == 0:
            self.logger.info("counted %i words", self.total)
        self.emit([word, self.counter[word]])


class RedisWordCountBolt(Bolt):
    outputs = ['word', 'count']

    def initialize(self, conf, ctx):
        self.redis = StrictRedis()
        self.total = 0

    def _increment(self, word, inc_by):
        self.total += inc_by
        return self.redis.zincrby("words", word, inc_by)

    def process(self, tup):
        word = tup.values[0]
        count = self._increment(word, 10 if word == "dog" else 1)
        if self.total % 1000 == 0:
            self.logger.info("counted %i words", self.total)
self.emit([word, count])

其中的initialize()和_increment()函式就是定義資料輸出到redis的。process()是資料處理過程。

Remote Deployment

Setting up a Storm Cluster

Submit

Disabling & Configuring Virtualenv Creation

Using unofficial versions of Storm

Local Clusters

Setting Submit Options in config.json

Logging