Apache Storm 官方文件 —— Storm 與 Kestrel
本文說明了如何使用 Storm 從 Kestrel 叢集中消費資料。
前言
Storm
本教程中使用了 storm-kestrel 專案和 storm-starter 專案中的例子。建議讀者將這幾個專案 clone 到本地,並動手執行其中的例子。
Kestrel
本文假定讀者可以如此專案所述在本地執行一個 Kestrel 叢集。
Kestrel 伺服器與佇列
Kestrel 服務中包含有一組訊息佇列。Kestrel 佇列是一種非常簡單的訊息佇列,可以運行於 JVM 上,並使用 memcache 協議(以及一些擴充套件)與客戶端互動。詳情可以參考 storm-kestrel
每個佇列均嚴格遵循先入先出的規則。為了提高服務效能,資料都是快取在系統記憶體中的;不過,只有開頭的 128MB 是儲存在記憶體中的。在服務停止的時候,佇列的狀態會儲存到一個日誌檔案中。
請參閱此文瞭解更多詳細資訊。
Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特點。
例如,Twitter 就使用 Kestrel 作為訊息系統的核心環節,此文中介紹了相關資訊。
** 向 Kestrel 中新增資料
首先,我們需要一個可以向 Kestrel 的佇列新增資料的程式。下述方法使用了 storm-kestrel 專案中的 KestrelClient
private static void queueSentenceItems(KestrelClient kestrelClient, String queueName) throws ParseError, IOException { String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; Random _rand = new Random(); for(int i=1; i<=10; i++){ String sentence = sentences[_rand.nextInt(sentences.length)]; String val = "ID " + i + " " + sentence; boolean queueSucess = kestrelClient.queue(queueName, val); System.out.println("queueSucess=" +queueSucess+ " [" + val +"]"); } }
從 Kestrel 中移除資料
此方法從一個佇列中取出一個數據,但並不把該資料從佇列中刪除:
private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { byte[] data = item._data; String receivedVal = new String(data); System.out.println("receivedItem=" + receivedVal); } }
此方法會從佇列中取出並移除資料:
private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { int itemID = item._id; byte[] data = item._data; String receivedVal = new String(data); kestrelClient.ack(queueName, itemID); System.out.println("receivedItem=" + receivedVal); } } }
向 Kestrel 中連續新增資料
下面的程式可以向本地 Kestrel 服務的一個 sentence_queue 佇列中連續新增句子,這也是我們的最後一個程式。
可以在命令列視窗中輸入一個右中括號 ]
並回車來停止程式。
import java.io.IOException; import java.io.InputStream; import java.util.Random; import backtype.storm.spout.KestrelClient; import backtype.storm.spout.KestrelClient.Item; import backtype.storm.spout.KestrelClient.ParseError; public class AddSentenceItemsToKestrel { /** * @param args */ public static void main(String[] args) { InputStream is = System.in; char closing_bracket = ']'; int val = closing_bracket; boolean aux = true; try { KestrelClient kestrelClient = null; String queueName = "sentence_queue"; while(aux){ kestrelClient = new KestrelClient("localhost",22133); queueSentenceItems(kestrelClient, queueName); kestrelClient.close(); Thread.sleep(1000); if(is.available()>0){ if(val==is.read()) aux=false; } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ParseError e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end"); } }
使用 KestrelSpout
下面的拓撲使用 KestrelSpout
從一個 Kestrel 佇列中讀取句子,並將句子分割成若干個單詞(Bolt:SplitSentence),然後輸出每個單詞出現的次數(Bolt:WordCount)。資料處理的細節可以參考訊息的可靠性保證一文。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word"));
執行
首先,以生產模式或者開發者模式啟動你的本地 Kestrel 服務。
然後,等待大約 5 秒鐘以防出現網路連線異常。
現在可以執行向佇列中新增資料的程式,並啟動 Storm 拓撲。程式啟動的順序並不重要。
如果你以 TOPOLOGY_DEBUG 模式執行拓撲你會觀察到拓撲中 tuple 傳送的細節資訊。