1. 程式人生 > 其它 >iOS給H5傳值JSON中的轉義字元\\\被自動移除問題

iOS給H5傳值JSON中的轉義字元\\\被自動移除問題

一、前言

最近幾天因為工作比較忙,已經幾天沒有及時更新文章了,在這裡先給小夥伴們說聲抱歉…臨近週末,再忙再累,我也要開始發力了。接下來的幾天,菌哥將為大家帶來關於FlinkSQL的教程,之後還會更新一些大資料實時數倉的內容,和一些熱門的元件使用!希望小夥伴們能點個關注,第一時間關注技術乾貨!


二、FlinkSQL出現的背景

Flink SQL 是 Flink 實時計算為簡化計算模型,降低使用者使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。

自 2015 年開始,阿里巴巴開始調研開源流計算引擎,最終決定基於 Flink 打造新一代計算引擎,針對 Flink 存在的不足進行優化和改進,並且在 2019 年初將最終程式碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎上最顯著的一個貢獻就是 Flink SQL 的實現。

Flink SQL 是面向使用者的 API 層,在我們傳統的流式計算領域,比如Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,使用者通過 Java 或 Scala 寫業務邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調優較難,隨著版本的不斷更新,API 也出現了很多不相容的地方。


在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:

  • SQL 屬於設定式語言,使用者只要表達清楚需求即可,不需要了解具體做法;
  • SQL 可優化,內建多種查詢優化器,這些查詢優化器可為 SQL 翻譯出最優執行計劃;
  • SQL 易於理解,不同行業和領域的人都懂,學習成本較低;
  • SQL 非常穩定,在資料庫 30 多年的歷史中,SQL 本身變化較少;
  • 流與批的統一,Flink 底層 Runtime 本身就是一個流與批統一的引擎,而 SQL 可以做到 API 層的流與批統一。

三、整體介紹

3.1 什麼是 Table API 和 Flink SQL?

Flink本身是批流統一的處理框架,所以Table API和SQL,就是批流統一的上層處理API。目前功能尚未完善,處於活躍的開發階段。

Table API是一套內嵌在Java和Scala語言中的查詢API,它允許我們以非常直觀的方式,組合來自一些關係運算符的查詢(比如select、filter和join)。而對於Flink SQL,就是直接可以在程式碼中寫SQL,來實現一些查詢(Query)操作。Flink的SQL支援,基於實現了SQL標準的Apache Calcite(Apache開源SQL解析工具)。

無論輸入是批輸入還是流式輸入,在這兩套API中,指定的查詢都具有相同的語義,得到相同的結果。

3.2 需要引入的依賴

Table API 和 SQL 需要引入的依賴有兩個:plannerbridge

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

其中:

flink-table-planner:planner計劃器,是table API最主要的部分,提供了執行時環境和生成程式執行計劃的planner

flink-table-api-scala-bridge:bridge橋接器,主要負責table API和 DataStream/DataSet API的連線支援,按照語言分java和scala

這裡的兩個依賴,是IDE環境下執行需要新增的;如果是生產環境,lib目錄下預設已經有了planner,就只需要有bridge就可以了。

當然,如果想使用使用者自定義函式,或是跟 kafka 做連線,需要有一個SQL client,這個包含在flink-table-common裡。

3.3 兩種planner(old & blink)的區別

1、批流統一:Blink將批處理作業,視為流式處理的特殊情況。所以,blink不支援表和DataSet之間的轉換,批處理作業將不轉換為DataSet應用程式,而是跟流處理一樣,轉換為DataStream程式來處理。

2、因為批流統一,Blink planner也不支援BatchTableSource,而使用有界的StreamTableSource代替。

3、Blink planner只支援全新的目錄,不支援已棄用的ExternalCatalog。

4、舊 planner 和 Blink planner 的FilterableTableSource實現不相容。舊的planner會把PlannerExpressions下推到filterableTableSource中,而blink planner則會把Expressions下推。

5、基於字串的鍵值配置選項僅適用於Blink planner。

6、PlannerConfig在兩個planner中的實現不同。

7、Blink planner會將多個sink優化在一個DAG中(僅在TableEnvironment上受支援,而在StreamTableEnvironment上不受支援)。而舊 planner 的優化總是將每一個sink放在一個新的DAG中,其中所有DAG彼此獨立。

8、舊的planner不支援目錄統計,http://jintianxuesha.com/ 而Blink planner支援。

