SparkStreaming向Hbase中寫資料
阿新 • • 發佈:2019-02-20
在SparkStreaming中統計了資料之後,我們需要將結果寫入外部檔案系統。
首先,需要說一下,下面的這個方法。
foreachRDD(func)
最通用的輸出操作,把func作用於從stream生成的每一個RDD。注意:這個函式是在 執行streaming程式的driver程序 中執行的。
下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料
向外部寫資料 常見的錯誤:
向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)。
開發者可能 在driver中建立連線,而在spark worker 中儲存資料
例如:
上面這種寫法是錯誤的!上面的寫法,需要connection 物件被序列化,然後從driver傳送到worker。dstream.foreachRDD { rdd => val connection = createNewConnection() // 這個會在driver中執行 rdd.foreach { record => connection.send(record) //這個會在 worker中執行 } }
這樣的connection是很少在機器之間傳輸的。知道這個問題後,我們可以寫出以下的,修改後的程式碼:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
很遺憾!這種寫法也是不對的。這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)。下面的方法會好一些:
上面的方法,使用 rdd.foreachPartition 建立一個connection 物件, 一個RDD分割槽中的所有資料,都使用這一個connection。dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
是不是,很機智啊~~~
事實上,還可以更機智點
在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool是一個靜態的,延遲初始化的連線池 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 返回到池中 以便別人使用 } }
注意:連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)
到此,SparkStreaming向外部資料庫寫資料的原理就講完了。
補充
看大家對spark連線HBase的ConnectionPool工具類有需求,在這裡在補充一下:
Hbase通用連線類
Scala連線Hbase是通過zookeeper獲取資訊,所以在配置時需要提供zookeeper的相關資訊,如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory
object HbaseUtil extends Serializable {
private val conf = HBaseConfiguration.create()
private val para = Conf.hbaseConfig // Conf為配置類,獲取hbase的配置
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts
private val connection = ConnectionFactory.createConnection(conf)
def getHbaseConn: Connection = connection
}
Hbase並沒有使用連線池
Hbase輸出操作
以put操作為例,將上述設計模式應用到Hbase輸出操作當中:
dstream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreachPartition(partitionRecords => {
val connection = HbaseUtil.getHbaseConn // 獲取Hbase連線
partitionRecords.foreach(data => {
val tableName = TableName.valueOf("tableName")
val t = connection.getTable(tableName)
try {
val put = new Put(Bytes.toBytes(_rowKey_)) // row key
// column, qualifier, value
put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes)
Try(t.put(put)).getOrElse(t.close())
// do some log(顯示在worker上)
} catch {
case e: Exception =>
// log error
e.printStackTrace()
} finally {
t.close()
}
})
})
// do some log(顯示在driver上)
}
})
關於Hbase的其他操作可以參考官方文件
重點記錄在連線Hbase過程中配置HConstants.ZOOKEEPER_QUORUM的問題:
由於Hbase的連線不能直接使用ip地址進行訪問,往往需要配置hosts,例如我在上述程式碼段中127-0-0-1(任意),我們在hosts中需要配置
127-0-0-1 127.0.0.1
在單機情況下,我們只需要配置一臺zookeeper所在Hbase的hosts即可,但是當切換到Hbase叢集是遇到一個詭異的bug
問題描述:在foreachRDD中將Dstream儲存到Hbase時會卡住,並且沒有任何錯誤資訊爆出(沒錯!它就是卡住,沒反應)
問題分析:由於Hbase叢集有多臺機器,而我們只配置了一臺Hbase機器的hosts,這樣導致Spark叢集在訪問Hbase時不斷的去尋找但卻找不到就卡在那裡
解決方式:對每個worker上的hosts配置了所有hbase的節點ip,問題解決
MySQL通用連線類
import java.sql.Connection
import java.util.Properties
import com.mchange.v2.c3p0.ComboPooledDataSource
class MysqlPool extends Serializable {
private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
private val conf = Conf.mysqlConfig
try {
cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
cpds.setDriverClass("com.mysql.jdbc.Driver");
cpds.setUser(conf.get("username").getOrElse("root"));
cpds.setPassword(conf.get("password").getOrElse(""))
cpds.setMaxPoolSize(200)
cpds.setMinPoolSize(20)
cpds.setAcquireIncrement(5)
cpds.setMaxStatements(180)
} catch {
case e: Exception => e.printStackTrace()
}
def getConnection: Connection = {
try {
return cpds.getConnection();
} catch {
case ex: Exception =>
ex.printStackTrace()
null
}
}
}
object MysqlManager {
var mysqlManager: MysqlPool = _
def getMysqlManager: MysqlPool = {
synchronized {
if (mysqlManager == null) {
mysqlManager = new MysqlPool
}
}
mysqlManager
}
}
我們利用c3p0建立Mysql連線池,然後訪問的時候每次從連線池中取出連線用於資料傳輸。
Mysql輸出操作
同樣利用之前的foreachRDD設計模式,將Dstream輸出到mysql的程式碼如下:
dstream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreachPartition(partitionRecords => {
//從連線池中獲取一個連線
val conn = MysqlManager.getMysqlManager.getConnection
val statement = conn.createStatement
try {
conn.setAutoCommit(false)
partitionRecords.foreach(record => {
val sql = "insert into table..." // 需要執行的sql操作
statement.addBatch(sql)
})
statement.executeBatch
conn.commit
} catch {
case e: Exception =>
// do some log
} finally {
statement.close()
conn.close()
}
})
}
})
值得注意的是:我們在提交Mysql的操作的時候,並不是每條記錄提交一次,而是採用了批量提交的形式,所以需要將conn.setAutoCommit(false),這樣可以進一步提高mysql的效率。
如果我們更新Mysql中帶索引的欄位時,會導致更新速度較慢,這種情況應想辦法避免,如果不可避免,那就硬上吧(T^T)
pom依賴
提供一下Spark連線Mysql和Hbase所需要的jar包的maven配置:
<dependency><!-- Hbase -->
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0</version>
</dependency>
<dependency><!-- Mysql -->
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>