1. 程式人生 > >Storm(四)並行度和流分組

Storm(四)並行度和流分組

並行度(parallelism)概念

  • 一個執行中的拓撲是由什麼構成的:工作程序(worker processes),執行器(executors)和任務(tasks)
  • 在 Worker 中執行的是拓撲的一個子集。一個 worker 程序是從屬於某一個特定的拓撲的,在 worker
    程序中會執行一個或者多個與拓撲中的元件相關聯的 executor。一個執行中的拓撲就是由這些運行於 Storm叢集中的很多機器上的程序組成的。
  • 一個 executor 是由 worker 程序生成的一個執行緒。在 executor 中可能會有一個或者多個 task,這些 task
    都是為同一個元件(spout 或者 bolt)服務的。
  • task 是實際執行資料處理的最小工作單元(注意,task 並不是執行緒) —— 在你的程式碼中實現的每個 spout 或者 bolt 都會在叢集中執行很多個 task。在拓撲的整個生命週期中每個元件的 task 數量都是保持不變的,不過每個元件的 executor數量卻是有可能會隨著時間變化。在預設情況下 task 的數量是和 executor 的數量一樣的,也就是說,預設情況下 Storm會在每個執行緒上執行一個 task。

Storm的流分組策略

  • Storm的分組策略對結果有著直接的影響,不同的分組的結果一定是不一樣的。其次,不同的分組策略對資源的利用也是有著非常大的不同
  • 拓撲定義的一部分就是為每個Bolt指定輸入的資料流,而資料流分組則定義了在Bolt的task之間如何分配資料流。

八種流分組定義

Shuffle grouping:

  • 隨機分組:隨機的將tuple分發給bolt的各個task,每個bolt例項接收到相同數量的tuple。

Fields grouping:

  • 按欄位分組:根據指定的欄位的值進行分組,舉個栗子,流按照“user-id”進行分組,那麼具有相同的“user-id”的tuple會發到同一個task,而具有不同“user-id”值的tuple可能會發到不同的task上。這種情況常常用在單詞計數,而實際情況是很少用到,因為如果某個欄位的某個值太多,就會導致task不均衡的問題。

Partial Key grouping:

  • 部分欄位分組:流由分組中指定的欄位分割槽,如“欄位”分組,但是在兩個下游Bolt之間進行負載平衡,當輸入資料歪斜時,可以更好地利用資源。優點。有了這個分組就完全可以不用Fields grouping了

All grouping:

  • 廣播分組:將所有的tuple都複製之後再分發給Bolt所有的task,每一個訂閱資料流的task都會接收到一份相同的完全的tuple的拷貝。

Global grouping:

  • 全域性分組:這種分組會將所有的tuple都發到一個taskid最小的task上。由於所有的tuple都發到唯一一個task上,勢必在資料量大的時候會造成資源不夠用的情況。

None grouping:

  • 不分組:不指定分組就表示你不關心資料流如何分組。目前來說不分組和隨機分組效果是一樣的,但是最終,Storm可能會使用與其訂閱的bolt或spout在相同程序的bolt來執行這些tuple

Direct grouping:

  • 指向分組:這是一種特殊的分組策略。以這種方式分組的流意味著將由元組的生成者決定消費者的哪個task能接收該元組。指向分組只能在已經宣告為指向資料流的資料流中宣告。tuple的發射必須使用emitDirect種的一種方法。Bolt可以通過使用TopologyContext或通過在OutputCollector(返回元組傳送到的taskID)中跟蹤emit方法的輸出來獲取其消費者的taskID。

Local or shuffle grouping:
本地或隨機分組:和隨機分組類似,但是如果目標Bolt在同一個工作程序中有一個或多個任務,那麼元組將被隨機分配到那些程序內task。簡而言之就是如果傳送者和接受者在同一個worker則會減少網路傳輸,從而提高整個拓撲的效能。有了此分組就完全可以不用shuffle grouping了。

示例

package com.qxw.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.qxw.bolt.OutBolt;
import com.qxw.bolt.OutBolt2;
import com.qxw.spout.DataSource;

