1. 程式人生 > >Storm處理流程, 基本引數配置

Storm處理流程, 基本引數配置

文章來源:http://www.cnblogs.com/chengxin1982/p/4001275.html

配置選項名稱

配置選項作用

topology.max.task.parallelism

每個Topology執行時最大的executor數目

topology.workers

每個Topology執行時的worker的預設數目,若在程式碼中設定,則此選項值被覆蓋

storm.zookeeper.servers

zookeeper叢集的節點列表

storm.local.dir

Storm用於儲存jar包和臨時檔案的本地儲存目錄

storm.zookeeper.root

Storm在zookeeper叢集中的根目錄,預設是“/”

ui.port

Storm叢集的UI地址埠號,預設是8080

nimbus.host:

Nimbus節點的host

supervisor.slots.ports

Supervisor 節點的worker佔位槽,叢集中的所有Topology公用這些槽位數,即使提交時設定了較大數值的槽位數,系統也會按照當前叢集中實際剩餘的槽位數來 進行分配,當所有的槽位數都分配完時,新提交的Topology只能等待,系統會一直監測是否有空餘的槽位空出來,如果有,就再次給新提交的 Topology分配

supervisor.worker.timeout.secs

Worker的超時時間,單位為秒,超時後,Storm認為當前worker程序死掉,會重新分配其執行著的task任務

drpc.servers

在使用drpc服務時,drpc server的伺服器列表

drpc.port

在使用drpc服務時,drpc 

本地模式下, 基本併發度控制

conf.setMaxTaskParallelism(5);   本地模式下一個元件能夠執行的最大執行緒數

builder.setSpout("spout", new RandomSentenceSpout(), 10);  最後的引數parallelism_hint 表示executor的數目

,每個作為一個thread在work下工作,  但是如果超過setMaxTaskParallelism定義的上限,則使用setMaxTaskParallelism設定的TOPOLOGY_MAX_TASK_PARALLELISM

builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);  ,task的數目,預設和executor是1:1 的關係,就是每個task執行在一個物理執行緒上,

在這裡設定的是taskNum為2,executor 是5,表示RandomSentenceSpout建立2次,實際只有兩個2個executor,  executor不能超過NumTask

builder.setSpout("spout", new RandomSentenceSpout(), 2).setNumTasks(5);   

在這裡設定的是taskNum為5,executor 是2, 表示RandomSentenceSpout建立5次,2個executor在兩個物理執行緒上執行,  每個executor執行1/2的任務

這麼寫感覺意義都不大, 只是個人為了理解storm executor task概念, 在0.8以後,幾個executor有可能是共用一個物理執行緒,由上面測試能看出。

突然想起這個其實還是有好處的,因為在storm中 TaskNum是靜態的, executor是動態的, 比如tasknum是5,exector是2,這時候是在兩個物理執行緒執行, 如果我們將executor改成3,  這時會變成在3個物理執行緒上執行,提高了併發性. 物理執行緒公式應該Min(executor, tasknum),  這個未在任何文件上見過,個人的一個推斷.

動態調整引數

# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); 這裡和上面一樣,會負載均衡地放入一個執行緒中執行

conf.setDebug(true);                         // 
conf.setMaxSpoutPending(2);          //  這個設定一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple佇列過大, 只對可靠任務起作用
conf.setMessageTimeoutSecs(1);    //  訊息處理延時, 就是訊息超過延時後, emit發射源會認為是fail , storm預設是30秒,如果實現的為Irichbolt介面,沒有ack和ack延時都會觸發,這個時間過短的話,如果自定義重發,bolt可能會多處理,tuple在發射過程中, 但是還沒有到達bolt, 但是已經延時了,emit發射源會認為已經失敗了,但是bolt還是收到這個tuple, 所以storm引入了事務拓撲,0.8以後叫trident. 如果實現的為IBaseBolt,則只會在延時情況下觸發, 預設會呼叫ack,但是這個ack如果有再次發射, 這個ack就會自動錨定了.

根據具體業務需求選擇合適的Bolt
conf.setNumAckers(2);                     //  訊息處理的acker數量.預設1,可以根據實際處理情況調大

真實環境

conf.setNumWorkers(5); // 設定工作程序 ,  如果不新增埠, 預設會是4個worker程序

需要在storm.yaml下新增埠

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704

每個worker使用一個埠. 

 在uI視窗是spout bolt acker幾個的累加.

storm.yaml引數參考  

