1. 程式人生 > >storm七之storm java示例

storm七之storm java示例

通過前面6個章節,我們大致瞭解apache storm的核心細節了,現在我們開始寫一些簡單的程式碼,來感受下storm的魅力。

場景——移動呼叫日誌分析

行動電話呼叫號及其持續時間將作為Apache stormd輸入,storm根據撥號方和接收方之間的電話號碼以及通話次數進行分組

Spout Creation

Spoutstorm用於資料生成一個元件,。

通常,Spout實現一個IRichSpout介面。

IRichSpout介面有以下重要的方法

1.open−提供Spout以及spout執行環境。executors執行這個方法來初始化spout

2.nextTuple

通過收集器傳送產生的資料

3.close關閉Spout時呼叫close方法

4.declareOutputFields宣告輸出元組schema

5.ack處理特定的元組

6.fail指定一個特定的不處理和再加工元組。

open

open方法簽名如下:

open(map conf,TopologyContext context,SpoutOutputCollectorcollector)

引數解析:

confSpout提供storm配置。

context:topology中提供Spout的完整資訊,包括:任務id,輸入輸出資訊

collector:保證我們傳送的資料能被

bolt處理

nextTuple

nextTuple方法簽名如下:

nextTuple()

nextTuple()定期方法定期的被相同迴圈中的ack()方法和fail()方法呼叫

當沒有工作要做的時候必須釋放執行緒,以保證其他方法有機會被呼叫。

因此,nextTuple首先要檢查處理是否已經完成。

如果完成,在結果返回之前,為了降低處理器的負載,該執行緒應該至少睡1毫秒。

close

Close方法簽名如下:

close()

declareOutputFields

declareOutputFields方法前面如下:

declareOutputFields(OutputFieldsDeclarer declarer)

引數說明:

Declarer宣告輸出流ids,輸出欄位,等等。

這個方法用於指定tuple輸出的shema

ack

ack方法的簽名如下:

ack(Object msgId)

這個方法表明指定的tuple已經被處理過。

fail

fail方法簽名如下:

fail(Object msgId)

表明spout傳送出的資料並沒有被完全處理,storm會重新處理這個資料。

FakeCallLogReaderSpout

現在我們要收集手機日誌的詳細資訊,包含:

1.主叫號碼

2.被叫號碼

3.通話時長

因為我們沒有實時通話記錄資訊,那麼我們就自己模擬通話記錄。

Random隨機類產生模擬的通話資訊。

完整的程式程式碼如下所示。

Coding FakeCallLogReaderSpout.java

import java.util.*;

//import storm tuple packages

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

//import Spout interface packages

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface

   to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {

   //Create instance for SpoutOutputCollector which passes tuples to bolt.

   private SpoutOutputCollector collector;

   private boolean completed = false;

   //Create instance for TopologyContext which contains topology data.

   private TopologyContext context;

   //Create instance for Random class.

   private Random randomGenerator = new Random();

   private Integer idx = 0;

   @Override

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

      this.context = context;

      this.collector = collector;

   }

   @Override

   public void nextTuple() {

      if(this.idx <= 1000) {

         List<String> mobileNumbers = new ArrayList<String>();

         mobileNumbers.add("1234123401");

         mobileNumbers.add("1234123402");

         mobileNumbers.add("1234123403");

         mobileNumbers.add("1234123404");

         Integer localIdx = 0;

         while(localIdx++ < 100 && this.idx++ < 1000) {

            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            while(fromMobileNumber == toMobileNumber) {

               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            }

            Integer duration = randomGenerator.nextInt(60);

            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));

         }

      }

   }

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("from", "to", "duration"));

   }

   //Override all the interface methods

   @Override

   public void close() {}

   public boolean isDistributed() {

      return false;

   }

   @Override

   public void activate() {}

   @Override

   public void deactivate() {}

   @Override

   public void ack(Object msgId) {}

   @Override

   public void fail(Object msgId) {}

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

Bolt Creation

Bolt是一個以元組作為輸入處理元組產生新的元組作為輸出的元件

Bolt通常需要實現IRichBolt介面。

在這個程式中,兩個boltCallLogCreatorBolt和CallLogCounterBolt用來執行的操作處理

IRichBolt介面有如下方法:

1.prepare 準備−提供bolt以及bolt執行環境。

executors會執行這個方法去初始化bolt

