flink中文文件-快速開始 安裝部署 v1.4
阿新 • • 發佈:2019-01-06
Flink入門及實戰-上:
Flink入門及實戰-下:
- 下載啟動flink
- 檢視程式碼
- 執行例子
- 下一步
下載啟動flink
flink可以在Linux, Mac OS X, 和Windows平臺上執行。為了執行flink,只需要安裝JAVA7.x(或者更高版本)。windows使用者,請點選此連結檢視相關文件。
你可以使用下面命令檢查安裝的java版本
java -version
如果你已經安裝了java8,你將會看到下面的資料。
java version "1.8.0_111" Java(TM) SE Runtime Environment (build 1.8.0_111-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
下面以在linux上安裝為例(mac上安裝也可以參考這個):
- 點此連結下載flink安裝包。你可以選擇任何hadoop/scala的組合。如果你計劃使用本地檔案系統(安裝本地叢集),那麼你選擇任何hadoop版本對應的flink都可以。如果是生產環境,那麼建議根據你叢集上的hadoop版本選擇對應的flink版本。
- 進入檔案的下載目錄
- 解壓檔案
$ cd ~/Downloads # 進入檔案的下載目錄
$ tar xzf flink-*.tgz # 解壓下載的壓縮包
$ cd flink-1.4.1
安裝本地flink叢集
./bin/start-local.sh # 啟動 Flink 叢集
你也可以在log日誌目錄中檢查系統執行情況
$ tail log/flink-*-jobmanager-*.log
INFO ... - Starting JobManager
INFO ... - Starting JobManager web frontend
INFO ... - Web frontend listening at 127.0.0.1:8081
INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)
檢視程式碼
你可以在github上發現SocketWindowWordCount
scala程式碼
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// port 表示需要連線的埠
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// 獲取執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 連線此socket獲取輸入資料
val text = env.socketTextStream("localhost", port, '\n')
// 解析資料, 分組, 視窗化, 並且聚合求SUM
import org.apache.flink.api.scala._ //需要加上這一行隱式轉換 否則在呼叫flatmap方法的時候會報錯
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// 使用一個單執行緒列印結果
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// 定義一個數據型別儲存單詞出現的次數
case class WordWithCount(word: String, count: Long)
}
java程式碼
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// port 表示需要連線的埠
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// 獲取執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 連線此socket獲取輸入資料
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// 解析資料, 分組, 視窗化, 並且聚合求SUM
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// 使用一個單執行緒列印結果
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// 定義一個數據型別儲存單詞出現的次數
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
執行這個例子
現在,我們將要執行這個flink例子。它將會從socket獲取資料,並且每隔5秒列印一次計算的單詞出現的次數。
- 首先,我們使用netcat啟動一個本地socket
$ nc -l 9000
- 提交flink程式
這個程式連線到socket,然後等待資料。你可以通過webui介面檢視job的執行情況$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://[email protected]:6123/user/jobmanager#297388688] 11/04/2016 14:04:50 Job execution switched to status RUNNING. 11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED 11/04/2016 14:04:50 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING 11/04/2016 14:04:50 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED 11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING 11/04/2016 14:04:51 Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING 11/04/2016 14:04:51 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
- 每5秒計算一次單詞,並且列印到控制檯。監控taskmanager的日誌檔案輸出,並且在nc控制檯輸入一些內容,每一行輸入完成以後需要輸入回車。
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
這個.out檔案將會打印出來在指定時間內單詞出現的次數
$ tail -f log/flink-*-taskmanager-*.out
lorem : 1
bye : 1
ipsum : 4
實驗結束,停止flink。
$ ./bin/stop-local.sh
下一步
檢視更多例子來熟悉flink程式的api。當你已經做完這些的時候,繼續讀下面的流處理指南
獲取更多大資料資料,視訊以及技術交流請加群:
QQ群號1:295505811(已滿)
QQ群號2:54902210
QQ群號3:555684318