1. 程式人生 > 實用技巧 >基於Spark Streaming + Canal + Kafka對Mysql增量資料實時進行監測分析

基於Spark Streaming + Canal + Kafka對Mysql增量資料實時進行監測分析

1.Canal是什麼
2.如何編寫Canal客戶端
3.如何編寫一個數據庫操作的Spark程式程式碼?
4.開發Spark專案時容易發生哪些衝突問題?



Spark中的Spark Streaming可以用於實時流專案的開發,實時流專案的資料來源除了可以來源於日誌、檔案、網路埠等,常常也有這種需求,那就是實時分析處理MySQL中的增量資料。面對這種需求當然我們可以通過JDBC的方式定時查詢Mysql,然後再對查詢到的資料進行處理也能得到預期的結果,但是Mysql往往還有其他業務也在使用,這些業務往往比較重要,通過JDBC方式頻繁查詢會對Mysql造成大量無形的壓力,甚至可能會影響正常業務的使用,在基本不影響其他Mysql正常使用的情況下完成對增量資料的處理,那就需要 Canal 了。


假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,並標註比率的風險預警等級。


1. Canal
Canal[kə'næl]是阿里巴巴開源的純java開發的基於資料庫binlog的增量訂閱&消費元件。Canal的原理是模擬為一個Mysql slave的互動協議,偽裝自己為MySQL slave,向Mysql Master傳送dump協議,然後Mysql master接收到這個請求後將binary log推送給slave(也就是Canal),Canal解析binary log物件。詳細可以查閱Canal的官方文件
[alibaba/canal wiki]


1.1 Canal 安裝
Canal的server mode在1.1.x版本支援的有TPC、KafkaRocketMQ。本次安裝的canal版本為1.1.2,Canal版本最後在1.1.1之後。server端採用MQ模式,MQ選用Kafka。伺服器系統為Centos7,其他環境為:jdk8、Scala 2.11、Mysql、Zookeeper、Kafka。

1.1.1 準備
安裝Canal之前我們先把如下安裝好
Mysql
a.如果沒有Mysql: 詳細的安裝過程可參考我的另一篇部落格[Centos7環境下離線安裝mysql 5.7 / mysql 8.0]
b.開啟Mysql的binlog。修改/etc/my.cnf
,在[mysqld]下新增如下配置,改完之後重啟Mysql/etc/init.d/mysql restart

[mysqld]
#新增這一行就ok
log-bin=mysql-bin
#選擇row模式
binlog-format=ROW
#配置mysql replaction需要定義,不能和canal的slaveId重複
server_id=1

  c.建立一個Mysql使用者並賦予相應許可權,用於Canal使用

mysql>  CREATE USER canal IDENTIFIED BY 'canal';  
mysql>  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql>  GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
mysql>  FLUSH PRIVILEGES;

Zookeeper
因為安裝Kafka時需要Zookeeper,例如ZK安裝後地址為:cdh3:2181,cdh4:2181,cdh5:2181

Kafka
例如安裝後的地址為:node1:9092,node2:9092,node3:9092
安裝後建立一個Topic,例如建立一個example

kafka-topics.sh --create --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --partitions 2 --replication-factor 1 --topic example

1.1.2 安裝Canal
1. 下載Canal
訪問Canal的Release頁canal v1.1.2

wget [url=https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz]https://github.com/alibaba/canal ... ployer-1.1.2.tar.gz[/url]

  2. 解壓
注意這裡一定要先創建出一個目錄,直接解壓會覆蓋檔案

mkdir -p /usr/local/canal
mv canal.deployer-1.1.2.tar.gz /usr/local/canal/
tar -zxvf canal.deployer-1.1.2.tar.gz

3. 修改instance 配置檔案
vim $CANAL_HOME/conf/example/instance.properties,修改如下項,其他預設即可

## mysql serverId , v1.0.26+ will autoGen , 不要和server_id重複
canal.instance.mysql.slaveId=3
     
# position info。Mysql的url
canal.instance.master.address=node1:3306
     
# table meta tsdb info
canal.instance.tsdb.enable=false
     
# 這裡配置前面在Mysql分配的使用者名稱和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# 配置需要檢測的庫名,可以不配置,這裡只檢測canal_test庫
canal.instance.defaultDatabaseName=canal_test
# enable druid Decrypt database password
canal.instance.enableDruid=false
     
# 配置過濾的正則表示式,監測canal_test庫下的所有表
canal.instance.filter.regex=canal_test\\..*
     
# 配置MQ
## 配置上在Kafka建立的那個Topic名字
canal.mq.topic=example
## 配置分割槽編號為1
canal.mq.partition=1

4. 修改canal.properties配置檔案
vim $CANAL_HOME/conf/canal.properties,修改如下項,其他預設即可

# 這個是如果開啟的是tcp模式,會佔用這個11111埠,canal客戶端通過這個埠獲取資料
canal.port = 11111
     
# 可以配置為:tcp, kafka, RocketMQ,這裡配置為kafka
canal.serverMode = kafka
     
# 這裡將這個註釋掉,否則啟動會有一個警告
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
     
##################################################
#########                      MQ                      #############
##################################################
canal.mq.servers = node1:9092,node2:9092,node3:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 預設50K, 由於kafka最大訊息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get資料的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式物件
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka訊息投遞是否使用事務
#canal.mq.transaction = false

5. 啟動Canal

$CANAL_HOME/bin/startup.sh

6. 驗證
檢視日誌
啟動後會在logs下生成兩個日誌檔案:logs/canal/canal.log、logs/example/example.log,檢視這兩個日誌,保證沒有報錯日誌。
如果是在虛擬機器安裝,最好給2個核數以上。
確保登陸的系統的hostname可以ping通。

在Mysql資料庫中進行增刪改查的操作,然後檢視Kafka的topic為 example 的資料

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic example

7. 關閉Canal
不用的時候一定要通過這個命令關閉如果是用kill或者關機,當再次啟動依然會提示要先執行stop.sh指令碼後才能再啟動。

$CANAL_HOME/bin/stop.sh

*1.2 Canal 客戶端程式碼
如果我們不使用Kafka作為Canal客戶端,我們也可以用程式碼編寫自己的Canal客戶端,然後在程式碼中指定我們的資料去向。此時只需要將canal.properties配置檔案中的canal.serverMode值改為tcp。編寫我們的客戶端程式碼。

在Maven專案的pom中引入:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

編寫程式碼:

/**
 * Canal客戶端。
 *  注意:canal服務端只會連線一個客戶端,當啟用多個客戶端時,其他客戶端是就無法獲取到資料。所以啟動一個例項即可
 * 
 * @see <a href="https://github.com/alibaba/canal/wiki/ClientExample">官方文件:ClientSample程式碼</a>
 *
 * Created by yore on 2019/3/16 10:50
 */
public class SimpleCanalClientExample {
     
    public static void main(String args[]) {
     
        /**
         * 建立連結
         *      SocketAddress: 如果提交到canal服務端所在的伺服器上執行這裡可以改為 new InetSocketAddress(AddressUtils.getHostIp(), 11111)
         *      destination 通服務端canal.properties中的canal.destinations = example配置對應
         *      username:
         *      password:
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("node1", 11111),
                "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
  
                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾資料
            }
  
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
  
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
  
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
  
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
             
            /**
             * 如果只對某些庫的資料操作,可以加如下判斷:
             * if("庫名".equals(entry.getHeader().getSchemaName())){
             *      //TODO option
             *  }
             * 
             * 如果只對某些表的資料變動操作,可以加如下判斷:
             * if("表名".equals(entry.getHeader().getTableName())){
             *     //todo option
             * }
             * 
             */
  
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
  
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
     
}

本地執行上述程式碼,我們修改Mysql資料中的資料,可在控制檯中看到資料的改變:

empty count : 20
empty count : 21
empty count : 22
================> binlog[mysql-bin.000009:1510] , name[canal_test,customer] , eventType : INSERT
id : 4    update=true
name : spark    update=true
empty count : 1
empty count : 2
empty count : 3

  2. Spark
通過上一步我們已經能夠獲取到canal_test庫的變化資料,並且已經可將將變化的資料實時推送到Kafka中,Kafka中接收到的資料是一條Json格式的資料,我們需要對INSERTUPDATE型別的資料處理,並且只處理狀態為1的資料,然後需要計算mor_rate的變化,並判斷 mor_rate 的風險等級0-75%為G1等級,75%-80%為R1等級,80%-100%為R2等級。最後將處理的結果儲存到DB,可以儲存到Redis、Mysql、MongoDB,或者推送到Kafka都可以。這裡是將結果資料儲存到了Mysql。

2.1 在Mysql中建立如下兩張表:

-- 在canal_test庫下建立表
CREATE TABLE `policy_cred` (
        p_num varchar(22) NOT NULL,
        policy_status varchar(2) DEFAULT NULL COMMENT '狀態:0、1',
        mor_rate decimal(20,4) DEFAULT NULL,
        load_time datetime DEFAULT NULL,
        PRIMARY KEY (`p_num`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  
-- 在real_result庫下建立表
CREATE TABLE `real_risk` (
        p_num varchar(22) NOT NULL,
        risk_rank varchar(8) DEFAULT NULL COMMENT '等級:G1、R1、R2',
        mor_rate decimal(20,4) ,
        ch_mor_rate decimal(20,4),
        load_time datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  2.2 Spark程式碼開發:
2.2.1 在resources下new一個專案的配置檔案my.properties

## spark
# spark://cdh3:7077
spark.master=local[2]
spark.app.name=m_policy_credit_app
spark.streaming.durations.sec=10
spark.checkout.dir=src/main/resources/checkpoint
  
  
## Kafka
bootstrap.servers=node1:9092,node2:9092,node3:9092
group.id=m_policy_credit_gid
# latest, earliest, none
auto.offset.reset=latest
enable.auto.commit=false
kafka.topic.name=example
  
  
## Mysql
mysql.jdbc.driver=com.mysql.jdbc.Driver
mysql.db.url=jdbc:mysql://node1:3306/real_result
mysql.user=root
mysql.password=123456
mysql.connection.pool.size=10

2.2.2 在pom.xml檔案中引入如下:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.4.0</spark.version>
    <canal.client.version>1.1.2</canal.client.version>
</properties>
 
<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>${canal.client.version}</version>
        <exclusions>
            <exclusion>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
 
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
 
    <!-- Spark -->
    <!-- spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- spark-streaming-kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
 
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.1</version>
    </dependency>
 
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.51</version>
    </dependency>
 
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
 
</dependencies>

2.2.3 在scala原始碼目錄下的包下編寫配置檔案的工具類

package yore.spark
  
import java.util.Properties
  
/**
  * Properties的工具類
  *
  * Created by yore on 2018-06-29 14:05
  */
object PropertiesUtil {
  
  private val properties: Properties = new Properties
  
  /**
    *
    * 獲取配置檔案Properties物件
    *
    * @author yore
    * @return java.util.Properties
    * date 2018/6/29 14:24
    */
  def getProperties() :Properties = {
    if(properties.isEmpty){
      //讀取原始碼中resource資料夾下的my.properties配置檔案
      val reader = getClass.getResourceAsStream("/my.properties")
      properties.load(reader)
    }
    properties
  }
  
  /**
    *
    * 獲取配置檔案中key對應的字串值
    *
    * @author yore
    * @return java.util.Properties
    * @date 2018/6/29 14:24
    */
  def getPropString(key : String) : String = {
    getProperties().getProperty(key)
  }
  
  /**
    *
    * 獲取配置檔案中key對應的整數值
    *
    * @author yore
    * @return java.util.Properties
    * @date 2018/6/29 14:24
    */
  def getPropInt(key : String) : Int = {
    getProperties().getProperty(key).toInt
  }
  
  /**
    *
    * 獲取配置檔案中key對應的布林值
    *
    * @author yore
    * @return java.util.Properties
    * @date 2018/6/29 14:24
    */
  def getPropBoolean(key : String) : Boolean = {
    getProperties().getProperty(key).toBoolean
  }
  
}

  2.2.5 在scala原始碼目錄下的包下編寫Spark程式程式碼

package yore.spark
  
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
  
import scala.collection.mutable.ListBuffer
  
/**
  *
  * Created by yore on 2019/3/16 15:11
  */
object M_PolicyCreditApp {
  
  def main(args: Array[String]): Unit = {
  
    // 設定日誌的輸出級別
    Logger.getLogger("org").setLevel(Level.ERROR)
  
    val conf = new SparkConf()
      .setMaster(PropertiesUtil.getPropString("spark.master"))
      .setAppName(PropertiesUtil.getPropString("spark.app.name"))
      // !!必須設定,否則Kafka資料會報無法序列化的錯誤
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //如果環境中已經配置HADOOP_HOME則可以不用設定hadoop.home.dir
    System.setProperty("hadoop.home.dir", "/Users/yoreyuan/soft/hadoop-2.9.2")
  
    val ssc = new StreamingContext(conf,  Seconds(PropertiesUtil.getPropInt("spark.streaming.durations.sec").toLong))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint(PropertiesUtil.getPropString("spark.checkout.dir"))
  
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> PropertiesUtil.getPropString("bootstrap.servers"),
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> PropertiesUtil.getPropString("group.id"),
      "auto.offset.reset" -> PropertiesUtil.getPropString("auto.offset.reset"),
      "enable.auto.commit" -> (PropertiesUtil.getPropBoolean("enable.auto.commit"): java.lang.Boolean)
    )
    val topics = Array(PropertiesUtil.getPropString("kafka.topic.name"))
  
    val kafkaStreaming = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
  
  
    kafkaStreaming.map[JSONObject](line => { // str轉成JSONObject
      println("$$$\t" + line.value())
      JSON.parseObject(line.value)
    }).filter(jsonObj =>{   // 過濾掉非 INSERT和UPDATE的資料
      if(null == jsonObj || !"canal_test".equals(jsonObj.getString("database")) ){
        false
      }else{
        val chType = jsonObj.getString("type")
        if("INSERT".equals(chType) || "UPDATE".equals(chType)){
          true
        }else{
          false
        }
      }
    }).flatMap[(JSONObject, JSONObject)](jsonObj => {   // 將改變前和改變後的資料轉成Tuple
      var oldJsonArr: JSONArray = jsonObj.getJSONArray("old")
      val dataJsonArr: JSONArray = jsonObj.getJSONArray("data")
      if("INSERT".equals(jsonObj.getString("type"))){
        oldJsonArr = new JSONArray()
        val oldJsonObj2 = new JSONObject()
        oldJsonObj2.put("mor_rate", "0")
        oldJsonArr.add(oldJsonObj2)
      }
  
      val result = ListBuffer[(JSONObject, JSONObject)]()
  
      for(i <- 0 until oldJsonArr.size ) {
        val jsonTuple = (oldJsonArr.getJSONObject(i), dataJsonArr.getJSONObject(i))
        result += jsonTuple
      }
      result
    }).filter(t => {  // 過濾狀態不為1的資料,和mor_rate沒有改變的資料
      val policyStatus = t._2.getString("policy_status")
      if(null != policyStatus && "1".equals(policyStatus) && null!= t._1.getString("mor_rate")){
        true
      }else{
        false
      }
    }).map(t => {
      val p_num = t._2.getString("p_num")
      val nowMorRate = t._2.getString("mor_rate").toDouble
      val chMorRate = nowMorRate - t._1.getDouble("mor_rate")
      val riskRank = gainRiskRank(nowMorRate)
  
      // p_num, risk_rank, mor_rate, ch_mor_rate, load_time
      (p_num, riskRank, nowMorRate, chMorRate, new java.util.Date)
    }).foreachRDD(rdd => {
      rdd.foreachPartition(p => {
        val paramsList = ListBuffer[ParamsList]()
        val jdbcWrapper = JDBCWrapper.getInstance()
        while (p.hasNext){
          val record = p.next()
          val paramsListTmp = new ParamsList
          paramsListTmp.p_num = record._1
          paramsListTmp.risk_rank = record._2
          paramsListTmp.mor_rate = record._3
          paramsListTmp.ch_mor_rate = record._4
          paramsListTmp.load_time = record._5
          paramsListTmp.params_Type = "real_risk"
          paramsList += paramsListTmp
        }
        /**
          * VALUES(p_num, risk_rank, mor_rate, ch_mor_rate, load_time)
          */
        val insertNum = jdbcWrapper.doBatch("INSERT INTO real_risk VALUES(?,?,?,?,?)", paramsList)
        println("INSERT TABLE real_risk: " + insertNum.mkString(", "))
      })
    })
  
    ssc.start()
    ssc.awaitTermination()
  
  }
  
  
  
  def gainRiskRank(rate: Double): String = {
    var result = ""
    if(rate>=0.75 && rate<0.8){
      result = "R1"
    }else if(rate >=0.80 && rate<=1){
      result = "R2"
    }else{
      result = "G1"
    }
    result
  }
  
}
  
/**
  * 結果表對應的引數實體物件
  */
class ParamsList extends Serializable{
  var p_num: String = _
  var risk_rank: String = _
  var mor_rate: Double = _
  var ch_mor_rate: Double = _
  var load_time:java.util.Date = _
  var params_Type : String = _
  override def toString = s"ParamsList($p_num, $risk_rank, $mor_rate, $ch_mor_rate, $load_time)"
}

  3. 測試
啟動 ZK、Kafka、Canal。
在 canal_test 庫下的policy_cred 表中插入或者修改資料
然後檢視 real_result 庫下的real_risk表中結果。


更新一條資料時Kafka接收到的json資料如下(這是canal投送到Kafka中的資料格式,包含原始資料、修改後的資料、庫名、表名等資訊):

{
  "data": [
    {
      "p_num": "1",
      "policy_status": "1",
      "mor_rate": "0.8800",
      "load_time": "2019-03-17 12:54:57"
    }
  ],
  "database": "canal_test",
  "es": 1552698141000,
  "id": 10,
  "isDdl": false,
  "mysqlType": {
    "p_num": "varchar(22)",
    "policy_status": "varchar(2)",
    "mor_rate": "decimal(20,4)",
    "load_time": "datetime"
  },
  "old": [
    {
      "mor_rate": "0.5500"
    }
  ],
  "sql": "",
  "sqlType": {
    "p_num": 12,
    "policy_status": 12,
    "mor_rate": 3,
    "load_time": 93
  },
  "table": "policy_cred",
  "ts": 1552698141621,
  "type": "UPDATE"
}

檢視Mysql中的結果表



4、出現的問題
在開發Spark程式碼是有時專案可能會引入大量的依賴包,依賴包之間可能就會發生衝突,比如發生如下錯誤:

Exception in thread "main" java.lang.NoSuchMethodError: io.netty.buffer.PooledByteBufAllocator.<init>(ZIIIIIIIZ)V
        at org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:120)
        at org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:106)
        at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:99)
        at org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:71)
        at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:461)
        at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:838)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)
        at yore.spark.M_PolicyCreditApp$.main(M_PolicyCreditApp.scala:33)
        at yore.spark.M_PolicyCreditApp.main(M_PolicyCreditApp.scala)

我們可以在專案的根目錄下的命令視窗中輸人:mvn dependency:tree -Dverbose> dependency.log

然後可以在專案根目錄下生產一個dependency.log檔案,檢視這個檔案,在檔案中搜索io.netty關鍵字,找到其所在的依賴包:

然就在canal.clientio.netty排除掉

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>${canal.client.version}</version>
    <exclusions>
        <exclusion>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </exclusion>
    </exclusions>
</dependency>