1. 程式人生 > 其它 >大資料實時處理實戰

大資料實時處理實戰

隨著網際網路時代的發展,運營商作為內容傳送的管道服務商,在資料領域具有巨大的優勢,如何將這些資料轉化為價值,越來越被運營商所重視。

運營商的大資料具有體量大,種類多的特點,如各類話單、信令等,通常一種話單每天的資料量就有上百億條。隨著業務分析需求對資料處理實時性的要求越來越高,也給我們的大資料處理架構帶來了巨大的挑戰,參照網路上可查的例子,運用到實際處理架構上,經常會因為實時資料流量大,造成系統執行不穩定及各種異常。從大資料實時處理架構開發到上線,耗時近2個月時間,經過大量優化,我們的系統才趨於穩定。最終我們使用10臺伺服器的叢集,實時處理每天上百億條的資料,這裡每條資料的欄位數量有100個,最長的欄位內容超過1000位元組。

下面就來分享一下我們在實時大資料處理大體量資料的過程中,總結出來的酸甜苦辣。

  • 專案目標

在有限伺服器叢集數量的基礎上,實現對每天超過百億條、體量超過20T的某話單進行實時處理。具體需求是FTP收集多臺話單伺服器上的詳單,進行實時處理後將資料儲存到Hbase資料庫供使用者即時詳單查詢,同時將話單儲存到Hdfs供離線分析使用。

  • 硬體資源

10臺x86伺服器,單機配置16盒CPU,128G記憶體,2T硬碟*10,300G硬碟*2(系統盤)。

  • 系統架構

10臺伺服器組成hadoop叢集,其中NameNode節點同時作為採集機安裝FTP和Flume,選取其他5臺伺服器安裝Kafka,Zookeeper和Storm實現大資料實時流處理架構,為了充分利用叢集計算資源,這5臺伺服器也配置了少量的Yarn計算資源,參與日常的離線資料分析需求。剩下的4臺伺服器我們安裝了Hbase滿足大資料下的秒級查詢需求,系統拓撲圖如下:

圖一 系統拓撲圖

  • 專案實施

1.使用的相關技術

我們先來回顧一下相關的大資料架構和開源技術,大資料處理分離線分析架構和實時處理架構。離線分析架構(如Hive,Map/Reduce,Spark Sql等)可以滿足資料後分析,資料探勘的應用需求。對於實時性要求高的應用,如使用者即時詳單查詢,業務量監控等,需要應用實時處理架構。目前大資料開源實時處理架構最常見的是Storm和Spark Streaming,相比Spark Streaming準實時批處理系統,Strom是更純粹的實時處理系統,即來一條事件就處理一條,具有更高的實時性。

Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。Flume支援單機也支援叢集,支援多種資料來源,如不斷寫入的檔案、Socket、不斷生成新檔案的資料夾等,支援多種輸出,如Hdfs、Kafka、Mysql資料庫等。Flume使用時僅需實現簡單配置,無需開發程式。

Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,類似一個大資料量的快取池,支援一份資料多使用者消費。ZooKeeper是一個分散式的,開源的分散式應用程式協調服務,負責儲存叢集間部分元件的狀態同步資訊。Storm分散式實時計算系統,包含Nimbus主節點和Supervisor從節點(從storm1.0以後,增加了Nimbus備份節點),節點之間需要依靠Zookeeper做狀態同步。Storm叢集元件:

  • Nimbus:是Storm叢集的master節點,負責資源分配和任務排程。
  • Supervisor:是Storm叢集的slave節點,負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序,是真正意義上的分散式計算節點。

圖二 Storm叢集元件

Storm應用涉及到Java程式的開發,程式設計模型中涉及的概念:

  • Topology:Storm中執行的一個實時應用程式,各個元件間的訊息流動形成邏輯上的一個拓撲結構,Topology一旦啟動,就會常駐記憶體並佔用worker資源。
  • Spout:在一個Topology中產生源資料流的元件。通常情況下Spout會從外部資料來源中讀取資料,然後轉換為Topology內部的源資料。
  • Bolt:在一個Topology中接受資料然後執行處理的元件。Bolt可以執行過濾、函式操作、合併、寫資料庫等任何操作。
  • Tuple:一次訊息傳遞的基本單元。

2.開源元件安裝及配置

a)Flume安裝及配置