/**
 * 拓撲的並行性
 * @author qxw
 * @data 2018年9月17日下午2:49:09
 */
public class TopologyTest2 {

	public static void main(String[] args) throws Exception {
		//配置
		Config cfg = new Config();
		cfg.setNumWorkers(2);//指定工作程序數  (jvm數量,分散式環境下可用,本地模式設定無意義)
		cfg.setDebug(false);
		
		//構造拓撲流程圖
		TopologyBuilder builder = new TopologyBuilder();
		//設定資料來源(產生2個執行器和倆個任務)
		builder.setSpout("dataSource", new DataSource(),2).setNumTasks(2);
		//設定資料建流處理元件(產生2個執行器和4個任務)
		builder.setBolt("out-bolt", new OutBolt(),2).shuffleGrouping("dataSource").setNumTasks(4); //隨機分組
		//設定bolt的並行度和任務數:(產生6個執行器和6個任務)
//		builder.setBolt("out-bol2", new OutBolt2(),6).shuffleGrouping("out-bolt").setNumTasks(6); //隨機分組
		
		//設定欄位分組(產生8個執行器和8個任務)欄位分組 
		builder.setBolt("out-bol2", new OutBolt2(),8).fieldsGrouping("out-bolt", new Fields("outdata")).setNumTasks(8);
		//設定廣播分組
		//builder.setBolt("write-bolt", new OutBolt2(), 4).allGrouping("print-bolt");
		//設定全域性分組
		//builder.setBolt("write-bolt", new OutBolt2(), 4).globalGrouping("print-bolt");
		
		//1 本地模式
		LocalCluster cluster = new LocalCluster();
		
		//提交拓撲圖  會一直輪詢執行
		cluster.submitTopology("topo", cfg, builder.createTopology());

		
		//2 叢集模式
//		StormSubmitter.submitTopology("topo", cfg, builder.createTopology());
		
	}
}

}

相關推薦

Storm()並行分組

並行度(parallelism)概念 一個執行中的拓撲是由什麼構成的:工作程序(worker processes),執行器(executors)和任務(tasks) 在 Worker 中執行的是拓撲的

Storm並行分組詳解

並行度 對於一個拓撲來說,並行度其實就是task,task是最小的計算單元,每個spout/bolt的相關程式碼副本都會執行在一個task中。並不是executor,因為預設情況下一個executor只有一個task,executor的數量和task是相等的。

storm並行

storm的並行度 是什麼組成了一個執行中的topology:工作程序(worker processes),執行器(executors)和任務(tasks) 在一個 Storm 叢集中,Storm 主要通過以下三個部件來執行拓撲: 工作程序(worker processes) 執

Storm並行詳解

文章來源:http://www.bubuko.com/infodetail-822074.html Storm的並行度詳解 Storm的並行度是非常重要的,通過提高並行度可以提高storm程式的計算能力。 那strom是如何提高並行度的呢? Strom程式的執

kafka中topic的partition數量customerGroup的customer數量關係以及storm消費kafka時並行設定問題總結:

前段時間通過網上查詢和自己測試仔細研究了partition和customer關係以及工作中遇到的storm並行度調整的問題,認真梳理了一下現總結如下: 一、先說kafka部分: produce方面: 如果有多個分割槽,傳送的時候按照key值hashCode%partit

跟我學storm教程2-並行機制及資料分組

topology的四個組成部分 Nodes(伺服器) 即為storm叢集中的supervisor,會執行topology的一部分運算,一個storm叢集一般會有多個node workers(JVM虛擬機器) node節點上執行的相互獨立

storm並行

概念 worker 一個實體機可以執行一個或者多個worker 一個worker只能執行一個topology上的部分或全部component 一個worker是一個獨立的程序 在執行過程中可以調整worker的數量 executor 一個worker中可以

【原】【譯文】理解storm拓撲並行

rec 分享 矩形 bolt pos div pro out data 原文地址: http://storm.apache.org/releases/1.2.1/Understanding-the-parallelism-of-a-Storm-topology.html 什

