Storm 入門的Demo教程
Storm介紹
Storm是Twitter開源的分布式實時大數據處理框架,最早開源於github,從0.9.1版本之後,歸於Apache社區,被業界稱為實時版Hadoop。隨著越來越多的場景對Hadoop的MapReduce高延遲無法容忍,比如網站統計、推薦系統、預警系統、金融系統(高頻交易、股票)等等,大數據實時處理解決方案(流計算)的應用日趨廣泛,目前已是分布式技術領域最新爆發點,而Storm更是流計算技術中的佼佼者和主流。
Storm的核心組件
- Nimbus:即Storm的Master,負責資源分配和任務調度。一個Storm集群只有一個Nimbus。
- Supervisor:即Storm的Slave,負責接收Nimbus分配的任務,管理所有Worker,一個Supervisor節點中包含多個Worker進程。
- Worker:工作進程,每個工作進程中都有多個Task。
- Task:任務,在 Storm 集群中每個 Spout 和 Bolt 都由若幹個任務(tasks)來執行。每個任務都與一個執行線程相對應。
- Topology:計算拓撲,Storm 的拓撲是對實時計算應用邏輯的封裝,它的作用與 MapReduce 的任務(Job)很相似,區別在於 MapReduce 的一個 Job 在得到結果之後總會結束,而拓撲會一直在集群中運行,直到你手動去終止它。拓撲還可以理解成由一系列通過數據流(Stream Grouping)相互關聯的 Spout 和 Bolt 組成的的拓撲結構。
- Stream:數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分布式環境中並行創建、處理的一組元組(tuple)的無界序列。數據流可以由一種能夠表述數據流中元組的域(fields)的模式來定義。
- Spout:數據源(Spout)是拓撲中數據流的來源。一般 Spout 會從一個外部的數據源讀取元組然後將他們發送到拓撲中。根據需求的不同,Spout 既可以定義為可靠的數據源,也可以定義為不可靠的數據源。一個可靠的 Spout能夠在它發送的元組處理失敗時重新發送該元組,以確保所有的元組都能得到正確的處理;相對應的,不可靠的 Spout 就不會在元組發送之後對元組進行任何其他的處理。一個 Spout可以發送多個數據流。
- Bolt:拓撲中所有的數據處理均是由 Bolt 完成的。通過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎能夠完成任何一種數據處理需求。一個 Bolt 可以實現簡單的數據流轉換,而更復雜的數據流變換通常需要使用多個 Bolt 並通過多個步驟完成。
- Stream grouping:為拓撲中的每個 Bolt 的確定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不同任務(tasks)中劃分數據流的方式。在 Storm 中有八種內置的數據流分組方式。
- Reliability:可靠性。Storm 可以通過拓撲來確保每個發送的元組都能得到正確處理。通過跟蹤由 Spout 發出的每個元組構成的元組樹可以確定元組是否已經完成處理。每個拓撲都有一個“消息延時”參數,如果 Storm 在延時時間內沒有檢測到元組是否處理完成,就會將該元組標記為處理失敗,並會在稍後重新發送該元組。
Storm程序再Storm集群中運行的示例圖如下:
Topology
為什麽把Topology單獨提出來呢,因為Topology是我們開發程序主要的用的組件。
Topology和MapReduce很相像。
MapReduce是Map進行獲取數據,Reduce進行處理數據。
而Topology則是使用Spout獲取數據,Bolt來進行計算。
總的來說就是一個Topology由一個或者多個的Spout和Bolt組成。
具體流程是怎麽走,可以通過查看下面這張圖來進行了解。
示例圖:
註:圖片來源http://www.tianshouzhi.com/api/tutorials/storm/52。
圖片有三種模式,解釋如下:
第一種比較簡單,就是由一個Spout獲取數據,然後交給一個Bolt進行處理;
第二種稍微復雜點,由一個Spout獲取數據,然後交給一個Bolt進行處理一部分,然後在交給下一個Bolt進行處理其他部分。
第三種則比較復雜,一個Spout可以同時發送數據到多個Bolt,而一個Bolt也可以接受多個Spout或多個Bolt,最終形成多個數據流。但是這種數據流必須是有方向的,有起點和終點,不然會造成死循環,數據永遠也處理不完。就是Spout發給Bolt1,Bolt1發給Bolt2,Bolt2又發給了Bolt1,最終形成了一個環狀。
Storm 集群安裝
之前已經寫過了,這裏就不在說明了。
博客地址:http://www.panchengming.com/2018/01/26/pancm70/
Storm Hello World
前面講了一些Storm概念,可能在理解上不太清楚,那麽這裏我們就用一個Hello World代碼示例來體驗下Storm運作的流程吧。
環境準備
在進行代碼開發之前,首先得做好相關的準備。
本項目是使用Maven構建的,使用Storm的版本為1.1.1。
Maven的相關依賴如下:
<!--storm相關jar -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
具體流程
在寫代碼的時候,我們先來明確要用Storm做什麽。
那麽第一個程序,就簡單的輸出下信息。
具體步驟如下:
- 啟動topology,設置好Spout和Bolt。
- 將Spout獲取的數據傳遞給Bolt。
- Bolt接受Spout的數據進行打印。
Spout
那麽首先開始編寫Spout類。一般是實現 IRichSpout 或繼承BaseRichSpout該類,然後實現該方法。
這裏我們繼承BaseRichSpout這個類,該類需要實現這幾個主要的方法:
一、open
open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。
有三個參數,它們的作用分別是:
- Storm配置的Map;
- topology中組件的信息;
- 發射tuple的方法;
代碼示例:
@Override
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
System.out.println("open:"+map.get("test"));
this.collector = collector;
}
二、nextTuple
nextTuple()方法是Spout實現的核心。
也就是主要執行方法,用於輸出信息,通過collector.emit
方法發射。
這裏我們的數據信息已經寫死了,所以這裏我們就直接將數據進行發送。
這裏設置只發送兩次。
代碼示例:
@Override
public void nextTuple() {
if(count<=2){
System.out.println("第"+count+"次開始發送數據...");
this.collector.emit(new Values(message));
}
count++;
}
三、declareOutputFields
declareOutputFields是在IComponent接口中定義,用於聲明數據格式。
即輸出的一個Tuple中,包含幾個字段。
因為這裏我們只發射一個,所以就指定一個。如果是多個,則用逗號隔開。
代碼示例:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("定義格式...");
declarer.declare(new Fields(field));
}
四、ack
ack是在ISpout接口中定義,用於表示Tuple處理成功。
代碼示例:
@Override
public void ack(Object obj) {
System.out.println("ack:"+obj);
}
五、fail
fail是在ISpout接口中定義,用於表示Tuple處理失敗。
代碼示例:
@Override
public void fail(Object obj) {
System.out.println("失敗:"+obj);
}
六、close
close是在ISpout接口中定義,用於表示Topology停止。
代碼示例:
@Override
public void close() {
System.out.println("關閉...");
}
至於還有其他的,這裏就不在一一列舉了。
Bolt
Bolt是用於處理數據的組件,主要是由execute方法來進行實現。一般來說需要實現 IRichBolt 或繼承BaseRichBolt該類,然後實現其方法。
需要實現方法如下:
一、prepare
在Bolt啟動前執行,提供Bolt啟動環境配置的入口。
參數基本和Sqout一樣。
一般對於不可序列化的對象進行實例化。
這裏的我們就簡單的打印下
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
System.out.println("prepare:"+map.get("test"));
this.collector=collector;
}
註:如果是可以序列化的對象,那麽最好是使用構造函數。
二、execute
execute()方法是Bolt實現的核心。
也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
從tuple中獲取消息可以使用 tuple.getString()
和tuple.getStringByField();
這兩個方法。個人推薦第二種,可以通過field來指定接收的消息。
註:如果繼承的是IRichBolt,則需要手動ack。這裏就不用了,BaseRichBolt會自動幫我們應答。
代碼示例:
@Override
public void execute(Tuple tuple) {
// String msg=tuple.getString(0);
String msg=tuple.getStringByField("test");
//這裏我們就不做消息的處理,只打印
System.out.println("Bolt第"+count+"接受的消息:"+msg);
count++;
/**
*
* 沒次調用處理一個輸入的tuple,所有的tuple都必須在一定時間內應答。
* 可以是ack或者fail。否則,spout就會重發tuple。
*/
// collector.ack(tuple);
}
三、declareOutputFields
和Spout的一樣。
因為到了這裏就不再輸出了,所以就什麽都沒寫。
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
cleanup
cleanup是IBolt接口中定義,用於釋放bolt占用的資源。
Storm在終止一個bolt之前會調用這個方法。
因為這裏沒有什麽資源需要釋放,所以就簡單的打印一句就行了。
@Override
public void cleanup() {
System.out.println("資源釋放");
}
Topology
這裏我們就是用main方法進行提交topology。
不過在提交topology之前,需要進行相應的設置。
這裏我就不一一細說了,代碼的註釋已經很詳細了。
代碼示例:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
/**
*
* Title: App
* Description:
* storm測試
* Version:1.0.0
* @author pancm
* @date 2018年3月6日
*/
public class App {
private static final String str1="test1";
private static final String str2="test2";
public static void main(String[] args) {
// TODO Auto-generated method stub
//定義一個拓撲
TopologyBuilder builder=new TopologyBuilder();
//設置一個Executeor(線程),默認一個
builder.setSpout(str1, new TestSpout());
//設置一個Executeor(線程),和一個task
builder.setBolt(str2, new TestBolt(),1).setNumTasks(1).shuffleGrouping(str1);
Config conf = new Config();
conf.put("test", "test");
try{
//運行拓撲
if(args !=null&&args.length>0){ //有參數時,表示向集群提交作業,並把第一個參數當做topology名稱
System.out.println("遠程模式");
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else{//沒有參數時,本地提交
//啟動本地模式
System.out.println("本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("111" ,conf, builder.createTopology() );
Thread.sleep(10000);
// 關閉本地集群
cluster.shutdown();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
運行該方法,輸出結果如下:
本地模式
定義格式...
open:test
第1次開始發送數據...
第2次開始發送數據...
prepare:test
Bolt第1接受的消息:這是個測試消息!
Bolt第2接受的消息:這是個測試消息!
資源釋放
關閉...
到這裏,是不是基本上對Storm的運作有些了解了呢。
這個demo達到了上述的三種模式圖中的第一種,一個Spout傳輸數據, 一個Bolt處理數據。
那麽如果我們想達到第二種模式呢,那又該如何做呢?
假如我們想統計下在一段文本中的單詞出現頻率的話,我們只需執行一下步驟就可以了。
1.首先將Spout中的message消息進行更改為數組,並依次將消息發送到TestBolt。
2.然後TestBolt將獲取的數據進行分割,將分割的數據發送到TestBolt2。
3.TestBolt2對數據進行統計,在程序關閉的時候進行打印。
4.Topology成功配置並且啟動之後,等待20秒左右,關閉程序,然後得到輸出的結果。
代碼示例如下:
Spout
用於發送消息。
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
*
* Title: TestSpout
* Description:
* 發送信息
* Version:1.0.0
* @author pancm
* @date 2018年3月6日
*/
public class TestSpout extends BaseRichSpout{
private static final long serialVersionUID = 225243592780939490L;
private SpoutOutputCollector collector;
private static final String field="word";
private int count=1;
private String[] message = {
"My nickname is xuwujing",
"My blog address is http://www.panchengming.com/",
"My interest is playing games"
};
/**
* open()方法中是在ISpout接口中定義,在Spout組件初始化時被調用。
* 有三個參數:
* 1.Storm配置的Map;
* 2.topology中組件的信息;
* 3.發射tuple的方法;
*/
@Override
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
System.out.println("open:"+map.get("test"));
this.collector = collector;
}
/**
* nextTuple()方法是Spout實現的核心。
* 也就是主要執行方法,用於輸出信息,通過collector.emit方法發射。
*/
@Override
public void nextTuple() {
if(count<=message.length){
System.out.println("第"+count +"次開始發送數據...");
this.collector.emit(new Values(message[count-1]));
}
count++;
}
/**
* declareOutputFields是在IComponent接口中定義,用於聲明數據格式。
* 即輸出的一個Tuple中,包含幾個字段。
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("定義格式...");
declarer.declare(new Fields(field));
}
/**
* 當一個Tuple處理成功時,會調用這個方法
*/
@Override
public void ack(Object obj) {
System.out.println("ack:"+obj);
}
/**
* 當Topology停止時,會調用這個方法
*/
@Override
public void close() {
System.out.println("關閉...");
}
/**
* 當一個Tuple處理失敗時,會調用這個方法
*/
@Override
public void fail(Object obj) {
System.out.println("失敗:"+obj);
}
}
TestBolt
用於分割單詞。
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
*
* Title: TestBolt
* Description:
* 對單詞進行分割
* Version:1.0.0
* @author pancm
* @date 2018年3月16日
*/
public class TestBolt extends BaseRichBolt{
/**
*
*/
private static final long serialVersionUID = 4743224635827696343L;
private OutputCollector collector;
/**
* 在Bolt啟動前執行,提供Bolt啟動環境配置的入口
* 一般對於不可序列化的對象進行實例化。
* 註:如果是可以序列化的對象,那麽最好是使用構造函數。
*/
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
System.out.println("prepare:"+map.get("test"));
this.collector=collector;
}
/**
* execute()方法是Bolt實現的核心。
* 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
*/
@Override
public void execute(Tuple tuple) {
String msg=tuple.getStringByField("word");
System.out.println("開始分割單詞:"+msg);
String[] words = msg.toLowerCase().split(" ");
for (String word : words) {
this.collector.emit(new Values(word));//向下一個bolt發射數據
}
}
/**
* 聲明數據格式
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("count"));
}
/**
* cleanup是IBolt接口中定義,用於釋放bolt占用的資源。
* Storm在終止一個bolt之前會調用這個方法。
*/
@Override
public void cleanup() {
System.out.println("TestBolt的資源釋放");
}
}
Test2Bolt
用於統計單詞出現次數。
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
/**
*
* Title: Test2Bolt
* Description:
* 統計單詞出現的次數
* Version:1.0.0
* @author pancm
* @date 2018年3月16日
*/
public class Test2Bolt extends BaseRichBolt{
/**
*
*/
private static final long serialVersionUID = 4743224635827696343L;
/**
* 保存單詞和對應的計數
*/
private HashMap<String, Integer> counts = null;
private long count=1;
/**
* 在Bolt啟動前執行,提供Bolt啟動環境配置的入口
* 一般對於不可序列化的對象進行實例化。
* 註:如果是可以序列化的對象,那麽最好是使用構造函數。
*/
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
System.out.println("prepare:"+map.get("test"));
this.counts=new HashMap<String, Integer>();
}
/**
* execute()方法是Bolt實現的核心。
* 也就是執行方法,每次Bolt從流接收一個訂閱的tuple,都會調用這個方法。
*
*/
@Override
public void execute(Tuple tuple) {
String msg=tuple.getStringByField("count");
System.out.println("第"+count+"次統計單詞出現的次數");
/**
* 如果不包含該單詞,說明在該map是第一次出現
* 否則進行加1
*/
if (!counts.containsKey(msg)) {
counts.put(msg, 1);
} else {
counts.put(msg, counts.get(msg)+1);
}
count++;
}
/**
* cleanup是IBolt接口中定義,用於釋放bolt占用的資源。
* Storm在終止一個bolt之前會調用這個方法。
*/
@Override
public void cleanup() {
System.out.println("===========開始顯示單詞數量============");
for (Map.Entry<String, Integer> entry : counts.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
System.out.println("===========結束============");
System.out.println("Test2Bolt的資源釋放");
}
/**
* 聲明數據格式
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
Topology
主程序入口。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
*
* Title: App
* Description:
* storm測試
* Version:1.0.0
* @author pancm
* @date 2018年3月6日
*/
public class App {
private static final String test_spout="test_spout";
private static final String test_bolt="test_bolt";
private static final String test2_bolt="test2_bolt";
public static void main(String[] args) {
//定義一個拓撲
TopologyBuilder builder=new TopologyBuilder();
//設置一個Executeor(線程),默認一個
builder.setSpout(test_spout, new TestSpout(),1);
//shuffleGrouping:表示是隨機分組
//設置一個Executeor(線程),和一個task
builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);
//fieldsGrouping:表示是按字段分組
//設置一個Executeor(線程),和一個task
builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count"));
Config conf = new Config();
conf.put("test", "test");
try{
//運行拓撲
if(args !=null&&args.length>0){ //有參數時,表示向集群提交作業,並把第一個參數當做topology名稱
System.out.println("運行遠程模式");
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else{//沒有參數時,本地提交
//啟動本地模式
System.out.println("運行本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Word-counts" ,conf, builder.createTopology() );
Thread.sleep(20000);
// //關閉本地集群
cluster.shutdown();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
輸出結果:
運行本地模式
定義格式...
open:test
第1次開始發送數據...
第2次開始發送數據...
第3次開始發送數據...
prepare:test
prepare:test
開始分割單詞:My nickname is xuwujing
開始分割單詞:My blog address is http://www.panchengming.com/
開始分割單詞:My interest is playing games
第1次統計單詞出現的次數
第2次統計單詞出現的次數
第3次統計單詞出現的次數
第4次統計單詞出現的次數
第5次統計單詞出現的次數
第6次統計單詞出現的次數
第7次統計單詞出現的次數
第8次統計單詞出現的次數
第9次統計單詞出現的次數
第10次統計單詞出現的次數
第11次統計單詞出現的次數
第12次統計單詞出現的次數
第13次統計單詞出現的次數
第14次統計單詞出現的次數
===========開始顯示單詞數量============
address: 1
interest: 1
nickname: 1
games: 1
is: 3
xuwujing: 1
playing: 1
my: 3
blog: 1
http://www.panchengming.com/: 1
===========結束============
Test2Bolt的資源釋放
TestBolt的資源釋放
關閉...
上述的是本地模式運行,如果想在Storm集群中進行使用,只需要將程序打包為jar,然後將程序上傳到storm集群中,
輸入:
storm jar xxx.jar xxx xxx
說明:第一個xxx是storm程序打包的包名,第二個xxx是運行主程序的路徑,第三個xxx則表示主程序輸入的參數,這個可以隨意。
如果是使用maven打包的話,則需要在pom.xml加上
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.pancm.storm.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
成功運行程序之後,可以在Storm集群的UI界面查看該程序的狀態。
到此,本文結束,謝謝閱讀!
Storm 入門的Demo教程