使Apache Spark和Mysql作資料分析
阿新 • • 發佈:2018-12-26
使用用spart-shell讀取MySQL表中的資料
步驟1: 執行spark-shell命令,進入spark-shell命令列,執行命令如下:
[email protected]:~/run/spark/bin$ ./spark-shell --master spark://ubuntu1:7077 --jars /home/bigdata/run/spark/mysql-connector-java-5.1.30-bin.jar
執行結果如下:
[email protected]:~/run/spark/bin$ ./spark-shell --master spark://ubuntu1:7077 --jars /home/bigdata/run/spark/mysql-connector-java-5.1.30-bin.jar Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/05/08 01:40:28 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead. 17/05/08 01:40:46 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead. 17/05/08 01:40:46 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead. 17/05/08 01:40:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/05/08 01:40:57 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead. 17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar." 17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark/jars/datanucleus-rdbms-3.2.9.jar." 17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark/jars/datanucleus-core-3.2.10.jar." 17/05/08 01:41:10 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://10.3.19.171:4040 Spark context available as 'sc' (master = spark://ubuntu1:7077, app id = app-20170508014050-0004). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25) Type in expressions to have them evaluated. Type :help for more information. scala>
步驟2: 建立變數sqlContext
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = [email protected]
步驟3:從Mysql中載入資料
scala> val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://127.0.0.1:3306/mydatabase").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "mytable").option("user", "myname").option("password", "mypassword").load() dataframe_mysql: org.apache.spark.sql.DataFrame = [id: string, grouptype: int ... 16 more fields]
步驟4:顯示dataframe中的資料
scala> dataframe_mysql.show +---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+ | id|grouptype|groupid|loginname| name| pwd|sex| birthday|tel|mobilephone|email|isOk| lastLoginTime| addtime|intro|credit|experience|img| +---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+ | 1| 1| 1| admin| admin| admin| 1|2016-05-05 14:51:...| 1| 1| 1| 1|2016-05-10 14:52:...|2016-05-08 14:52:...| 1| 1| 1| 1| | 2| 2| 2| wanghb|wanghb|wanghb| 2|2016-05-10 14:56:...| 2| 2| 2| 2|2016-05-11 14:57:...|2016-05-10 14:57:...| 2| 2| 22| 2| +---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+
步驟5:為了後續查詢,將dataframe中的資料註冊為一個臨時表
scala> dataframe_mysql.registerTempTable("tmp_tablename")
warning: there was one deprecation warning; re-run with -deprecation for details
步驟6:現在可以從臨時表"tmp_tablename"中查詢資料
scala> dataframe_mysql.sqlContext.sql("select * from tmp_tablename").collect.foreach(println)
[1,1,1,admin,admin,admin,1,2016-05-05 14:51:58.0,1,1,1,1,2016-05-10 14:52:07.0,2016-05-08 14:52:12.0,1,1,1,1]
[2,2,2,wanghb,wanghb,wanghb,2,2016-05-10 14:56:58.0,2,2,2,2,2016-05-11 14:57:05.0,2016-05-10 14:57:08.0,2,2,22,2]