1. 程式人生 > >Storm之——程式設計案例

Storm之——程式設計案例

一、程式需求

今天,我們再次為大家帶來一篇關於Storm的文章,以便為大家起到複習Storm的效果。這篇文章的程式設計案例基於Maven實現,主要的功能是:從檔案讀取內容——>切分單詞,去掉首尾空格並將單詞轉化為小寫——>統計單詞數量並列印結果。

好了,明確了程式要實現的功能之後,我們就正式進入Storm的開發。

二、程式實現

1、建立工程

首先我們建立一個Maven工程,編譯pom.xml檔案如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.lyz</groupId>
	<artifactId>storm-test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<compilerVersion>1.7</compilerVersion>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<repositories>
		<!-- Repository where we can found the storm dependencies -->
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
	</repositories>
	<dependencies>
		<!-- Storm Dependency -->
		<dependency>
			<groupId>storm</groupId>
			<artifactId>storm</artifactId>
			<version>0.6.0</version>
		</dependency>
	</dependencies>
</project>

工程的目錄結構如下:


2、編寫Spout類WordReader

這個類的主要作用是負責從檔案按行讀取文字,並把文字行提供給第一個bolt。

主要程式碼如下:

package com.lyz.storm.spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 *WordReader負責從檔案按行讀取文字,並把文字行提供給第一個bolt。
 * @author liuyazhuang
 *
 */
public class WordReader implements IRichSpout {

	private static final long serialVersionUID = -850307559130820088L;

	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;
	private TopologyContext context;
	@Override
	public boolean isDistributed() {
		return false;
	}
	@Override
	public void ack(Object msgId) {
		System.out.println("OK:" + msgId);
	}
	@Override
	public void close() {
	}
	@Override
	public void fail(Object msgId) {
		System.out.println("FAIL:" + msgId);
	}

	/**
	 * 這個方法做的惟一一件事情就是分發檔案中的文字行
	 */
	@Override
	public void nextTuple() {
		/**
		 * 這個方法會不斷的被呼叫,直到整個檔案都讀完了,我們將等待並返回。
		 */
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// 什麼也不做
			}
			return;
		}
		String str;
		// 建立reader
		BufferedReader reader = new BufferedReader(fileReader);
		try {
			// 讀所有文字行
			while ((str = reader.readLine()) != null) {
				/**
				 * 按行釋出一個新值
				 */
				this.collector.emit(new Values(str), str);
			}
		} catch (Exception e) {
			throw new RuntimeException("Error reading tuple", e);
		} finally {
			completed = true;
		}
	}

	/**
	 * 我們將建立一個檔案並維持一個collector物件
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		try {
			this.context = context;
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");
		}
		this.collector = collector;
	}

	/**
	 * 宣告輸入域"word"
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}

}

3、編寫第一個bolt類WordNormalizer

這個類的主要作用是:負責得到並標準化每行文字。它把文字行切分成單詞,大寫轉化成小寫,去掉頭尾空白符,並將結果傳送給第二個bolt類。

具體程式碼如下:

package com.lyz.storm.bolts;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 負責得到並標準化每行文字。它把文字行切分成單詞,大寫轉化成小寫,去掉頭尾空白符。
 * @author liuyazhuang
 *
 */
public class WordNormalizer implements IRichBolt {
	private static final long serialVersionUID = -2127001114476106896L;
	
	private OutputCollector collector;
	
	@Override
	public void cleanup() {
		
	}

	/**
	 * *bolt*從單詞檔案接收到文字行,並標準化它。 文字行會全部轉化成小寫,並切分它,從中得到所有單詞。
	 */
	@Override
	public void execute(Tuple input) {
		String sentence = input.getString(0);
		String[] words = sentence.split(" ");
		for (String word : words) {
			word = word.trim();
			if (!word.isEmpty()) {
				word = word.toLowerCase();
				// 釋出這個單詞
				List<Tuple> a = new ArrayList<Tuple>();
				a.add(input);
				collector.emit(a, new Values(word));
			}
		}
		// 對元組做出應答
		collector.ack(input);
	}
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	/**
	 * 這個*bolt*只會釋出“word”域
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

4、編寫第二個bolt類WordCounter

這個類的主要作用是:統計每個單詞的數量並列印結果。

package com.lyz.storm.bolts;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

/**
 * 負責為單詞計數。這個拓撲結束時(cleanup()方法被呼叫時),我們將顯示每個單詞的數量。
 * 這個例子的bolt什麼也沒釋出,它把資料儲存在map裡,但是在真實的場景中可以把資料儲存到資料庫。
 * @author liuyazhuang
 *
 */
public class WordCounter implements IRichBolt{
	
	private static final long serialVersionUID = 6323893801667766697L;
	Integer id;
    String name;
    Map<String,Integer> counters;
    private OutputCollector collector;

    /**
      * 這個spout結束時(叢集關閉的時候),我們會顯示單詞數量
      */
    @Override
    public void cleanup(){
        System.out.println("-- 單詞數 【"+name+"-"+id+"】 --");
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     *  為每個單詞計數
     */
    @Override
    public void execute(Tuple input) {
        String str=input.getString(0);
        /**
         * 如果單詞尚不存在於map,我們就建立一個,如果已在,我們就為它加1
         */
        if(!counters.containsKey(str)){
        	counters.put(str,1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str,c);
        }
        //對元組作為應答
        collector.ack(input);
    }

    /**
     * 初始化
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

5、編寫執行程式的入口類TopologyMain

這個類的主要作用是:作為程式的入口,以本地模式執行。
具體程式碼如下:

package com.lyz.storm;
import com.lyz.storm.bolts.WordCounter;
import com.lyz.storm.bolts.WordNormalizer;
import com.lyz.storm.spouts.WordReader;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * 執行程式的主類,本例項以本地模式執行
 * @author liuyazhuang
 *
 */
public class TopologyMain {
	
	public static void main(String[] args) throws InterruptedException {
		// 定義拓撲
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("word-reader", new WordReader());
		builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
		builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping("word-normalizer", new Fields("word"));

		// 配置
		Config conf = new Config();
		conf.put("wordsFile", "D:/Workspaces/Hadoop/storm-test/src/main/resources/word.txt");
		conf.setDebug(false);

		// 執行拓撲
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
		Thread.sleep(1000);
		cluster.shutdown();
	}
}

6、建立word.txt檔案

在工程的src/main/resources目錄下建立word.txt檔案如下:

Storm test are great is an Storm simple application but very powerful really Storm is great great great great great great great great great great great great great great great great great great great great great great great 
至此,整個工程建立完畢。

三、執行程式

我們執行程式的入口類TopologyMain

可以看到控制檯輸出如下日誌:

1501 [Thread-24] INFO  backtype.storm.util  - Async loop interrupted!
-- 單詞數 【word-counter-3】 --
really: 1
but: 1
great: 24
an: 1
storm: 3
1504 [main] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Topologie-1-1509248619:3
1504 [main] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Topologie-1-1509248619:2
1504 [Thread-26] INFO  backtype.storm.util  - Async loop interrupted!
-- 單詞數 【word-counter-2】 --
application: 1
is: 2
are: 1
test: 1
simple: 1
powerful: 1
very: 1
1507 [main] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Topologie-1-1509248619:2
其中打印出了word.txt檔案中的每個單詞的統計數量。至此,整個應用程式編寫測試完畢。

四、溫馨提示