1. 程式人生 > >急中生智~利用Spark core完成"ETL"!

急中生智~利用Spark core完成"ETL"!

MySQL Spark core ETL

背景介紹:
今天接到老板分配的一個小任務:開發一個程序,實現從數據庫中抽取數據並生成報表的功能(這是我們數據庫審計平臺準備上線的一個功能)。既然是要生成報表,那麽首先得有數據,於是便想到從該業務系統的測試環境抽取業務表的數據,然後裝載至自己雲主機上的Mysql中。
本來以為只要"select ...into outfile"和"load data infile..."兩個命令就可以搞定的,可是還是出了意外。測試環境導出的
txt文件在雲主機load時,報了"Row 1 doesn‘t contain data for all columns"這樣的warning,表中的數據自然也是淩亂且不完整的。
仔細分析,感覺可能是兩個方面出了問題:
1.由於測試環境的網段是隔離的,所以為了拿到"select ...into outfile"時生成的數據,我是打開CRT的日誌,然後執行
"cat xxx.txt",變相地將數據獲取到了本地,然後上傳至雲主機的;
2.測試環境的Mysql和雲主機上Mysql的小版本不一致。
這兩個問題看似都沒法解決,現在只有文本文件,怎麽辦?使用Spark不就得了!
之前也寫過一篇使用Spark分析Mysql慢日誌的博文,自己也對Spark core的各種算子比較熟悉,所以決定試一試。

實戰演練:
表結構如下:

mysql> desc claims_case_loss_document;
+---------------+-------------+------+-----+---------+----------------+
| Field         | Type        | Null | Key | Default | Extra          |
+---------------+-------------+------+-----+---------+----------------+
| id            | int(11)     | NO   | PRI | NULL    | auto_increment |
| case_id       | varchar(22) | NO   |     | NULL    |                |
| case_times    | varchar(2)  | NO   |     | NULL    |                |
| document_list | text        | NO   |     | NULL    |                |
| create_time   | timestamp   | YES  |     | NULL    |                |
| update_time   | timestamp   | YES  |     | NULL    |                |
+---------------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)

文本結構如下:

1147    90100002700021437455    1       100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書     2017-11-16 12:08:08     2017-11-16 12:08:08

觀察文本結構可知,每個字段間都有數個空格,而且兩兩字段間的空格數並不一致,所以得先使用Spark core將文本中字段間的空格數都變為1,以便後續切分。
閑話少說,直接上程序!(以下程序均使用scala在eclipse ide for scala中編寫和執行)

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import java.sql.DriverManager

object insertToMysql {
  def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insertToMysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    wordsNotNull.foreach { word =>
                           Class.forName("com.mysql.cj.jdbc.Driver")
                           val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
                           try {
                                val statement = conn.createStatement()
                                val sql="insert into claims_case_loss_document values ("+
                                        word(0)+","+
                                        "‘"+word(1)+"‘"+","+
                                        "‘"+word(2)+"‘"+","+
                                        "‘"+word(3)+"‘"+","+
                                        "‘"+word(4)+"‘"+","+
                                        "‘"+word(5)+"‘"+")"
                                //執行插入
                                //println(sql)
                                statement.executeUpdate(sql)
                                } catch{
                                        case e:Exception =>e.printStackTrace
                                       }
                                  finally {
                                          conn.close
                                          }
                         }
    val t2=System.nanoTime()
        //打印程序運行時間
    println((t2-t1)/1000000000 +"s")
  }
}

在插入的過程中,第一條記錄總是會報錯(後續語句插入正常),在eclipse中將報錯的insert語句手工粘貼至mysql執行,仍報相同錯誤:
技術分享圖片
從報錯看是遇到了bug,並且1147這個值有問題,將相鄰語句放入notePad對比:
技術分享圖片
從圖中可看出,1147的千位上的1確實發生了異常改變,而第二條語句中的1148是正常的,猜測可能是某個未知bug導致了第一條記錄發生了異常改變。這個猜測在後續得到了證實:當把1147所在行從文本中刪除後(此時1148所在行為第一條記錄),1148所在行也報出同樣的錯誤,而後續語句均可正常插入。
由於數據是作分析用的,所以丟失一條無傷大雅,而且這個bug實在詭異,這裏就不再深究了。