從http://flume.apache.org/下載flume的安裝包,解壓縮;如果使用Cloudera Manager或者Ambari安裝,僅需通過相應的管理頁面安裝配置。我們僅安裝了單機的Flume,未安裝Flume叢集,單機Flume處理效率非常高,完全能夠滿足我們每天處理上百億條資料的需求,但需要說明一點的是Flume魯棒性非常差,經常出現程序在、但資料不處理的程序卡死狀態,使用Flume時要注意以下幾點:

  • flume監控目錄中不能含有目錄;
  • flume正在處理的檔案,其他程序不能更改(如FTP正在傳送中的檔案,需要設定過濾條件,避免flume處理)。建議flume監控目錄與FTP實時傳送目錄分開,避免flume處理FTP傳送中的檔案,導致異常,也可以設定正則表示式忽略正在傳送的檔案:
a1.sources.r1.ignorePattern = ^(.)*\.tmp$
  • flume處理的檔案中可能含有特殊字元,導致flume程序卡死。設定遇到不能識別的字元忽略跳過:
a1.sources.r1.decodeErrorPolicy = IGNORE
  • flume執行過程中出現GC over的記憶體溢位錯誤,配置flume-env.sh中記憶體配置(預設值很小);
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
  • flume啟動時-c後面要給全到詳細flume配置檔案目錄,否則flume-env.sh中的配置不會載入,會使用預設配置,例如下面啟動命令給全配置檔案目錄:
/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/
  • 如果使用記憶體佇列,請注意記憶體佇列訊息數的配置,設定transactionCapacity佇列大小必須大於等於batchSize;
a1.channels.c1.transactionCapacity = 2000a1.sinks.k1.batchSize = 2000
  • 增加batchSize可以提升flume處理速度,原理是flume處理的event都儲存在transaction佇列中,直到滿足了batchSize的數量條件,才一次性批量向sink傳送。但是要注意實際資料量的大小,如果實際資料量很小,batchSize就不能配置過大,否則資料達不到batchSize的數量條件,會長時間積壓在transaction佇列中,後面的實時處理程式反而得不到資料,導致實時性變差;
  • flume中讀取的一條記錄長度超過2048字元,也就是4096位元組就會被截斷,可以在配置檔案中增加如下配置項解決:
producer.sources.s.deserializer.maxLineLength=65535
  • flume字元轉換異常問題,java.nio.charset.MalformedInputException: Input length = 1,可以在配置檔案中增加如下配置項解決:
a1.sources.r1.inputCharset = ISO8859-1
  • flume遇到亂碼停止,報異常:java.nio.charset.MalformedInputException,可以在配置檔案中增加如下配置,忽略錯誤資料(預設是FAIL,拋異常報錯,flume會停止)解決;
producer.sources.s.decodeErrorPolicy=IGNORE
  • 預設情況下,Flume處理完成的檔案會增加.completed字尾,在資料量很大的情況下,會很快撐滿採集機硬碟,可以在配置檔案中增加如下配置,讓flume處理完後自動刪除該資料檔案解決。
a1.sources.r1.deletePolicy = immediate

Flume配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /ftpdata/xdr/HTTP_tmp
a1.sources.r1.ignorePattern = ^(.)*\.tmp$
a1.sources.r1.fileHeader = false
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.inputCharset = ISO8859-1
a1.sources.r1.deserializer.maxLineLength = 8192
a1.sources.r1.decodeErrorPolicy = IGNORE

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 10000
a1.sinks.k1.brokerList = stormmaster:9092,storm01:9092,storm02:9092,storm03:9092,storm04:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.producer.type = async
a1.sinks.k1.topic = sighttpnew

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 80000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.keep-alive = 30

Flume-env.sh配置:

# Enviroment variables can be set here.
export JAVA_HOME=/usr/java/jdk1.7.0_80
export FLUME_HOME=/hadoop/apache-flume-1.6.0-bin
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
export FLUME_CLASSPATH="/hadoop/apache-flume-1.6.0-bin/lib"

Flume啟動命令:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/ -f /hadoop/apache-flume-1.6.0-bin/conf/viewdata.conf -n producer –Dflume.root.logger=ERROR &

注意一定要給全Flume配置檔案的路徑,否則啟動Flume不能正確載入Flume-env.sh的配置。

b)Kafka叢集安裝及配置

http://kafka.apache.org/下載kafka安裝包:kafka_*.tgz,解壓後,配置server.properties檔案。

server.properties配置:

