1. 程式人生 > 實用技巧 >Flink SQL 1.11 on Zeppelin 平臺化實踐

Flink SQL 1.11 on Zeppelin 平臺化實踐

簡介:鑑於有很多企業都無法配備專門的團隊來解決 Flink SQL 平臺化的問題,那麼到底有沒有一個開源的、開箱即用的、功能相對完善的元件呢?答案就是本文的主角——Apache Zeppelin。

作者:LittleMagic

大資料領域 SQL 化開發的風潮方興未艾(所謂"Everybody knows SQL"),Flink 自然也不能“免俗”。Flink SQL 是 Flink 系統內部最高級別的 API,也是流批一體思想的集大成者。使用者可以通過簡單明瞭的 SQL 語句像查表一樣執行流任務或批任務,遮蔽了底層 DataStream/DataSet API 的複雜細節,降低了使用門檻。

但是,Flink SQL 的預設開發方式是通過 Java/Scala API 編寫,與純 SQL 化、平臺化的目標相去甚遠。目前官方提供的 Flink SQL Client 僅能在配備 Flink 客戶端的本地使用,侷限性很大。而 Ververica 開源的 Flink SQL Gateway 元件是基於 REST API 的,仍然需要二次開發才能供給上層使用,並不是很方便。

鑑於有很多企業都無法配備專門的團隊來解決 Flink SQL 平臺化的問題,那麼到底有沒有一個開源的、開箱即用的、功能相對完善的元件呢?答案就是本文的主角——Apache Zeppelin。

Flink SQL on Zeppelin!

Zeppelin 是基於 Web 的互動式資料分析筆記本,支援 SQL、Scala、Python 等語言。Zeppelin 通過外掛化的 Interpreter(直譯器)來解析使用者提交的程式碼,並將其轉化到對應的後端(計算框架、資料庫等)執行,靈活性很高。其架構簡圖如下所示。

Flink Interpreter 就是 Zeppelin 原生支援的眾多 Interpreters 之一。只要配置好 Flink Interpreter 以及相關的執行環境,我們就可以將 Zeppelin 用作 Flink SQL 作業的開發平臺了(當然,Scala 和 Python 也是沒問題的)。接下來本文就逐步介紹 Flink on Zeppelin 的整合方法。

配置 Zeppelin

目前 Zeppelin 的最新版本是 0.9.0-preview2,可以在官網下載包含所有 Interpreters 的 zeppelin-0.9.0-preview2-bin-all.tgz,並解壓到伺服器的合適位置。

接下來進入 conf 目錄。將環境配置檔案 zeppelin-env.sh.template 更名為 zeppelin-env.sh,並修改:

# JDK目錄
export JAVA_HOME=/opt/jdk1.8.0_172
# 方便之後配置Interpreter on YARN模式。注意必須安裝Hadoop,且hadoop必須配置在系統環境變數PATH中
export USE_HADOOP=true
# Hadoop配置檔案目錄
export HADOOP_CONF_DIR=/etc/hadoop/hadoop-conf

將服務配置檔案 zeppelin-site.xml.template 更名為 zeppelin-site.xml,並修改:

<!-- 服務地址。預設為127.0.0.1,改為0.0.0.0使得可以在外部訪問 -->
<property>
  <name>zeppelin.server.addr</name>
  <value>0.0.0.0</value>
  <description>Server binding address</description>
</property>

<!-- 服務埠。預設為8080,如果已佔用,可以修改之 -->
<property>
  <name>zeppelin.server.port</name>
  <value>18080</value>
  <description>Server port.</description>
</property>

最基礎的配置就完成了。執行 bin/zeppelin-daemon.sh start 命令,返回 Zeppelin start [ OK ]的提示之後,訪問<伺服器地址>:18080,出現下面的頁面,就表示 Zeppelin 服務啟動成功。

當然,為了一步到位適應生產環境,也可以適當修改 zeppelin-site.xml 中的以下引數:

<!-- 將Notebook repo更改為HDFS儲存 -->
<property>
  <name>zeppelin.notebook.storage</name>
  <value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value>
  <description>Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.</description>
</property>

<!-- Notebook在HDFS上的儲存路徑 -->
<property>
  <name>zeppelin.notebook.dir</name>
  <value>/zeppelin/notebook</value>
  <description>path or URI for notebook persist</description>
</property>

<!-- 啟用Zeppelin的恢復功能。當Zeppelin服務掛掉並重啟之後,能連線到原來執行的Interpreter -->
<property>
  <name>zeppelin.recovery.storage.class</name>
  <value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
  <description>ReoveryStorage implementation based on hadoop FileSystem</description>
</property>

<!-- Zeppelin恢復元資料在HDFS上的儲存路徑 -->
<property>
  <name>zeppelin.recovery.dir</name>
  <value>/zeppelin/recovery</value>
  <description>Location where recovery metadata is stored</description>
</property>

