kafka--Struct Streaming--mysql案例
阿新 • • 發佈:2020-12-10
技術標籤:kafkasparksparkkafkamysql
需求
從kafka
讀取資料,用Struct Streaming處理,然後儲存到mysql
資料以::拆分,過濾出含有Comedy的行,儲存到mysql中
kafka資料格式如下
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy| Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
案例
啟動kafka生產者
kafka-console-producer.sh \ --broker-list mypc01:9092,mypc02:9092,mypc03:9092 \ --topic pet
程式碼如下
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}
import java.sql.{Connection, DriverManager, PreparedStatement}
object MysqlSink extends App {
//構建spark Session
private val session: SparkSession = SparkSession.builder().master("local[2]").appName( "test")
.getOrCreate()
//消費kafka資料
private val df: DataFrame = session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")
.option("subscribe", "pet")
.load()
import session.implicits._
//轉換二進位制資料為String
private val ds: Dataset[String] = df.selectExpr("cast(value as String)").as[String]
//拆分資料為元組
private val ds2: Dataset[(String, String, String)] = ds.map(x => {
val arr: Array[String] = x.split("::")
(arr(0), arr(1), arr(2))
})
//過濾Comedy所在行並轉為DF
private val ds3: DataFrame = ds2.as[(String, String, String)].filter(_._3.contains("Comedy")).toDF
ds3.writeStream
.foreach(new myWriter)
.start()
.awaitTermination()
}
class myWriter extends ForeachWriter[Row]() {
var statement: PreparedStatement = _
var connection: Connection = _
//連線mysql
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/db2", "root", "123456")
statement = connection.prepareStatement(s"insert into movie values (?,?,?)")
true
}
//用於向資料庫插入資料
override def process(value: Row): Unit = {
//解析Row內容,並設定mysql中的值
statement.setString(1, value.getString(0))
statement.setString(2, value.getString(1))
statement.setString(3, value.getString(2))
//執行插入資料
statement.execute()
}
override def close(errorOrNull: Throwable): Unit = {
//關閉連線
connection.close()
}
}
結果示例
注意事項
- mysql中的表要提前存在,程式碼不會幫你建立表格