spark讀取redis資料(互動式,scala單機版,java單機版)
阿新 • • 發佈:2019-01-23
- 互動式
第一步:向redis中新增資料
第二步:將jedis jar包放入~/lib目錄下,開啟spark服務
第三步:通過spark-shell讀取redis資料,並做相應處理
- scala單機版
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import redis.clients.jedis.Jedis
object RedisClient {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName ("wow,my first spark app")
conf.setMaster("local")
val sc = new SparkContext(conf)
var jd = new Jedis("172.171.51.154", 6379)
var str = jd.get("chengshi")
var strList = str.split(",")
val a = sc.parallelize(strList, 3)
val b = a.keyBy(_.length)
b.collect().foreach(s => println(s._1 + ":" + s._2))
sc.stop()
}
}
輸出結果
- java單機版
package com.dt.spark.SparkApps.cores;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java .JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import redis.clients.jedis.Jedis;
import scala.Tuple2;
public class Redis {
public static void main(String[] args) throws FileNotFoundException, IOException {
SparkConf conf = new SparkConf().setAppName("Spark Read From Redis written by Java").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Jedis jedis = new Jedis("172.171.51.154", 6379);
String s = jedis.get("chengshi");
String[] strList = s.split(",");
JavaRDD<String> ch = sc.parallelize(Arrays.asList(strList), 4);
JavaPairRDD<String, Integer> jp = ch.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, word.length());
}
});
jp.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> pairs) throws Exception {
System.out.println(pairs._1() + ":" + pairs._2());
}
});
sc.close();
}
}
輸出結果