java,spark實現黑名單過濾
阿新 • • 發佈:2018-11-30
/** * java,spark實現黑名單過濾 */ public class BlackListFilter { public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> javaRDD = sc.textFile("F:\\text\\url.txt"); //黑名單 List<Tuple2<String, Boolean>> blackList = Arrays.asList(new Tuple2<String, Boolean>("a", true), new Tuple2<String, Boolean>("b", true), new Tuple2<String, Boolean>("c", true)); //list => JavaPairRDDJavaPairRDD<String, Boolean> blackListRDD = sc.parallelizePairs(blackList); //lines => words => (word, 1) JavaPairRDD<String, Integer> wordsAndCount = javaRDD .flatMap(new FlatMapFunction<String, String>() { @Overridepublic Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split("\\s+")).iterator(); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //(word, 1) leftOutJoin (word, true) => (word, (1, Option)) JavaPairRDD<String, Tuple2<Integer, Optional<Boolean>>> leftOuterJoin = wordsAndCount.leftOuterJoin(blackListRDD); //(word, (1, Option)) => filter => (word, (1, option = false)) => map => word JavaRDD<String> whiteList = leftOuterJoin.filter(new Function<Tuple2<String, Tuple2<Integer, Optional<Boolean>>>, Boolean>() { @Override public Boolean call(Tuple2<String, Tuple2<Integer, Optional<Boolean>>> t) throws Exception { return t._2._2.orElse(false) ? false : true; } }).map(new Function<Tuple2<String, Tuple2<Integer, Optional<Boolean>>>, String>() { @Override public String call(Tuple2<String, Tuple2<Integer, Optional<Boolean>>> t) throws Exception { return t._1; } }); System.out.println(whiteList.collect()); } }