1. 程式人生 > >Spark Stream 實時讀kafka寫redis,rdd轉換其他型別

Spark Stream 實時讀kafka寫redis,rdd轉換其他型別

做一個實時系統,用到了kafka,redis,sparkStream,很經典的一個架構。

kafka的生產者就不寫了,這邊只涉及sparksteam寫消費者程式碼,存到redis。

KafkaToRedis kafkaToRedis=new KafkaToRedis();     SparkConf conf = new SparkConf().setAppName("kafka_to_redis");     JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(10));         Map<String, String> kafkaParams = new HashMap<>();         kafkaParams.put("bootstrap.servers","wxincentos1:9092,wxincentos3:9092");         kafkaParams.put("metadata.broker.list", "wxincentos1:9092,wxincentos3:9092");         kafkaParams.put("group.id", "1");Set<String> topics = new HashSet<String>();         topics.add("test3");         JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(                 jssc,                  String.class, // key型別                 String.class, // value型別                 StringDecoder.class, // 解碼器                 StringDecoder.class,                 kafkaParams,                  topics);

到此就看後面的需求了,網上太多的wordcount的列子,看的都快吐了,想找個能把資料內容取出來的都那麼困難,大多還是停留在rdd上,沒辦法只好自己想辦法,看了好多列子,發現其實就是個rdd轉其他型別的問題,就好辦多了。

把rdd collect一下就能得到一個數組,操作也簡單,說幹就幹。

        JavaDStream<String> valueDStream = lines.map(new Function<Tuple2<String, String>, String>() {             public String call(Tuple2<String, String> v1) throws Exception {                 return v1._2();             }         });

List a=rdd.collect();             for(int i = 0; i<a.size();i++){

a.get(i);

}

資料就輕輕鬆鬆地取出來啦

寫redis的話就是例項化個redis客戶端,操作操作即可。就不贅述啦,後面講講redis叢集怎麼用的。