2.execute處理輸入的單個tuple

3.cleanup 要關閉bolt時被呼叫

4.declareOutputFields宣告輸出元組schema

Prepare

prepare方法的簽名如下:

prepare(Map conf, TopologyContext context, OutputCollector collector)

引數說明:

Conf:為bolt提供配置

Context:topology提供完整bolt位置資訊,包括它的任務id,輸入和輸出資訊等。

Collector:保證處理過的tuple能被髮送出去。

Execute

execute方法簽名如下:

execute(Tuple tuple)//這裡的tuple是將被處理的輸入tuple

execute方法每次處理一個元組。

通過tuple的getValue方法訪問元組的資料。

輸入元組不是必須立即處理的,可以過一會再處理

可以處理多個元組處理後產生單個tuple作為輸出tuple

處理過的tuple可以使用OutputCollector類傳送出去

Cleanup

cleanup方法簽名如下:

Cleanup()

declareOutputFields

declareOutputFields方法簽名如下:

declareOutputFields(OutputFieldsDeclarer declarer)//這裡的declarer用來宣告輸出流的ids,輸出欄位等資訊

這個方法用來指定tuple的輸出shema

Call log Creator Bolt

Call log creator bolt 接收通話日誌tuple

通話日誌tuple包含主叫號碼,被叫號碼和通話時長

This bolt simply creates a new value by combining the caller number and the receiver number.

這個螺栓簡單地建立了一個新值通過呼叫者數量和接收方號碼。

格式化後的新值欄位叫call,格式是Caller number – Receiver number

完整的程式碼如下所示:

//import util packages

import java.util.HashMap;

import java.util.Map;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface

public class CallLogCreatorBolt implements IRichBolt {

   //Create instance for OutputCollector which collects and emits tuples to produce output

   private OutputCollector collector;

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;

   }

   @Override

   public void execute(Tuple tuple) {

      String from = tuple.getString(0);

      String to = tuple.getString(1);

      Integer duration = tuple.getInteger(2);

      collector.emit(new Values(from + " - " + to, duration));

   }

   @Override

   public void cleanup() {}

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call", "duration"));

   }

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

Call log Counter Bolt

Call log counter bolt 接收上一個bolt call及其持續時間作為一個tuple

boltprepare方法中初始化一個字典(Map)物件。

execute 方法,它檢查字典中的tuple併為tuple中的每一個新的call”值建立一個條目entry,並設定字典的值為1

對於字典中現有的條目,則將其值+1

簡而言之,這個bolt在字典中儲存call和它的數量

如果不儲存在字典中,我們也可以把它儲存懂啊一個數據源中。

而不是儲存呼叫及其計數在字典裡,我們也可以將它儲存到一個數據源。

完整的程式程式碼如下

Coding − CallLogCounterBolt.java

import java.util.HashMap;

import java.util.Map;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {

   Map<String, Integer> counterMap;

   private OutputCollector collector;

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.counterMap = new HashMap<String, Integer>();

      this.collector = collector;

   }

   @Override

   public void execute(Tuple tuple) {

      String call = tuple.getString(0);

      Integer duration = tuple.getInteger(1);

      if(!counterMap.containsKey(call)){

         counterMap.put(call, 1);

      }else{

         Integer c = counterMap.get(call) + 1;

         counterMap.put(call, c);

      }

      collector.ack(tuple);

   }

   @Override

   public void cleanup() {

      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

         System.out.println(entry.getKey()+" : " + entry.getValue());

      }

   }

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call"));

   }

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

Creating Topology

通常stormtopology是一個Thrift結構。

TopologyBuilder類提供了簡單易用的方法來建立複雜的topology

TopologyBuilder類提供了方法來設spout(setSpout)和bolt(setBolt)。

總之,TopologyBuilder createTopology建立topology

下面的程式碼片段為建立topology的事例:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()).fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping和fieldsGrouping方法幫助spoutboltstream進行分組。

Local Cluster

為了便於開發,我們可以使用“LocalCluster”物件建立一個本地叢集,然後使用“LocalCluster”類的“submitTopology”方法提交topology

其中,“submitTopology”的引數之一是Config”類的一個例項。

Config”類的作用是提交topology之前設定配置選項。

This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method.

這種配置選項將合併在執行時間和傳送到所有叢集配置任務(壺嘴和螺栓)的準備方法。

