spark之DataFrame分析日誌檔案
阿新 • • 發佈:2019-01-29
場景:我們利用DataFrame對日誌中出現的錯誤次數進行一個統計。
一,準備日誌檔案:
我這裡是使用的hadoop的日誌檔案,因為以前配置檔案沒有配好,所有每次啟動hadoop會有錯誤的資訊,記錄在日誌檔案。
二,書寫流程:
1,讀取日誌檔案,將檔案轉化成RDD。 2,將日誌檔案通過map函式將資料轉化行的格式返回。 3,建立元型別, 即建立schema,為RDD轉化為DataFrame提供格式。 4,根據元資料型別將JavaRDD<Row>轉化成DataFrame 5,使用過濾器篩選錯誤資訊。 6,輸出錯誤資訊統計次數。
三,程式碼展示:
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class CountErrors2 { public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext sc=new JavaSparkContext("local","CountErrors2",conf); System.out.println("建立連線成功:"+conf); //讀取日誌檔案 JavaRDD<String> textFile=sc.textFile("hdfs://192.168.61.128:9000/spark001/hadoop.log"); //將日誌檔案通過map函式將資料轉化行的格式返回 JavaRDD<Row> rowRDD=textFile.map(new Function<String,Row>(){ public Row call(String line) throws Exception { return RowFactory.create(line); }}); //StructField 結構化檔案格式 List<StructField> fileds=new ArrayList<StructField>(); //建立一個結構化檔案,三個引數 分別是 name 值的型別 是否設定為表 fileds.add(DataTypes.createStructField("line", DataTypes.StringType, true)); // 建立元型別, 即建立schema StructType schema=DataTypes.createStructType(fileds); SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc); //根據元資料型別將JavaRDD<Row>轉化成DataFrame DataFrame df=sqlContext.createDataFrame(rowRDD, schema); //過濾檔案 DataFrame errors=df.filter(df.col("line").like("%ERROR%")); //統計出現次數 long result=errors.count(); System.out.println("log檔案總共記錄有:"+result+":次出錯!"); } }