細心的童鞋在看了代碼後應該會問:數據插入的效率如何?實不相瞞,效率很差!5000條的數據足足用了近半個小時,即使是在這樣的OLAP場景下,這樣的效率也是不可容忍的!
仔細研究代碼可發現,在對RDD調用foreach方法進行插入的時候,每一條記錄都要創建一個連接,並且每一次insert都會在Mysql中觸發一次commit操作(autocommit參數默認是打開的),這些都是很消耗資源的操作,插入效率自然很差。
發現這些問題後,針對代碼進行了修改:

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer

object insert2Mysql {
    def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insert2Mysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    //textFile方法只能讀取字符集為utf-8的文件,否則中文會亂碼。windows下,將文件另存為時,可以選擇utf-8字符集
    //也可在代碼中實施轉換,但比較繁瑣
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    val sqlRDD=wordsNotNull.map{ word =>  
                                    val sql="insert into claims_case_loss_document values ("+
                                             word(0)+","+
                                             "‘"+word(1)+"‘"+","+
                                             "‘"+word(2)+"‘"+","+
                                             "‘"+word(3)+"‘"+","+
                                             "‘"+word(4)+"‘"+","+
                                             "‘"+word(5)+"‘"+")"
                                    sql
                                  }

    val sqlArray=sqlRDD.toArray()

    //加載驅動
    Class.forName("com.mysql.cj.jdbc.Driver") 
    val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")                               
    try {
        conn.setAutoCommit(false)
        val statement = conn.createStatement() 
        //這裏有bug,處理出來的第一行格式都會報ERROR 1054 (42S22): Unknown column ‘?1147‘ in ‘field list‘
        //為了避免程序跳出循環,所以循環從1開始,即從第2條開始插入
        for(i<-1 until sqlArray.length){
           //執行插入
          println(sqlArray(i))
          statement.executeUpdate(sqlArray(i))
          }
        conn.commit()
        } 
    catch{
          case e:Exception =>e.printStackTrace
          }   
    finally{
            conn.close
            }

    val t2=System.nanoTime()
    println((t2-t1)/1000000000 +"s")
  }
}

修改後的代碼規避了上述缺陷,在同樣插入5000條數據的情況下,只用了221s!效率大大提升!
到Mysql驗證數據:

mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
|     4999 |
+----------+
1 row in set (0.00 sec)

mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
           id: 1148
      case_id: 90100002700021437450
   case_times: 1
document_list: 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書
  create_time: 2017-11-16 12:08:08
  update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)

至此,問題圓滿解決!整個過程和數據倉庫領域的ETL很接近,抽取-轉換-裝載,三個環節都有涉及,只是沒有使用
kettle之類的工具罷了。

總結:
在大數據時代,DBA應該積極做出改變,掌握一定開發技能,以便更好地適應時代變化,切不可固守自己的一畝三分地!

最後,給我們上海分組自研的數據庫審計平臺打個廣告 ^.^
數據庫審計平臺是我們分組歷時兩年打造的產品,可用於Mysql、Oracle、Postgres等多種數據庫,具備以下核心工能:
1.審計違規sql,前端一鍵生成報告
2.對相同功能點的sql可實現自動歸類,方便後續統一整改
3.內嵌Percona toolkit,前端一鍵調用
4.一鍵抓取低效sql,並自動給出優化建議
還有很多很酷的功能就不一一介紹了,總之,誰用誰說好!感興趣的DBA童鞋可以留言,可免費試用哦!

急中生智~利用Spark core完成"ETL"!