1. 程式人生 > >SparkSteaming實時接收網路埠資料和hdfs做Wordcount

SparkSteaming實時接收網路埠資料和hdfs做Wordcount

一、POM配置

因為使用windows的IDEA連線虛擬機器中的Spark,所有要配置一下依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.imooc.spark</groupId>
  <artifactId>sparktrain</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.4</scala.version>
    <kafka.version>1.0.0</kafka.version>
    <spark.version>2.4.0</spark.version>
    <hadoop.version>2.6.1</hadoop.version>
    <hbase.version>1.2.6</hbase.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.17.Final</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.9.1</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.31</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

 

二、實時接收網路資料

1.程式碼

package Sparkstreaming

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object NetWorkCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("NerWorkCount").setMaster("local[2]")
    val ssc =new StreamingContext(conf,Seconds(5))
    val lines=ssc.socketTextStream("192.168.116.10",9999)
    val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    //System.setProperty("hadoop.home.dir","E:\\MVN\\hadoop-common-2.2.0-bin-master")
    //相當於這個寫法 reduceByKey((x,y) => x+y)當找到key相同的兩條記錄時會對其value(分別記為x,y)做(x,y) => x+y
    result.print()
    result.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createconnection()
        partitionOfRecords.foreach(record =>{
          val sql="insert into wordcount values('"+record._1+"',"+record._2+")"
          connection.createStatement().execute(sql)
        }
         )
        connection.close()
      }
    }
    ssc.start()
    ssc.awaitTermination()
    //awaitTermination用於等待子執行緒結束,再繼續執行下面的程式碼

  }
  def createconnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://192.168.116.10:3306/test","root","123456")
  }

}

2.測試

在虛擬機器中開啟新開一個視窗,輸入nc -lk 6789

然後執行IDEA的spark原始碼,隨便輸入幾個單詞,可以發現IDEA已經顯示出來了。

3.報錯處理。

在Windows執行的時候,可能會報錯 Failed to locate the winutils binary in the hadoop binary path,可以到GitHub下載整個bin目錄,然後修改本機的環境變數。在cmd中測試一下輸入hadoop看看環境是否設定成功。

三、實時接收本地hdfs資料

1.程式碼

package Sparkstreaming

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Filewordcount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("Filewordcont").setMaster("local")
    val ssc=new StreamingContext(conf,Seconds(5))
    val lines=ssc.textFileStream("hdfs://192.168.116.10:9000/sqoop/hdfs/")
    val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

四、做名單過濾

1.程式碼

package Sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Filtername {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Filewordcont").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //名單過濾之建立名單
    val names=List("good","better")
    val namesRDD=ssc.sparkContext.parallelize(names).map((_,true))
    val lines = ssc.socketTextStream("192.168.116.10", 6789)
    val result = lines.map(x=>(x.split(",")(1),x)).transform(rdd=>rdd.leftOuterJoin(namesRDD)).
      filter(x=>x._2._2.getOrElse("good")!="good").map(x=>x._2._1)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

2.測試

在控制檯輸入可以發現只有2018,good打印出來而已