1. 程式人生 > >Spark Java API 計算 Levenshtein 距離

Spark Java API 計算 Levenshtein 距離

clust 數據保存 repr apache pan 哈哈 tell 實現 header

Spark Java API 計算 Levenshtein 距離

在上一篇文章中,完成了Spark開發環境的搭建,最終的目標是對用戶昵稱信息做聚類分析,找出違規的昵稱。聚類分析需要一個距離,用來衡量兩個昵稱之間的相似度。這裏采用levenshtein距離。現在就來開始第一個小目標,用Spark JAVA API 計算字符串之間的Levenshtein距離。

1. 數據準備

樣本數據如下:

{"name":"Michael", "nick":"Mich","age":50}
{"name":"Andy", "nick":"Anc","age":30}
{"name":"Anch", "nick":"MmAc","age":19}

把數據保存成文件並上傳到hdfs上:./bin/hdfs dfs -put levestein.json /user/panda

2. 代碼實現

定義一個類表示樣本數據:

    public static class User{
        private String name;
        private String nick;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getNick() {
            return nick;
        }

        public void setNick(String nick) {
            this.nick = nick;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

創建SparkSession

SparkSession sparkSession = SparkSession.builder()
                .appName("levenshtein example")
                .master("spark://172.25.129.170:7077")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

在Spark命令行./bin/pyspark啟動Spark時,會默認創建一個名稱為 spark 的SparkSession。而這裏是寫代碼,也需要創建SparkSession對象。

The SparkSession instance is the way Spark executes user-defined
manipulations across the cluster. There is a one-to-one correspondence between a SparkSession and
a Spark Application.

定義數據類型

Encoder<User> userEncoder = Encoders.bean(User.class);

JAVA裏面定義了一套數據類型,比如java.util.String是字符串類型;類似地,Spark也有自己的數據類型,因此Encoder就定義了如何將Java對象映射成Spark裏面的對象。

Used to convert a JVM object of type T to and from the internal Spark SQL representation.

To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark‘s internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.

構建Dataset:

Dataset<User> userDataset = sparkSession.read().json(path).as(userEncoder);

說明一下Dataset與DataFrame區別,Dataset是針對Scala和JAVA特有的。Dataset是有類型的,Dataset的每一行是某種類型的數據,比如上面的User類型。

A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

而DataFrame的每一行的類型是Row(看官方文檔,我就這樣理解了,哈哈。。)

DataFrame is represented by a Dataset of Row。While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

技術分享圖片

這個圖就很好地解釋了DataFrame和Dataset的區別。

計算levenshtein距離,將之 transform 成一個新DataFrame中:

Column lev_res = functions.levenshtein(userDataset.col("name"), userDataset.col("nick"));
Dataset<Row> leveDataFrame = userDataset.withColumn("distance", lev_res);

完整代碼

import org.apache.spark.sql.*;

public class LevenstenDistance {
    public static class User{
        private String name;
        private String nick;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getNick() {
            return nick;
        }

        public void setNick(String nick) {
            this.nick = nick;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("levenshtein example")
                .master("spark://172.25.129.170:7077")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        String path = "hdfs://172.25.129.170:9000/user/panda/levestein.json";
        Encoder<User> userEncoder = Encoders.bean(User.class);
        Dataset<User> userDataset = sparkSession.read().json(path).as(userEncoder);
        userDataset.show();

        Column lev_res = functions.levenshtein(userDataset.col("name"), userDataset.col("nick"));
        Dataset<Row> leveDataFrame = userDataset.withColumn("distance", lev_res);
//        userDataset.show();
        leveDataFrame.show();
        System.out.println(lev_res.toString());
    }
}

原來的Dataset:

技術分享圖片

計算Levenshtein距離後的得到的DataFrame:

技術分享圖片

根據上面的示例,下面來演示一下一個更實際點的例子:計算昵稱和簽名之間的levenshtein距離,若levenshtein距離相同,就代表該用戶的 昵稱 和 簽名 是相同的:

數據格式如下:

{"nick":"賴求","uid":123456}
{"details":"時尚是一種態度,時尚第一品牌。看我的。","nick":"冰冷世家@蹦蹦","signature":"輕裝時代看我的。艾萊依時尚羽絨服。。","uid":123456}
{"nick":"[潗團軍-6]明 明『招 募』","signature":"我是來擂人的,擂死人不償命!","uid":123456}

  1. 加載數據

            Dataset<Row> dataset = spark.read().format("json")
                    .option("header", "false")
                    .load("hdfs://172.25.129.170:9000/user/panda/profile_noempty.json");

    ?

  2. 取出昵稱和簽名

            //空字符串 與 null 是不同的
            Dataset<Row> nickSign = dataset.filter(col("nick").isNotNull())
                    .filter(col("signature").isNotNull())
                    .select(col("nick"), col("signature"), col("uid"));

    ?

  3. 計算昵稱和簽名的Levenshtein距離

    Column lev_distance = functions.levenshtein(nickSign.col("nick"), nickSign.col("signature"));
            Dataset<Row> nickSignDistance = nickSign.withColumn("distance", lev_distance);

    ?

  4. 按距離進行過濾

    Dataset<Row> sameNickSign = nickSignDistance.filter("distance = 0");

    ?

這樣就能找出昵稱和簽名完全一樣的用戶了。

原文:https://www.cnblogs.com/hapjin/p/9954191.html

Spark Java API 計算 Levenshtein 距離