Spark連線MySQL,Hive,Hbase
阿新 • • 發佈:2021-01-09
Spark連線MySQL
object ConnectMysql { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate() //設定要訪問的mysql的url,表名 val url = "jdbc:mysql://singer:3306/kb10" val tableName ="hive_shop" val props=new Properties() //設定要訪問的mysql的使用者名稱,密碼,Drive props.setProperty("user","root") props.setProperty("password","kb10") props.setProperty("driver","com.mysql.jdbc.Driver") //通過spark. read.jdbc方法讀取mysql中資料 val df: DataFrame = spark.read.jdbc(url,tableName,props) df.show() //將DataFrame資料寫入到MySQL中,追加方式 // df.write.mode("append").jdbc(url,tableName,props)
spark和MySQL中執行結果一致:
Spark連線Hive
object ConnectHive { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[2]") .enableHiveSupport() .config("hive.metastore.uris", "thrift://192.168.181.129:9083") .appName(this.getClass.getName).getOrCreate() val df: DataFrame = spark.sql("show databases") df.show() } }
spark和Hive的執行結構截圖一致:
Spark連線Hbase
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SparkSession object ConnectHbase { def main(args: Array[String]): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","192.168.181.129") conf.set("hbase.zookeeper.property.clientPort","2181") conf.set(TableInputFormat.INPUT_TABLE,"kb10:customer") val spark = SparkSession.builder().appName("HBaseTest") .master("local[2]") .getOrCreate() val sc= spark.sparkContext val rdd1= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ).cache() println("count="+rdd1.count()) import spark.implicits._ //遍歷輸出 rdd1.foreach({case (_,result) => //通過result.getRow來獲取行鍵 val key = Bytes.toString(result.getRow) //通過result.getValue("列簇","列名")來獲取值 //需要使用getBytes將字元流轉化為位元組流 val city = Bytes.toString(result.getValue("addr".getBytes,"city".getBytes)) val country = Bytes.toString(result.getValue("addr".getBytes,"country".getBytes)) val age = Bytes.toString(result.getValue("order".getBytes,"age".getBytes)) println("Row key:"+key+" city:"+city+" country:"+country+" age:"+age) }) } }