配置項 配置說明
storm.zookeeper.servers ZooKeeper伺服器列表
storm.zookeeper.port ZooKeeper連線埠
storm.local.dir storm使用的本地檔案系統目錄(必須存在並且storm程序可讀寫)
storm.cluster.mode Storm叢集執行模式([distributed|local])
storm.local.mode.zmq Local模式下是否使用ZeroMQ作訊息系統,如果設定為false則使用java訊息系統。預設為false
storm.zookeeper.root ZooKeeper中Storm的根目錄位置
storm.zookeeper.session.timeout 客戶端連線ZooKeeper超時時間
storm.id 執行中拓撲的id,由storm name和一個唯一隨機陣列成。
nimbus.host nimbus伺服器地址
nimbus.thrift.port nimbus的thrift監聽埠
nimbus.childopts 通過storm-deploy專案部署時指定給nimbus程序的jvm選項
nimbus.task.timeout.secs 心跳超時時間,超時後nimbus會認為task死掉並重分配給另一個地址。
nimbus.monitor.freq.secs nimbus檢查心跳和重分配任務的時間間隔.注意如果是機器宕掉nimbus會立即接管並處理。
nimbus.supervisor.timeout.secs supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務.
nimbus.task.launch.secs task啟動時的一個特殊超時設定.在啟動後第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs.
nimbus.reassign 當發現task失敗時nimbus是否重新分配執行。預設為真,不建議修改。
nimbus.file.copy.expiration.secs nimbus判斷上傳/下載連結的超時時間,當空閒時間超過該設定時nimbus會認為連結死掉並主動斷開
ui.port Storm UI的服務埠
drpc.servers DRPC伺服器列表,以便DRPCSpout知道和誰通訊
drpc.port Storm DRPC的服務埠
supervisor.slots.ports supervisor上能夠執行workers的埠列表.每個worker佔用一個埠,且每個埠只執行一個worker.通過這項配置可以調整每臺機器上執行的worker數.(調整slot數/每機)
supervisor.childopts 在storm-deploy專案中使用,用來配置supervisor守護程序的jvm選項
supervisor.worker.timeout.secs supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker程序.
supervisor.worker.start.timeout.secs supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定
supervisor.enable supervisor是否應當執行分配給他的workers.預設為true,該選項用來進行Storm的單元測試,一般不應修改.
supervisor.heartbeat.frequency.secs supervisor心跳傳送頻率(多久傳送一次)
supervisor.monitor.frequency.secs supervisor檢查worker心跳的頻率
worker.childopts supervisor啟動worker時使用的jvm選項.所有的”%ID%”字串會被替換為對應worker的識別符號
worker.heartbeat.frequency.secs worker的心跳傳送時間間隔
task.heartbeat.frequency.secs task彙報狀態心跳時間間隔
task.refresh.poll.secs task與其他tasks之間連結同步的頻率.(如果task被重分配,其他tasks向它傳送訊息需要重新整理連線).一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。
topology.debug 如果設定成true,Storm將記錄發射的每條資訊。
topology.optimize master是否在合適時機通過在單個執行緒內執行多個task以達到優化topologies的目的.
topology.workers 執行該topology叢集中應當啟動的程序數量.每個程序內部將以執行緒方式執行一定數目的tasks.topology的元件結合該引數和並行度提示來優化效能
topology.ackers topology中啟動的acker任務數.Acker儲存由spout傳送的tuples的記錄,並探測tuple何時被完全處理.當Acker探測到tuple被處理完畢時會向spout傳送確認資訊.通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多.當設定為0時,相當於禁用了訊息可靠性,storm會在spout傳送tuples後立即進行確認.
topology.message.timeout.secs topology中spout傳送訊息的最大處理超時時間.如果一條訊息在該時間視窗內未被成功ack,Storm會告知spout這條訊息失敗。而部分spout實現了失敗訊息重播功能。
topology.kryo.register 註冊到Kryo(Storm底層的序列化框架)的序列化方案列表.序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現.
topology.skip.missing.kryo.registrations Storm是否應該跳過它不能識別的kryo序列化方案.如果設定為否task可能會裝載失敗或者在執行時丟擲錯誤.
topology.max.task.parallelism 在一個topology中能夠允許的最大元件並行度.該項配置主要用在本地模式中測試執行緒數限制.
topology.max.spout.pending 一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology.
topology.state.synchronization.timeout.secs 元件同步狀態源的最大超時時間(保留選項,暫未使用)
topology.stats.sample.rate 用來產生task統計資訊的tuples抽樣百分比
topology.fall.back.on.java.serialization topology中是否使用java的序列化方案
zmq.threads 每個worker程序內zeromq通訊用到的執行緒數
zmq.linger.millis 當連線關閉時,連結嘗試重新發送訊息到目標主機的持續時長.這是一個不常用的高階選項,基本上可以忽略.
java.library.path JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設定.該選項告訴JVM在哪些路徑下定位本地庫.

