Spark -實時綜合實戰
阿新 • • 發佈:2022-05-05
# Start HDFS hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode # Start YARN yarn-daemon.sh start resourcemanager yarn-daemon.sh start nodemanager # Start MRHistoryServer mr-jobhistory-daemon.sh start historyserver # Start Spark HistoryServer /export/server/spark/sbin/start-history-server.sh # Start Zookeeper zookeeper-daemon.sh start # Start Kafka kafka-server-start.sh -daemon /export/server/kafka/config/server.properties # Start HBase hbase-daemon.sh start master hbase-daemon.sh start regionserver # Start search elasticsearch-daemon.sh start # Start Redis /export/server/redis/bin/redis-server /export/server/redis/conf/redis.conf整個實時綜合案例所涉及的大資料技術框架,基本上都是企業實時業務使用的,通過此案例框 架整合使用,進一步掌握大資料框架技術與應用。
1.2.2 應用開發環境
在前面建立的Maven Project工程中建立Maven Module模組,pom.xml檔案中新增相關依賴:<!-- 指定倉庫位置,依次為aliyun、cloudera和jboss倉庫 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</按照應用開發分層結構,需要在【src原始碼目錄】下建立相關目錄和包,具體如下:url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <kafka.version>2.0.0</kafka.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依賴Scala語言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Structured Streaming + Kafka 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 與Kafka 0.10.0 整合依賴--> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> --> <!-- Spark Streaming 與Kafka 0.8.2.1 整合依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依賴 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依賴 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- 管理配置檔案 --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.2.1</version> </dependency> <!-- 根據ip轉換為省市區 --> <dependency> <groupId>org.lionsoul</groupId> <artifactId>ip2region</artifactId> <version>1.7.2</version> </dependency> <!-- JSON解析庫:fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.11</artifactId> <version>2.4.2</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 編譯的外掛 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
# local mode
app.is.local=true
app.spark.master=local[3]
# kafka config
kafka.bootstrap.servers=node1.itcast.cn:9092
kafka.auto.offset.reset=largest
kafka.source.topics=orderTopic
kafka.etl.topic=orderEtlTopic
kafka.max.offsets.per.trigger=100000
# Kafka Consumer Group ID
streaming.etl.group.id=order-etl-1000
# Zookeeper Server
kafka.zk.url=node1.itcast.cn:2181/kafka200
# streaming checkpoint
streaming.etl.ckpt=datas/order-apps/ckpt/etl-ckpt/
streaming.hbase.ckpt=datas/order-apps/ckpt/hbase-ckpt/
streaming.es.ckpt=datas/order-apps/ckpt/es-ckpt/
streaming.amt.total.ckpt=datas/order-apps/ckpt/amt-total-ckpt/
streaming.amt.province.ckpt=datas/order-apps/ckpt/amt-province-ckpt/
streaming.amt.city.ckpt=datas/order-apps/ckpt/amt-city-ckpt/
##streaming.etl.ckpt=/spark/order-apps/ckpt/etl-ckpt/
##streaming.hbase.ckpt=/spark/order-apps/ckpt/hbase-ckpt/
##streaming.es.ckpt=/spark/order-apps/ckpt/es-ckpt/
##streaming.amt.total.ckpt=/spark/order-apps/ckpt/amt-total-ckpt/
##streaming.amt.province.ckpt=/spark/order-apps/ckpt/amt-province-ckpt/
##streaming.amt.city.ckpt=/spark/order-apps/ckpt/amt-city-ckpt/
# streaming stop file
stop.etl.file=datas/order-apps/stop/etl-stop
stop.hbase.file=datas/order-apps/stop/hbase-stop
stop.es.file=datas/order-apps/stop/es-stop
stop.state.file=datas/order-apps/stop/state-stop
##stop.etl.file=/spark/order-apps/stop/etl-stop
##stop.hbase.file=/spark/order-apps/stop/hbase-stop
##stop.es.file=/spark/order-apps/stop/es-stop
##stop.state.file=/spark/order-apps/stop/state-stop
# HBase Config
hbase.zk.hosts=node1.itcast.cn
hbase.zk.port=2181
hbase.zk.znode=/hbase
hbase.order.table=htb_orders
hbase.table.family=info
hbase.table.columns=orderId,userId,orderTime,ip,orderMoney,orderStatus,province,city
# Elasticsearch Config
es.nodes=node1.itcast.cn
es.port=9200
es.index.auto.create=true
es.write.operation=upsert
es.index.name=orders/index
es.mapping.id=orderId
# Redis Config
redis.host=node1.itcast.cn
redis.port=6379
redis.db=0
# 字典資料
ipdata.region.path=dataset/ip2region.db
##ipdata.region.path=hdfs://node1.itcast.cn:8020/spark/dataset/ip2region.db
其中應用開發時,採用本地模式執行,相關資料儲存在本地檔案系統,測試生產時使用HDFS
檔案系統。
編寫載入屬性檔案工具類:ApplicationConfig,位於【cn.itcast.spark.config】包,具體程式碼如下:
package cn.itcast.spark.config import com.typesafe.config.{Config, ConfigFactory} /** * 載入應用Application屬性配置檔案config.properties獲取屬性值 */ object ApplicationConfig { // 載入屬性檔案 private val config: Config = ConfigFactory.load("config.properties") /* 執行模式,開發測試為本地模式,測試生產通過--master傳遞 */ lazy val APP_LOCAL_MODE: Boolean = config.getBoolean("app.is.local") lazy val APP_SPARK_MASTER: String = config.getString("app.spark.master") /* Kafka 相關配置資訊 */ lazy val KAFKA_BOOTSTRAP_SERVERS: String = config.getString("kafka.bootstrap.servers") lazy val KAFKA_AUTO_OFFSET_RESET: String = config.getString("kafka.auto.offset.reset") lazy val KAFKA_SOURCE_TOPICS: String = config.getString("kafka.source.topics") lazy val KAFKA_ETL_TOPIC: String = config.getString("kafka.etl.topic") lazy val KAFKA_MAX_OFFSETS: String = config.getString("kafka.max.offsets.per.trigger") lazy val KAFKA_ZK_URL: String = config.getString("kafka.zk.url") lazy val STREAMING_ETL_GROUP_ID: String = config.getString("streaming.etl.group.id") /* Streaming流式應用,檢查點目錄 */ lazy val STREAMING_ETL_CKPT: String = config.getString("streaming.etl.ckpt") lazy val STREAMING_HBASE_CKPT: String = config.getString("streaming.hbase.ckpt") lazy val STREAMING_ES_CKPT: String = config.getString("streaming.es.ckpt") lazy val STREAMING_AMT_TOTAL_CKPT: String = config.getString("streaming.amt.total.ckpt") lazy val STREAMING_AMT_PROVINCE_CKPT: String = config.getString("streaming.amt.province.ckpt") lazy val STREAMING_AMT_CITY_CKPT: String = config.getString("streaming.amt.city.ckpt") /* Streaming流式應用,停止檔案 */ lazy val STOP_ETL_FILE: String = config.getString("stop.etl.file") lazy val STOP_HBASE_FILE: String = config.getString("stop.hbase.file") lazy val STOP_ES_FILE: String = config.getString("stop.es.file") lazy val STOP_STATE_FILE: String = config.getString("stop.state.file") /* HBase 資料庫連線資訊及表的資訊 */ lazy val HBASE_ZK_HOSTS: String = config.getString("hbase.zk.hosts") lazy val HBASE_ZK_PORT: String = config.getString("hbase.zk.port") lazy val HBASE_ZK_ZNODE: String = config.getString("hbase.zk.znode") lazy val HBASE_ORDER_TABLE: String = config.getString("hbase.order.table") lazy val HBASE_ORDER_TABLE_FAMILY: String = config.getString("hbase.table.family") lazy val HBASE_ORDER_TABLE_COLUMNS: Array[String] = config.getString("hbase.table.columns").split(",") /* Elasticsearch 連線資訊 */ lazy val ES_NODES: String = config.getString("es.nodes") lazy val ES_PORT: String = config.getString("es.port") lazy val ES_INDEX_AUTO_CREATE: String = config.getString("es.index.auto.create") lazy val ES_WRITE_OPERATION: String = config.getString("es.write.operation") lazy val ES_INDEX_NAME: String = config.getString("es.index.name") lazy val ES_MAPPING_ID: String = config.getString("es.mapping.id") /* Redis 資料庫 */ lazy val REDIS_HOST: String = config.getString("redis.host") lazy val REDIS_PORT: String = config.getString("redis.port") lazy val REDIS_DB: String = config.getString("redis.db") // 解析IP地址字典資料檔案儲存路徑 lazy val IPS_DATA_REGION_PATH: String = config.getString("ipdata.region.path") }每個屬性變數前使用lazy,表示懶載入初始化,當第一次使用變數時,才會進行初始化。
package cn.itcast.spark.utils import cn.itcast.spark.config.ApplicationConfig import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 工具類:構建SparkSession和StreamingContext例項物件 */ object SparkUtils { /** * 獲取SparkSession例項物件,傳遞Class物件 * @param clazz Spark Application位元組碼Class物件 * @return SparkSession物件例項 */ def createSparkSession(clazz: Class[_]): SparkSession = { // 1. 構建SparkConf物件 val sparkConf: SparkConf = new SparkConf() .setAppName(clazz.getSimpleName.stripSuffix("$")) .set("spark.debug.maxToStringFields", "2000") .set("spark.sql.debug.maxToStringFields", "2000") // 2. 判斷應用是否本地模式執行,如果是設定值 if(ApplicationConfig.APP_LOCAL_MODE){ sparkConf .setMaster(ApplicationConfig.APP_SPARK_MASTER) // 設定Shuffle時分割槽數目 .set("spark.sql.shuffle.partitions", "3") } // 3. 獲取SparkSession例項物件 val session: SparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() // 4. 返回例項 session } /** * 獲取StreamingContext流式上下文例項物件 * @param clazz Spark Application位元組碼Class物件 * @param batchInterval 每批次時間間隔 */ def createStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = { // 構建物件例項 val context: StreamingContext = StreamingContext.getActiveOrCreate( () => { // 1. 構建SparkConf物件 val sparkConf: SparkConf = new SparkConf() .setAppName(clazz.getSimpleName.stripSuffix("$")) .set("spark.debug.maxToStringFields", "2000") .set("spark.sql.debug.maxToStringFields", "2000") .set("spark.streaming.stopGracefullyOnShutdown", "true") // 2. 判斷應用是否本地模式執行,如果是設定值 if(ApplicationConfig.APP_LOCAL_MODE){ sparkConf .setMaster(ApplicationConfig.APP_SPARK_MASTER) // 設定每批次消費資料最大資料量,生成環境使用命令列設定 .set("spark.streaming.kafka.maxRatePerPartition", "10000") } // 3. 建立StreamingContext物件 new StreamingContext(sparkConf, Seconds(batchInterval)) } ) context // 返回物件 } }
其中應用開發本地模式執行時設定的相關屬性,在測試和生成環境使用spark-submit提交應用, 通過--conf指定此屬性的值。
1.4 模擬交易訂單資料
程式設計模擬生成交易訂單資料,實時傳送至Kafka Topic,為了簡單起見交易訂單資料欄位如下, 封裝到樣例類OrderRecord中:/** * 訂單實體類(Case Class) * @param orderId 訂單ID * @param userId 使用者ID * @param orderTime 訂單日期時間 * @param ip 下單IP地址 * @param orderMoney 訂單金額 * @param orderStatus 訂單狀態 */ case class OrderRecord( orderId: String, userId: String, orderTime: String, ip: String, orderMoney: Double, orderStatus: Int )
1.4.1 建立 Topic
在整個實時綜合案例中,原始的交易訂單資料儲存【orderTopic】,經過ETL後交易訂單資料 存【orderEtlTopic】,關於Topic建立等操作命令如下:# 啟動Zookeeper zookeeper-daemon.sh start # 啟動Kafka Broker kafka-server-start.sh -daemon /export/server/kafka/config/server.properties # 檢視Topic資訊 kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200 # 建立topic kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3 --topic orderTopic # 模擬生產者 kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic orderTopic # 模擬消費者 kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderTopic --from-beginning # 刪除topic kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderTopic # 建立topic kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3 --topic orderEtlTopic # 模擬消費者 kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderEtlTopic --from-beginning # 刪除topic kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderEtlTopic
1.4.2 模擬資料
編寫程式,實時產生交易訂單資料,使用Json4J類庫轉換資料為JSON字元,傳送Kafka Topic 中,程式碼如下:package cn.itcast.spark.mock import java.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.json4s.jackson.Json import scala.util.Random /** * 模擬生產訂單資料,傳送到Kafka Topic中 * Topic中每條資料Message型別為String,以JSON格式資料傳送 * 資料轉換: * 將Order類例項物件轉換為JSON格式字串資料(可以使用json4s類庫) */ object MockOrderProducer { def main(args: Array[String]): Unit = { var producer: KafkaProducer[String, String] = null try { // 1. Kafka Client Producer 配置資訊 val props = new Properties() props.put("bootstrap.servers", "node1.itcast.cn:9092") props.put("acks", "1") props.put("retries", "3") props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) // 2. 建立KafkaProducer物件,傳入配置資訊 producer = new KafkaProducer[String, String](props) // 隨機數例項物件 val random: Random = new Random() // 訂單狀態:訂單開啟 0,訂單取消 1,訂單關閉 2,訂單完成 3 val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) while(true){ // 每次迴圈 模擬產生的訂單數目 val batchNumber: Int = random.nextInt(2) + 1 (1 to batchNumber).foreach{number => val currentTime: Long = System.currentTimeMillis() val orderId: String = s"${getDate(currentTime)}%06d".format(number) val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000)) val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS") val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100)) val orderStatus: Int = allStatus(random.nextInt(allStatus.length)) // 3. 訂單記錄資料 val orderRecord: OrderRecord = OrderRecord( orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus ) // 轉換為JSON格式資料 val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord) println(orderJson) // 4. 構建ProducerRecord物件 val record = new ProducerRecord[String, String]("orderTopic", orderJson) // 5. 傳送資料:def send(messages: KeyedMessage[K,V]*), 將資料傳送到Topic producer.send(record) } Thread.sleep(random.nextInt(10) * 100 + 500) } }catch { case e: Exception => e.printStackTrace() }finally { if(null != producer) producer.close() } } /**=================獲取當前時間=================*/ def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = { val fastFormat: FastDateFormat = FastDateFormat.getInstance(format) val formatDate: String = fastFormat.format(time) // 格式化日期 formatDate } /**================= 獲取隨機IP地址 =================*/ def getRandomIp: String = { // ip範圍 val range: Array[(Int, Int)] = Array( (607649792,608174079), //36.56.0.0-36.63.255.255 (1038614528,1039007743), //61.232.0.0-61.237.255.255 (1783627776,1784676351), //106.80.0.0-106.95.255.255 (2035023872,2035154943), //121.76.0.0-121.77.255.255 (2078801920,2079064063), //123.232.0.0-123.235.255.255 (-1950089216,-1948778497),//139.196.0.0-139.215.255.255 (-1425539072,-1425014785),//171.8.0.0-171.15.255.255 (-1236271104,-1235419137),//182.80.0.0-182.92.255.255 (-770113536,-768606209),//210.25.0.0-210.47.255.255 (-569376768,-564133889) //222.16.0.0-222.95.255.255 ) // 隨機數:IP地址範圍下標 val random = new Random() val index = random.nextInt(10) val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1) // 轉換Int型別IP地址為IPv4格式 number2IpString(ipNumber) } /**=================將Int型別IPv4地址轉換為字串型別=================*/ def number2IpString(ip: Int): String = { val buffer: Array[Int] = new Array[Int](4) buffer(0) = (ip >> 24) & 0xff buffer(1) = (ip >> 16) & 0xff buffer(2) = (ip >> 8) & 0xff buffer(3) = ip & 0xff // 返回IPv4地址 buffer.mkString(".") } }