1. 程式人生 > >[轉]Spark 踩坑記:數據庫(Hbase+Mysql)

[轉]Spark 踩坑記:數據庫(Hbase+Mysql)

cep 直接 策略 https rds 但是 更新 base ID

https://cloud.tencent.com/developer/article/1004820

Spark 踩坑記:數據庫(Hbase+Mysql)
前言
在使用Spark Streaming的過程中對於計算產生結果的進行持久化時,我們往往需要操作數據庫,去統計或者改變一些值。

最近一個實時消費者處理任務,在使用spark streaming進行實時的數據流處理時,我需要將計算好的數據更新到hbase和mysql中,所以本文對spark操作hbase和mysql的內容進行總結,並且對自己踩到的一些坑進行記錄。

Spark Streaming持久化設計模式
DStreams輸出操作
print:打印driver結點上每個Dstream中的前10個batch元素,常用於開發和調試

saveAsTextFiles(prefix, [suffix]):將當前Dstream保存為文件,每個interval batch的文件名命名規則基於 prefix 和 suffix :”prefix-TIME_IN_MS[.suffix]”.
saveAsObjectFiles(prefix, [suffix]):將當前的Dstream內容作為Java可序列化對象的序列化文件進行保存,每個interval batch的文件命名規則基於prefix和suffix:: “prefix-TIME_IN_MS[.suffix]”.
saveAsHadoopFiles(prefix, [suffix]):將Dstream以hadoop文件的形式進行保存,每個interval batch的文件命名規則基於prefix和suffix:: “prefix-TIME_IN_MS[.suffix]”.
foreachRDD(func):最通用的輸出操作,可以對從數據流中產生的每一個RDD應用函數fun。通常fun會將每個RDD中的數據保存到外部系統,如:將RDD保存到文件,或者通過網絡連接保存到數據庫。值得註意的是:fun執行在跑應用的driver進程中,並且通常會包含RDD action以促使數據流RDD開始計算。
使用foreachRDD的設計模式
dstream.foreachRDD對於開發而言提供了很大的靈活性,但在使用時也要避免很多常見的坑。我們通常將數據保存到外部系統中的流程是:建立遠程連接->通過連接傳輸數據到遠程系統->關閉連接。針對這個流程我們很直接的想到了下面的程序代碼:

dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
在上一篇文章《spark踩坑記——初試》中,對spark的worker和driver進行了整理,我們知道在集群模式下,上述代碼中的connection需要通過序列化對象的形式從driver發送到worker,但是connection是無法在機器之間傳遞的,即connection是無法序列化的,這樣可能會引起Cserialization errors (connection object not serializable)的錯誤。為了避免這種錯誤,我們將conenction在worker當中建立,代碼如下:

dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
似乎這樣問題解決了?但是細想下,我們在每個rdd的每條記錄當中都進行了connection的建立和關閉,這會導致不必要的高負荷並且降低整個系統的吞吐量。

所以一個更好的方式是使用rdd.foreachPartition即對於每一個rdd的partition建立唯一的連接(註:每個partition是內的rdd是運行在同一worker之上的),代碼如下:

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
這樣我們降低了頻繁建立連接的負載,通常我們在連接數據庫時會使用連接池,把連接池的概念引入,代碼優化如下:

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
通過持有一個靜態連接池對象,我們可以重復利用connection而進一步優化了連接建立的開銷,從而降低了負載。另外值得註意的是,同數據庫的連接池類似,我們這裏所說的連接池同樣應該是lazy的按需建立連接,並且及時的收回超時的連接。

另外值得註意的是:

如果在spark streaming中使用了多次foreachRDD,它們之間是按照程序順序向下執行的
Dstream對於輸出操作的執行策略是lazy的,所以如果我們在foreachRDD中不添加任何RDD action,那麽系統僅僅會接收數據然後將數據丟棄。
Spark訪問Hbase
上面我們闡述了將spark streaming的Dstream輸出到外部系統的基本設計模式,這裏我們闡述如何將Dstream輸出到Hbase集群。

Hbase通用連接類
Scala連接Hbase是通過zookeeper獲取信息,所以在配置時需要提供zookeeper的相關信息,如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory

object HbaseUtil extends Serializable {
private val conf = HBaseConfiguration.create()
private val para = Conf.hbaseConfig // Conf為配置類,獲取hbase的配置
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts
private val connection = ConnectionFactory.createConnection(conf)

def getHbaseConn: Connection = connection
}
根據網上資料,Hbase的連接的特殊性我們並沒有使用連接池