四、API 呼叫

4.1 基本程式結構

Table API 和 SQL 的程式結構,與流式處理的程式結構類似;也可以近似地認為有這麼幾步:首先建立執行環境,然後定義source、transform和sink。

具體操作流程如下:

val tableEnv = ...     // 建立表的執行環境

// 建立一張表,用於讀取資料
tableEnv.connect(...).createTemporaryTable("inputTable")
// 註冊一張表,用於把計算結果輸出
tableEnv.connect(...).createTemporaryTable("outputTable")

// 通過 Table API 查詢運算元,得到一張結果表
val result = tableEnv.from("inputTable").select(...)
// 通過 SQL查詢語句,得到一張結果表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")

// 將結果表寫入輸出表中
result.insertInto("outputTable")

4.2 建立表環境

建立表環境最簡單的方式,就是基於流處理執行環境,調create方法直接建立:

val tableEnv = StreamTableEnvironment.create(env)

表環境(TableEnvironment)是flink中整合 Table API & SQL 的核心概念。它負責:

  • 註冊catalog
  • 在內部 catalog 中登錄檔
  • 執行 SQL 查詢
  • 註冊使用者自定義函式
  • 將 DataStream 或 DataSet 轉換為表
  • 儲存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在建立TableEnv的時候,可以多傳入一個EnvironmentSettings 或者 TableConfig 引數,可以用來配置 TableEnvironment 的一些特性。

比如,配置老版本的流式查詢(Flink-Streaming-Query):

val settings = EnvironmentSettings.newInstance()
  .useOldPlanner()      // 使用老版本planner
  .inStreamingMode()    // 流處理模式
  .build()
val tableEnv = StreamTableEnvironment.create(env, settings)

基於老版本的批處理環境(Flink-Batch-Query):

val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)

基於 blink 版本的流處理環境(Blink-Streaming-Query):

val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

基於blink版本的批處理環境(Blink-Batch-Query):

val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

4.3 在Catalog中登錄檔

4.3.1 表(Table)的概念

TableEnvironment 可以註冊目錄 Catalog ,並可以基於Catalog登錄檔。它會維護一個 Catalog-Table 表之間的map。

表(Table)是由一個“識別符號”來指定的,由3部分組成:Catalog名、資料庫(database)名和物件名(表名)。如果沒有指定目錄或資料庫,就使用當前的預設值。

表可以是常規的(Table,表),或者虛擬的(View,檢視)。常規表(Table)一般可以用來描述外部資料,比如檔案、資料庫表或訊息佇列的資料,也可以直接從 DataStream轉換而來。檢視可以從現有的表中建立,通常是 table API 或者SQL查詢的一個結果。

4.3.2 連線到檔案系統(Csv格式)

連線外部系統在Catalog中登錄檔,直接呼叫tableEnv.connect()就可以,裡面引數要傳入一個 ConnectorDescriptor ,也就是connector描述器。對於檔案系統的 connector 而言,flink內部已經提供了,就叫做FileSystem()。

程式碼如下:

tableEnv
.connect( new FileSystem().path("sensor.txt"))  // 定義表資料來源,外部連線
  .withFormat(new OldCsv())    // 定義從外部系統讀取資料之後的格式化方法
  .withSchema( new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  )    // 定義表結構
  .createTemporaryTable("inputTable")    // 建立臨時表

這是舊版本的csv格式描述器。由於它是非標的,跟外部系統對接並不通用,所以將被棄用,以後會被一個符合RFC-4180標準的新format描述器取代。新的描述器就叫Csv(),但flink沒有直接提供,需要引入依賴flink-csv:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.0</version>
</dependency>

程式碼非常類似,只需要把 withFormat 裡的 OldCsv 改成Csv就可以了。

4.3.3 連線到Kafka

kafka的聯結器 flink-kafka-connector 中,1.10 版本的已經提供了 Table API 的支援。我們可以在 connect方法中直接傳入一個叫做Kafka的類,這就是kafka聯結器的描述器ConnectorDescriptor。

tableEnv.connect(
  new Kafka()
    .version("0.11") // 定義kafka的版本
    .topic("sensor") // 定義主題
    .property("zookeeper.connect", "localhost:2181") 
    .property("bootstrap.servers", "localhost:9092")
)
  .withFormat(new Csv())
  .withSchema(new Schema()
  .field("id", DataTypes.STRING())
  .field("timestamp", DataTypes.BIGINT())
  .field("temperature", DataTypes.DOUBLE())
)
  .createTemporaryTable("kafkaInputTable")