storm內預設引數
java.library.path:"/usr/local/lib:/opt/local/lib:/usr/lib"
### storm.* configs are general configurations
# the local dir is where jars are kept
storm.local.dir"storm-local"
storm.zookeeper.servers:
"localhost"
storm.zookeeper.port2181
storm.zookeeper.root"/storm"
storm.zookeeper.session.timeout20000
storm.zookeeper.connection.timeout15000
storm.zookeeper.retry.times5
storm.zookeeper.retry.interval1000
storm.zookeeper.retry.intervalceiling.millis30000
storm.cluster.mode"distributed" # can be distributed or local
storm.local.mode.zmqfalse
storm.thrift.transport"backtype.storm.security.auth.SimpleTransportPlugin"
storm.messaging.transport"backtype.storm.messaging.netty.Context"
storm.meta.serialization.delegate"backtype.storm.serialization.DefaultSerializationDelegate"
### nimbus.* configs are for the master
nimbus.host"localhost"
nimbus.thrift.port6627
nimbus.thrift.max_buffer_size1048576
nimbus.childopts"-Xmx1024m"
nimbus.task.timeout.secs30
nimbus.supervisor.timeout.secs60
nimbus.monitor.freq.secs10
nimbus.cleanup.inbox.freq.secs600
nimbus.inbox.jar.expiration.secs3600
nimbus.task.launch.secs120
nimbus.reassigntrue
nimbus.file.copy.expiration.secs600
nimbus.topology.validator"backtype.storm.nimbus.DefaultTopologyValidator"
### ui.* configs are for the master
ui.port8080
ui.childopts"-Xmx768m"
logviewer.port8000
logviewer.childopts"-Xmx128m"
logviewer.appender.name"A1"
drpc.port3772
drpc.worker.threads64
drpc.queue.size128
drpc.invocations.port3773
drpc.request.timeout.secs600
drpc.childopts"-Xmx768m"
transactional.zookeeper.root"/transactional"
transactional.zookeeper.serversnull
transactional.zookeeper.portnull
### supervisor.* configs are for node supervisors
# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication
supervisor.slots.ports:
6700
6701
6702
6703
supervisor.childopts"-Xmx256m"
#how long supervisor will wait to ensure that a worker process is started
supervisor.worker.start.timeout.secs120
#how long between heartbeats until supervisor considers that worker dead and tries to restart it
supervisor.worker.timeout.secs30
#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
supervisor.monitor.frequency.secs3
#how frequently the supervisor heartbeats to the cluster state (for nimbus)
supervisor.heartbeat.frequency.secs5
supervisor.enabletrue
### worker.* configs are for task workers
worker.childopts"-Xmx768m"
worker.heartbeat.frequency.secs1
# control how many worker receiver threads we need per worker
topology.worker.receiver.thread.count1
task.heartbeat.frequency.secs3
task.refresh.poll.secs10
zmq.threads1
zmq.linger.millis5000
zmq.hwm0
storm.messaging.netty.server_worker_threads1
storm.messaging.netty.client_worker_threads1
storm.messaging.netty.buffer_size5242880 #5MB buffer
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
storm.messaging.netty.max_retries300
storm.messaging.netty.max_wait_ms1000
storm.messaging.netty.min_wait_ms100
# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
storm.messaging.netty.transfer.batch.size262144
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
storm.messaging.netty.flush.check.interval.ms10
### topology.* configs are for specific executing storms
topology.enable.message.timeoutstrue
topology.debugfalse
topology.workers1
topology.acker.executorsnull
topology.tasksnull
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs30
topology.multilang.serializer"backtype.storm.multilang.JsonSerializer"
topology.skip.missing.kryo.registrationsfalse
topology.max.task.parallelismnull
topology.max.spout.pendingnull
topology.state.synchronization.timeout.secs60
topology.stats.sample.rate0.05
topology.builtin.metrics.bucket.size.secs60
topology.fall.back.on.java.serializationtrue
topology.worker.childoptsnull
topology.executor.receive.buffer.size1024 #batched
topology.executor.send.buffer.size1024 #individual messages
topology.receiver.buffer.size# setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets)
topology.transfer.buffer.size1024 # batched
topology.tick.tuple.freq.secsnull
topology.worker.shared.thread.pool.size4
topology.disruptor.wait.strategy"com.lmax.disruptor.BlockingWaitStrategy"
topology.spout.wait.strategy"backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms1
topology.error.throttle.interval.secs10
topology.max.error.report.per.interval5
topology.kryo.factory"backtype.storm.serialization.DefaultKryoFactory"
topology.tuple.serializer"backtype.storm.serialization.types.ListDelegateSerializer"
topology.trident.batch.emit.interval.millis500
topology.classpathnull
topology.environmentnull
dev.zookeeper.path"/tmp/dev-storm-zookeeper"