#本機在kafka叢集中的idbroker.id=48#服務埠port=9092#主機名host.name=storm01# The number of threads handling network requestsnum.network.threads=3# The number of threads doing disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600#kafka資料儲存位置(資料量大時,需要儲存的目錄大小也要充分)log.dirs=/data1/kafka-logs#預設topic建立partition的數量num.partitions=1# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1#kafka事件只有flash到硬碟才能被後續消費者消費,因此要配置flash時間引數,避免小資料量情況下資料重新整理時間過久log.flush.interval.messages=10000log.flush.interval.ms=1000# 資料在kafka中儲存的時間,單位小時,超時的資料kafka會自動刪除log.retention.hours=48# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according to the retention policieslog.retention.check.interval.ms=300000# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.log.cleaner.enable=false# zookeeper叢集配置zookeeper.connect=master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#是否能夠刪除topic的配置,預設false不能刪除topicdelete.topic.enable=true

Kafka服務啟動:jps命令可以看到kafka的程序名,說明kafka已經成功啟動。

nohup kafka-server-start.sh /home/hadoop/kafka_2.9.1-0.8.2.1/config/server.properties &

建立topic:建立複製因子2,有24個partition的topic,建立多個partition的目的是增加並行性,複製因子的目的是資料安全冗餘。

kafka-topics.sh --create --zookeeper master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181 --replication-factor 2 --partitions 24 --topic sighttp

kafka資料儲存方式:在kafka資料儲存目錄下,可以看到以每個-方式命名的資料夾,例如sighttp-19表示topic:sighttp,partition:19,如下圖所示:

圖三

進入topic-partition目錄,可以看到很多.index和.log結尾的檔案。其中.log是資料檔案,其中儲存的是kafka快取池中的資料,.index是索引檔案,資料檔案和索引檔案成對出現,檔名為一串數字,標識了該檔案中儲存資料的起始序列號,如下:

圖四

kafka資料消費狀態查詢:消費者從kafka消費資料狀態是記錄在zookeeper中的,使用zkCli.sh命令可以檢視,如下圖查詢了消費topic:sighttp,partition:0的狀態,offset表明已經處理到49259227840行,如下圖所示:

圖五

經驗:通過消費到的行數與儲存到的行數,可以判斷資料處理程式的速度是否滿足資料生成速度的需求。

kafka消費典型異常:

[2016-10-27 16:15:42,536] ERROR [Replica Manager on Broker 51]: Error when processing fetch request for partition [sighttp,3] offset 6535061966 from consumer with correlation id 0. Possible cause: Request for offset 6535061966 but we only have log segments in the range 6580106664 to 6797636149. (kafka.server.ReplicaManager)

異常原因:kafka中由於訊息過期已經把序號是6535061966的訊息刪除了,目前kafka中只有範圍是6580106664到6797636149的日誌,但是消費者還要處理過期刪除的訊息,那就會出現此異常訊息(通常是由於資料處理速度慢,無法滿足資料生成速度的要求,導致訊息積壓,積壓的訊息到達kafka配置的過期時間,被kafka刪除)。

c)Storm叢集安裝及配置

在http://storm.apache.org/下載Storm安裝包,建議使用Storm 0.10.0 released以上版本,因為最新版本修正了很多bug,特別是STORM-935的問題(拓撲啟動後會佔用大量系統資源,導致Topology執行不穩定)。

storm.yaml檔案配置:

#zookeeper叢集伺服器配置
storm.zookeeper.servers:
    - "master"
    - "storm01"
    - "storm02"
    - "storm03"
    - "storm04"
#storm主節點
nimbus.host: "master"
#strom管理頁面服務埠
ui.port: 8081
#storm從節點服務埠配置,預設6700-6703共4個埠,意味著每臺伺服器可以提供4個worker插槽,這裡增加了6704和6705埠,即為單臺伺服器增加了2個worker插槽,worker數增加意味著storm叢集可以提供更多的計算資源。
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703
 - 6704
 - 6705
#狀態資訊儲存位置,避免使用/tmp
storm.local.dir: "/home/hadoop/apache-storm-0.10.0/workdir"
#主節點的記憶體
nimbus.childopts: "-Xmx3072m"
#從節點的記憶體
supervisor.childopts: "-Xmx3072m"
#worker的記憶體,增加記憶體可以減少GC overload的問題
worker.childopts: "-Xmx3072m"
#預設為30,增加netty超時時長等引數,降低因Netty通訊問題,造成worker不穩定
storm.messaging.netty.max_retries:60
#增加storm.messaging.netty.max_wait_ms設定,預設為1000
storm.messaging.netty.max_wait_ms:2000

