1. 程式人生 > >java,spark實現黑名單過濾

java,spark實現黑名單過濾

/**
 * java,spark實現黑名單過濾
 */
public class BlackListFilter {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> javaRDD = sc.textFile("F:
\\text\\url.txt"); //黑名單 List<Tuple2<String, Boolean>> blackList = Arrays.asList(new Tuple2<String, Boolean>("a", true), new Tuple2<String, Boolean>("b", true), new Tuple2<String, Boolean>("c", true)); //list => JavaPairRDD
JavaPairRDD<String, Boolean> blackListRDD = sc.parallelizePairs(blackList); //lines => words => (word, 1) JavaPairRDD<String, Integer> wordsAndCount = javaRDD .flatMap(new FlatMapFunction<String, String>() { @Override
public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split("\\s+")).iterator(); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //(word, 1) leftOutJoin (word, true) => (word, (1, Option)) JavaPairRDD<String, Tuple2<Integer, Optional<Boolean>>> leftOuterJoin = wordsAndCount.leftOuterJoin(blackListRDD); //(word, (1, Option)) => filter => (word, (1, option = false)) => map => word JavaRDD<String> whiteList = leftOuterJoin.filter(new Function<Tuple2<String, Tuple2<Integer, Optional<Boolean>>>, Boolean>() { @Override public Boolean call(Tuple2<String, Tuple2<Integer, Optional<Boolean>>> t) throws Exception { return t._2._2.orElse(false) ? false : true; } }).map(new Function<Tuple2<String, Tuple2<Integer, Optional<Boolean>>>, String>() { @Override public String call(Tuple2<String, Tuple2<Integer, Optional<Boolean>>> t) throws Exception { return t._1; } }); System.out.println(whiteList.collect()); } }