1. 程式人生 > >spark 2.x 外部資料來源綜合案例

spark 2.x 外部資料來源綜合案例

先在 MySQL中建立 一個數據庫 

create database spark;

use spark;

CREATE TABLE DEPT(

DEPTNO int(2) PRIMARY KEY,

DNAME VARCHAR(14) ,

LOC VARCHAR(13) ) ;

INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');

INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');

INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');

INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');

scala> val hiveDF = spark.table("emp")

hiveDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala>    hiveDF.show()

+-----+------+---------+----+----------+-------+------+------+

|empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|

+-----+------+---------+----+----------+-------+------+------+

| 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|  null|    20|

| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|

| 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|

| 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|  null|    20|

| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|

| 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|  null|    30|

| 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|  null|    10|

| 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|  null|    20|

| 7839|  KING|PRESIDENT|null|1981-11-17| 5000.0|  null|    10|

| 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|  0.0|    30|

| 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|  null|    20|

| 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|  null|    30|

| 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|  null|    20|

| 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|  null|    10|

| 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|  null|  null|

+-----+------+---------+----+----------+-------+------+------+

scala>  val mysqlDF = spark.read.format("jdbc").

    |      option("url", "jdbc:mysql://localhost:3306").

    |      option("dbtable", "spark.DEPT").

    |      option("user", "root").

    |      option("password", "oracle").

    |      option("driver", "com.mysql.jdbc.Driver").

    |      load()

mysqlDF: org.apache.spark.sql.DataFrame = [DEPTNO: int, DNAME: string ... 1 more field]

scala>    mysqlDF.show()

+------+----------+--------+

|DEPTNO|    DNAME|    LOC|

+------+----------+--------+

|    10|ACCOUNTING|NEW YORK|

|    20|  RESEARCH|  DALLAS|

|    30|    SALES| CHICAGO|

|    40|OPERATIONS|  BOSTON|

+------+----------+--------+

scala> val resultDF= hiveDF.join(mysqlDF,hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))

resultDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 9 more fields]

scala>  resultDF.show()

+-----+------+---------+----+----------+------+------+------+------+----------+--------+

|empno| ename|      job| mgr|  hiredate|  sal|  comm|deptno|DEPTNO|    DNAME|    LOC|

+-----+------+---------+----+----------+------+------+------+------+----------+--------+

| 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|  null|    10|    10|ACCOUNTING|NEW YORK|

| 7839|  KING|PRESIDENT|null|1981-11-17|5000.0|  null|    10|    10|ACCOUNTING|NEW YORK|

| 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|  null|    10|    10|ACCOUNTING|NEW YORK|

| 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|  null|    20|    20|  RESEARCH|  DALLAS|

| 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|  null|    20|    20|  RESEARCH|  DALLAS|

| 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|  null|    20|    20|  RESEARCH|  DALLAS|

| 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|  null|    20|    20|  RESEARCH|  DALLAS|

| 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|  null|    20|    20|  RESEARCH|  DALLAS|

| 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|  null|    30|    30|    SALES| CHICAGO|

| 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|  0.0|    30|    30|    SALES| CHICAGO|

| 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|  null|    30|    30|    SALES| CHICAGO|

| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|    30|    SALES| CHICAGO|

| 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|    30|    SALES| CHICAGO|

| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|    30|    SALES| CHICAGO|

+-----+------+---------+----+----------+------+------+------+------+----------+--------+

scala> resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),

    |  mysqlDF.col("deptno"),mysqlDF.col("dname")).show()

+-----+------+------+----------+

|empno| ename|deptno|    dname|

+-----+------+------+----------+

| 7934|MILLER|    10|ACCOUNTING|

| 7839|  KING|    10|ACCOUNTING|

| 7782| CLARK|    10|ACCOUNTING|

| 7902|  FORD|    20|  RESEARCH|

| 7876| ADAMS|    20|  RESEARCH|

| 7788| SCOTT|    20|  RESEARCH|

| 7566| JONES|    20|  RESEARCH|

| 7369| SMITH|    20|  RESEARCH|

| 7900| JAMES|    30|    SALES|

| 7844|TURNER|    30|    SALES|

| 7698| BLAKE|    30|    SALES|

| 7654|MARTIN|    30|    SALES|

| 7521|  WARD|    30|    SALES|

| 7499| ALLEN|    30|    SALES|

+-----+------+------+----------+