當然也可以連線到 ElasticSearch、MySql、HBase、Hive等外部系統,實現方式基本上是類似的。感興趣的 小夥伴可以自行去研究,這裡就不詳細贅述了。

4.4 表的查詢

通過上面的學習,我們已經利用外部系統的聯結器connector,我們可以讀寫資料,並在環境的Catalog中登錄檔。接下來就可以對錶做查詢轉換了。

Flink給我們提供了兩種查詢方式:Table API和 SQL。

4.4.1 Table API的呼叫

Table API是整合在Scala和Java語言內的查詢API。與SQL不同,Table API的查詢不會用字串表示,而是在宿主語言中一步一步呼叫完成的。

Table API基於代表一張“表”的Table類,並提供一整套操作處理的方法API。這些方法會返回一個新的Table物件,這個物件就表示對輸入表應用轉換操作的結果。有些關係型轉換操作,可以由多個方法呼叫組成,構成鏈式呼叫結構。例如table.select(…).filter(…),其中 select(…)表示選擇表中指定的欄位,filter(…)表示篩選條件。

程式碼中的實現如下:

val sensorTable: Table = tableEnv.from("inputTable")

val resultTable: Table = senorTable
.select("id, temperature")
.filter("id ='sensor_1'")

4.4.2 SQL查詢

Flink的SQL整合,基於的是ApacheCalcite,它實現了SQL標準。在Flink中,用常規字串來定義SQL查詢語句。SQL 查詢的結果,是一個新的 Table。

程式碼實現如下:

val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'")

或者:

val resultSqlTable: Table = tableEnv.sqlQuery(
  """
    |select id, temperature
    |from inputTable
    |where id = 'sensor_1'
  """.stripMargin)

當然,也可以加上聚合操作,比如我們統計每個sensor溫度資料出現的個數,做個count統計:

val aggResultTable = sensorTable
.groupBy('id)
.select('id, 'id.count as 'count)

SQL的實現:

val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id")

這裡Table API裡指定的欄位,前面加了一個單引號’,這是Table API中定義的Expression型別的寫法,可以很方便地表示一個表中的欄位。

欄位可以直接全部用雙引號引起來,也可以用半邊單引號+欄位名的方式。以後的程式碼中,一般都用後一種形式。

4.5 將DataStream 轉換成表

Flink允許我們把Table和DataStream做轉換:我們可以基於一個DataStream,先流式地讀取資料來源,然後map成樣例類,再把它轉成Table。Table的列欄位(column fields),就是樣例類裡的欄位,這樣就不用再麻煩地定義schema了。

4.5.1 程式碼表達

程式碼中實現非常簡單,直接用 tableEnv.fromDataStream() 就可以了。預設轉換後的 Table schema 和 DataStream 中的欄位定義一一對應,也可以單獨指定出來。

這就允許我們更換欄位的順序、重新命名,或者只選取某些欄位出來,相當於做了一次map操作(或者Table API的 select操作)。

程式碼具體如下:

val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
  .map(data => {
   
   
    val dataArray = data.split(",")
    SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
  })

val sensorTable: Table = tableEnv.fromDataStreama(datStream)

val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts)

4.5.2 資料型別與 Table schema的對應

在上節的例子中,DataStream 中的資料型別,與表的 Schema 之間的對應關係,是按照樣例類中的欄位名來對應的(name-based mapping),所以還可以用as做重新命名。

另外一種對應方式是,直接按照欄位的位置來對應(position-based mapping),對應的過程中,就可以直接指定新的欄位名了。

基於名稱的對應:

val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature)

基於位置的對應:

val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)

Flink的 DataStream 和 DataSet API 支援多種型別。

組合型別,比如元組(內建Scala和Java元組)、POJO、Scala case類和Flink的Row型別等,允許具有多個欄位的巢狀資料結構,這些欄位可以在Table的表示式中訪問。其他型別,則被視為原子型別。

元組型別和原子型別,一般用位置對應會好一些;如果非要用名稱對應,也是可以的:元組型別,預設的名稱是 “_1”, “_2”;而原子型別,預設名稱是 ”f0”。

4.6 建立臨時檢視(Temporary View)

