1. 程式人生 > >upsert在mysql中的實現(附spark應用

upsert在mysql中的實現(附spark應用

upsert概述以及在mysql中的實現

  upsert是update和insert的合體,這裡暫時不對其具體的語義進行探討,簡單對其做一個定義,基本功能為:存在時更新,不存在時插入,簡單的解釋就是,當某種條件成立時使用update,條件不成立時使用insert。

  在很多場景下,我們少不了使用upsert功能,比如資料增量處理,商品狀態更新等,而在mysql中並沒有這樣的一種語句,如果我們先查詢是否存在,再選擇使用update或者insert,這樣不僅比較複雜而且還容易造成事務上的衝突,不過後來發現有方式實現這種需求,那就是ON DUPLICATE KEY UPDATE

舉個栗子:

INSERT
INTO table (a,b,c) VALUES (1,2,3) ON DUPLICATE KEY UPDATE c=c+1;

ON DUPLICATE KEY UPDATE需要配合UNIQUE索引PRIMARY KEY使用。

  這句sql的基本解釋為,當UNIQUE索引PRIMARY KEY的欄位在表中不存在時,行作為新記錄被插入,如果欄位存在,則執行ON DUPLICATE KEY UPDATE後面的邏輯,這裡就是c=c+1。

  這裡我們可以將欄位a設定為唯一索引或者主鍵,還可以將a,b設定為聯合唯一索引,就能實現upsert的需求,具體如何應用還是要看具體的場景和需求。

spark寫入mysql使用upsert

  一般存入mysql中的資料都是結構化的資料,這裡我推薦先將RDD轉為dataframe,再進行資料庫方面的操作,因為同為結構化的資料轉換起來比較簡單,而且只需要很小的改動就能得實現我們的需求。

而一般的dataframe寫入mysql中一般是使用的

dataFrame.write().mode("append").jdbc(url, "table", mysqlProperties)

  下面過一遍原始碼,首先呼叫的dataframe的write()生成DataFrameWriter,然後再配置mode,我們這裡選用的是append,接下來來到了jdbc()中建立createConnectionFactory()以及檢查表是否存在,如果不存在則從dataframe中取出schema來建立表,然後呼叫的是saveTable(),最後使用的是savePartition()來進行資料型別對映以及資料的commit,這裡執行語句是在insertStatement()中處理的。 write.png

model.png

jdbc.png

saveTable.png

savePartition.png

insertStatement.png

  而在上面我們的mysql upsert實現中,主要還是通過對執行語句的處理來實現,也就是說我們只需要在呼叫鏈的函式中加上一個引數,並且改動一下insertStatement(),就能實現我們想要的需求,在這裡我是這樣操作的:

  def insertStatement(conn: Connection, table: String, rddSchema: StructType, accumulateColumns: Array[String]): PreparedStatement = {
    val columns = rddSchema.fields.map(_.name).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    var sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
    if (!accumulateColumns.equals(null)) {
      val finalMap = accumulateColumns.map(x=>x+ "=" + "values(" + x + ")+" + x).mkString(",")
      sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $finalMap"
    }
    conn.prepareStatement(sql)
  }

  累加的列為accumulateColumns,如果我們要累加的列為c,最後的字串會變成

INSERT INTO table (a,b,c) VALUES (?,?,?) ON DUPLICATE KEY UPDATE c=VALUES(c)+c

  最後我們編譯完之後打包,最後呼叫寫入mysql的程式碼為:

dataFrame.write().mode("append").jdbc(url, "table", mysqlProperties,accumulateColumns)

總結

  我們這裡的業務邏輯比較簡單,只是簡單增量的處理,更加複雜的邏輯還是需要個性化的定製,在這裡提供一個簡單的思路,希望對大家有所幫助。