啟動服務:

  • 主節點:(啟動主節點服務和管理頁面) nohup storm nimbus & nohup storm ui &
  • 從節點: nohup storm supervisor &

Storm管理頁面:

瀏覽器輸入Storm UI所在伺服器地址+8081埠號,開啟Strom管理頁面如下圖:

圖六

從圖六Cluster Summary中可以看出Storm叢集共有4個Supervisor節點,因每臺Supervisor提供6個slot(如果在storm.yaml配置檔案中不配置supervisor.slots.ports屬性,則每個Supervisor預設提供4個slot),因此共有4*6=24個slot,已使用22個,還有2個空閒。需要注意的是每個拓撲一旦釋出,將長久佔用slot,如果沒有足夠的slot,最新發布的拓撲只會佔用空閒的slot,不會搶佔其他已經被佔用的slot資源;如果沒有slot,將無法釋出新的拓撲,此時需要挖潛Storm叢集伺服器,通過配置檔案增加slot資源或增加新的伺服器。

從圖六Topology Summary中可以看出,叢集上已經發布了7個Topology,每個Topology佔用的worker資源,啟動的executor執行緒數,具體資源佔用多少是在Storm Topology開發程式中指定的。

d)Kafka+Storm+Hdfs+Hbase拓撲開發

我們使用Eclipse建立MAVEN工程,在pom.xml配置檔案中新增Storm及Hdfs的相關依賴,本例是Storm從Kafka中消費資料,經過ETL處理後儲存到Hdfs和Hbase中,因此需要新增Storm-Kafka、Storm-Hdfs、Storm-Hbase等依賴,注意依賴包版本要與叢集一致。

抽取過程繼承BaseRichBolt類:

public class splitBolt extends BaseRichBolt {
    private static final String TAB = ",";
    private OutputCollector collector; 
    public void prepare(Map config,TopologyContext context,OutputCollector collector){
        this.collector=collector;
    }
    public void execute(Tuple input){
            String line=input.getString(0);
            String[] words=line.split(TAB);
            if (words.length>74)
            {
                String Account;
                if (words[0].length()>0) Account=words[0]; 
                else Account="NULL";
                String LocalIPv4;
                if (words[1].length()>0) LocalIPv4=words[1];
                else LocalIPv4="NULL";
                 String RemoteIPv4;
                if (words[2].length()>0) RemoteIPv4=words[2];
                else RemoteIPv4="NULL";
                String newline=Account+"|"+LocalIPv4+"|"+RemoteIPv4;
                collector.emit(input,new Values(newline));
            }
            collector.ack(input);
    }
     public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("newline"));
    }
}

寫Hbase需要實現HBaseMapper類:

public class myHbaseMapper implements HBaseMapper {
    public ColumnList columns(Tuple tuple) {
        String line=tuple.getString(0);
        String[] words=line.split("\|");
        ColumnList cols = new ColumnList();
         //引數依次是列族名,列名,值
        if (words[1].length()>0) cols.addColumn("content".getBytes(), "LocalIPv4".getBytes(), words[1].getBytes());
        if (words[2].length()>0) cols.addColumn("content".getBytes(), "RemoteIPv4".getBytes(), words[2].getBytes());
        return cols;
    }
     public byte[] rowKey(Tuple tuple) {
        String line=tuple.getString(0);
        String[] words=line.split("\|");
        String key;
        //rowkey設定成Account的反字串,便於hbase表內分割槽的資料均衡
        key=new StringBuilder(words[0]).reverse().toString();
        return key.getBytes();
    }
}

main函式:

