SparkStreaming(7):例項-wordcount統計結果寫入到MySQL
阿新 • • 發佈:2018-11-08
一、功能概述
DStreams的輸出操作,即將DStreams輸出到對應的目的地。輸出操作包括:print、saveAsTextFiles、saveAsObjectFiles、saveAsHadoopFiles、foreachRDD。本例將使用foreachRDD把資料輸出到外部mysql資料庫。
【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html】
二、功能實現
1.前提工作:在mysql資料庫中建立表
(1)開啟資料庫
mysql -h192.168.31.3 -uroot -p
(2) 建立資料庫
create database spark;
(3)建立表
use spark;
create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
2.scala程式碼
(1)依賴
<!-- mysql driver jar --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
(2)程式碼
package Spark import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用spark streaming完成有狀態統計,並且將結果寫入到mysql資料庫中 * */ object ForeachRDDApp { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789) val results=lines.flatMap( _.split(" ")) .map((_,1)).reduceByKey(_+_) //TODO... 將結果寫入到MYSQL //參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html results.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords =>{ //partitionOfRecords是整個分割槽的資料 val connection = createConnection() partitionOfRecords.foreach(record =>{ //record這個record才是每一條資料 val sql=" insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() } } } ssc.start() ssc.awaitTermination() } /** * 獲取mysql連線 * @return */ def createConnection()={ Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://bigdata.ibeifeng.com:3306/spark","root","123456") } }
三、測試
1.啟動nc -lk 6789,輸入測試資料
aa bb cc dd ee dd
aa bb cc dd ee dd
aa bb cc dd ee dd
2.mysql中的是
mysql> select * from wordcount;
+------+-----------+
| word | wordcount |
+------+-----------+
| ee | 3 |
| aa | 3 |
| dd | 6 |
| bb | 3 |
| cc | 3 |
+------+-----------+
5 rows in set (0.00 sec)