1. 程式人生 > >基於Storm的WordCount

基於Storm的WordCount

Storm WordCount 工作過程

Storm 版本:
1、Spout 從外部資料來源中讀取資料,隨機發送一個元組物件出去;
2、SplitBolt 接收 Spout 中輸出的元組物件,將元組中的資料切分成單詞,並將切分後的單詞發射出去;
3、WordCountBolt 接收 SplitBolt 中輸出的單詞陣列,對裡面單詞的頻率進行累加,將累加後的結果輸出。

Java 版本:
1、讀取檔案中的資料,一行一行的讀取;
2、將讀到的資料進行切割;
3、對切割後的陣列中的單詞進行計算。

Hadoop 版本:
1、按行讀取檔案中的資料;
2、在 Mapper()函式中對每一行的資料進行切割,並輸出切割後的資料陣列;

3、接收 Mapper()中輸出的資料陣列,在 Reducer()函式中對陣列中的單詞進行計算,將計算後的統計結果輸出。

原始碼

storm的配置、eclipse裡maven的配置以及建立專案部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
        //建立一個 TopologyBuilder         
        TopologyBuilder tb = new TopologyBuilder();         
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
        //建立配置         
        Config conf = new Config();         
        //設定 worker 數量         
        conf.setNumWorkers(2);         
        //提交任務         
        //叢集提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
        //本地提交         
        LocalCluster localCluster = new LocalCluster();         
        localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
} 

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 執行方法      */     
    public void execute(Tuple input) {         
        String line = input.getString(0);         
        String[] split = line.split(" ");         
        for (String word : split) {             
            collector.emit(new Values(word));         
            }     
        } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("word"));     
        } 
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>(); 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 


    /**      * 執行方法      */     
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
    Integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //測試輸出         
    System.out.println("結果:"+map);     
    } 

    /**      * 輸出      */     
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    
} 
} 

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
    /**      * 初始化方法      */     
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 重複呼叫方法      */     
    public void nextTuple() {         
        collector.emit(new Values("hello world this is a test"));     
        } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("test"));     
        } 
} 

POM.XML 檔案內容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.stormwordcount.MainClass</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的問題

基於Storm的WordCount需要eclipse安裝了maven外掛,之前的大資料實踐安裝的eclipse版本為Eclipse IDE for Eclipse Committers4.5.2,這個版本不自帶maven外掛,後續安裝失敗了幾次(網上很多的教程都已經失效),這裡分享一下我成功安裝的方法:
使用連結下載,Help->Install New SoftWare

點選Add,name輸入隨意,在location輸入下載eclipse的maven外掛,下載地址可以這樣獲取
點選連線:http://www.eclipse.org/m2e/index.html 進入網站後點擊download,拉到最下面可以看到很多eclipse maven外掛的版本和釋出時間,選在適合eclipse的版本複製連結即可。建議取消選中Contack all update sites during install to find required software(耗時太久)。

但是安裝成功後還是無法配置(這裡原因不太清楚,沒找到解決辦法),就直接上官網換成自己maven外掛的JavaEE IDE了...

後續的maven的配置這些都比較順利,第一次建立maven-archetype-quickstat專案報錯,試了網上很多辦法都還沒成功,然後開啟 Windows->Preferencs->Maven->Installation發現之前配置了的maven的安裝路徑沒了...重新配置了下就可以建立專案了。

最後執行成功的結果:

相關推薦

SOA與基於CDIF的API的聯動

網絡協議 sca 流行 大發 一致性 ice 們的 硬件 形象 幾千年來,巴別塔的故事一直是人類面對的一個核心的困境。為了交流和溝通我們人類創造出語言,但溝通與交流仍然存在障礙……相同語言之間的溝通依語境的不同,尚且存在巨大的鴻溝,

python基礎之socket編程-------基於tcp的套接字實現遠程執行命令的操作

