1. 程式人生 > 其它 >Spark -實時綜合實戰

Spark -實時綜合實戰

 

 

 

 

 

 

 

 

# Start HDFS
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
# Start YARN
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
# Start MRHistoryServer
mr-jobhistory-daemon.sh start historyserver
# Start Spark HistoryServer
/export/server/spark/sbin/start-history-server.sh
# Start Zookeeper
zookeeper-daemon.sh start
# Start Kafka
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# Start HBase
hbase-daemon.sh start master
hbase-daemon.sh start regionserver
# Start search
elasticsearch-daemon.sh start
# Start Redis
/export/server/redis/bin/redis-server /export/server/redis/conf/redis.conf
整個實時綜合案例所涉及的大資料技術框架,基本上都是企業實時業務使用的,通過此案例框 架整合使用,進一步掌握大資料框架技術與應用。    

1.2.2 應用開發環境

在前面建立的Maven Project工程中建立Maven Module模組,pom.xml檔案中新增相關依賴:
<!-- 指定倉庫位置,依次為aliyun、cloudera和jboss倉庫 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</
url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository
> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <kafka.version>2.0.0</kafka.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依賴Scala語言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Structured Streaming + Kafka 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 與Kafka 0.10.0 整合依賴--> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> --> <!-- Spark Streaming 與Kafka 0.8.2.1 整合依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依賴 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依賴 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- 管理配置檔案 --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.2.1</version> </dependency> <!-- 根據ip轉換為省市區 --> <dependency> <groupId>org.lionsoul</groupId> <artifactId>ip2region</artifactId> <version>1.7.2</version> </dependency> <!-- JSON解析庫:fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.11</artifactId> <version>2.4.2</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 編譯的外掛 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
按照應用開發分層結構,需要在【src原始碼目錄】下建立相關目錄和包,具體如下:

 

 

