1. 程式人生 > >[Spark][Streaming]Spark讀取網路輸入的例子

[Spark][Streaming]Spark讀取網路輸入的例子

Spark讀取網路輸入的例子:

參考如下的URL進行試驗

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
http://www.cnblogs.com/FG123/p/5324743.html

發現 先執行 nc -lk 9999 ,再執行 spark 程式之後,
如果停止 nc ,spark程式會報錯:

類似於:

-------------------------------------------
Time: 2017-10-28 19:32:02
-------------------------------------------

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

這表明,兩者已經建立 的 通訊。但是沒有看到預想的 word count 輸出。我猜測是 用於參與計算的程序數不夠,所以進行如下改動:

sc = SparkContext("local[2]", "streamwordcount")

改為:

sc = SparkContext("local[3]", "streamwordcount")

整個程式如下:

[[email protected] ab]$ cat test.py
#showing remote messages

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[3]", "streamwordcount")
# 建立本地的SparkContext物件,包含3個執行執行緒

ssc = StreamingContext(sc, 2)
# 建立本地的StreamingContext物件,處理的時間片間隔時間,設定為2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split對2秒內收到的字串進行分割

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start() 
# 啟動Spark Streaming應用

ssc.awaitTermination()

再次執行 nc 程式

[[email protected] ~]$ nc -lk 9999

執行 spark 程式:

[[email protected] ~]$ spark-submit /home/training/ab/test.py

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

在nc視窗中輸入一些資料:

aaa bbb ccc
ddd aaa sss
sss bbb bbb

kkk jjj mmm
ooo kkk jjj
mmm ccc ddd
eee fff sss
rrr nnn ooo
ppp sss zzz
mmm sss ttt
kkk sss ttt
rrr ooo ppp
kkk qqq kkk
lll nnn jjj
rrr ooo sss
kkk aaa ddd
aaa aaa fff
eee sss nnn
ooo ppp qqq
qqq sss eee
sss mmm nnn

此時,經過一小會,可以看到,spark 程式的視窗輸出:

------------------------------------------- 
Time: 2017-10-28 19:33:50
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:52
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:54
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:56
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:33:58
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:00
-------------------------------------------
(u'', 1)
(u'mmm', 2)
(u'bbb', 3)
(u'nnn', 1)
(u'ccc', 2)
(u'rrr', 1)
(u'sss', 3)
(u'fff', 1)
(u'aaa', 2)
(u'ooo', 2)
...

------------------------------------------- 
Time: 2017-10-28 19:34:02
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:04
-------------------------------------------
(u'ppp', 1)
(u'sss', 1)
(u'zzz', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:06
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:08
-------------------------------------------
(u'mmm', 1)
(u'sss', 1)
(u'ttt', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:10
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:12
-------------------------------------------
(u'sss', 1)
(u'ttt', 1)
(u'kkk', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:14
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:16
-------------------------------------------
(u'ppp', 1)
(u'rrr', 1)
(u'ooo', 1)

------------------------------------------- 
Time: 2017-10-28 19:34:18
-------------------------------------------
(u'qqq', 1)
(u'kkk', 2)

------------------------------------------- 
Time: 2017-10-28 19:34:20
-------------------------------------------

------------------------------------------- 
Time: 2017-10-28 19:34:22
-------------------------------------------

相關推薦

[Spark][Streaming]Spark讀取網路輸入例子

Spark讀取網路輸入的例子: 參考如下的URL進行試驗 https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstreamhttp://www.cnblogs.com/

[Spark][Streaming]Spark讀取網絡輸入例子

trac pair keep exception clas zookeeper 包含 air blog Spark讀取網絡輸入的例子: 參考如下的URL進行試驗 https://stackoverflow.com/questions/46739081/how-to-ge

Spark修煉之道(進階篇)——Spark入門到精通:第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming

主要內容 Spark SQL、DataFrame與Spark Streaming 1. Spark SQL、DataFrame與Spark Streaming import org.apache.spark.SparkConf import org

《深入理解Spark》之 結構化流(spark streaming+spark SQL 處理結構化資料)的一個demo

最近在做關於spark Streaming + spark sql 結合處理結構化的資料的業務,下面是一個小栗子,有需要的拿走! ​ package com.unistack.tamboo.compute.process.impl; import com.alibaba.

Kakfka-Spark Streaming-Spark SQL操作筆記

(kafka版本與Spark版本在maven專案中有提到) linux下測試kafka 1.偽分散式下開啟kafka服務(啟動有先後順序): nohup zookeeper-server-start.sh config/zookeeper.propert

Spark Streaming——Spark第一代實時計算引擎

雖然SparkStreaming已經停止更新,Spark的重點也放到了 Structured Streaming ,但由於Spark版本過低或者其他技術選型問題,可能還是會選擇SparkStreaming。 SparkStreaming對於時間視窗,事件時間雖然支撐較少,但還是可以滿足部分的實時計算場景的,S

spark-streaming例子程式

開發spark-streaming從伺服器埠實時接收資料進行worldcount; 環境搭建 idea+maven 其pom檔案如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http:

計算成交量例子,kafka/spark streaming/zk

package com.ws.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder impor

spark讀取hbase(NewHadoopAPI 例子)

package cn.piesat.controllerimport java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Futur

Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)

叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St

Spark Streaming 輸入DStream和Receiver詳解

輸入DStream和Receiver詳解   輸入DStream代表了來自資料來源的輸入資料流。在之前的wordcount例子中,lines就是一個輸入DStream(JavaReceiverInputDStream),代表了從netcat(nc)服務接收到的資

Spark Streaming 輸入DStream之基礎資料來源HDFS檔案

Socket:之前的wordcount例子,已經演示過了,StreamingContext.socketTextStream() HDFS檔案     基於HDFS檔案的實時計算,其實就是,監控一個

spark streaming中reduceByKeyAndWindow簡單例子

視窗的一些簡單操作 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingCon

Spark Streaming 基本輸入

檔案流 在spark/mycode/streaming/logfile目錄下新建兩個日誌檔案log1.txt和log2.txt,隨便輸入內容。比如,在log1.txt中輸入以下內容: I love Hadoop I love Spark Spark is

spark streaming讀取HDFS

今天跑第一spark streaming程式讀取HDFS檔案,碰到很多坑: JavaDStream lines = jsc1.textFileStream("hdfs://*.*.*.*:900

Spark Streaming 讀取本地檔案壓檔案

package streamings.studys import org.apache.spark.SparkConf import org.apache.spark.streaming.dstre

常見問題----Spark Streaming 讀取User Group ID設定

在Spark Streaming DirectStream中設定User Group ID,優點是可跟蹤Kafka中此Group ID的Offset,下次重啟時從上次中斷的地方開始讀資料。但是如果Kafka中已經不儲存對應Offset的資料,則會報java.lang.ClassNotFoundExc

spark streaming讀取kafka資料,記錄offset

如下是pom.xml檔案<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati

Spark Streaming java實現簡單例子(一)

1. 背景:之前已經學習過Spark SQL的相關知識,現在開始對Spark的另一模組Streaming部分進行學習。首先是參考官網上的Demo進行樣例的編寫,但是發現程式碼有點問題,百度之後發現,在一處程式碼處發現問題,所以寫此文。 2. 介紹:一些部落格上的 Jav

Flume+Kakfa+Spark Streaming整合(執行WordCount小例子

環境版本:Scala 2.10.5; Spark 1.6.0; Kafka 0.10.0.1; Flume 1.6.0 Flume/Kafka的安裝配置請看我之前的部落格: http://blog.c