建立臨時檢視的第一種方式,就是直接從DataStream轉換而來。同樣,可以直接對應欄位轉換;也可以在轉換的時候,指定相應的欄位。

程式碼如下:

tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 'timestamp as 'ts)

另外,當然還可以基於Table建立檢視:

tableEnv.createTemporaryView("sensorView", sensorTable)

View和Table的Schema完全相同。事實上,在Table API中,可以認為View 和 Table 是等價的。

4.7 輸出表

表的輸出,是通過將資料寫入 TableSink 來實現的。TableSink 是一個通用介面,可以支援不同的檔案格式、儲存資料庫和訊息佇列。

具體實現,輸出表最直接的方法,就是通過 Table.insertInto() 方法將一個 Table 寫入註冊過的 TableSink 中。

4.7.1 輸出到檔案

程式碼如下:

// 註冊輸出表
tableEnv.connect(
  new FileSystem().path("…\\resources\\out.txt")
) // 定義到檔案系統的連線
  .withFormat(new Csv()) // 定義格式化方法,Csv格式
  .withSchema(new Schema()
  .field("id", DataTypes.STRING())
  .field("temp", DataTypes.DOUBLE())
) // 定義表結構
  .createTemporaryTable("outputTable") // 建立臨時表

resultSqlTable.insertInto("outputTable")

4.7.2 更新模式(Update Mode)

在流處理過程中,表的處理並不像傳統定義的那樣簡單。

對於流式查詢(Streaming Queries),需要宣告如何在(動態)表和外部聯結器之間執行轉換。與外部系統交換的訊息型別,由更新模式(update mode)指定。

Flink Table API中的更新模式有以下三種:

  • 追加模式(Append Mode)

在追加模式下,表(動態表)和外部聯結器只交換插入(Insert)訊息。

  • 撤回模式(Retract Mode)

在撤回模式下,表和外部聯結器交換的是:新增(Add)和撤回(Retract)訊息。

其中:

  • 插入(Insert)會被編碼為新增訊息;
  • 刪除(Delete)則編碼為撤回訊息;
  • 更新(Update)則會編碼為,已更新行(上一行)的撤回訊息,和更新行(新行)的新增訊息。

在此模式下,不能定義key,這一點跟upsert模式完全不同。

  • Upsert(更新插入)模式

在Upsert模式下,動態表和外部聯結器交換Upsert和Delete訊息。

這個模式需要一個唯一的key,通過這個key可以傳遞更新訊息。為了正確應用訊息,外部聯結器需要知道這個唯一key的屬性。

  • 插入(Insert)和更新(Update)都被編碼為Upsert訊息;
  • 刪除(Delete)編碼為Delete資訊

這種模式和 Retract 模式的主要區別在於,Update操作是用單個訊息編碼的,所以效率會更高。

4.7.3 輸出到Kafka

除了輸出到檔案,也可以輸出到Kafka。我們可以結合前面Kafka作為輸入資料,構建資料管道,kafka進,kafka出。

程式碼如下:

// 輸出到 kafka
tableEnv.connect(
  new Kafka()
    .version("0.11")
    .topic("sinkTest")
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
)
  .withFormat( new Csv() )
  .withSchema( new Schema()
    .field("id", DataTypes.STRING())
    .field("temp", DataTypes.DOUBLE())
  )
  .createTemporaryTable("kafkaOutputTable")

resultTable.insertInto("kafkaOutputTable")

4.7.4 輸出到ElasticSearch

ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,這樣就可以使用Query定義的鍵(key)與外部系統交換UPSERT/DELETE訊息。

另外,對於“僅追加”(append-only)的查詢,connector還可以在 append 模式下操作,這樣就可以與外部系統只交換 insert 訊息。

es目前支援的資料格式,只有Json,而 flink 本身並沒有對應的支援,所以還需要引入依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.10.0</version>
</dependency>

程式碼實現如下:

// 輸出到es
tableEnv.connect(
  new Elasticsearch()
    .version("6")
    .host("localhost", 9200, "http")
    .index("sensor")
    .documentType("temp")
)
  .inUpsertMode()           // 指定是 Upsert 模式
  .withFormat(new Json())
  .withSchema( new Schema()
    .field("id", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())
  )
  .createTemporaryTable("esOutputTable")

aggResultTable.insertInto("esOutputTable")

4.7.5 輸出到MySql

