0073 spark streaming從埠接受資料進行實時處理的方法
阿新 • • 發佈:2019-01-01
一,環境
Windows_x64 系統 Java1.8 Scala2.10.6 spark1.6.0 hadoop2.7.5 IDEA IntelliJ 2017.2 nmap工具(用到其中的ncat命令,對應Linux中的nc命令)二,本地應用搭建
2.1 環境變數
設定方法:系統引數--》新增變數--》形式為:XXX_HOME,然後把對應安裝包的根目錄複製作為變數值;在PATH變數中新增: %XXX_HOME%\bin; 1,Hadoop需要設定環境變數; 2,Scala最好自己下載安裝相應版本,設定環境變數; 3,spark直接解壓即可;2.2 搭建測試
利用SBT工具非常方便的可以完成搭建,利用sbt建立Scala專案。專案結構生成為:其中SparkStreaming.scala為:/** * notes: To test scala and spark and hadoop * date: 2017.12.20 * author: gendlee */ import org.apache.spark.{SparkConf,SparkContext} import org.apache.log4j.{Level,Logger} import com.test.SparkStreaming object test { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { SparkStreaming.printWebsites() //initiate spark val sc = new SparkContext(conf) //read file from local disc val rdd = sc.textFile("F:\\Code\\scala2.10.6_spark1.6_hadoop2.8\\Test.log") } }
/** *notes: To test spark streaming * date: 2017.12.21 * author: gendlee */ package com.test import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming { def printWebsites(): Unit= { val conf = new SparkConf().setMaster("local[2]").setAppName("PrintWebsites") val ssc = new StreamingContext(conf, Seconds(1)) val output = "F:\\Code\\scala2.10.6_spark1.6_hadoop2.8\\out\\gettedWebsites" val lines = ssc.socketTextStream("localhost", 7777) val websiteLines = lines.filter(_.contains("http")) websiteLines.print() //websiteLines.repartition(1).saveAsTextFiles(output) ssc.start() ssc.awaitTermination() } }
我要從輸入中提取出含有網址的欄位(含有http): 踩坑:
val conf = new SparkConf().setMaster("local[2]").setAppName("PrintWebsites")
這裡setMaster引數必須為local[2],應為這裡要開啟兩個程序,一個發一個收,若用預設的local將接受不到資料。
編譯後可以執行一下,發現列印這樣的資訊:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/12/22 16:39:14 INFO Slf4jLogger: Slf4jLogger started
17/12/22 16:39:14 INFO Remoting: Starting remoting
17/12/22 16:39:14 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:64905]
17/12/22 16:39:15 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
-------------------------------------------
Time: 1513931956000 ms
-------------------------------------------
Time: 1513931957000 ms
-------------------------------------------
出現錯誤。不著急,那是因為7777 埠沒有接受到資料,下面先暫停程式,我們需要往7777埠發資料。
利用socketTextStream()函式,我們可以從指定的主機上某個特定埠接收資料。下面看一下如何在7777埠發資料。
開啟windows的power shell或CMD,輸入:
ncat -lk -p 7777
然後再執行IDEA中的程式,這時在開啟的CMD窗空中輸入,當輸入的欄位含有http,就會在IDEA的執行展示視窗打印出來。
IDEA端過濾列印:
可見這裡有個問題,其實像https這種我是不要的,即http作為單詞的一部分這種是不要的,所以後續再想辦法看看如何過濾。
至此完成題目的要求。