SparkSteaming實時接收網路埠資料和hdfs做Wordcount
阿新 • • 發佈:2018-12-29
一、POM配置
因為使用windows的IDEA連線虛擬機器中的Spark,所有要配置一下依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.spark</groupId> <artifactId>sparktrain</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.4</scala.version> <kafka.version>1.0.0</kafka.version> <spark.version>2.4.0</spark.version> <hadoop.version>2.6.1</hadoop.version> <hbase.version>1.2.6</hbase.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
二、實時接收網路資料
1.程式碼
package Sparkstreaming import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object NetWorkCount { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("NerWorkCount").setMaster("local[2]") val ssc =new StreamingContext(conf,Seconds(5)) val lines=ssc.socketTextStream("192.168.116.10",9999) val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //System.setProperty("hadoop.home.dir","E:\\MVN\\hadoop-common-2.2.0-bin-master") //相當於這個寫法 reduceByKey((x,y) => x+y)當找到key相同的兩條記錄時會對其value(分別記為x,y)做(x,y) => x+y result.print() result.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createconnection() partitionOfRecords.foreach(record =>{ val sql="insert into wordcount values('"+record._1+"',"+record._2+")" connection.createStatement().execute(sql) } ) connection.close() } } ssc.start() ssc.awaitTermination() //awaitTermination用於等待子執行緒結束,再繼續執行下面的程式碼 } def createconnection()={ Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://192.168.116.10:3306/test","root","123456") } }
2.測試
在虛擬機器中開啟新開一個視窗,輸入nc -lk 6789
然後執行IDEA的spark原始碼,隨便輸入幾個單詞,可以發現IDEA已經顯示出來了。
3.報錯處理。
在Windows執行的時候,可能會報錯 Failed to locate the winutils binary in the hadoop binary path,可以到GitHub下載整個bin目錄,然後修改本機的環境變數。在cmd中測試一下輸入hadoop看看環境是否設定成功。
三、實時接收本地hdfs資料
1.程式碼
package Sparkstreaming
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Filewordcount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("Filewordcont").setMaster("local")
val ssc=new StreamingContext(conf,Seconds(5))
val lines=ssc.textFileStream("hdfs://192.168.116.10:9000/sqoop/hdfs/")
val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
四、做名單過濾
1.程式碼
package Sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Filtername {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Filewordcont").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//名單過濾之建立名單
val names=List("good","better")
val namesRDD=ssc.sparkContext.parallelize(names).map((_,true))
val lines = ssc.socketTextStream("192.168.116.10", 6789)
val result = lines.map(x=>(x.split(",")(1),x)).transform(rdd=>rdd.leftOuterJoin(namesRDD)).
filter(x=>x._2._2.getOrElse("good")!="good").map(x=>x._2._1)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
2.測試
在控制檯輸入可以發現只有2018,good打印出來而已