# local mode
app.is.local=true
app.spark.master=local[3]
# kafka config
kafka.bootstrap.servers=node1.itcast.cn:9092
kafka.auto.offset.reset=largest
kafka.source.topics=orderTopic
kafka.etl.topic=orderEtlTopic
kafka.max.offsets.per.trigger=100000
# Kafka Consumer Group ID
streaming.etl.group.id=order-etl-1000
# Zookeeper Server
kafka.zk.url=node1.itcast.cn:2181/kafka200
# streaming checkpoint
streaming.etl.ckpt=datas/order-apps/ckpt/etl-ckpt/
streaming.hbase.ckpt=datas/order-apps/ckpt/hbase-ckpt/
streaming.es.ckpt=datas/order-apps/ckpt/es-ckpt/
streaming.amt.total.ckpt=datas/order-apps/ckpt/amt-total-ckpt/
streaming.amt.province.ckpt=datas/order-apps/ckpt/amt-province-ckpt/
streaming.amt.city.ckpt=datas/order-apps/ckpt/amt-city-ckpt/
##streaming.etl.ckpt=/spark/order-apps/ckpt/etl-ckpt/
##streaming.hbase.ckpt=/spark/order-apps/ckpt/hbase-ckpt/
##streaming.es.ckpt=/spark/order-apps/ckpt/es-ckpt/
##streaming.amt.total.ckpt=/spark/order-apps/ckpt/amt-total-ckpt/
##streaming.amt.province.ckpt=/spark/order-apps/ckpt/amt-province-ckpt/
##streaming.amt.city.ckpt=/spark/order-apps/ckpt/amt-city-ckpt/
# streaming stop file
stop.etl.file=datas/order-apps/stop/etl-stop
stop.hbase.file=datas/order-apps/stop/hbase-stop
stop.es.file=datas/order-apps/stop/es-stop
stop.state.file=datas/order-apps/stop/state-stop
##stop.etl.file=/spark/order-apps/stop/etl-stop
##stop.hbase.file=/spark/order-apps/stop/hbase-stop
##stop.es.file=/spark/order-apps/stop/es-stop
##stop.state.file=/spark/order-apps/stop/state-stop
# HBase Config
hbase.zk.hosts=node1.itcast.cn
hbase.zk.port=2181
hbase.zk.znode=/hbase
hbase.order.table=htb_orders
hbase.table.family=info
hbase.table.columns=orderId,userId,orderTime,ip,orderMoney,orderStatus,province,city
# Elasticsearch Config
es.nodes=node1.itcast.cn
es.port=9200
es.index.auto.create=true
es.write.operation=upsert
es.index.name=orders/index
es.mapping.id=orderId
# Redis Config
redis.host=node1.itcast.cn
redis.port=6379
redis.db=0
# 字典資料
ipdata.region.path=dataset/ip2region.db
##ipdata.region.path=hdfs://node1.itcast.cn:8020/spark/dataset/ip2region.db
其中應用開發時,採用本地模式執行,相關資料儲存在本地檔案系統,測試生產時使用HDFS 檔案系統。 編寫載入屬性檔案工具類:ApplicationConfig,位於【cn.itcast.spark.config】包,具體程式碼如下:
package cn.itcast.spark.config
import com.typesafe.config.{Config, ConfigFactory}
/**
* 載入應用Application屬性配置檔案config.properties獲取屬性值
*/
object ApplicationConfig {
// 載入屬性檔案
private val config: Config = ConfigFactory.load("config.properties")
/*
執行模式,開發測試為本地模式,測試生產通過--master傳遞
*/
lazy val APP_LOCAL_MODE: Boolean = config.getBoolean("app.is.local")
lazy val APP_SPARK_MASTER: String = config.getString("app.spark.master")
/*
Kafka 相關配置資訊
*/
lazy val KAFKA_BOOTSTRAP_SERVERS: String = config.getString("kafka.bootstrap.servers")
lazy val KAFKA_AUTO_OFFSET_RESET: String = config.getString("kafka.auto.offset.reset")
lazy val KAFKA_SOURCE_TOPICS: String = config.getString("kafka.source.topics")
lazy val KAFKA_ETL_TOPIC: String = config.getString("kafka.etl.topic")
lazy val KAFKA_MAX_OFFSETS: String = config.getString("kafka.max.offsets.per.trigger")
lazy val KAFKA_ZK_URL: String = config.getString("kafka.zk.url")
lazy val STREAMING_ETL_GROUP_ID: String = config.getString("streaming.etl.group.id")
/*
Streaming流式應用,檢查點目錄
*/
lazy val STREAMING_ETL_CKPT: String = config.getString("streaming.etl.ckpt")
lazy val STREAMING_HBASE_CKPT: String = config.getString("streaming.hbase.ckpt")
lazy val STREAMING_ES_CKPT: String = config.getString("streaming.es.ckpt")
lazy val STREAMING_AMT_TOTAL_CKPT: String = config.getString("streaming.amt.total.ckpt")
lazy val STREAMING_AMT_PROVINCE_CKPT: String = config.getString("streaming.amt.province.ckpt")
lazy val STREAMING_AMT_CITY_CKPT: String = config.getString("streaming.amt.city.ckpt")
/*
Streaming流式應用,停止檔案
*/
lazy val STOP_ETL_FILE: String = config.getString("stop.etl.file")
lazy val STOP_HBASE_FILE: String = config.getString("stop.hbase.file")
lazy val STOP_ES_FILE: String = config.getString("stop.es.file")
lazy val STOP_STATE_FILE: String = config.getString("stop.state.file")
/*
HBase 資料庫連線資訊及表的資訊
*/
lazy val HBASE_ZK_HOSTS: String = config.getString("hbase.zk.hosts")
lazy val HBASE_ZK_PORT: String = config.getString("hbase.zk.port")
lazy val HBASE_ZK_ZNODE: String = config.getString("hbase.zk.znode")
lazy val HBASE_ORDER_TABLE: String = config.getString("hbase.order.table")
lazy val HBASE_ORDER_TABLE_FAMILY: String = config.getString("hbase.table.family")
lazy val HBASE_ORDER_TABLE_COLUMNS: Array[String] = config.getString("hbase.table.columns").split(",")
/*
Elasticsearch 連線資訊
*/
lazy val ES_NODES: String = config.getString("es.nodes")
lazy val ES_PORT: String = config.getString("es.port")
lazy val ES_INDEX_AUTO_CREATE: String = config.getString("es.index.auto.create")
lazy val ES_WRITE_OPERATION: String = config.getString("es.write.operation")
lazy val ES_INDEX_NAME: String = config.getString("es.index.name")
lazy val ES_MAPPING_ID: String = config.getString("es.mapping.id")
/*
Redis 資料庫
*/
lazy val REDIS_HOST: String = config.getString("redis.host")
lazy val REDIS_PORT: String = config.getString("redis.port")
lazy val REDIS_DB: String = config.getString("redis.db")
// 解析IP地址字典資料檔案儲存路徑
lazy val IPS_DATA_REGION_PATH: String = config.getString("ipdata.region.path") }
每個屬性變數前使用lazy,表示懶載入初始化,當第一次使用變數時,才會進行初始化。  

 

 

