storm應用入門(一)
一.Storm是一種實時流計算框架
具體的表現形式可以從它的元件中看出:
Spout:資料來源
Bolt:處理點
總體來說就是Spout不斷的提供資料,而Bolt不斷的處理資料,這就形成了資料處理流。
二.下面以單詞計數為例子:
SentenceSpout(Spout,產生句子)->SplitSentenceBolt(Bolt,對句子進行切割)->WordCountBolt(Bolt,對切割的單詞進行計數)->ReportBolt(Bolt,輸出計數結果)
整個SentenceSpout->SplitSentenceBolt->WordCountBolt->ReportBolt流水線就構成了一個概念,Topology拓撲。
SentenceSpout.java
package com.zte.StormTest;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout
{
private static final long serialVersionUID = -2521640424426565301L;
private SpoutOutputCollector collector;
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework" ,
"don't have a cow man",
"i don't think i like fleas"
};
private int index = 0;
@Override
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if(index >= sentences.length)
{
index=0;
}
}
//所有Spout元件在初始化的時候呼叫這個方法
//Map包含了Storm的配置資訊
//TopologyContext提供了topology中的元件資訊,例如當前元件ID等
//SpoutOutputCollector發射tuple的方法
@Override
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
SplitSentenceBolt.java
package com.zte.StormTest;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class SplitSentenceBolt extends BaseRichBolt
{
private static final long serialVersionUID = 5516446565262406488L;
private OutputCollector collector;
@Override
public void execute(Tuple tuple)
{
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words)
{
this.collector.emit(new Values(word));
}
}
//在bolt初始化的時候呼叫,可以用來準備bolt用到的資源,例如資料庫連線等
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector)
{
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("word"));
}
}
WordCountBolt.java
package com.zte.StormTest;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountBolt extends BaseRichBolt
{
private static final long serialVersionUID = 3533537921679412895L;
private OutputCollector collector;
private HashMap<String,Long> counts = null;
@Override
public void execute(Tuple tuple)
{
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null)
{
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word,count));
System.out.println("word:"+word+" count:"+count);
}
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector)
{
this.collector = collector;
this.counts = new HashMap<String,Long>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("word","count"));
}
}
WordCountTopology.java
package com.zte.StormTest;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology
{
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception
{
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID,reportBolt).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
//本地執行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
//本地執行在關閉的時候最好加個sleep,因為關閉元件需要一些時間,才能看到計數的輸出效果
Thread.sleep(5000);
cluster.killTopology(TOPOLOGY_NAME);
Thread.sleep(30000);
cluster.shutdown();
//正式部署到storm叢集中使用StormSubmitter.submitTopology
// StormSubmitter.submitTopology(TOPOLOGY_NAME,config, builder.createTopology());
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zte.apt</groupId>
<artifactId>StormTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>StormTest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven
defaults (may be moved to parent pom) -->
<plugins>
</plugins>
</pluginManagement>
</build>
</project>
三.storm基本概念
1.Nodes(伺服器),配置在Storm叢集中的伺服器,一個叢集可以包括一個或者多個工作node
2.Workers(JVM虛擬機器,程序),指一個node上相互獨立執行的JVM程序,每個node可以配置執行一個或者多個worker,每一個worker只能繫結到一個topology
設定工作程序數,比如Config.setNumWorkers(3)
3.Executer(執行緒),指一個worker的jvm程序中執行的java執行緒,多個Task可以指派給同一個executer,預設Storm會給每一個Executer分配一個Task
設定執行緒數,比如builder.setBolt(SPLIT_BOLT_ID, splitBolt,2)
4.Task(bolt/spout例項),task是spout和bolt例項,它們的nextTuple()和executer()方法會被executor執行緒呼叫執行。
設定任務Task數builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4);
四.資料的分組策略
1.Shuffle grouping 隨機分發tuple,發出多少個,bolt所有執行緒收到的總數就是多少個
2.Fields grouping 按欄位分組,按照指定的欄位組合值進行tuple的分發,如果值相同,tuple始終分發同一個bolt
比如有在單詞計數的時候,固定的a->bolt1,b->bolt2,c->bolt3,d->bolt1.
3.All grouping 全複製分組,每一個bolt都會接收到一個tuple的副本,比如發出10個,每個bolt的都會接收到10個
4.Direct Grouping 指向性分組,資料來源(Spout/blot)會呼叫emitDirect方法來判斷一個tuple應該由哪個Storm元件來接收,只能在生命了指向型資料流上使用。
比如Spout指定xxx資料只能由TaskID=4的bolt來處理
5.Globle grouping全域性分組 所有的tuple都會發送給具有最小taskID的bolt,也就是說併發度對該設定沒有效果。
6.None grouing不分組,其實和隨機分組相同
7.CustomStreamGrouping 實現自定義分組
五.storm執行
1.在本地執行,使用LocalCluster,然後直接在eclipse中執行幾個
2.在叢集上執行,使用StormSubmitter.submitTopology,然後將工程打包,不需要將storm依賴包一起打包,然後使用以下命令執行即可:
bin/storm jar WordCount.jar com.zte.StormTest.WordCountTopology
六.storm安裝
確保環境安裝了JDK1.8
1.安裝zookeeper
下載zookeeper包,解壓
(1)先設定配置檔案
將conf目錄下的zoo_sample.cfg更名為zoo.cfg,預設埠為2181
(2)使用bin/zkServer.sh start 啟動zookeeper
2.安裝storm
解壓縮包
(1)bin目錄是啟動相關
(2)conf目錄是配置相關,其中storm.yml為配置項,裡面有包含配置zookeeper的配置項,預設為localhost
可以在
storm.zookeeper.servers:
- "storm-01.test.com(主機名或者IP,10.42.27.1)"
- "storm-02.test.com"
- "storm-03.test.com"
nimbus.seeds 可以配置主伺服器
所有配置完以後然後也是通過直接拷貝整個storm資料夾都其它的伺服器
(3)啟動主節點 bin/storm nimbus &
(4)啟動從節點 bin/storm supervisor &
(5)啟動UI介面 bin/storm ui &
(6)啟動日誌檢視程序 bin/storm logviewer &
然後使用ip:8080/index.html 訪問UI介面 192.168.1.104:8080/index.html