public static void main(String[] args)
{  
    String zks = "master:2181,storm01:2181,storm02:2181 "; //zookeeper叢集
    String topic = "topicname"; //kafka中topic名稱
    String zkRoot = "/storm";//zookeeper中儲存狀態資訊的根目錄
    String id = "kafkatopicname";//zookeeper中儲存本拓撲狀態資訊的子目錄
    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    .withPath("/storm/tmp/").withPrefix("tmp_").withExtension(".dat");
    RecordFormat format = new DelimitedRecordFormat()
    .withFieldDelimiter("|"); //寫到hdfs的目錄檔名以’tmp_’開頭,’.dat’結尾
    //每10分鐘重寫一個hdfs的新檔案
    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(10.0f, TimeUnit.MINUTES);
    BrokerHosts brokerHosts = new ZkHosts(zks);
    //配置storm拓撲的spout
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());  
    spoutConf.zkServers = Arrays.asList(new String[] {"master", "storm01","storm02"});  
    spoutConf.zkPort = 2181;
    spoutConf.ignoreZkOffsets = false;//重啟拓撲時,需要從zookeeper中讀取偏移量
    //如果偏移量中的資料已經從kafka中刪除,則從kafka中儲存的最早資料開始處理。
    spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;    //配置hdfs bolt
    HdfsBolt hdfsBolt = new HdfsBolt()
    .withFsUrl("hdfs://hdfsmaster:9000")
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(format)
    .withRotationPolicy(rotationPolicy)
    //hdfs資料檔案寫完後,move到新目錄
    .addRotationAction(new MoveFileAction().toDestination("/storm/http/")); 
    //例項化HBaseMapper
    HBaseMapper mapper = new myHbaseMapper();
    //例項化HBaseBolt,指定hbase中的表名
    HBaseBolt hBolt = new HBaseBolt("hbasetable", mapper).withConfigKey("hbase.conf");
    TopologyBuilder builder = new TopologyBuilder();  
    //配置spout執行緒數為24,此數要與kafka中topic的partition數一致,partition數越多,則spout讀取資料的並行性越高,處理速度越快
    builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),24);    //配置bolt,此bolt開發處理邏輯,bolt可以串接多個
    builder.setBolt("etl", new splitBolt(), 24).shuffleGrouping("kafka-reader");  
    builder.setBolt("hdfs-bolt", hdfsBolt, 24).shuffleGrouping("etl");
    builder.setBolt("hbase-bolt", hBolt, 24).shuffleGrouping("etl");
    Config conf = new Config();
    //增加hbase配置,指定hbase在hdfs叢集上的目錄,zookeeper伺服器叢集
    Map<String, Object> hbConf = new HashMap<String, Object>();
    hbConf.put("hbase.rootdir", "hdfs://hdfsmaster:9000/hbase");
    hbConf.put("hbase.zookeeper.quorum","master,storm01,storm02");
    conf.put("hbase.conf", hbConf);    String name = sighttphdfs.class.getSimpleName();   
    if (args != null && args.length > 0) {  
        conf.put(Config.NIMBUS_HOST, args[0]);
        conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 
        //設定拓撲佔用worker數為4,根據實時處理資料量大小按需配置
        conf.setNumWorkers(4); 
        StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());  
    }
}

上面程式實現了Storm讀Kafka寫Hdfs和Hbase的例子,抽取類中可以根據不同的業務需求,通過Java程式碼實現不同的邏輯。編譯後的jar包上傳到叢集,使用storm命令列提交Topology:

storm jar ./kafkastream.jar sighdfs.sighttphdfs stormmaster

總結

經過幾個月的實際執行,我們的大資料實時處理架構能夠始終保持穩定,話單處理速度高於話單生成速度,有效的支撐了運營商大資料的各種分析查詢需求。開發和優化過程充滿挑戰,經過各種研究和嘗試,問題逐漸解決,在此我們也積累了大量的開發和優化經驗。

最後再分享2個我們實際遇到的問題:

  • Zookeeper配置造成Storm拓撲執行不穩定

因Storm叢集需要Zookeeper叢集作狀態同步,因此所有是Storm伺服器worker程序都會不停連線Zookeeper節點,Zookeeper節點的預設連線數是60,當Storm計算拓撲數量較多時,需要修改Zookeeper配置maxClientCnxns=1000,增加Zookeeper連線數。

  • Hdfs節點磁碟I/O高造成Storm拓撲執行不穩定

由於Storm是實時計算,每個環節的擁塞都將引起Storm拓撲的不穩定,在開發中我們遇到Hdfs某個節點磁碟I/O高,導致Storm寫Hdfs超時,最終引發Supervisor殺掉worker,造成拓撲不穩定的問題。究其原因是在某個Hdfs節點上,Yarn任務正在進行Reduce操作,用iostat -x 1 10命令檢視,Yarn的中間盤I/O長時間被100%佔用,同時Yarn的中間盤也是Hdfs的資料盤,導致寫入請求無法響應,最終導致Storm寫Hdfs的worker超時,引發拓撲執行不穩定。此處建議配置Yarn的中間盤時,不要使用作業系統根盤,不要使用Hdfs的資料盤,可以有效避免Storm寫Hdfs超時的問題。