Flink專門為Table API的jdbc連線提供了flink-jdbc聯結器,我們需要先引入依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

jdbc連線的程式碼實現比較特殊,因為沒有對應的java/scala類實現ConnectorDescriptor,所以不能直接tableEnv.connect()。不過Flink SQL留下了執行DDL的介面:tableEnv.sqlUpdate()

對於jdbc的建立表操作,天生就適合直接寫DDL來實現,所以我們的程式碼可以這樣寫:

// 輸出到 Mysql
val sinkDDL: String =
  """
    |create table jdbcOutputTable (
    |  id varchar(20) not null,
    |  cnt bigint not null
    |) with (
    |  'connector.type' = 'jdbc',
    |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
    |  'connector.table' = 'sensor_count',
    |  'connector.driver' = 'com.mysql.jdbc.Driver',
    |  'connector.username' = 'root',
    |  'connector.password' = '123456'
    |)
  """.stripMargin

tableEnv.sqlUpdate(sinkDDL)
aggResultSqlTable.insertInto("jdbcOutputTable")

4.7.6 將錶轉換成DataStream

表可以轉換為DataStream或DataSet。這樣,自定義流處理或批處理程式就可以繼續在 Table API或SQL查詢的結果上運行了。

將錶轉換為DataStream或DataSet時,需要指定生成的資料型別,即要將表的每一行轉換成的資料型別。通常,最方便的轉換型別就是Row。當然,因為結果的所有欄位型別都是明確的,我們也經常會用元組型別來表示。

表作為流式查詢的結果,是動態更新的。所以,將這種動態查詢轉換成的資料流,同樣需要對錶的更新操作進行編碼,進而有不同的轉換模式。

Table API 中表到 DataStream 有兩種模式:

  • 追加模式(Append Mode)

用於表只會被插入(Insert)操作更改的場景

  • 撤回模式(Retract Mode)

用於任何場景。有些類似於更新模式中Retract模式,它只有 Insert 和 Delete 兩類操作。

得到的資料會增加一個Boolean型別的標識位(返回的第一個欄位),用它來表示到底是新增的資料(Insert),還是被刪除的資料(老資料, Delete)

程式碼實現如下:

val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)

val aggResultStream: DataStream[(Boolean, (String, Long))] = 
tableEnv.toRetractStream[(String, Long)](aggResultTable)

resultStream.print("result")
aggResultStream.print("aggResult")

所以,沒有經過groupby之類聚合操作,可以直接用 toAppendStream 來轉換;而如果經過了聚合,有更新操作,一般就必須用 toRetractDstream。

4.7.7 Query的解釋和執行

Table API提供了一種機制來解釋(Explain)計算表的邏輯和優化查詢計劃。這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。

explain方法會返回一個字串,描述三個計劃:

  • 未優化的邏輯查詢計劃
  • 優化後的邏輯查詢計劃
  • 實際執行計劃

我們可以在程式碼中檢視執行計劃:

val explaination: String = tableEnv.explain(resultTable)
println(explaination)

Query的解釋和執行過程,老planner和 blink planner 大體是一致的,又有所不同。整體來講,Query都會表示成一個邏輯查詢計劃,然後分兩步解釋:

  1. 優化查詢計劃
  2. 解釋成 DataStream 或者 DataSet程式

而 Blink 版本是批流統一的,所以所有的Query,只會被解釋成DataStream程式;另外在批處理環境 TableEnvironment 下,Blink版本要到tableEnv.execute()執行呼叫才開始解釋。

巨人的肩膀

1、http://www.atguigu.com/
2、https://www.bilibili.com/video/BV12k4y1z7LM?from=search&seid=953051020130358915
3、https://blog.csdn.net/u013411339/article/details/93267838

小結

本篇文章主要用五千多字,為大家帶來迅速入門並掌握 FlinkSQL 的技巧,包含FlinkSQL出現的背景介紹以及與 Table API 的區別,API呼叫方式更是介紹的非常詳細全面,希望小夥伴們在看了之後能夠及時複習總結,尤其是初學者。好了,本篇文章 over,大家看了之後有任何的疑惑都可以私信作者,我看到都會一一解答。下一篇我會在本篇的基礎上為大家介紹一些流處理中的特殊概念,敬請期待|ू・ω・` ),你知道的越多,你不知道的也越多,我是Alice,我們下一期見!