1. 程式人生 > >大資料學習之路111-大資料專案(中國移動運營資料分析)

大資料學習之路111-大資料專案(中國移動運營資料分析)

業務一:

業務二:

統計每個省份的充值失敗資料量,並以地圖的方式顯示分佈情況。

資料說明:

充值的整個過程是包括:

訂單建立->支付請求->支付通知->充值請求->充值通知

而我們需要處理的就是充值通知部分的資料。而我們的資料中是包含上面這五種型別的資料的。

那麼我們如何從那麼多資料中確定哪條資料是充值通知的資料呢?

我們可以通過serviceName欄位來確定,如果該欄位是reChargeNotifyReq則代表該條資料是充值通知部分的資料。

為什麼呢?

針對業務一:

充值訂單量我們只需要通過有多少行數就可以確定有多少筆。

對於充值金額

,我們首先需要確定到充值成功的訂單數(欄位bussinessRst如果為0000則代表成功)

找到充值成功的訂單之後,我們可以將該資料的chargefee欄位進行累加。就可以得到總金額。

充值成功率:我們只要知道總交易筆數和成功的筆數即可求。

充值平均時長:首先我們需要知道開始時間和結束時間,我們才能知道充值所花費的時間。

開始時間:對於開始時間,這裡有一個RequestId欄位,它是由時間戳+隨機數生成的。

結束時間:即為接到充值通知的時間,為欄位(receiveNotifyTime)

針對業務二:

對於業務失敗量的分佈,首先我們需要知道在哪個省份,哪個地區。

我們可以根據provinceCode欄位來確定省份

對於失敗的訂單我們可以通過統計bussinessRst為不是0000的情況來確定。

接下來我們就開始寫業務:

下面是我們的資料截圖,該檔名叫cmcc.log

首先我們用flume採集資料到kafka:

我們先寫配置檔案:

# 定義這個agent中各元件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source元件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/flumedata/


# 描述和配置sink元件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = cmccThree
a1.sinks.k1.kafka.bootstrap.servers = marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy


# 描述和配置channel元件,此處使用是記憶體快取的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之間的連線關係
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然後啟動flume:

執行結果:

現在我們已經將資料匯入kafka了,在kafka的data目錄中,該資料的主題是cmccTwo

接下來我們可以寫程式碼了,在寫程式碼之前我們 可以再確認一下kafka中是否已經寫進去資料了。

bin/kafka-console-consumer.sh --bootstrap-server marshal:9092 --from-beginning --topic cmccThree

我們可以看到執行結果如下:

一共有40883條記錄。

到此為止我們就完成了將資料匯入到kafka的工作。

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 接下來我們需要從kafka中拉取資料:

我們在寫程式碼的過程中應該有分離程式碼的思想,我們首先將一些kafka的配置資訊寫出去:

application.conf

#kafka相關引數
kafka.topic = "cmcc"
kafka.broker.list = "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092"
kafka.group.id = "20181016"

AppParams.scala

package com.sheep.utils

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer

object AppParams {
  /**
    * 解析application.conf的配置檔案
    * 載入resource下面的配置,預設規則application.conf -> application.json -> application.properties
    */
    private lazy val config: Config = ConfigFactory.load()
  /**
    * 返回訂閱的主題,這裡用,分割是因為可能有多個主題
    */
  val topic = config.getString("kafka.topic").split(",")
  /**
    * kafka叢集所在的主機和埠
    */
  val brokers = config.getString("kafka.broker.list")
  /**
    * 消費者的id
    */
  val groupId = config.getString("kafka.group.id")

  val kafkaParams = Map[String,Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    //這個代表,任務啟動之前產生的資料也要讀
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false:java.lang.Boolean)
  )
}

配置檔案寫完之後就可以獲取kafka中的資料了:

package com.sheep.app

import com.alibaba.fastjson.{JSON, JSONObject}
import com.sheep.utils.AppParams
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BootStrapApp {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf: SparkConf = new SparkConf()
      .setAppName("中國移動運營實時監控平臺-Monitor")
      //如果是在叢集上執行的話需要去掉setMaster
      .setMaster("local[*]")
    //SparkStreaming傳輸的是離散流,離散流是由RDD組成的
    //資料傳輸的時候可以對RDD進行壓縮,壓縮的目的是減少記憶體的佔用
    //預設採用org.apache.spark.serializer.JavaSerializer
    //這是最基本的優化
    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    //rdd壓縮
    conf.set("spark.rdd.compress","true")
    //設定每次拉取的數量,為了防止一下子拉取的資料過多,系統處理不過來
    //這裡並不是拉取100條,是有公式的。
    //batchSize = partitionNum * 分割槽數量 * 取樣時間
    conf.set("spark.streaming.kafka.maxRatePerPartition","100")
    //設定優雅的結束,這樣可以避免資料的丟失
    conf.set("spark.streaming.stopGracefullyOnShutdown","true")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
    //獲取kafka的資料
    /**
      *   指定kafka資料來源
      *   ssc:StreamingContext的例項
      *   LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略
      *       設定策略後會以最優的策略進行獲取資料
      *       一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算,
      *       因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高
      *       如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略
      *       使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上
      *   ConsumerStrategies:消費者策略(指定如何消費)
      *
      */
    val directStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](AppParams.topic,AppParams.kafkaParams)
    )

