急中生智~利用Spark core完成"ETL"!
今天接到老板分配的一個小任務:開發一個程序,實現從數據庫中抽取數據並生成報表的功能(這是我們數據庫審計平臺準備上線的一個功能)。既然是要生成報表,那麽首先得有數據,於是便想到從該業務系統的測試環境抽取業務表的數據,然後裝載至自己雲主機上的Mysql中。
本來以為只要"select ...into outfile"和"load data infile..."兩個命令就可以搞定的,可是還是出了意外。測試環境導出的
txt文件在雲主機load時,報了"Row 1 doesn‘t contain data for all columns"這樣的warning,表中的數據自然也是淩亂且不完整的。
1.由於測試環境的網段是隔離的,所以為了拿到"select ...into outfile"時生成的數據,我是打開CRT的日誌,然後執行
"cat xxx.txt",變相地將數據獲取到了本地,然後上傳至雲主機的;
2.測試環境的Mysql和雲主機上Mysql的小版本不一致。
這兩個問題看似都沒法解決,現在只有文本文件,怎麽辦?使用Spark不就得了!
之前也寫過一篇使用Spark分析Mysql慢日誌的博文,自己也對Spark core的各種算子比較熟悉,所以決定試一試。
實戰演練:
表結構如下:
mysql> desc claims_case_loss_document; +---------------+-------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +---------------+-------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | case_id | varchar(22) | NO | | NULL | | | case_times | varchar(2) | NO | | NULL | | | document_list | text | NO | | NULL | | | create_time | timestamp | YES | | NULL | | | update_time | timestamp | YES | | NULL | | +---------------+-------------+------+-----+---------+----------------+ 6 rows in set (0.00 sec)
文本結構如下:
1147 90100002700021437455 1 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書 2017-11-16 12:08:08 2017-11-16 12:08:08
觀察文本結構可知,每個字段間都有數個空格,而且兩兩字段間的空格數並不一致,所以得先使用Spark core將文本中字段間的空格數都變為1,以便後續切分。
閑話少說,直接上程序!(以下程序均使用scala在eclipse ide for scala中編寫和執行)
package cn.spark.study.sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ArrayBuffer import java.sql.DriverManager object insertToMysql { def main(args: Array[String]): Unit = { val t1=System.nanoTime() val conf = new SparkConf() .setAppName("insertToMysql") .setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1); val words = lines.map { line => line.split(" ") } val wordsNotNull = words.map{ word => val wordArray_raw = new ArrayBuffer[String]() val wordArray = new ArrayBuffer[String]() for(i<-0 until word.length){ if (word(i)!=""){ wordArray_raw+=word(i) } } for(i<-0 until wordArray_raw.length-4){ wordArray+=wordArray_raw(i) } wordArray+=wordArray_raw(4)+" "+wordArray_raw(5) wordArray+=wordArray_raw(6)+" "+wordArray_raw(7) wordArray } wordsNotNull.foreach { word => Class.forName("com.mysql.cj.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456") try { val statement = conn.createStatement() val sql="insert into claims_case_loss_document values ("+ word(0)+","+ "‘"+word(1)+"‘"+","+ "‘"+word(2)+"‘"+","+ "‘"+word(3)+"‘"+","+ "‘"+word(4)+"‘"+","+ "‘"+word(5)+"‘"+")" //執行插入 //println(sql) statement.executeUpdate(sql) } catch{ case e:Exception =>e.printStackTrace } finally { conn.close } } val t2=System.nanoTime() //打印程序運行時間 println((t2-t1)/1000000000 +"s") } }
在插入的過程中,第一條記錄總是會報錯(後續語句插入正常),在eclipse中將報錯的insert語句手工粘貼至mysql執行,仍報相同錯誤:
從報錯看是遇到了bug,並且1147這個值有問題,將相鄰語句放入notePad對比:
從圖中可看出,1147的千位上的1確實發生了異常改變,而第二條語句中的1148是正常的,猜測可能是某個未知bug導致了第一條記錄發生了異常改變。這個猜測在後續得到了證實:當把1147所在行從文本中刪除後(此時1148所在行為第一條記錄),1148所在行也報出同樣的錯誤,而後續語句均可正常插入。
由於數據是作分析用的,所以丟失一條無傷大雅,而且這個bug實在詭異,這裏就不再深究了。
細心的童鞋在看了代碼後應該會問:數據插入的效率如何?實不相瞞,效率很差!5000條的數據足足用了近半個小時,即使是在這樣的OLAP場景下,這樣的效率也是不可容忍的!
仔細研究代碼可發現,在對RDD調用foreach方法進行插入的時候,每一條記錄都要創建一個連接,並且每一次insert都會在Mysql中觸發一次commit操作(autocommit參數默認是打開的),這些都是很消耗資源的操作,插入效率自然很差。
發現這些問題後,針對代碼進行了修改:
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer
object insert2Mysql {
def main(args: Array[String]): Unit = {
val t1=System.nanoTime()
val conf = new SparkConf()
.setAppName("insert2Mysql")
.setMaster("local")
val sc = new SparkContext(conf)
//textFile方法只能讀取字符集為utf-8的文件,否則中文會亂碼。windows下,將文件另存為時,可以選擇utf-8字符集
//也可在代碼中實施轉換,但比較繁瑣
val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
val words = lines.map { line => line.split(" ") }
val wordsNotNull = words.map{ word =>
val wordArray_raw = new ArrayBuffer[String]()
val wordArray = new ArrayBuffer[String]()
for(i<-0 until word.length){
if (word(i)!=""){
wordArray_raw+=word(i)
}
}
for(i<-0 until wordArray_raw.length-4){
wordArray+=wordArray_raw(i)
}
wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
wordArray
}
val sqlRDD=wordsNotNull.map{ word =>
val sql="insert into claims_case_loss_document values ("+
word(0)+","+
"‘"+word(1)+"‘"+","+
"‘"+word(2)+"‘"+","+
"‘"+word(3)+"‘"+","+
"‘"+word(4)+"‘"+","+
"‘"+word(5)+"‘"+")"
sql
}
val sqlArray=sqlRDD.toArray()
//加載驅動
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
try {
conn.setAutoCommit(false)
val statement = conn.createStatement()
//這裏有bug,處理出來的第一行格式都會報ERROR 1054 (42S22): Unknown column ‘?1147‘ in ‘field list‘
//為了避免程序跳出循環,所以循環從1開始,即從第2條開始插入
for(i<-1 until sqlArray.length){
//執行插入
println(sqlArray(i))
statement.executeUpdate(sqlArray(i))
}
conn.commit()
}
catch{
case e:Exception =>e.printStackTrace
}
finally{
conn.close
}
val t2=System.nanoTime()
println((t2-t1)/1000000000 +"s")
}
}
修改後的代碼規避了上述缺陷,在同樣插入5000條數據的情況下,只用了221s!效率大大提升!
到Mysql驗證數據:
mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
| 4999 |
+----------+
1 row in set (0.00 sec)
mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
id: 1148
case_id: 90100002700021437450
case_times: 1
document_list: 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書
create_time: 2017-11-16 12:08:08
update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)
至此,問題圓滿解決!整個過程和數據倉庫領域的ETL很接近,抽取-轉換-裝載,三個環節都有涉及,只是沒有使用
kettle之類的工具罷了。
總結:
在大數據時代,DBA應該積極做出改變,掌握一定開發技能,以便更好地適應時代變化,切不可固守自己的一畝三分地!
最後,給我們上海分組自研的數據庫審計平臺打個廣告 ^.^
數據庫審計平臺是我們分組歷時兩年打造的產品,可用於Mysql、Oracle、Postgres等多種數據庫,具備以下核心工能:
1.審計違規sql,前端一鍵生成報告
2.對相同功能點的sql可實現自動歸類,方便後續統一整改
3.內嵌Percona toolkit,前端一鍵調用
4.一鍵抓取低效sql,並自動給出優化建議
還有很多很酷的功能就不一一介紹了,總之,誰用誰說好!感興趣的DBA童鞋可以留言,可免費試用哦!
急中生智~利用Spark core完成"ETL"!