Hbase輸出操作
我們以put操作為例,演示將上述設計模式應用到Hbase輸出操作當中:

dstream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreachPartition(partitionRecords => {
val connection = HbaseUtil.getHbaseConn // 獲取Hbase連接
partitionRecords.foreach(data => {
val tableName = TableName.valueOf("tableName")
val t = connection.getTable(tableName)
try {
val put = new Put(Bytes.toBytes(rowKey)) // row key
// column, qualifier, value
put.addColumn(column.getBytes, qualifier.getBytes, value.getBytes)
Try(t.put(put)).getOrElse(t.close())
// do some log(顯示在worker上)
} catch {
case e: Exception =>
// log error
e.printStackTrace()
} finally {
t.close()
}
})
})
// do some log(顯示在driver上)
}
})
關於Hbase的其他操作可以參考Spark 下操作 HBase(1.0.0 新 API)

填坑記錄
重點記錄在連接Hbase過程中配置HConstants.ZOOKEEPER_QUORUM的問題:

由於Hbase的連接不能直接使用ip地址進行訪問,往往需要配置hosts,例如我在上述代碼段中127-0-0-1(任意),我們在hosts中需要配置
127-0-0-1 127.0.0.1
在單機情況下,我們只需要配置一臺zookeeper所在Hbase的hosts即可,但是當切換到Hbase集群是遇到一個詭異的bug
問題描述:在foreachRDD中將Dstream保存到Hbase時會卡住,並且沒有任何錯誤信息爆出(沒錯!它就是卡住,沒反應)

問題分析:由於Hbase集群有多臺機器,而我們只配置了一臺Hbase機器的hosts,這樣導致Spark集群在訪問Hbase時不斷的去尋找但卻找不到就卡在那裏

解決方式:對每個worker上的hosts配置了所有hbase的節點ip,問題解決

Spark訪問Mysql
同訪問Hbase類似,我們也需要有一個可序列化的類來建立Mysql連接,這裏我們利用了Mysql的C3P0連接池

MySQL通用連接類
import java.sql.Connection
import java.util.Properties

import com.mchange.v2.c3p0.ComboPooledDataSource

class MysqlPool extends Serializable {
private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
private val conf = Conf.mysqlConfig
try {
cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
cpds.setDriverClass("com.mysql.jdbc.Driver");
cpds.setUser(conf.get("username").getOrElse("root"));
cpds.setPassword(conf.get("password").getOrElse(""))
cpds.setMaxPoolSize(200)
cpds.setMinPoolSize(20)
cpds.setAcquireIncrement(5)
cpds.setMaxStatements(180)
} catch {
case e: Exception => e.printStackTrace()
}
def getConnection: Connection = {
try {
return cpds.getConnection();
} catch {
case ex: Exception =>
ex.printStackTrace()
null
}
}
}
object MysqlManager {
var mysqlManager: MysqlPool = _
def getMysqlManager: MysqlPool = {
synchronized {
if (mysqlManager == null) {
mysqlManager = new MysqlPool
}
}
mysqlManager
}
}
我們利用c3p0建立Mysql連接池,然後訪問的時候每次從連接池中取出連接用於數據傳輸。

Mysql輸出操作
同樣利用之前的foreachRDD設計模式,將Dstream輸出到mysql的代碼如下:

dstream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreachPartition(partitionRecords => {
//從連接池中獲取一個連接
val conn = MysqlManager.getMysqlManager.getConnection
val statement = conn.createStatement
try {
conn.setAutoCommit(false)
partitionRecords.foreach(record => {
val sql = "insert into table..." // 需要執行的sql操作
statement.addBatch(sql)
})
statement.executeBatch
conn.commit
} catch {
case e: Exception =>
// do some log
} finally {
statement.close()
conn.close()
}
})
}
})
值得註意的是:

我們在提交Mysql的操作的時候,並不是每條記錄提交一次,而是采用了批量提交的形式,所以需要將conn.setAutoCommit(false),這樣可以進一步提高mysql的效率。
如果我們更新Mysql中帶索引的字段時,會導致更新速度較慢,這種情況應想辦法避免,如果不可避免,那就硬上吧(T^T)
部署
提供一下Spark連接Mysql和Hbase所需要的jar包的maven配置:


org.apache.hbase
hbase-client
1.0.0


org.apache.hbase
hbase-common
1.0.0


org.apache.hbase
hbase-server
1.0.0


mysql
mysql-connector-java
5.1.31


c3p0
c3p0
0.9.1.2

參考文獻:

[轉]Spark 踩坑記:數據庫(Hbase+Mysql)