寫到這裡我們就已經獲取了kafka中的資料了。接下來就是對他進行處理:

我們首先做的是計算充值成功的筆數:

由於我們的資料是json的所以要想對資料進行分析的話,就要使用json解析工具:

我們匯入json解析工具的依賴:

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.51</version>
        </dependency>

我們的思路是:過濾出serviceName欄位為reChargeNotifyReq的資料,這些資料就是和充值通知有關的,我們需要處理的資料。

在過濾出來的這些資料中,有成功的,也有失敗的。

過濾完資料之後我們對資料操作,將成功的資料設為1,失敗的設為0,並且從requestId中截取出前8位和標誌位組成一個元組返回。

然後使用reduceByKey就可以統計出當天交易成功的數量了。如果是將結果列印在控制檯上結果是這樣的:

所以要想統計最終的我們可以將資料寫入redis 累加。

程式碼如下:

 directStream.foreachRDD(
      rdd =>{
        //rdd.map(_.value()).foreach(println)
        //取得所有充值通知日誌
        val baseData: RDD[JSONObject] = rdd.map(cr =>JSON.parseObject(cr.value()))
        .filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()
        //bussinessRst是業務結果,如果是0000則為成功,其他返回錯誤編碼
        val totalSucc = baseData.map(obj => {
          val reqId = obj.getString("requestId")
          val day = reqId.substring(0, 8)
          //取出該條充值是否成功的標誌
          val result = obj.getString("bussinessRst")
          val flag = if (result.equals("0000")) 1 else 0
          (day, flag)
        }).reduceByKey(_+_)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------現在我們需要做的是統計出當天的交易成功的總金額,我們只要對上面的程式進行修改一下就好了。

之前的程式碼中如果交易成功返回的是1,而現在我們只要返回交易金額就好了。

程式碼如下:

//獲取充值成功的訂單金額
        val totalMoney = baseData.map(obj => {
          val reqId = obj.getString("requestId")
          val day = reqId.substring(0, 8)
          //取出該條充值是否成功的標誌
          val result = obj.getString("bussinessRst")
          val fee = if (result.equals("0000")) obj.getString("chargefee").toDouble else 0
          (day, fee)
        }).reduceByKey(_+_)

輸出結果如下:

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------到現在為止我們已經充值成功的訂單量和充值金額寫好了。接下來就要算充值成功率了。

充值成功率 = 成功訂單數 / 總訂單數

總訂單量只要使用count就可以得出。而充值成功的訂單我們前面已經算出來了。

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

接下來要算的就是充值成功的充值時長:

如果是交易成功的則用結束時間(即通知充值成功的時間)- 開始時間(即requestId的前17位)

如果是交易失敗的話,則返回0

程式碼如下:

 /**
          * 獲取充值成功的充值時長
          */
        val totalTime: RDD[(String, Long)] = baseData.map(obj => {
          val reqId = obj.getString("requestId")
          //獲取日期
          val day = reqId.substring(0, 8)
          //取出該條充值是否成功的標誌
          val result = obj.getString("bussinessRst")
          //時間格式為:yyyyMMddHHmmssSSS(年月日時分秒毫秒)
          val endTime = obj.getString("receiveNotifyTime")
          val startTime: String = reqId.substring(0, 17)
          val format: SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS")
          val cost = if (result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0
          (day, cost)
        }).reduceByKey(_ + _)

輸出結果如下:

這裡返回的時間是毫秒數

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

接下來我們嘗試將充值成功的訂單數寫入redis :

我們首先在application.conf中新增redis的一些配置:

# redis
redis.host = "marshal"
redis.db.index = 1

接下來在AppParams中新增,對redis引數的訪問:


  /**
    * redis伺服器地址
    */
  val redisHost = config.getString("redis.host")

  /**
    * 將資料寫入到哪個庫
    */
  val redisDbIndex = config.getString("redis.db.index").toInt

然後寫一個從連線池獲取連線的方法:

package com.sheep.cmcc.utils

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{Jedis, JedisPool}

object Jpools {
  private val poolConfig = new GenericObjectPoolConfig()
  //連線池中最大的空閒連線數,預設是8
  poolConfig.setMaxIdle(5)
  //只支援最大的連線數,連線池中最大的連線數,預設是8
  poolConfig.setMaxTotal(2000)
  private lazy val jedisPool: JedisPool = new JedisPool(poolConfig,AppParams.redisHost)

  def getJedis = {
    val jedis: Jedis = jedisPool.getResource
    jedis.select(AppParams.redisDbIndex)
    jedis
  }
}

接下來寫入資料庫:

//將充值成功的訂單數寫入redis
        totalSucc.foreachPartition(it => {
          val jedis: Jedis = Jpools.getJedis
          it.foreach(
            tp => {
              jedis.incrBy("CMCC-"+tp._1,tp._2)
            })
          jedis.close()
        })

執行結果如下:

兩次重新整理之後的結果不一樣,是因為他不停的在讀資料,處理資料,然後做累加。

但是這樣的寫法很不好,而且存在很多問題:

比如頻繁的使用reduceByKey,會不停的產生shuffle,這樣對效能會有影響。

----------------------------------------------------------------------------------------------------------------------------------------------------------------------

寫到現在我們的程式還不夠優化,我們的各項指標都是單獨計算的,每次計算都會產生shuffle.這樣的效能是非常低的。所以我們針對現有的程式進行一個改造。