IO課位元組輸出字元輸出

  這節課講位元組輸出流和字元輸出流     輸出流和輸入流差不多,只不過輸入流是讀取檔案內容,輸出流是向檔案中寫入內容     直接看Demo吧:   Demo1: 通過位元組輸出流寫入檔案 public st

Spark專案實戰-實際專案中常見的優化點-分配更多的資源調節並行

1、分配更多的資源 (1)分配哪些資源?executor、cpu per executor、memory per executor、driver memory。 (2)在哪裡分配這些資源?在我們在生產環境中,提交spark作業時,用的spark-submit shell指

MapTaskReduceTask執行機制以及Map任務的並行

1、MapTask執行機制詳解以及Map任務的並行度 整個Map階段流程大體如下圖所示。 簡單概述:inputFile通過split被邏輯切分為多個split檔案,通過Record按行讀取內容給map(使用者自己實現的)進行處理,資料被map處理結束之後交給

Storm 訊息容錯機制通訊

ack 是什麼 ack 機制是 storm 整個技術體系中非常閃亮的一個創新點。 通過 ack 機制,spout 傳送的每一條資訊,都可以確定是被成功處理或失敗處理,從而可以讓開發者採取行動。比如在meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送處理。 因此,

Apache Storm 官方文件 —— 理解 Storm 拓撲的並行(parallelism)概念

原文連結    譯者:魏勇 一個執行中的拓撲是由什麼構成的:工作程序(worker processes),執行器(executors)和任務(tasks) 在一個 Storm 叢集中,Storm 主要通過以下三個部件來執行拓撲: 工作程序(worker processes) 執行器(exec

HADOOP 中mapreduce的並行設定的問題

1.3 MapTask並行度決定機制 maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度 那麼,mapTask並行例項是否越多越好呢?其並行度又是如何決定呢? 1.3.1 mapTask並行度的決定機制 一個job的map階段並行度由客戶

[Swift通天遁地]、網路執行緒-(3)執行緒組:使用DispatchGroup(排程組)對執行緒進行分組管理

本文將演示執行緒組的使用。 使用執行緒組可以設定在完成一個或一組任務之後,再執行另一個或一組任務。 在專案導航區,開啟檢視控制器的程式碼檔案【ViewController.swift】 現在開始編寫程式碼,實現執行緒組的功能。 1 import UIKit 2 3 class Vie

Storm學習筆記(5)- 並行

文章目錄 並行度概念詳解 Storm作業執行UI頁面上的引數詳解 並行度設定 worker數量的設定 executor數量的設定 task數量的設定 acker的設定 並行

Storm並行詳解

注意:並行度主要就是調整executor的數量,但是調整之後的executor的數量必須小於等於task的數量,如果分配的executor的執行緒數比task數量多的話也只能分配和task數量相等的executor。 TASK的存在只是為了topology擴充套件的靈活性,與並行度無關。 executor的數

理解Storm並行

一個Topology可以執行多個Worker上,這樣可以提高資料處理能力,因為一個worker就是一個程序,更確切的說是一個JVM,很自然的,我們可以想到如果一個worker中可以再起多個執行緒的話效率就會很高。事實上storm就是這麼幹的,worker並不是s

Storm部分:Storm Grouping -- 資料分組(即資料分發策略)

1. Shuffle Grouping 隨機分組,隨機派發stream裡面的tuple,保證每個bolt task接收到的tuple數目大致相同。 輪詢,平均分配 2. Fields Grouping 按欄位分組,比如,按"user-id"這個欄位來分組,那麼具有同樣"us

ggbiplot-最好看的PCA作圖:樣品PCA散點+分組橢圓+主成分丰相關

寫在前面 前幾天在《巨集基因組0》微信討論群看到了有人發了一個上面連結,點開一看居然是一條命令出帥圖,真是太實用了。我立即使用本領域的OTU表上進行了測試,效果很好,現分享給大家,歡迎大家留言補充。 ggbiplot簡介 ggbiplot是一款PC