<!-- 禁止使用匿名使用者 -->
<property>
  <name>zeppelin.anonymous.allowed</name>
  <value>true</value>
  <description>Anonymous user allowed by default</description>
</property>

Zeppelin 集成了 Shiro 實現許可權管理。禁止使用匿名使用者之後,可以在 conf 目錄下的 shiro.ini 中配置使用者名稱、密碼、角色等,不再贅述。注意每次修改配置都需要執行 bin/zeppelin-daemon.sh restart 重啟 Zeppelin 服務。

配置 Flink Interpreter on YARN

在使用 Flink Interpreter 之前,我們有必要對它進行配置,使 Flink 作業和 Interpreter 本身在 YARN 環境中執行。

點選首頁使用者名稱區域選單中的 Interpreter 項(上一節圖中已經示出),搜尋 Flink,就可以看到引數列表。

Interpreter Binding

首先,將 Interpreter Binding 模式修改為 Isolated per Note,如下圖所示。

在這種模式下,每個 Note 在執行時會分別啟動 Interpreter 程序,類似於 Flink on YARN 的 Per-job 模式,最符合生產環境的需要。

Flink on YARN 引數

以下是需要修改的部分基礎引數。注意這些引數也可以在 Note 中指定,每個作業自己的配置會覆蓋掉這裡的預設配置。

  • FLINK_HOME:Flink 1.11所在的目錄;
  • HADOOP_CONF_DIR:Hadoop 配置檔案所在的目錄;
  • flink.execution.mode:Flink 作業的執行模式,指定為 YARN 以啟用 Flink on YARN;
  • flink.jm.memory:JobManager 的記憶體量(MB);
  • flink.tm.memory:TaskManager 的記憶體量(MB);
  • flink.tm.slot:TaskManager 的 Slot 數;
  • flink.yarn.appName:YARN Application 的預設名稱;
  • flink.yarn.queue:提交作業的預設 YARN 佇列。

Hive Integration 引數

如果我們想訪問 Hive 資料,以及用 HiveCatalog 管理 Flink SQL 的元資料,還需要配置與 Hive 的整合。

  • HIVE_CONF_DIR:Hive 配置檔案(hive-site.xml)所在的目錄;
  • zeppelin.flink.enableHive:設為 true 以啟用 Hive Integration;
  • zeppelin.flink.hive.version:Hive 版本號。
  • 複製與 Hive Integration 相關的依賴到 $FLINK_HOME/lib 目錄下,包括:
  • flink-connector-hive_2.11-1.11.0.jar
  • flink-hadoop-compatibility_2.11-1.11.0.jar
  • hive-exec-..jar
  • 如果 Hive 版本是1.x,還需要額外加入 hive-metastore-1.*.jar、libfb303-0.9.2.jar 和 libthrift-0.9.2.jar
  • 保證 Hive 元資料服務(Metastore)啟動。注意不能是 Embedded 模式,即必須以外部資料庫(MySQL、Postgres等)作為元資料儲存。

Interpreter on YARN 引數

在預設情況下,Interpreter 程序是在部署 Zeppelin 服務的節點上啟動的。隨著提交的任務越來越多,就會出現單點問題。因此我們需要讓 Interpreter 也在 YARN 上執行,如下圖所示。

  • zeppelin.interpreter.yarn.resource.cores:Interpreter Container 佔用的vCore 數量;
  • zeppelin.interpreter.yarn.resource.memory:Interpreter Container 佔用的記憶體量(MB);
  • zeppelin.interpreter.yarn.queue:Interpreter 所處的 YARN 佇列名稱。

配置完成之後,Flink on Zeppelin 整合完畢,可以測試一下了。

測試 Flink SQL on Zeppelin

建立一個 Note,Interpreter 指定為 Flink。然後寫入第一個 Paragraph:

以 %flink.conf 標記的 Paragraph 用於指定這個 Note 中的作業配置,支援 Flink 的所有配置引數(參見 Flink 官網)。另外,flink.execution.packages 引數支援以 Maven GAV 座標的方式引入外部依賴項。

接下來建立第二個 Paragraph,建立 Kafka 流表:

%flink.ssql 表示利用 StreamTableEnvironment 執行流處理 SQL,相對地,%flink.bsql 表示利用 BatchTableEnvironment 執行批處理 SQL。注意表引數中的 properties.bootstrap.servers 利用了 Zeppelin Credentials 來填寫,方便不同作業之間複用。

執行上述 SQL 之後會輸出資訊:

同時在 Hive 中可以看到該表的元資料。

最後寫第三個 Paragraph,從流表中查詢,並實時展現出來:

點選右上角的 FLINK JOB 標記,可以開啟作業的 Web UI。上述作業的 JobGraph 如下。

除 SELECT 查詢外,通過 Zeppelin 也可以執行 INSERT 查詢,實現更加豐富的功能。關於 Flink SQL on Zeppelin 的更多應用,筆者在今後的文章中會繼續講解。

原文連結
本文為阿里雲原創內容,未經允許不得轉載。