1. 程式人生 > >SparkStreaming(7):例項-wordcount統計結果寫入到MySQL

SparkStreaming(7):例項-wordcount統計結果寫入到MySQL

一、功能概述

DStreams的輸出操作,即將DStreams輸出到對應的目的地。輸出操作包括:print、saveAsTextFiles、saveAsObjectFiles、saveAsHadoopFiles、foreachRDD。本例將使用foreachRDD把資料輸出到外部mysql資料庫。

【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

二、功能實現

1.前提工作:在mysql資料庫中建立表

(1)開啟資料庫

mysql -h192.168.31.3 -uroot -p 

(2) 建立資料庫

create database spark;

(3)建立表

use spark;
create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

 

2.scala程式碼

(1)依賴

<!-- mysql driver jar -->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.27</version>
</dependency>

(2)程式碼

package Spark

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用spark streaming完成有狀態統計,並且將結果寫入到mysql資料庫中
  *
  */
object ForeachRDDApp {
  def main(args: Array[String]): Unit = {

    val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]")
    val ssc=new StreamingContext(sparkConf,Seconds(5))

    val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789)
    val results=lines.flatMap( _.split(" "))
      .map((_,1)).reduceByKey(_+_)

    //TODO... 將結果寫入到MYSQL
    //參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html
    results.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>{   //partitionOfRecords是整個分割槽的資料
        
          val connection = createConnection()
          partitionOfRecords.foreach(record =>{     //record這個record才是每一條資料
            val sql=" insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
            connection.createStatement().execute(sql)
          })
          connection.close()

      }
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 獲取mysql連線
    * @return
    */
  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://bigdata.ibeifeng.com:3306/spark","root","123456")
  }

}

三、測試

1.啟動nc -lk 6789,輸入測試資料

aa bb cc dd ee dd 
aa bb cc dd ee dd 
aa bb cc dd ee dd 


2.mysql中的是

        mysql> select * from wordcount;
        +------+-----------+
        | word | wordcount |
        +------+-----------+
        | ee   |         3 |
        | aa   |         3 |
        | dd   |         6 |
        | bb   |         3 |
        | cc   |         3 |
        +------+-----------+
        5 rows in set (0.00 sec)

(經測試,成功!)