一旦topology提交到叢集,我們需要等待10秒以便叢集計算提交topology,然後使用shutdown方法關閉叢集。

完整的程式程式碼如下

Coding − LogAnalyserStorm.java

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

//import storm configuration packages

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.

public class LogAnalyserStorm {

   public static void main(String[] args) throws Exception{

      //Create Config instance for cluster configuration

      Config config = new Config();

      config.setDebug(true);

      //

      TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())

         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())

         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());

      Thread.sleep(10000);

      //Stop the topology

      cluster.shutdown();

   }

}

Building and Running the Application

完整的應用程式有四個Java程式碼

1.FakeCallLogReaderSpout.java2.CallLogCreaterBolt.java3.CallLogCounterBolt.java4.LogAnalyerStorm.java

應用程式可以使用下面的命令構建:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

應用程式可以使用以下的命令執行:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:.LogAnalyserStorm

Output

一旦應用程式啟動,它將輸出完整的叢集啟動程序的細節,poutbolt處理過程,最後,叢集關閉這些處理過程

 "CallLogCounterBolt"程式碼中,我們列印了callcount的具體資訊。

這些資訊將顯示在控制檯如下

1234123402 - 1234123401 : 781234123402 - 1234123404 : 881234123402 - 1234123403 : 1051234123401 - 1234123404 : 741234123401 - 1234123403 : 811234123401 - 1234123402 : 811234123403 - 1234123404 : 861234123404 - 1234123401 : 631234123404 - 1234123402 : 821234123403 - 1234123402 : 831234123404 - 1234123403 : 861234123403 - 1234123401 : 93

JVM外的其他語言

Storm topology通過Thrift介面實現,這使得很容易任何語言去提交topologystorm叢集中

Storm支援Ruby、Python和許多其他語言。

讓我們看看使用python事例:

Python Binding

Python是一種解釋,互動的、面向物件的高階程式語言。

Storm支援Python實現其topology

Python支援 emitting, anchoring, acking, and logging operations

如你所知,bolt可以使用任何語言定義

Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout.

下面來看一個用python編寫的bolt來計算單詞出現次數的事例:

public static class WordCount implements IRichBolt {public WordSplit() {super("python", "splitword.py");}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py".

現在建立一個名為“splitword.py”的python實現。

import stormclass WordCountBolt(storm.BasicBolt):def process(self, tup):words = tup.values[0].split(" ")for word in words:storm.emit([word])WordCountBolt().run()

這是Python實現計數的示例。

同樣你也可以其他支援語言實現

相關推薦

stormstorm java示例

通過前面6個章節,我們大致瞭解apache storm的核心細節了,現在我們開始寫一些簡單的程式碼,來感受下storm的魅力。 場景——移動呼叫日誌分析 行動電話呼叫號及其持續時間將作為Apache stormd的輸入流,storm將根據撥號方和接收方之間的電話號碼以及通話

大數據入門第十天——storm上遊數據源 kafka詳解(一)入門

不同 這也 接受 blog 存儲 發送 records ant post 一、概述   1.kafka是什麽     根據標題可以有個概念:kafka是storm的上遊數據源之一,也是一對經典的組合,就像郭德綱和於謙     根據官網:http://kafka.apa

Storm解讀路(二、基本 Java-API 篇)

寫這些東西其實本質上是記錄因工作接觸 Storm 之後的學習進度,既然是工作,當然要敲程式碼,所以這一篇就分享下基本 Java-API 吧。 首先看下面的圖(畫圖不行見諒),這是 Storm API 使用中最基本的介面和抽象類關係。 OK,這裡我們可以

轉: 【Java並發編程】:深入Java內存模型—內存操作規則總結

tle 沒有 article 類型 javase 感知 執行引擎 要求 lock 轉載請註明出處:http://blog.csdn.net/ns_code/article/details/17377197 主內存與工作內存 Java內存模型的主要目標是定義程序中

HadoopStorm基礎

1、離線計算是什麼   離線計算:批量獲取資料,批量傳輸資料,週期性批量計算資料,資料展示   代表技術:sqoop批量匯入資料,hdfs批量儲存資料,mapreduce批量計算資料,hive批量計算資料,***任務排程 2、流式就算是什麼   流式計算:資料實時產生,資料實時傳輸,資料實時計算,實時展示

j ava程式設計師從笨鳥到菜鳥)一—java資料庫操作

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Storm案例詞頻統計

