spark安裝配置和程式碼框架(轉)
之前查閱原始碼啊,效能測試啊調優啊。。基本告一段落,專案也接近尾聲,那麼整理下spark所有配置引數與優化策略,方便以後開發與配置:
Spark安裝配置與程式碼框架
spark-default.conf 配置
spark.executor.instance 引數,向Yarn申請建立的資源池例項數
spark.executor.cores 引數,每個container中所包含的core數量
spark.executor.memory 引數,每個資源池所具有的記憶體數
spark.dirver.memory 引數,driver端所佔用的資源數
spark.storage.memoryFraction
spark.kryoserializer.buffer.max 引數,序列化最大值,預設64M
spark.shuffle.consolidateFiles 引數,shuffle是否合併檔案
spark.rdd.compress 引數,rdd是否進行壓縮
spark.sql.shuffle.partitions 引數,shuffle過程中所建立的partition個數
spark.reducer.maxSizeInFlight 引數,設定shuffle read task的buffer緩衝大小,它將決定每次資料從遠端的executors中拉取大小。這個拉取過程是由5個並行的request,從不同的executor中拉取過來,從而提升了fetch的效率。
spark.shuffle.io.retryWait 引數,每次拉取資料的等待間隔
spark.shuffle.manage 引數,使用hash,同時與引數spark.shuffle.consolidateFiles true並用。因為不需要對中間結果進行排序,同時合併中間檔案的個數,從而減少開啟檔案的效能消耗,在spark2.0.2中不可直接配置hash,會報錯,其他優化引數包括:Sort Shuffle、Tungsten Sort,這裡我們要根據資料量進行選擇,優缺點請參考本部落格《Spark Shuffle詳細過程》
spark.executor.heartbeatInterval
spark.driver.maxResultSize 引數,所有分割槽的序列化結果的總大小限制
spark.yarn.am.cores 引數,在yarn-client模式下,申請Yarn App Master所用的CPU核數
spark.master 引數,選用的模式
spark.task.maxFailures 引數,task失敗多少次後丟棄job(防止因為網路IO等問題失敗,重新拉取)
spark.shuffle.file.buffer 引數,會增大Map任務的寫磁碟前的cache快取
spark-env.sh 配置
export HADOOP_CONF_DIR 引數,配置hadoop所在配置檔案路徑
export HADOOP_HOME 引數,配置hadoop Client的所在路徑
export JAVA_HOME 引數,配置JAVA的環境變數地址
export SPARK_YARN_APP_NAME 引數,配置application的名稱
export SPARK_LOG_DIR 引數,配置Spark log的輸出路徑
export SPARK_PID_DIR 引數,配置spark的Pid輸出路徑
將hive-site.xml檔案放入spark的conf下 修改spark thrift port,使其與hive的thrift的port分離開來,同時配置mysql的資料來源,因為hive的meta資訊存在mysql中,以及配置meta指定的hdfs路徑:
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface.
Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
Spark動態資源分配測試:
spark.dynamicAllocation.cachedExecutorIdleTimeout 360000000 如果executor中有資料則不移除
spark.dynamicAllocation.executorIdleTimeout 60s executor空閒時間達到規定值,則將該executor移除
spark.dynamicAllocation.initialExecutors 3 如果所有的executor都移除了,重新請求時啟動的初始executor數
spark.dynamicAllocation.maxExecutors 30 能夠啟動的最大executor數目
spark.dynamicAllocation.minExecutors 1 能夠啟動的最小executor數目
spark.dynamicAllocation.schedulerBacklogTimeout 1s task等待執行時間超過該值後開始啟動executor
spark.dynamicAllocation.enabled True 開啟動態引數配置
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 1s 啟動executor的時間間隔
啟動指令碼:
/usr/local/spark-1.6.1/sbin/start-thriftserver.sh \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=30 \
--conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout = 5s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1s \
--conf spark.dynamicAllocation.initialExecutors=2 \
--conf spark.dynamicAllocation.executorIdleTimeout=60s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=360000000s \
--conf spark.driver.memory=50g
程式碼框架:
首先,我們引入需要依賴的包括hadoop、spark、hbase等jar包,pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sparkApp</groupId> <artifactId>sparkApp</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.10.0</scala.version> <spring.version>4.0.2.RELEASE</spring.version> <hadoop.version>2.6.0</hadoop.version> <jedis.version>2.8.1</jedis.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <!-- SPARK START --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <!--<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.1</version> </dependency> <!-- SPARK END --> <!-- HADOOP START --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>jsp-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HADOOP END --> <!-- hbase START --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core</artifactId> <version>3.1.0-incubating</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>1.0.2</version> </dependency> <!-- hbase END --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>11.0.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <!-- REDIS START --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency> <!-- REDIS END --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.26</version> </dependency> <!--<dependency> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-api-jdo</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-rdbms</artifactId> <version>3.2.9</version> </dependency>--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha1</version> </dependency> <!-- <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>1.2.1</version> </dependency>--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>1.2.1</version> </dependency> </dependencies> <build> <!-- <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory>--> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
然後將叢集中hive-site.xml、hdfs-site.xml、hbase-site.xml引入專案中。
編寫HBase公共方法(部分程式碼):
1 package hbase 2 3 import java.util.{Calendar, Date} 4 5 import org.apache.hadoop.hbase.HBaseConfiguration 6 import org.apache.hadoop.hbase.client.{Result, Scan} 7 import org.apache.hadoop.hbase.filter._ 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 9 import org.apache.hadoop.hbase.mapreduce.TableInputFormat 10 import org.apache.hadoop.hbase.protobuf.ProtobufUtil 11 import org.apache.hadoop.hbase.util.{Base64, Bytes} 12 import org.apache.spark.{Logging, SparkContext} 13 import org.apache.spark.rdd.RDD 14 import org.slf4j.{Logger, LoggerFactory} 15 16 /** 17 * Created by ysy on 2016/11/6. 18 */ 19 object HBaseTableHelper extends Serializable { 20 21 val logger: Logger = LoggerFactory.getLogger(HBaseTableHelper.getClass) 22 23 //根據timestramp過濾載入Hbase資料 24 def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = { 25 val configuration = HBaseConfiguration.create() 26 configuration.set(TableInputFormat.INPUT_TABLE, tablename) 27 28 val scan = new Scan 29 scan.setTimeRange(fromdate.getTime,todate.getTime) 30 val column = columns.split(",") 31 for(columnName <- column){ 32 scan.addColumn("f1".getBytes, columnName.getBytes) 33 } 34 configuration.set(TableInputFormat.SCAN, convertScanToString(scan)) 35 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) 36 logger.info("-------count-------" + hbaseRDD.count() + "------------------") 37 hbaseRDD 38 } 39 40 def convertScanToString(scan : Scan) = { 41 val proto = ProtobufUtil.toScan(scan) 42 Base64.encodeBytes(proto.toByteArray) 43 } 44 45 //根據時間條件filter資料 46 def tableInitByFilter(sc : SparkContext,tablename : String,columns : String,time : String) : RDD[(ImmutableBytesWritable,Result)] = { 47 val configuration = HBaseConfiguration.create() 48 configuration.set(TableInputFormat.INPUT_TABLE,tablename) 49 val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator(time)) 50 val scan = new Scan 51 scan.setFilter(filter) 52 val column = columns.split(",") 53 for(columnName <- column){ 54 scan.addColumn("f1".getBytes, columnName.getBytes) 55 } 56 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) 57 logger.info("-------count-------" + hbaseRDD.count() + "------------------") 58 hbaseRDD 59 } 60 61 def HBaseTableInit(): Unit ={ 62 63 } 64 65 def hbaseToHiveTable(): Unit ={ 66 67 } 68 69 //前N天的時間戳獲取 70 def getPassDays(beforeDay : Int): Date ={ 71 val calendar = Calendar.getInstance() 72 var year = calendar.get(Calendar.YEAR) 73 var dayOfYear = calendar.get(Calendar.DAY_OF_YEAR) 74 var j = 0 75 for(i <- 0 to beforeDay){ 76 calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - j); 77 if (calendar.get(Calendar.YEAR) < year) { 78 //跨年了 79 j = 1; 80 //更新 標記年 81 year = year + 1; 82 //重置日曆 83 calendar.set(year,Calendar.DECEMBER,31); 84 //重新獲取dayOfYear 85 dayOfYear = calendar.get(Calendar.DAY_OF_YEAR); 86 }else{ 87 j = j + 1 88 } 89 } 90 calendar.getTime() 91 } 92 93 //根據startRow與endRow進行過濾 94 def scanHbaseByStartAndEndRow(sc : SparkContext,startRow : String,stopRow : String,tableName : String) : RDD[(ImmutableBytesWritable,Result)] ={ 95 val configuration = HBaseConfiguration.create() 96 val scan = new Scan() 97 scan.setCacheBlocks(false) 98 scan.setStartRow(Bytes.toBytes(startRow)) 99 scan.setStopRow(Bytes.toBytes(stopRow)) 100 val filterList = new FilterList() 101 filterList.addFilter(new KeyOnlyFilter()) 102 filterList.addFilter(new InclusiveStopFilter(Bytes.toBytes(stopRow))) 103 scan.setFilter(filterList) 104 configuration.set(TableInputFormat.INPUT_TABLE,tableName) 105 configuration.set(TableInputFormat.SCAN, convertScanToString(scan)) 106 val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) 107 logger.info("-------ScanHbaseCount-------" + hbaseRDD.count() + "------------------") 108 hbaseRDD 109 } 110 111 }
編寫hive公共方法(部分程式碼):
1 package hive 2 3 import org.apache.spark.{Logging, SparkContext} 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.Row 6 import org.apache.spark.sql.hive.HiveContext 7 import org.apache.spark.sql.types.{StringType, StructField, StructType} 8 9 /** 10 * Created by uatcaiwy on 2016/11/6. 11 */ 12 object HiveTableHelper extends Logging { 13 14 def hiveTableInit(sc:SparkContext): HiveContext ={ 15 val sqlContext = new HiveContext(sc) 16 sqlContext 17 } 18 19 def writePartitionTable(HCtx:HiveContext,inputRdd:RDD[Row],tabName:String,colNames:String):Unit ={ 20 val schema = StructType( 21 colNames.split(" ").map(fieldName => StructField(fieldName,StringType,true)) 22 ) 23 val table = colNames.replace(" dt","").split(" ").map(name => name + " String").toList.toString().replace("List(","").replace(")","") 24 val df = HCtx.createDataFrame(inputRdd,schema) 25 df.show(20) 26 logInfo("----------------------------------begin write table-----------------------------------") 27 val temptb = "temp" + tabName 28 HCtx.sql("drop table if exists " + tabName) 29 df.registerTempTable(temptb) 30 HCtx.sql("CREATE EXTERNAL TABLE if not exists " + tabName +" ("+ table+ ") PARTITIONED BY (`dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileIn putFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'") 31 HCtx.sql("set hive.exec.dynamic.partition.mode = nonstrict") 32 HCtx.sql("insert overwrite table " + tabName + " partition(`dt`)" + " select * from " + temptb) 33 } 34 }
讀取hdfs檔案,有時我們需要根據檔案的編碼來讀取,否則會亂碼,並改變編碼公共方法:
1 package importSplitFiletoHive 2 3 import org.apache.hadoop.io.{LongWritable, Text} 4 import org.apache.hadoop.mapred.TextInputFormat 5 import org.apache.spark.SparkContext 6 import org.apache.spark.rdd.RDD 7 8 /** 9 * Created by ysy on 2016/12/7. 10 */ 11 object changeEncode { 12 13 def changeFileEncoding(sc:SparkContext,path:String,encode : String):RDD[String]={ 14 sc.hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1) 15 .map(p => new String(p._2.getBytes,0,p._2.getLength,encode)) 16 }
spark進行xml解析(部分程式碼):
1 import hive.HiveTableHelper 2 import org.apache.spark.Logging 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.sql.Row 5 import org.apache.spark.sql.hive.HiveContext 6 import org.apache.spark.sql.types.{StringType, StructField, StructType} 7 import org.slf4j.LoggerFactory 8 9 import scala.xml._ 10 11 object xmlParse extends Logging{ 12 val schemaString = "column1,column2...." 13 def getPBOC_V1_F1(HCtx:HiveContext,rdd:RDD[String],outputTablename:String):Unit = { 14 val tbrdd = rdd.filter(_.split("\t").length == 9) 15 .filter(_.split("\t")(8) != "RESULTS") 16 .map(data => { 17 val sp = data.split("\t") 18 val dt = sp(5).substring(0, 10).replaceAll("-", "") 19 ((sp(0), sp(1), sp(2), sp(3), sp(4), sp(5), sp(6), "RN"), sp(8), dt) 20 }).filter(_._2 != "") 21 .filter(_._2.split("<").length > 2) 22 .filter(data => !(data._2.indexOf("SingleQueryResultMessage0009") == -1 || data._2.indexOf("ReportMessage") == -1)) 23 .map(data => { 24 val xml = if (XML.loadString(data._2) != null) XML.loadString(data._2) else null 25 logDebug("%%%%%%%%%%%%%%%%%%finding xml-1:" + xml + "%%%%%%%%%%%%%%%%%%") 26 val column1 = if ((xml \ "PBOC" \ "TYPE") != null) (xml \ "PBOC" \ "TYPE").text else "null" 27 val column2 = if ((xml \ "HEAD" \ "VER") != null) (xml \ "HEAD" \ "VER").text else "null" 28 val column3 = if ((xml \ "HEAD" \ "SRC") != null) (xml \ "HEAD" \ "SRC").text else "null" 29 val column4 = if ((xml \ "HEAD" \ "DES") != null) (xml \ "HEAD" \ "DES").text else "null" 30 .... 31 (data._1,column1,column2,column3...) 32 }) 33 ROW(....) 34 HiveTableHelper.writePartitionTable(HCtx, tbrdd, outputTablename, schemaString)
Redis編碼公共方法(部分程式碼):
1 package redis 2 3 import org.slf4j.{Logger, LoggerFactory} 4 import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} 5 6 import scala.collection.mutable.ArrayBuffer 7 import scala.util.Random 8 9 /** 10 * Created by ysy on 2016/11/21. 11 */ 12 object RedisClient extends Serializable{ 13 val logger: Logger = LoggerFactory.getLogger(RedisClient.getClass) 14 @transient private var jedisPool : JedisPool = null 15 private var jedisPoolList = new ArrayBuffer[JedisPool] 16 private val poolSize = 0 17 makePool 18 19 def makePool() ={ 20 val pc = new PropConfig("redis.properties"); 21 if(jedisPool == null){ 22 val poolConfig : JedisPoolConfig = new JedisPoolConfig 23 poolConfig.setMaxIdle(pc.getProperty("redis.pool.maxActive").toInt) 24 poolConfig.setMaxTotal(pc.getProperty("redis.pool.maxActive").toInt) 25 poolConfig.setMaxWaitMillis(pc.getProperty("redis.pool.maxWait").toInt) 26 poolConfig.setTestOnBorrow(pc.getProperty("redis.pool.testOnBorrow").toBoolean) 27 poolConfig.setTestOnReturn(pc.getProperty("redis.pool.testOnReturn").toBoolean) 28 val hosts = pc.getProperty("redis.pool.servers").split(",") 29 .map(data => data.split(":")) 30 for(host <- hosts){ 31 jedisPool = new JedisPool(poolConfig,host(0),host(1).toInt,pc.getProperty("redis.server.timeout").toInt) 32 jedisPoolList += jedisPool 33 } 34 } 35 36 } 37 38 39 def getForString(key : String) : String = { 40 var value = "" 41 if(key != null && !key.isEmpty()){ 42 val jedis = getJedis 43 value = jedis.get(key) 44 } 45 value 46 } 47 48 def setForString(key : String,value : String) ={ 49 if(key != null && !key.isEmpty){ 50 var jedis = getJedis 51 jedis.set(key,value) 52 }else{ 53 54 } 55 } 56 57 def zexsist(key : String) : Boolean ={ 58 var flag = false 59 if(key != null && !key.isEmpty){ 60 val jedis = getJedis 61 62 val resultNum = jedis.zcard(key) 63 if("0".equals(resultNum.toLong)){ 64 flag = true 65 } 66 } 67 flag 68 } 69 70 def getJedis() : Jedis ={ 71 var ramNub = 0 72 if(poolSize == 1){ 73 ramNub = 0 74 }else{ 75 val random = new Random 76 ramNub = Math.abs(random.nextInt % 1) 77 } 78 jedisPool = jedisPoolList(ramNub) 79 jedisPool.getResource 80 } 81 82 def returnJedisResource(redis : Jedis): Unit ={ 83 if(redis != null){ 84 redis.close() 85 } 86 } 87 88 def close: Unit = { 89 for(jedisPool <- jedisPoolList){ 90 jedisPool.close 91 } 92 if(jedisPool!=null && !jedisPool.isClosed()){ 93 jedisPool.close 94 }else{ 95 jedisPool=null 96 } 97 } 98 99 }
詳細就不寫了,那麼完整的工程框架搭建完畢:
隨後通過main方法建立sparkContext物件,開始資料分析與處理,在spark路徑的bin目錄下或者寫成指令碼檔案執行:
./spark-submit --conf spark.ui.port=5566 --name "sparkApp" --master yarn-client --num-executors 3 --executor-cores 2 --executor-memory 10g --class impl.spark /usr/local/spark1.6.1/sparkApp/sparkApp.jar
(注意:這裡的配置引數會覆蓋spark-default.conf中配置的變數,重新宣告spark.ui.port的原因也是因為在同時啟動spark的thrfit的時候,提交submit會造成UI佔用的問題,至此spark完結)
Hadoop安裝配置與MapReduce程式碼框架
安裝:
yum install gcc
yum install gcc-c++
yum install make
yum install autoconfautomake libtool cmake
yum install ncurses-devel
yum install openssl-devel
安裝protoc(需用root使用者)
tar -xvf protobuf-2.5.0.tar.bz2
cd protobuf-2.5.0
./configure --prefix=/opt/protoc/
make && make install
編譯hadoop
mvn clean package -Pdist,native -DskipTests -Dtar
編譯完的hadoop在 /home/hadoop/ocdc/hadoop-2.6.0-src/hadoop-dist/target 路徑下
配置hosts檔案
10.1.245.244 master
10.1.245.243 slave1
命令列輸入 hostname master
免密碼登入:
執行命令生成金鑰: ssh-keygen -t rsa -P ""
進入資料夾cd .ssh (進入資料夾後可以執行ls -a 檢視檔案)
將生成的公鑰id_rsa.pub 內容追加到authorized_keys(執行命令:cat id_rsa.pub >> authorized_keys)
core-site.xml
<configuration>
<!--指定hdfs的nameservice為ns1-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<!--指定hadoop資料存放目錄-->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/tmp</value>
<description>Abasefor other temporary directories.</description>
</property>
<property>
<name>hadoop.proxyuser.spark.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.spark.groups</name>
<value>*</value>
</property>
</configuration>
<!--指定zookeeper地址-->
<property>
<name>ha.zookeeper.quorum</name>
<value>h4:2181,h5:2181,h6:2181</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:9001</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/ocdc/hadoop-2.6.0/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- ns1下面有兩個NameNode,分別是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<!-- 指定nodemanager啟動時載入server的方式為shuffle server -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8035</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
</property>
<!-- 指定resourcemanager地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>h3</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>16384</value>
</property>
</configuration>
關於MapReduce,1年多前大家都覺得很神祕,其實 就相當於在Map階段或者Reduce階段中,進行資料的處理,也可以在Map中讀取寫入hbase、redis都可以~其實就相當於在MapReduce中寫業務處理邏輯,程式碼如下:
1 public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{ 2 //設定常量1,用來形成<word,1>形式的輸出 3 private fianll static IntWritable one = new IntWritable(1) 4 private Text word = new Text(); 5 6 public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{ 7 //hadoop執行map函式時為是一行一行的讀取資料處理,有多少行,就會執行多少次map函式 8 String line = value.toString(); 9 //進行單詞的分割,可以多傳入進行分割的引數 10 StringTokenizer tokenizer = new StringTokenizer(line); 11 //遍歷單詞 12 while(tokenizer.hasMoreTokens()){ 13 //往Text中寫入<word,1> 14 word.set(tokenizer.nextToken()); 15 output.collect(word,one); 16 } 17 } 18 } 19 //需要注意的是,reduce將相同key值(這裡是word)的value值收集起來,形成<word,list of 1>的形式,再將這些1累加 20 public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{ 21 public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{ 22 //初始word個數設定 23 int sum = 0; 24 while(values,hasNext()){ 25 //單詞個數相加 26 sum += value.next().get(); 27 } 28 output.collect(key,new IntWritbale(sum)); 29 } 30 }
HBASE安裝配置與hbase程式碼框架
由於我使用的是外接的zookeeper,所以這裡HBASE_MANAGES_ZK設定為,設定引數:
export JAVA_HOME=/usr/local/yangsy/jdk1.7.0_55
export HBASE_CLASSPATH=/usr/local/hbase-1.0.2/conf
export HBASE_MANAGES_ZK=false
hbase-site.xml
<configuration>
//設定將資料寫入hdfs的目錄
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/usr/local/hadoop-2.6.0/hbaseData</value>
</property>
//設定hbase的模式為叢集模式
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
//設定hbase的master埠地址
<property>
<name>hbase.master</name>
<value>hdfs://master:60000</value>
</property>
//HBase Master Web借鑑繫結的預設埠,預設為0.0.0.0
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
</property>
//設定zookeeper的連線地址(必須為基數個)
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2183</value>
</property>
//zookeeper的節點
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
//zookeeper資料地址
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/usr/local/zookeeper-3.4.6/data</value>
</property>
//zookeeper連線超時時間
<property>
<name>zookeeper.session.timeout</name>
<value>60000</value>
</property>
</configuration>
這裡要注意的是,如果選擇外接的zookeeper叢集,則需要將zookeeper的zoo.cfg拷貝至HBase的conf下。在啟動HBase時,將會自動載入該配置檔案。
regionServers中配置regionserver節點的地址
程式碼結構:
1 package HbaseTest; 2 3 import akka.io.Tcp; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.hbase.*; 6 import org.apache.hadoop.hbase.client.*; 7 8 import java.util.ArrayList; 9 import java.util.List; 10 11 /** 12 * Created by root on 5/30/16. 13 */ 14 public class HbaseTest { 15 private Configuration conf; 16 public void init(){ 17 conf = HBaseConfiguration.create(); 18 } 19 20 public void createTable(){ 21 Connection conn = null; 22 try{ 23 conn = ConnectionFactory.createConnection(conf); 24 HBaseAdmin hadmin = (HBaseAdmin)conn.getAdmin(); 25 HTableDescriptor desc = new HTableDescriptor("TableName".valueOf("yangsy")); 26 27 desc.addFamily(new HColumnDescriptor("f1")); 28 if(hadmin.tableExists("yangsy")){ 29 System.out.println("table is exists!"); 30 System.exit(0); 31 }else{ 32 hadmin.createTable(desc); 33 System.out.println("create table success"); 34 } 35 }catch (Exception e){ 36 e.printStackTrace(); 37 }finally { 38 { 39 if(null != conn){ 40 try{ 41 conn.close(); 42 }catch(Exception e){ 43 e.printStackTrace(); 44 } 45 } 46 } 47 } 48 } 49 50 public void query(){ 51 Connection conn = null; 52 HTable table = null; 53 ResultScanner scan = null; 54 try{ 55 conn = ConnectionFactory.createConnection(conf); 56 table = (HTable)conn.getTable(TableName.valueOf("yangsy")); 57 58 scan = table.getScanner(new Scan()); 59 60 for(Result rs : scan){ 61 System.out.println("rowkey:" + new String(rs.getRow())); 62 63 for(Cell cell : rs.rawCells()){ 64 System.out.println("column:" + new String(CellUtil.cloneFamily(cell))); 65 66 System.out.println("columnQualifier:"+new String(CellUtil.cloneQualifier(cell))); 67 68 System.out.println("columnValue:" + new String(CellUtil.cloneValue(cell))); 69 70 System.out.println("----------------------------"); 71 } 72 } 73 }catch(Exception e){ 74 e.printStackTrace(); 75 }finally{ 76 try { 77 table.close(); 78 if(null != conn) { 79 conn.close(); 80 } 81 }catch (Exception e){ 82 e.printStackTrace(); 83 } 84 } 85 } 86 87 public void queryByRowKey(){ 88 Connection conn = null; 89 ResultScanner scann = null; 90 HTable table = null; 91 try { 92 conn = ConnectionFactory.createConnection(conf); 93 table = (HTable)conn.getTable(TableName.valueOf("yangsy")); 94 95 Result rs = table.get(new Get("1445320222118".getBytes())); 96 System.out.println("yangsy the value of rokey:1445320222118"); 97 for(Cell cell : rs.rawCells()){ 98 System.out.println("family" + new String(CellUtil.cloneFamily(cell))); 99 System.out.println("value:"+new String(CellUtil.cloneValue(cell))); 100 } 101 }catch (Exception e){ 102 e.printStackTrace(); 103 }finally{ 104 if(null != table){ 105 try{ 106 table.close(); 107 }catch (Exception e){ 108 e.printStackTrace(); 109 } 110 } 111 } 112 } 113 114 public void insertData(){ 115 Connection conn = null; 116 HTable hTable = null; 117 try{ 118 conn = ConnectionFactory.createConnection(conf); 119 hTable = (HTable)conn.getTable(TableName.valueOf("yangsy")); 120 121 Put put1 = new Put(String.valueOf("1445320222118").getBytes()); 122 123 put1.addColumn("f1".getBytes(),"Column_1".getBytes(),"123".getBytes()); 124 put1.addColumn("f1".getBytes(),"Column_2".getBytes(),"456".getBytes()); 125 put1.addColumn("f1".getBytes(),"Column_3".getBytes(),"789".getBytes()); 126 127 Put put2 = new Put(String.valueOf("1445320222119").getBytes()); 128 129 put2.addColumn("f1".getBytes(),"Column_1".getBytes(),"321".getBytes()); 130 put2.addColumn("f1".getBytes(),"Column_2".getBytes(),"654".getBytes()); 131 put2.addColumn("f1".getBytes(),"Column_3".getBytes(),"987".getBytes()); 132 133 List<Put> puts = new ArrayList<Put>(); 134 puts.add(put1); 135 puts.add(put2); 136 hTable.put(puts); 137 }catch(Exception e){ 138 e.printStackTrace(); 139 }finally{ 140 try { 141 if (null != hTable) { 142 hTable.close(); 143 } 144 }catch(Exception e){ 145 e.printStackTrace(); 146 } 147 } 148 } 149 150 public static void main(String args[]){ 151 HbaseTest test = new HbaseTest(); 152 test.init(); 153 test.createTable(); 154 test.insertData(); 155 test.query(); 156 } 157 158 159 }
Storm安裝配置與程式碼框架
拓撲構造:
編寫topology實體類,在構造方法中加入配置引數,序列化等。通過CommandlLine獲取啟動時的workers與calc數量,最終呼叫StormSubmitter的submitTopologyWithProgressBar,傳入topo的名稱,配置項,以及TopologyBuilder例項。
資料接收Spout:
Storm從kafka中獲取資料,創建於BasicTopology,其中配置引數:
kafka.brokerZkStr Kafka使用的zookeeper伺服器地址
kafka.brokerZkPath 儲存offset的zookeeper伺服器地址
kafka.offset.zkPort 儲存offset的zookeeper埠 預設2181
kafka.offset.zkRoot 儲存offset的zookeeper路徑 /kafka-offset
stateUpdateIntervalMs 把offset資訊寫入zookeeper的間隔時間 30000
spout、bolt初始化時,構建物件stormBeanFactory,其後使用getBean方法從BeanFactory中獲取物件
Bolt資料處理:
自定義bolt繼承自extendsBaseRichBolt,實現它的prepare、declareOutputFileds方法。在prepare方法中,獲取StormBeanFactory的配置,載入業務實體類。
Storm配置引數:
dev.zookeeper.path : '/tmp/dev-storm-zookeeper' 以dev.zookeeper.path配置的值作為本地目錄,以storm.zookeeper.port配置的值作為埠,啟動一個新的zookeeper服務
drpc.childopts: '-Xms768m'
drpc.invocations.port 3773
drpc.port : 3772
drpc.queue.size : 128
drpc.request.timeout.secs : 600
drpc.worker.threads : 64
java.library.path : ''
logviewer.appender.name : 'A1'
logviewer.childopts : '-Xms128m'
logviewr.port : 8000
metrics.reporter.register : 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter'
nimbus.childopts : '-Xmx1024m -javaagent:/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8649,wireformat31x-true...'
nimbus.cleanup.inbox.freq.secs : 600
nimbus.file.copy.exiration.secs : 600
storm.yaml
nimbus.host nimbus的配置地址
nimbus.inbox.jar.expiration.secs 主要負責清理nimbus的inbox資料夾最後一次修改時間,預設3600秒
nimbus.monitor.freq.secs : 120
nimbus.reassign : true 當發現task失敗時nimbus是否重新分配執行。預設為真,不建議修改
nimbus.supervisor.timeout.secs : 60
nimbus.task.launch.secs : 120
nimbus.task.timeout.secs : 30 心跳超時時間,超時後nimbus會認為task死掉並重分配給另一個地址
nimbus.thrift.max_buffer_size : 1048576
nimbus.thrift.port : 6627
nimbus.topology.validator : 'backtype.storm.nimbus.DefaultTopologyValidator'
storm.cluster.metrics.consumer.register : [{"class" : "org.apache.hadoop.metrics2.sink.storm.Strorm.StormTimeLineMetricsReporter"}]
storm.cluster.mode : 'distributed'
storm.local.dir : '/storage/disk1/hadoop/storm'
storm.local.mode.zmq : false
storm.log.dir : '/var/log/storm'
storm.messaging.netty.buffer_size : 5242880 為每次批量傳送的Tuple 序列化之後的Task Message 訊息的大小
storm.messaging.netty.client_worker_threads : 10 指定netty伺服器工作執行緒數量
storm.messaging.netty.max_retries : 60 指定最大重試次數
storm.messaging.netty.max_wait_ms : 2000 指定最大等待時間(毫秒)
storm.messaging.netty.min_wait_ms : 100 指定最小等待時間(毫秒)
storm.messaging.netty.server_worker_threads : 10 指定netty伺服器工作執行緒數量
storm.messaging.transport : 'backtype.storm.messaging.netty.Context' 指定傳輸協議
storm.thrift.transport : 'backtype.storm.security.auth.SimpleTransportPlugin'
storm.zookeeper.connection.timeout : 15000
storm.zookeeper.port : 2181
storm.zookeeper.retry.interval : 1000
storm.zookeeper.retry.intervalceiling.millis : 30000
storm.zookeeper.retry.times : 5
storm.zookeeper.root : '/storm' ZooKeeper中Storm的根目錄位置
storm.zookeeper.servers : ['',''.....] zookeeper伺服器列表
storm.zookeeper.session.timeout : 20000 客戶端連線ZooKeeper超時時間
supervisor.childopts : 在storm-deploy專案中使用,用來配置supervisor守護程序的jvm選項
supervisor.heartbeat.frequency.secs : 5 supervisor心跳傳送頻率(多久傳送一次)
supervisor.monitor.frequency.secs : 3 supervisor檢查worker心跳的頻率
supervisor.slots.ports : [6700,6701,....] supervisor上能夠執行workers的埠列表.每個worker佔用一個埠,且每個埠只執行一個worker.通過這項配置可以調整每臺機器上執行的worker數.(調整slot數/每機)
supervisor.worker.start.timeout.secs : 120
supervisor.worker.timeout.secs : 30 supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker程序.
task.heartbeat.frequency.secs : 3 task彙報狀態心跳時間間隔
task.refresh.poll.secs : 10 ask與其他tasks之間連結同步的頻率.(如果task被重分配,其他tasks向它傳送訊息需要重新整理連線).一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。
topology.acker.executors : null
topology.builtin.metrics.bucket.size.secs : 60
topology.debug : false
topology.disruptor.wait.strategy : 'com.lmax.disruptor.BlockingWaitStrategy'
topology.enable.message.timeouts : true
topology.error.throttle.interval.secs : 10
topology.executor.receive.buffer.size : 1024
topology.executor.send.buffer.size : 1024
topology.fall.back.on.java.serialization : true
topology.kryo.factory : 'backtype.storm.serialization.DefaultKryoFactory'
topology.max.error.report.per.interval : 5
topology.max.spout.pending : null 一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology.
topology.max.task.parallelism : null 在一個topology中能夠允許的最大元件並行度.該項配置主要用在本地模式中測試執行緒數限制.
topology.message.timeout.secs : 30 topology中spout傳送訊息的最大處理超時時間.如果一條訊息在該時間視窗內未被成功ack,Storm會告知spout這條訊息失敗。而部分spout實現了失敗訊息重播功能。
topology.metrics.aggregate.metric.evict.secs : 5
topology.metrics.aggregate.per.worker : true
topology.metrics.consumer.register :
topology.metrics.expand.map.type : true
topology.metrics.metric.name.separator : ','
topology.optimize : true
topology.skip.missing.kryo.registrations : false Storm是否應該跳過它不能識別的kryo序列化方案.如果設定為否task可能會裝載失敗或者在執行時丟擲錯誤.
topology.sleep.spout.wait.strategy.time.ms : 1
topology.spoout.wait.strategy : 'backtype.storm.spout.SleeSpoutWaitStrategy'
topology.state.synchronization.timeout.secs : 60
topology.stats.sample.rate : 0.05
topology.tick.tuple.freq.secs : null
topology.transfer.buffer.size : 1024
topology.trident.batch.emit.interval.millis : 500
topology.tuple.serializer : 'backtype.storm.serialization.types.ListDelegateSerializer'
topology.worker.childopts : null
topology.worker.shared.thread.pool.size : 4
topology.workers : 40 執行該topology叢集中應當啟動的程序數量.每個程序內部將以執行緒方式執行一定數目的tasks.topology的元件結合該引數和並行度提示來優化效能
transactional.zookeeper.port : null
transactional.zookeeper.root : '/transactional'
transactional.zookeeper.servers : null
ui.childopts : '-Xmx2048m'
ui.filter : null
ui.port : 8744 Storm UI的服務埠
worker.childopts :
worker.heartbeet.frequency.secs : 1
zmq.hwm : 0
zmq.linger.millis : 5000 當連線關閉時,連結嘗試重新發送訊息到目標主機的持續時長.這是一個不常用的高階選項,基本上可以忽略.
zmq.threads : 1 每個worker程序內zeromq通訊用到的執行緒數
storm程式碼框架總結:
基類topology,BasicTopology.java
1 import java.io.UnsupportedEncodingException; 2 import java.math.BigDecimal; 3 import java.util.Date; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7 8 import org.apache.commons.cli.CommandLine; 9 import org.apache.commons.cli.CommandLineParser; 10 import org.apache.commons.cli.DefaultParser; 11 import org.apache.commons.cli.HelpFormatter; 12 import org.apache.commons.cli.Options; 13 import org.apache.commons.lang3.StringUtils; 14 import org.apache.storm.guava.base.Preconditions; 15 import org.edm.storm.topo.util.DelayKafkaSpout; 16 import org.edm.storm.topo.util.StormBeanFactory; 17 import org.joda.time.DateTime; 18 19 import storm.kafka.BrokerHosts; 20 import storm.kafka.KafkaSpout; 21 import storm.kafka.SpoutConfig; 22 import storm.kafka.ZkHosts; 23 import backtype.storm.Config; 24 import backtype.storm.LocalCluster; 25 import backtype.storm.StormSubmitter; 26 import backtype.storm.topology.TopologyBuilder; 27 import backtype.storm.tuple.Fields; 28 29 public abstract class BasicTopology { 30 31 public static final String HASH_TAG = "hashTag"; 32 33 public static final Fields HASH_FIELDS = new Fields(HASH_TAG); 34 35 protected Options options = new Options(); 36 37 protected StormBeanFactory stormBeanFactory; 38 39 protected Config config = new Config(); 40 41 protected String configFile; 42 43 public BasicTopology(){ 44 config.setFallBackOnJavaSerialization(false); 45 46 config.setSkipMissingKryoRegistrations(false); 47 config.registerSerialization(Date.class); 48 config.registerSerialization(BigDecimal.class); 49 config.registerSerialization(HashMap.class); 50 config.registerSerialization(Map.class); 51 52 options.addOption("name", true, "拓撲執行時名稱"); 53 options.addOption("conf", false, "配置檔案路徑"); 54 options.addOption("workers", true, "虛擬機器數量"); 55 } 56 57 protected void setupConfig(CommandLine cmd) throws UnsupportedEncodingException{ 58 //配置檔名稱 59 String confLocation = cmd.getOptionValue("conf",getConfigName()); 60 //建立stormBeanFactory 61 stormBeanFactory = new StormBeanFactory(confLocation); 62 Map<String,Object> stormConfig = stormBeanFactory.getBean("stormConfig",Map.class); 63 Preconditions.checkNotNull(stormConfig); 64 config.putAll(stormConfig); 65 config.put(StormBeanFactory.SPRING_BEAN_FACTORY_XML, stormBeanFactory.getXml()); 66 //先預設載入,然後再載入命令列 67 String numWorkers = cmd.getOptionValue("workers"); 68 if(numWorkers != null){ 69 config.setNumWorkers(Integer.parseInt(numWorkers)); 70 }else{ 71 config.setNumWorkers(getNumWorkers()); 72 } 73 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 180); 74 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2000); 75 config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); 76 config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); 77 config.put(Config.TOPOLOGY_ACKER_EXECUTORS, config.get(Config.TOPOLOGY_WORKERS)); 78 } 79 80 @SuppressWarnings("unchecked") 81 protected SpoutConfig getSpoutConfig(String topic){ 82 String brokerZkStr = (String)config.get("kafka.brokerZkStr"); 83 String brokerZkPath = (String)config.get("kafka.brokerZkPath"); 84 85 List<String> zkServers = (List<String>)config.get("kafka.offset.zkServers"); 86 Integer zkPort = Integer.parseInt(String.valueOf(config.get("kafka.offset.zkPort"))); 87 String zkRoot = (String)config.get("kafka.offset.zkRoot"); 88 String id = StringUtils.join(getTopoName(),"-",topic); 89 BrokerHosts kafkaBrokerZk = new ZkHosts(brokerZkStr, brokerZkPath); 90 SpoutConfig spoutConfig = new SpoutConfig(kafkaBrokerZk, topic, zkRoot, id); 91 spoutConfig.zkServers = zkServers; 92 spoutConfig.zkPort = zkPort; 93 spoutConfig.zkRoot = zkRoot; 94 spoutConfig.stateUpdateIntervalMs = 30000; 95 return spoutConfig; 96 } 97 98 //建立kafkaspout 99 public KafkaSpout getKafkaSpout(String topic){ 100 SpoutConfig spoutConfig = getSpoutConfig(topic); 101 return new DelayKafkaSpout(spoutConfig); 102 } 103 104 105 /** 106 * 拓撲可部署多次,但從kafka獲取資料,做唯一次過濾等用的 107 * 108 * @return 109 */ 110 public abstract String getTopoName(); 111 112 public abstract String getConfigName(); 113 114 public abstract int getNumWorkers(); 115 116 public void registerKryo(Config config){ 117 118 } 119 120 public abstract void addOptions(Options options); 121 122 public abstract void setupOptionValue(CommandLine cmd); 123 124 public abstract void createTopology(TopologyBuilder builder); 125 126 public void runLocat(String[] args) throws Exception{ 127 CommandLineParser parser = new DefaultParser(); 128 CommandLine cmd = parser.parse(options, args); 129 HelpFormatter formatter = new HelpFormatter(); 130 formatter.printHelp("topology", options); 131 setupConfig(cmd); 132 133 config.setDebug(true); 134 config.setNumWorkers(1); 135 TopologyBuilder builder = new TopologyBuilder(); 136 createTopology(builder); 137 LocalCluster cluster = new LocalCluster(); 138 String topoName = cmd.getOptionValue("name", 139 StringUtils.join(getTopoName(), "-", new DateTime().toString("yyyyMMdd-HHmmss"))); 140 cluster.submitTopology(topoName,config,builder.createTopology()); 141 142 } 143 144 public void run(String args[])