用spark實現hive中的collect_set函式的功能
阿新 • • 發佈:2019-01-04
import org.apache.spark.SparkContext._ import org.apache.spark._ /** * Created by xiaojun on 2015/3/9. */ object SparkDemo2 { def main(args: Array[String]) { case class User(id: String, name: String, vtm: String, url: String) //val rowkey = (new RowKey).evaluate(_) val HADOOP_USER = "hdfs" // 設定訪問spark使用的使用者名稱 System.setProperty("user.name", HADOOP_USER); // 設定訪問hadoop使用的使用者名稱 System.setProperty("HADOOP_USER_NAME", HADOOP_USER); val conf = new SparkConf().setAppName("wordcount").setMaster("local").setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER) val sc = new SparkContext(conf) val data = sc.textFile("test.txt") val rdd1 = data.map(line => { val r = line.split(",") User(r(0), r(1), r(2), r(3)) }) val rdd2 = rdd1.map(r => ((r.id, r.name), r)) val seqOp = (a: List[String], b: User) => (b.vtm :: a).distinct val combOp = (a: List[String], b: List[String]) => { (a ::: b).distinct } println("-----------------------------------------") val rdd3 = rdd2.aggregateByKey(List[String]())(seqOp, combOp).mapValues(l => l.mkString(",")) rdd3.collect.foreach(r => println(r._2)) println("-----------------------------------------") sc.stop() } }
test.txt檔案內容(隨便造的,沒具體含義)如下:
id1,user1,2,http://www.hupu.com id1,user1,2,http://www.hupu.com id1,user1,3,http://www.hupu.com id1,user1,100,http://www.hupu.com id2,user2,2,http://www.hupu.com id2,user2,1,http://www.hupu.com id2,user2,50,http://www.hupu.com id2,user2,2,http://touzhu.hupu.com最終得出的結果為:
100,3,2
2,50,1
和collect_set效果一致.