logs lose stream res std 遠程控制 python log out 遠程實現cmd功能: import socket import subprocess phone=socket.socket(socket.AF_INET,socket.SOC

國家商用password(五)基於SM2的軟件授權碼生成及校驗

clas 信息 ecp register 方法 序列號 mod 生成 pub 將公開密鑰算法作為軟件註冊算法的優點是Cracker非常難通過跟蹤驗證算法得到註冊機。以下。將介紹使用SM2國密算法進行軟件註冊的方法。 生成授權碼 選擇SM2橢圓曲線參數(P,a,b,N,

day39-Spring 08-Spring的AOP:基於AspectJ的註解

ima spring mage 開發 技術 asp day3 cnblogs ring 基於AspectJ的註解的開發要重點掌握. day39-Spring 08-Spring的AOP:基於AspectJ的註解

從零開始——基於角色的權限管理01(補充)

itl jsp mage logs log sonar class htm -1 此博文較為詳細的介紹從零開始——基於角色的權限管理01文中的兩個部分的流程(解釋代碼)。 1)  index.jsp中提交跳轉action      action的login,獲取jsp頁面傳

Web驗證碼圖片的生成-基於Java的實現

submit esc page resp ioe 代碼 oge cnblogs pro 驗證碼圖片是由程序動態產生的,每次訪問的內容都是隨機的。那麽如何采用程序動態產生圖片,並能夠顯示在客戶端頁面中呢?原理很簡單,對於java而言,我們首先開發一個Servlet,這個Se

基於OpenGL編寫一個簡易的2D渲染框架-04 繪制圖片

著色器 drawtext 結構 渲染 images ron renderer make 制圖 閱讀文章前需要了解的知識,紋理:https://learnopengl-cn.github.io/01%20Getting%20started/06%20Textures/  

聊天程序(基於Socket、Thread)

客戶端信息 -- 聊天程序 soc 數組 net 運行 人的 圖解 聊天程序簡述 1、目的:主要是為了闡述Socket,以及應用多線程,本文側重Socket相關網路編程的闡述。如果您對多線程不了解,大家可以看下我的上一篇博文淺解多線程 。 2、功能:此聊天程序功能實現了服

day39-Spring 11-Spring的AOP:基於AspectJ的XML配置方式

asp 技術 mage bsp aop src xml配置方式 img aspectj day39-Spring 11-Spring的AOP:基於AspectJ的XML配置方式

基於upd的socketserver,即udp的多線程

rip strip() pri socket send while handle for hand 服務端 #udp服務端多進程import socketserverclass My_server(socketserver.BaseRequestHandler): d

基於udp的socket

nco utf utf-8 decode dcl 收發信息 是否 客戶端 用戶 服務端 #udp叫用戶數據報協議,它不會出現粘包形象,但會出現信息缺失#udp服務端單進程‘‘‘import socketudpserver =socket.socket(socket.AF_I

基於tcp的socketserver,即tcp的多線程

握手 for 數據 ket 函數 線程 listen utf-8 thread tcp是數據流式的,它的收發信息,是通過管道的.在進行鏈接時,必須雙方同時答應,故有三次握手,四次斷開機制服務端import socketserver #用於進行都並發,即服務端能同時接收多個

Asp.net基於session實現購物車的方法

lai 程序 clas contain ext info border mode man 本文實例講述了asp.net基於session實現購物車的方法。分享給大家供大家參考,具體如下: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 1

2.3 基於寬度優先搜索的網頁爬蟲原理講解

什麽 每一個 empty 目錄 except open 要求 and ref 上一節我們下載並使用了寬度優先的爬蟲,這一節我們來具體看一下這個爬蟲的原理。 首先,查看HTML.py的源代碼。 第一個函數: def get_html(url): try:

基於矢量數據的逆地理編碼功能實現

逆地理編碼 地圖矢量數據 地圖地物搜索 地圖範圍搜索 地圖框選搜索 地理編碼和逆地理編碼概述地理編碼,是指將大家熟悉的地址文字描述,轉換為經緯度,如輸入某某省、某某市、某某路,在地圖上根據經緯度定位到輸入的地圖上的某個點,直觀的顯示所在位置。逆地理編碼是指將經緯度轉化為大家都能看懂文字描述

簡單的登錄基於代碼的

上帝 logs password 賬號密碼 com 購物平臺 .com class usr 1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import getpass 5 print(‘歡迎來到不知

基於OpenGL編寫一個簡易的2D渲染框架-05 渲染文本

new 坐標 false 證明 ont 獲取 simple 了解 param 閱讀文章前需要了解的知識:文本渲染 https://learnopengl-cn.github.io/06%20In%20Practice/02%20Text%20Rendering/ 簡要步

基於Cocos2dx + box2d 實現的憤慨的小鳥Demo

space 程序 box nbsp 源碼 source span cocos2 lan 1. Demo初始界面 2. 遊戲界面 3. 精確碰撞檢測 4. 下載 壓縮文件文件夾 AngryBird source 憤慨的小鳥Demo源碼,基於C

擼代碼--linux進程通信(基於共享內存)

-- log pac 字符指針 clas fcn eno csdn printf 1.實現親緣關系進程的通信,父寫子讀 思路分析:1)首先我們須要創建一個共享內存。 2)父子進程的創建要用到fork函數。fork函數創建後,兩

pyDash:一個基於 web 的 Linux 性能監測工具

亮顯 依賴關系 stat 運行 tld 下一個 google avi 查看 pyDash 是一個輕量且基於 web 的 Linux 性能監測工具,它是用 Python 和 Django 加上 Chart.js 來寫的。經測試,在下面這些主流 Linux 發行版上可運行:Ce