1. 程式人生 > 其它 >kafka--Struct Streaming--mysql案例

kafka--Struct Streaming--mysql案例

技術標籤:kafkasparksparkkafkamysql

需求

kafka讀取資料,用Struct Streaming處理,然後儲存到mysql
資料以::拆分,過濾出含有Comedy的行,儲存到mysql中

kafka資料格式如下

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|
Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action

案例

啟動kafka生產者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

程式碼如下

import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement}

object MysqlSink extends App {
//構建spark Session
  private val session: SparkSession = SparkSession.builder().master("local[2]").appName(
"test") .getOrCreate() //消費kafka資料 private val df: DataFrame = session.readStream.format("kafka") .option("kafka.bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092") .option("subscribe", "pet") .load() import session.implicits._ //轉換二進位制資料為String private val ds: Dataset[String] = df.selectExpr("cast(value as String)").as[String] //拆分資料為元組 private val ds2: Dataset[(String, String, String)] = ds.map(x => { val arr: Array[String] = x.split("::") (arr(0), arr(1), arr(2)) }) //過濾Comedy所在行並轉為DF private val ds3: DataFrame = ds2.as[(String, String, String)].filter(_._3.contains("Comedy")).toDF ds3.writeStream .foreach(new myWriter) .start() .awaitTermination() } class myWriter extends ForeachWriter[Row]() { var statement: PreparedStatement = _ var connection: Connection = _ //連線mysql override def open(partitionId: Long, version: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/db2", "root", "123456") statement = connection.prepareStatement(s"insert into movie values (?,?,?)") true } //用於向資料庫插入資料 override def process(value: Row): Unit = { //解析Row內容,並設定mysql中的值 statement.setString(1, value.getString(0)) statement.setString(2, value.getString(1)) statement.setString(3, value.getString(2)) //執行插入資料 statement.execute() } override def close(errorOrNull: Throwable): Unit = { //關閉連線 connection.close() } }

結果示例
在這裡插入圖片描述

注意事項

  • mysql中的表要提前存在,程式碼不會幫你建立表格