Storm處理流程, 基本引數配置
文章來源:http://www.cnblogs.com/chengxin1982/p/4001275.html
配置選項名稱 |
配置選項作用 |
topology.max.task.parallelism |
每個Topology執行時最大的executor數目 |
topology.workers |
每個Topology執行時的worker的預設數目,若在程式碼中設定,則此選項值被覆蓋 |
storm.zookeeper.servers |
zookeeper叢集的節點列表 |
storm.local.dir |
Storm用於儲存jar包和臨時檔案的本地儲存目錄 |
storm.zookeeper.root |
Storm在zookeeper叢集中的根目錄,預設是“/” |
ui.port |
Storm叢集的UI地址埠號,預設是8080 |
nimbus.host: |
Nimbus節點的host |
supervisor.slots.ports |
Supervisor 節點的worker佔位槽,叢集中的所有Topology公用這些槽位數,即使提交時設定了較大數值的槽位數,系統也會按照當前叢集中實際剩餘的槽位數來 進行分配,當所有的槽位數都分配完時,新提交的Topology只能等待,系統會一直監測是否有空餘的槽位空出來,如果有,就再次給新提交的 Topology分配 |
supervisor.worker.timeout.secs |
Worker的超時時間,單位為秒,超時後,Storm認為當前worker程序死掉,會重新分配其執行著的task任務 |
drpc.servers |
在使用drpc服務時,drpc server的伺服器列表 |
drpc.port |
在使用drpc服務時,drpc |
本地模式下, 基本併發度控制
conf.setMaxTaskParallelism(5); 本地模式下一個元件能夠執行的最大執行緒數
builder.setSpout("spout", new RandomSentenceSpout(), 10); 最後的引數parallelism_hint 表示executor的數目
builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2); ,task的數目,預設和executor是1:1 的關係,就是每個task執行在一個物理執行緒上,
在這裡設定的是taskNum為2,executor 是5,表示RandomSentenceSpout建立2次,實際只有兩個2個executor, executor不能超過NumTask
builder.setSpout("spout", new RandomSentenceSpout(), 2).setNumTasks(5);
在這裡設定的是taskNum為5,executor 是2, 表示RandomSentenceSpout建立5次,2個executor在兩個物理執行緒上執行, 每個executor執行1/2的任務
這麼寫感覺意義都不大, 只是個人為了理解storm executor task概念, 在0.8以後,幾個executor有可能是共用一個物理執行緒,由上面測試能看出。
突然想起這個其實還是有好處的,因為在storm中 TaskNum是靜態的, executor是動態的, 比如tasknum是5,exector是2,這時候是在兩個物理執行緒執行, 如果我們將executor改成3, 這時會變成在3個物理執行緒上執行,提高了併發性. 物理執行緒公式應該Min(executor, tasknum), 這個未在任何文件上見過,個人的一個推斷.
動態調整引數
# Reconfigure the topology "mytopology" to use 5 worker processes, # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout"); 這裡和上面一樣,會負載均衡地放入一個執行緒中執行
conf.setDebug(true); //
conf.setMaxSpoutPending(2); // 這個設定一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple佇列過大, 只對可靠任務起作用
conf.setMessageTimeoutSecs(1); // 訊息處理延時, 就是訊息超過延時後, emit發射源會認為是fail , storm預設是30秒,如果實現的為Irichbolt介面,沒有ack和ack延時都會觸發,這個時間過短的話,如果自定義重發,bolt可能會多處理,tuple在發射過程中, 但是還沒有到達bolt, 但是已經延時了,emit發射源會認為已經失敗了,但是bolt還是收到這個tuple, 所以storm引入了事務拓撲,0.8以後叫trident. 如果實現的為IBaseBolt,則只會在延時情況下觸發,
預設會呼叫ack,但是這個ack如果有再次發射, 這個ack就會自動錨定了.
根據具體業務需求選擇合適的Bolt
conf.setNumAckers(2); // 訊息處理的acker數量.預設1,可以根據實際處理情況調大
真實環境
conf.setNumWorkers(5); // 設定工作程序 , 如果不新增埠, 預設會是4個worker程序
需要在storm.yaml下新增埠
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
每個worker使用一個埠.
在uI視窗是spout bolt acker幾個的累加.
storm.yaml引數參考
配置項 | 配置說明 |
storm.zookeeper.servers | ZooKeeper伺服器列表 |
storm.zookeeper.port | ZooKeeper連線埠 |
storm.local.dir | storm使用的本地檔案系統目錄(必須存在並且storm程序可讀寫) |
storm.cluster.mode | Storm叢集執行模式([distributed|local]) |
storm.local.mode.zmq | Local模式下是否使用ZeroMQ作訊息系統,如果設定為false則使用java訊息系統。預設為false |
storm.zookeeper.root | ZooKeeper中Storm的根目錄位置 |
storm.zookeeper.session.timeout | 客戶端連線ZooKeeper超時時間 |
storm.id | 執行中拓撲的id,由storm name和一個唯一隨機陣列成。 |
nimbus.host | nimbus伺服器地址 |
nimbus.thrift.port | nimbus的thrift監聽埠 |
nimbus.childopts | 通過storm-deploy專案部署時指定給nimbus程序的jvm選項 |
nimbus.task.timeout.secs | 心跳超時時間,超時後nimbus會認為task死掉並重分配給另一個地址。 |
nimbus.monitor.freq.secs | nimbus檢查心跳和重分配任務的時間間隔.注意如果是機器宕掉nimbus會立即接管並處理。 |
nimbus.supervisor.timeout.secs | supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務. |
nimbus.task.launch.secs | task啟動時的一個特殊超時設定.在啟動後第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs. |
nimbus.reassign | 當發現task失敗時nimbus是否重新分配執行。預設為真,不建議修改。 |
nimbus.file.copy.expiration.secs | nimbus判斷上傳/下載連結的超時時間,當空閒時間超過該設定時nimbus會認為連結死掉並主動斷開 |
ui.port | Storm UI的服務埠 |
drpc.servers | DRPC伺服器列表,以便DRPCSpout知道和誰通訊 |
drpc.port | Storm DRPC的服務埠 |
supervisor.slots.ports | supervisor上能夠執行workers的埠列表.每個worker佔用一個埠,且每個埠只執行一個worker.通過這項配置可以調整每臺機器上執行的worker數.(調整slot數/每機) |
supervisor.childopts | 在storm-deploy專案中使用,用來配置supervisor守護程序的jvm選項 |
supervisor.worker.timeout.secs | supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker程序. |
supervisor.worker.start.timeout.secs | supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定 |
supervisor.enable | supervisor是否應當執行分配給他的workers.預設為true,該選項用來進行Storm的單元測試,一般不應修改. |
supervisor.heartbeat.frequency.secs | supervisor心跳傳送頻率(多久傳送一次) |
supervisor.monitor.frequency.secs | supervisor檢查worker心跳的頻率 |
worker.childopts | supervisor啟動worker時使用的jvm選項.所有的”%ID%”字串會被替換為對應worker的識別符號 |
worker.heartbeat.frequency.secs | worker的心跳傳送時間間隔 |
task.heartbeat.frequency.secs | task彙報狀態心跳時間間隔 |
task.refresh.poll.secs | task與其他tasks之間連結同步的頻率.(如果task被重分配,其他tasks向它傳送訊息需要重新整理連線).一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。 |
topology.debug | 如果設定成true,Storm將記錄發射的每條資訊。 |
topology.optimize | master是否在合適時機通過在單個執行緒內執行多個task以達到優化topologies的目的. |
topology.workers | 執行該topology叢集中應當啟動的程序數量.每個程序內部將以執行緒方式執行一定數目的tasks.topology的元件結合該引數和並行度提示來優化效能 |
topology.ackers | topology中啟動的acker任務數.Acker儲存由spout傳送的tuples的記錄,並探測tuple何時被完全處理.當Acker探測到tuple被處理完畢時會向spout傳送確認資訊.通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多.當設定為0時,相當於禁用了訊息可靠性,storm會在spout傳送tuples後立即進行確認. |
topology.message.timeout.secs | topology中spout傳送訊息的最大處理超時時間.如果一條訊息在該時間視窗內未被成功ack,Storm會告知spout這條訊息失敗。而部分spout實現了失敗訊息重播功能。 |
topology.kryo.register | 註冊到Kryo(Storm底層的序列化框架)的序列化方案列表.序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現. |
topology.skip.missing.kryo.registrations | Storm是否應該跳過它不能識別的kryo序列化方案.如果設定為否task可能會裝載失敗或者在執行時丟擲錯誤. |
topology.max.task.parallelism | 在一個topology中能夠允許的最大元件並行度.該項配置主要用在本地模式中測試執行緒數限制. |
topology.max.spout.pending | 一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology. |
topology.state.synchronization.timeout.secs | 元件同步狀態源的最大超時時間(保留選項,暫未使用) |
topology.stats.sample.rate | 用來產生task統計資訊的tuples抽樣百分比 |
topology.fall.back.on.java.serialization | topology中是否使用java的序列化方案 |
zmq.threads | 每個worker程序內zeromq通訊用到的執行緒數 |
zmq.linger.millis | 當連線關閉時,連結嘗試重新發送訊息到目標主機的持續時長.這是一個不常用的高階選項,基本上可以忽略. |
java.library.path | JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設定.該選項告訴JVM在哪些路徑下定位本地庫. |
storm內預設引數
java.library.path:"/usr/local/lib:/opt/local/lib:/usr/lib" |
### storm.* configs are general configurations |
# the local dir is where jars are kept |
storm.local.dir: "storm-local" |
storm.zookeeper.servers: |
- "localhost" |
storm.zookeeper.port: 2181 |
storm.zookeeper.root: "/storm" |
storm.zookeeper.session.timeout: 20000 |
storm.zookeeper.connection.timeout: 15000 |
storm.zookeeper.retry.times: 5 |
storm.zookeeper.retry.interval: 1000 |
storm.zookeeper.retry.intervalceiling.millis: 30000 |
storm.cluster.mode: "distributed" # can be distributed or local |
storm.local.mode.zmq: false |
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" |
storm.messaging.transport: "backtype.storm.messaging.netty.Context" |
storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" |
### nimbus.* configs are for the master |
nimbus.host: "localhost" |
nimbus.thrift.port: 6627 |
nimbus.thrift.max_buffer_size: 1048576 |
nimbus.childopts: "-Xmx1024m" |
nimbus.task.timeout.secs: 30 |
nimbus.supervisor.timeout.secs: 60 |
nimbus.monitor.freq.secs: 10 |
nimbus.cleanup.inbox.freq.secs: 600 |
nimbus.inbox.jar.expiration.secs: 3600 |
nimbus.task.launch.secs: 120 |
nimbus.reassign: true |
nimbus.file.copy.expiration.secs: 600 |
nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" |
### ui.* configs are for the master |
ui.port: 8080 |
ui.childopts: "-Xmx768m" |
logviewer.port: 8000 |
logviewer.childopts: "-Xmx128m" |
logviewer.appender.name: "A1" |
drpc.port: 3772 |
drpc.worker.threads: 64 |
drpc.queue.size: 128 |
drpc.invocations.port: 3773 |
drpc.request.timeout.secs: 600 |
drpc.childopts: "-Xmx768m" |
transactional.zookeeper.root: "/transactional" |
transactional.zookeeper.servers: null |
transactional.zookeeper.port: null |
### supervisor.* configs are for node supervisors |
# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication |
supervisor.slots.ports: |
- 6700 |
- 6701 |
- 6702 |
- 6703 |
supervisor.childopts: "-Xmx256m" |
#how long supervisor will wait to ensure that a worker process is started |
supervisor.worker.start.timeout.secs: 120 |
#how long between heartbeats until supervisor considers that worker dead and tries to restart it |
supervisor.worker.timeout.secs: 30 |
#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary |
supervisor.monitor.frequency.secs: 3 |
#how frequently the supervisor heartbeats to the cluster state (for nimbus) |
supervisor.heartbeat.frequency.secs: 5 |
supervisor.enable: true |
### worker.* configs are for task workers |
worker.childopts: "-Xmx768m" |
worker.heartbeat.frequency.secs: 1 |
# control how many worker receiver threads we need per worker |
topology.worker.receiver.thread.count: 1 |
task.heartbeat.frequency.secs: 3 |
task.refresh.poll.secs: 10 |
zmq.threads: 1 |
zmq.linger.millis: 5000 |
zmq.hwm: 0 |
storm.messaging.netty.server_worker_threads: 1 |
storm.messaging.netty.client_worker_threads: 1 |
storm.messaging.netty.buffer_size: 5242880 #5MB buffer |
# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. |
storm.messaging.netty.max_retries: 300 |
storm.messaging.netty.max_wait_ms: 1000 |
storm.messaging.netty.min_wait_ms: 100 |
# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. |
storm.messaging.netty.transfer.batch.size: 262144 |
# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. |
storm.messaging.netty.flush.check.interval.ms: 10 |
### topology.* configs are for specific executing storms |
topology.enable.message.timeouts: true |
topology.debug: false |
topology.workers: 1 |
topology.acker.executors: null |
topology.tasks: null |
# maximum amount of time a message has to complete before it's considered failed |
topology.message.timeout.secs: 30 |
topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" |
topology.skip.missing.kryo.registrations: false |
topology.max.task.parallelism: null |
topology.max.spout.pending: null |
topology.state.synchronization.timeout.secs: 60 |
topology.stats.sample.rate: 0.05 |
topology.builtin.metrics.bucket.size.secs: 60 |
topology.fall.back.on.java.serialization: true |
topology.worker.childopts: null |
topology.executor.receive.buffer.size: 1024 #batched |
topology.executor.send.buffer.size: 1024 #individual messages |
topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) |
topology.transfer.buffer.size: 1024 # batched |
topology.tick.tuple.freq.secs: null |
topology.worker.shared.thread.pool.size: 4 |
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" |
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" |
topology.sleep.spout.wait.strategy.time.ms: 1 |
topology.error.throttle.interval.secs: 10 |
topology.max.error.report.per.interval: 5 |
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" |
topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" |
topology.trident.batch.emit.interval.millis: 500 |
topology.classpath: null |
topology.environment: null |
dev.zookeeper.path: "/tmp/dev-storm-zookeeper" |