Apache Flink 零基礎入門(十一)Flink transformation
前面講了常用的DataSource的用法,DataSource其實是把資料載入進來,載入進來之後就需要做Transformation操作了。
Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.
資料轉化可以將一個或多個DataSets轉化到一個新的DataSet。就是一個演算法的綜合使用。
Map Function
Scala
新建一個Object
object DataSetTransformationApp {
def main(args: Array[String]): Unit = {
val environment = ExecutionEnvironment.getExecutionEnvironment
}
def mapFunction(env: ExecutionEnvironment): Unit = {
val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))
}
}
這裡的資料來源是一個1到10的list集合。Map的原理是:假設data資料集中有N個元素,將每一個元素進行轉化:
data.map { x => x.toInt }
好比:y=f(x)
// 對data中的每一個元素都去做一個+1操作
data.map((x:Int) => x + 1 ).print()
然後對每一個元素都做一個+1操作。
簡單寫法:
如果這個裡面只有一個元素,就可以直接寫成下面形式:
data.map((x) => x + 1).print()
更簡潔的寫法:
data.map(x => x + 1).print()
更簡潔的方法:
data.map(_ + 1).print()
Java
public static void main(String[] args) throws Exception { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); mapFunction(executionEnvironment); } public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception { List<String> list = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i + ""); } DataSource<String> data = executionEnvironment.fromCollection(list); data.map(new MapFunction<String, Integer>() { public Integer map(String input) { return Integer.parseInt(input) + 1; } }).print(); }
因為我們定義的List是一個String的泛型,因此MapFunction的泛型是<String, Integer>,第一個引數表示輸入的型別,第二個引數表示輸出是一個Integer型別。
Filter Function
將每個元素執行+1操作,並取出大於5的元素。
Scala
def filterFunction(env: ExecutionEnvironment): Unit = {
val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
data.map(_ + 1).filter(_ > 5).print()
}
filter只會返回滿足條件的記錄。
Java
public static void filterFunction(ExecutionEnvironment env) throws Exception {
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
list.add(i);
}
DataSource<Integer> data = env.fromCollection(list);
data.map(new MapFunction<Integer, Integer>() {
public Integer map(Integer input) {
return input + 1;
}
}).filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer input) throws Exception {
return input > 5;
}
}).print();
}
MapPartition Function
map function 與 MapPartition function有什麼區別?
需求:DataSource 中有100個元素,把結果儲存在資料庫中
如果使用map function ,那麼實現方法如下:
// DataSource 中有100個元素,把結果儲存在資料庫中
def mapPartitionFunction(env: ExecutionEnvironment): Unit = {
val students = new ListBuffer[String]
for (i <- 1 to 100) {
students.append("Student" + i)
}
val data = env.fromCollection(students)
data.map(x=>{
// 每一個元素要儲存到資料庫中去,肯定需要先獲取到connection
val connection = DBUtils.getConnection()
println(connection + " ... ")
// TODO .... 儲存資料到DB
DBUtils.returnConnection(connection)
}).print()
}
列印結果,將會列印100個獲取DBUtils.getConnection()的請求。如果資料量增多,顯然不停的獲取連線是不現實的。
因此MapPartition就應運而生了,轉換一個分割槽裡面的資料,也就是說一個分割槽中的資料呼叫一次。
因此要首先設定分割槽:
val data = env.fromCollection(students).setParallelism(4)
設定4個分割槽,也就是並行度,然後使用mapPartition來處理:
data.mapPartition(x => {
val connection = DBUtils.getConnection()
println(connection + " ... ")
// TODO .... 儲存資料到DB
DBUtils.returnConnection(connection)
x
}).print()
那麼就會的到4次連線請求,每一個分割槽獲取一個connection。
Java
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
list.add("student:" + i);
}
DataSource<String> data = env.fromCollection(list);
/*data.map(new MapFunction<String, String>() {
@Override
public String map(String input) throws Exception {
String connection = DBUtils.getConnection();
System.out.println("connection = [" + connection + "]");
DBUtils.returnConnection(connection);
return input;
}
}).print();*/
data.mapPartition(new MapPartitionFunction<String, Object>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {
String connection = DBUtils.getConnection();
System.out.println("connection = [" + connection + "]");
DBUtils.returnConnection(connection);
}
}).print();
}
first groupBy sortGroup
first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內排序
def firstFunction(env:ExecutionEnvironment): Unit = {
val info = ListBuffer[(Int, String)]()
info.append((1, "hadoop"))
info.append((1, "spark"))
info.append((1, "flink"))
info.append((2, "java"))
info.append((2, "springboot"))
info.append((3, "linux"))
info.append((4, "vue"))
val data = env.fromCollection(info)
data.first(3).print()
data.groupBy(0).first(2).print()
data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print()
}