1.案例需求 在本地模式下使用Storm實現統計指定檔案中的詞頻個數統計 2.需求分析 Spout來讀取指定檔案的資料,並把每一行資料傳送出去 Bolt來實現具體邏輯,單詞分割和統計 將結果輸出到控制檯 Spout——>Bolt——>Bolt 3.匯入Stor

Storm案例自增數字求和

1.案例需求 實現自增數字相加的和 1+2+3+4+5+6+........ 2.需求分析 Spout來發送數字作為input 使用Bolt來實現求和邏輯 將結果輸出到控制檯 3.匯入Storm的pom依賴 <dependency> <gro

Storm()Storm整合kafka

使用kafka-client jar進行Storm Apache Kafka整合 這包括新的Apache Kafka消費者API。相容性 Apache Kafka版本0.10起 引入jar包 &

大資料storm(一) --- storm簡介,核心元件,工作流程,安裝和部署,電話通訊案例分析,叢集執行,單詞統計案例分析,調整併發度

一、storm簡介 --------------------------------------------------------- 1.開源,分散式,實時計算 2.實時可靠的處理無限資料流,可以使用任何語言開發 3.適用於實時分析,線上機器學習

深入理解Java虛擬機器()Java記憶體模型

深入理解Java虛擬機器系列文章 Java記憶體模型規定了所有的變數都儲存在主記憶體,每個執行緒都有自己的工作記憶體,執行緒中的工作記憶體儲存了被該執行緒使用到的變數的主記憶體的副本拷貝。執行緒對變

StormPartialKeyGrouping關鍵字分組

一、概述        這種方式與按欄位分組很相似,根據指定欄位的值進行分組,不同的是,這種方式會考慮下游 bolt 資料處理的均衡性問題,在輸入資料來源關鍵字不平衡時會有更好的效能。  二、程式碼 1.Spout package

StormDirectGrouping指向型分組

一、概述        這種方式傳送者可以指定下游的哪個任務可以接收這個元組。只有在資料流被宣告為直接資料流時才能夠使用直接分組方式。使用直接資料流傳送元組需要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 可以

StormNoneGrouping不分組

一、概述       在功能上和隨機分組相同,為將來預留的。 二、程式碼 1.Spout package com.test.csdn.nogrouping; import org.apache.storm.spout.SpoutOutputColl

StormAllGrouping全複製分組

一、概述        將所有的 tuple 複製後分發給所有 bolt task。每個訂閱資料流的 task 都會收到 tuple 的拷貝。 二、程式碼 Spout package com.test.csdn.allgroupi

StormGlobalGrouping全域性分組

一、概述        全域性分組方式將所有的 tuples 路由到唯一一個 task 上。Storm選取最小的 task ID 來選取接收資料的 task。注意,當使用全域性分組時,設定 bolt 的 task 併發度是沒有意義的,因為所有 tupl

OpenLayers官方示例詳解圖層的最小、最大解析度(Layer Min/Max Resolution)

目錄 一、示例簡介 二、程式碼詳解 一、示例簡介     這個示例載入了一個MapBox的瓦片圖層和一個Open Street Map的瓦片圖層,同時使用最小、最大解析度限制圖層載入的比例級別。     使用滑鼠放大兩次:MapBox圖

大資料處理框架:Storm + Kafka + zookeeper 叢集

Storm kafka zookeeper 叢集 我們知道storm的作用主要是進行流式計算,對於源源不斷的均勻資料流流入處理是非常有效的,而現實生活中大部分場景並不是均勻的資料流,而是時而多時而少的資料流入,這種情況下顯然用批量處理是不合適的,如果使用storm做實時計算的話可能因為資

學習筆記Java核心技術卷I》---- 第章 異常、斷言和日誌

異常物件都是派生與Throwable的一個例項 派生於Error類或RuntimeException類的所有異常稱為非受查異常,所有其他異常稱為受查異常 一個方法必須宣告所有可能丟擲的受查異常,而非受查異常要麼不可控制,要麼就應該避免發生 關鍵字throws位於方法之

Storm——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇)

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇) 2018年03月04日 23:05:29 冰 河 閱讀數:1602更多 所屬專欄: Hadoop生態 版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https:/