大資料學習之路111-大資料專案(中國移動運營資料分析)
業務一:
業務二:
統計每個省份的充值失敗資料量,並以地圖的方式顯示分佈情況。
資料說明:
充值的整個過程是包括:
訂單建立->支付請求->支付通知->充值請求->充值通知
而我們需要處理的就是充值通知部分的資料。而我們的資料中是包含上面這五種型別的資料的。
那麼我們如何從那麼多資料中確定哪條資料是充值通知的資料呢?
我們可以通過serviceName欄位來確定,如果該欄位是reChargeNotifyReq則代表該條資料是充值通知部分的資料。
為什麼呢?
針對業務一:
充值訂單量我們只需要通過有多少行數就可以確定有多少筆。
對於充值金額
找到充值成功的訂單之後,我們可以將該資料的chargefee欄位進行累加。就可以得到總金額。
充值成功率:我們只要知道總交易筆數和成功的筆數即可求。
充值平均時長:首先我們需要知道開始時間和結束時間,我們才能知道充值所花費的時間。
開始時間:對於開始時間,這裡有一個RequestId欄位,它是由時間戳+隨機數生成的。
結束時間:即為接到充值通知的時間,為欄位(receiveNotifyTime)
針對業務二:
對於業務失敗量的分佈,首先我們需要知道在哪個省份,哪個地區。
我們可以根據provinceCode欄位來確定省份
對於失敗的訂單我們可以通過統計bussinessRst為不是0000的情況來確定。
接下來我們就開始寫業務:
下面是我們的資料截圖,該檔名叫cmcc.log
首先我們用flume採集資料到kafka:
我們先寫配置檔案:
# 定義這個agent中各元件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source元件:r1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/flumedata/ # 描述和配置sink元件:k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = cmccThree a1.sinks.k1.kafka.bootstrap.servers = marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy # 描述和配置channel元件,此處使用是記憶體快取的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之間的連線關係 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
然後啟動flume:
執行結果:
現在我們已經將資料匯入kafka了,在kafka的data目錄中,該資料的主題是cmccTwo
接下來我們可以寫程式碼了,在寫程式碼之前我們 可以再確認一下kafka中是否已經寫進去資料了。
bin/kafka-console-consumer.sh --bootstrap-server marshal:9092 --from-beginning --topic cmccThree
我們可以看到執行結果如下:
一共有40883條記錄。
到此為止我們就完成了將資料匯入到kafka的工作。
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
接下來我們需要從kafka中拉取資料:
我們在寫程式碼的過程中應該有分離程式碼的思想,我們首先將一些kafka的配置資訊寫出去:
application.conf
#kafka相關引數
kafka.topic = "cmcc"
kafka.broker.list = "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092"
kafka.group.id = "20181016"
AppParams.scala
package com.sheep.utils
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer
object AppParams {
/**
* 解析application.conf的配置檔案
* 載入resource下面的配置,預設規則application.conf -> application.json -> application.properties
*/
private lazy val config: Config = ConfigFactory.load()
/**
* 返回訂閱的主題,這裡用,分割是因為可能有多個主題
*/
val topic = config.getString("kafka.topic").split(",")
/**
* kafka叢集所在的主機和埠
*/
val brokers = config.getString("kafka.broker.list")
/**
* 消費者的id
*/
val groupId = config.getString("kafka.group.id")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
//這個代表,任務啟動之前產生的資料也要讀
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
}
配置檔案寫完之後就可以獲取kafka中的資料了:
package com.sheep.app
import com.alibaba.fastjson.{JSON, JSONObject}
import com.sheep.utils.AppParams
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BootStrapApp {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf: SparkConf = new SparkConf()
.setAppName("中國移動運營實時監控平臺-Monitor")
//如果是在叢集上執行的話需要去掉setMaster
.setMaster("local[*]")
//SparkStreaming傳輸的是離散流,離散流是由RDD組成的
//資料傳輸的時候可以對RDD進行壓縮,壓縮的目的是減少記憶體的佔用
//預設採用org.apache.spark.serializer.JavaSerializer
//這是最基本的優化
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//rdd壓縮
conf.set("spark.rdd.compress","true")
//設定每次拉取的數量,為了防止一下子拉取的資料過多,系統處理不過來
//這裡並不是拉取100條,是有公式的。
//batchSize = partitionNum * 分割槽數量 * 取樣時間
conf.set("spark.streaming.kafka.maxRatePerPartition","100")
//設定優雅的結束,這樣可以避免資料的丟失
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
//獲取kafka的資料
/**
* 指定kafka資料來源
* ssc:StreamingContext的例項
* LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略
* 設定策略後會以最優的策略進行獲取資料
* 一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算,
* 因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高
* 如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略
* 使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上
* ConsumerStrategies:消費者策略(指定如何消費)
*
*/
val directStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](AppParams.topic,AppParams.kafkaParams)
)
寫到這裡我們就已經獲取了kafka中的資料了。接下來就是對他進行處理:
我們首先做的是計算充值成功的筆數:
由於我們的資料是json的所以要想對資料進行分析的話,就要使用json解析工具:
我們匯入json解析工具的依賴:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
我們的思路是:過濾出serviceName欄位為reChargeNotifyReq的資料,這些資料就是和充值通知有關的,我們需要處理的資料。
在過濾出來的這些資料中,有成功的,也有失敗的。
過濾完資料之後我們對資料操作,將成功的資料設為1,失敗的設為0,並且從requestId中截取出前8位和標誌位組成一個元組返回。
然後使用reduceByKey就可以統計出當天交易成功的數量了。如果是將結果列印在控制檯上結果是這樣的:
所以要想統計最終的我們可以將資料寫入redis 累加。
程式碼如下:
directStream.foreachRDD(
rdd =>{
//rdd.map(_.value()).foreach(println)
//取得所有充值通知日誌
val baseData: RDD[JSONObject] = rdd.map(cr =>JSON.parseObject(cr.value()))
.filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()
//bussinessRst是業務結果,如果是0000則為成功,其他返回錯誤編碼
val totalSucc = baseData.map(obj => {
val reqId = obj.getString("requestId")
val day = reqId.substring(0, 8)
//取出該條充值是否成功的標誌
val result = obj.getString("bussinessRst")
val flag = if (result.equals("0000")) 1 else 0
(day, flag)
}).reduceByKey(_+_)
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------現在我們需要做的是統計出當天的交易成功的總金額,我們只要對上面的程式進行修改一下就好了。
之前的程式碼中如果交易成功返回的是1,而現在我們只要返回交易金額就好了。
程式碼如下:
//獲取充值成功的訂單金額
val totalMoney = baseData.map(obj => {
val reqId = obj.getString("requestId")
val day = reqId.substring(0, 8)
//取出該條充值是否成功的標誌
val result = obj.getString("bussinessRst")
val fee = if (result.equals("0000")) obj.getString("chargefee").toDouble else 0
(day, fee)
}).reduceByKey(_+_)
輸出結果如下:
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------到現在為止我們已經充值成功的訂單量和充值金額寫好了。接下來就要算充值成功率了。
充值成功率 = 成功訂單數 / 總訂單數
總訂單量只要使用count就可以得出。而充值成功的訂單我們前面已經算出來了。
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
接下來要算的就是充值成功的充值時長:
如果是交易成功的則用結束時間(即通知充值成功的時間)- 開始時間(即requestId的前17位)
如果是交易失敗的話,則返回0
程式碼如下:
/**
* 獲取充值成功的充值時長
*/
val totalTime: RDD[(String, Long)] = baseData.map(obj => {
val reqId = obj.getString("requestId")
//獲取日期
val day = reqId.substring(0, 8)
//取出該條充值是否成功的標誌
val result = obj.getString("bussinessRst")
//時間格式為:yyyyMMddHHmmssSSS(年月日時分秒毫秒)
val endTime = obj.getString("receiveNotifyTime")
val startTime: String = reqId.substring(0, 17)
val format: SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS")
val cost = if (result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0
(day, cost)
}).reduceByKey(_ + _)
輸出結果如下:
這裡返回的時間是毫秒數
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
接下來我們嘗試將充值成功的訂單數寫入redis :
我們首先在application.conf中新增redis的一些配置:
# redis
redis.host = "marshal"
redis.db.index = 1
接下來在AppParams中新增,對redis引數的訪問:
/**
* redis伺服器地址
*/
val redisHost = config.getString("redis.host")
/**
* 將資料寫入到哪個庫
*/
val redisDbIndex = config.getString("redis.db.index").toInt
然後寫一個從連線池獲取連線的方法:
package com.sheep.cmcc.utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{Jedis, JedisPool}
object Jpools {
private val poolConfig = new GenericObjectPoolConfig()
//連線池中最大的空閒連線數,預設是8
poolConfig.setMaxIdle(5)
//只支援最大的連線數,連線池中最大的連線數,預設是8
poolConfig.setMaxTotal(2000)
private lazy val jedisPool: JedisPool = new JedisPool(poolConfig,AppParams.redisHost)
def getJedis = {
val jedis: Jedis = jedisPool.getResource
jedis.select(AppParams.redisDbIndex)
jedis
}
}
接下來寫入資料庫:
//將充值成功的訂單數寫入redis
totalSucc.foreachPartition(it => {
val jedis: Jedis = Jpools.getJedis
it.foreach(
tp => {
jedis.incrBy("CMCC-"+tp._1,tp._2)
})
jedis.close()
})
執行結果如下:
兩次重新整理之後的結果不一樣,是因為他不停的在讀資料,處理資料,然後做累加。
但是這樣的寫法很不好,而且存在很多問題:
比如頻繁的使用reduceByKey,會不停的產生shuffle,這樣對效能會有影響。
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
寫到現在我們的程式還不夠優化,我們的各項指標都是單獨計算的,每次計算都會產生shuffle.這樣的效能是非常低的。所以我們針對現有的程式進行一個改造。