package cn.itcast.spark.utils
import cn.itcast.spark.config.ApplicationConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 工具類:構建SparkSession和StreamingContext例項物件
*/
object SparkUtils {
/**
* 獲取SparkSession例項物件,傳遞Class物件
* @param clazz Spark Application位元組碼Class物件
* @return SparkSession物件例項
*/
def createSparkSession(clazz: Class[_]): SparkSession = {
// 1. 構建SparkConf物件
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
// 2. 判斷應用是否本地模式執行,如果是設定值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 設定Shuffle時分割槽數目
.set("spark.sql.shuffle.partitions", "3") }
// 3. 獲取SparkSession例項物件
val session: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
// 4. 返回例項
session
}
/**
* 獲取StreamingContext流式上下文例項物件
* @param clazz Spark Application位元組碼Class物件
* @param batchInterval 每批次時間間隔
*/
def createStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
// 構建物件例項
val context: StreamingContext = StreamingContext.getActiveOrCreate(
() => {
// 1. 構建SparkConf物件
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
// 2. 判斷應用是否本地模式執行,如果是設定值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 設定每批次消費資料最大資料量,生成環境使用命令列設定
.set("spark.streaming.kafka.maxRatePerPartition", "10000") }
// 3. 建立StreamingContext物件
new StreamingContext(sparkConf, Seconds(batchInterval))
} )
context // 返回物件
} }

 

其中應用開發本地模式執行時設定的相關屬性,在測試和生成環境使用spark-submit提交應用, 通過--conf指定此屬性的值。  

1.4 模擬交易訂單資料

程式設計模擬生成交易訂單資料,實時傳送至Kafka Topic,為了簡單起見交易訂單資料欄位如下, 封裝到樣例類OrderRecord中:  
/**
* 訂單實體類(Case Class) * @param orderId 訂單ID
* @param userId 使用者ID
* @param orderTime 訂單日期時間
* @param ip 下單IP地址
* @param orderMoney 訂單金額
* @param orderStatus 訂單狀態
*/
case class OrderRecord(
orderId: String,
userId: String,
orderTime: String,
ip: String,
orderMoney: Double,
orderStatus: Int
)

1.4.1 建立 Topic

在整個實時綜合案例中,原始的交易訂單資料儲存【orderTopic】,經過ETL後交易訂單資料 存【orderEtlTopic】,關於Topic建立等操作命令如下:
# 啟動Zookeeper
zookeeper-daemon.sh start
# 啟動Kafka Broker
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# 檢視Topic資訊
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# 建立topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderTopic
# 模擬生產者
kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic orderTopic
# 模擬消費者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderTopic --from-beginning
# 刪除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderTopic
# 建立topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderEtlTopic
# 模擬消費者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderEtlTopic --from-beginning
# 刪除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderEtlTopic

1.4.2 模擬資料

編寫程式,實時產生交易訂單資料,使用Json4J類庫轉換資料為JSON字元,傳送Kafka Topic 中,程式碼如下:
package cn.itcast.spark.mock
import java.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
/**
* 模擬生產訂單資料,傳送到Kafka Topic中 * Topic中每條資料Message型別為String,以JSON格式資料傳送
* 資料轉換:
* 將Order類例項物件轉換為JSON格式字串資料(可以使用json4s類庫)
*/
object MockOrderProducer {
def main(args: Array[String]): Unit = {
var producer: KafkaProducer[String, String] = null
try {
// 1. Kafka Client Producer 配置資訊
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
// 2. 建立KafkaProducer物件,傳入配置資訊
producer = new KafkaProducer[String, String](props)
// 隨機數例項物件
val random: Random = new Random()
// 訂單狀態:訂單開啟 0,訂單取消 1,訂單關閉 2,訂單完成 3
val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
while(true){
// 每次迴圈 模擬產生的訂單數目
val batchNumber: Int = random.nextInt(2) + 1 (1 to batchNumber).foreach{number =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"${getDate(currentTime)}%06d".format(number)
val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 訂單記錄資料
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 轉換為JSON格式資料
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 構建ProducerRecord物件
val record = new ProducerRecord[String, String]("orderTopic", orderJson)
// 5. 傳送資料:def send(messages: KeyedMessage[K,V]*), 將資料傳送到Topic
producer.send(record)
}
Thread.sleep(random.nextInt(10) * 100 + 500) } }catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != producer) producer.close()
} }
/**=================獲取當前時間=================*/
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
}
/**================= 獲取隨機IP地址 =================*/
def getRandomIp: String = {
// ip範圍
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 隨機數:IP地址範圍下標
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 轉換Int型別IP地址為IPv4格式
number2IpString(ipNumber)
}
/**=================將Int型別IPv4地址轉換為字串型別=================*/
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".") } }