flume+spark streaming+redis完整篇
一.前言
本篇是用flume作為資料來源,spark streaming來實時處理,然後把結果存在redis供查詢.
本篇介紹的是一個實時統計網站訪問的pv的例子.
本篇採用的各種版本如下 scala-2.10.4 spark-1.6.1 flume-1.6.0
本篇採用的spark叢集為sdandalone模式
二.資料來源flume配置
flume的詳細說明請自行百度.這裡的flume source採用http,並且使用json handler來處理.
source 配置:
a1.sources.r2.type = http
a1.sources.r2.bind=10.8.23.58
a1.sources.r2.port = 5140
a1.sources.r2.channels = c3
a1.sources.r2.handler = org.apache.flume.source.http.JSONHandler
channel因為測試,所以選擇的是記憶體方式,實際根據情況,建議使用flie模式,並且配置checkpoint防止資料丟失.
channel配置:
a1.channels.c3.type = memory
a1.channels.c3.capacity = 100
a1.channels.c3.transactionCapacity = 100
sink有兩種,如下
flume=>spark streaming有兩種方式
1.推模式:這種模式比較簡單,直接連線上對應的avro埠即可,但是有個最大的問題,你必須先啟動spark streaming任務,然後觀察這個埠開在哪臺spark節點上,並且每次啟動都會隨機到某一個節點,然後再去改flume的配置,往那臺機器上發資料,這就比較蛋疼,如果你有100臺flume,那會讓人瘋狂的.因此這個模式,只有你的flume數量比較少的情況下適用.
推模式的sink配置:
a1.sinks.k3.type = avro
a1.sinks.k3.channel = c3
a1.sinks.k3.hostname = 10.8.23.32
a1.sinks.k3.port = 4545
2.拉模式:
這裡建議先使用推模式,等推模式跑通了,再切換到拉模式. 拉模式相比推模式稍微複雜點,主要複雜在flume的配置,官網有很詳細的說明,我這裡會把用到的都描述出來.
首先需要把3個jar放到flume/lib下.(分別是:spark-streaming-flume-sink_2.10-1.6.1.jar ,scala-library-2.10.5.jar, commons-lang3-3.3.2.jar,在最後的網盤地址裡面有)
拉模式的sink配置
a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = 10.2.23.58
a1.sinks.spark.port = 4545
a1.sinks.spark.channel = c3
至此 flume配置完成.可以啟動觀察一下.
三.程式碼開發
object PvStatistic{def main(args: Array[String]): Unit = { val masterUrl = "spark://10.8.23.112:7077" val conf = new SparkConf().setMaster(masterUrl).setAppName("PvStatistic") val ssc = new StreamingContext(conf, Seconds(15)) // 推模式 // val flumeStream = FlumeUtils.createStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2) // 拉模式 val flumeStream = FlumeUtils.createPollingStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2) flumeStream.foreachRDD(rdd => { rdd.foreachPartition(it=>{ val jedis = RedisClient.pool.getResource it.foreach(event=>{ val sensorInfo = new String(event.event.getBody.array()) //單行記錄 // println(sensorInfo) val json = JSONObject.fromObject(sensorInfo); val url=json.getString("url")jedis.hincrBy("Spark:PV", url, 1); }) RedisClient.pool.returnResource(jedis) }) }) ssc.start() ssc.awaitTermination() }
兩種模式對於資料來源的獲取只是方法不一樣,處理邏輯都一樣.
需要的jar包 我直接貼pom檔案
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency>
完成以後就是打包上傳到spark主節點了,這裡提一下,因為需要很多依賴包,所以建議直接打成一個包含所有依賴的jar.
推薦一個maven外掛來做這個事.
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.defonds.RsaEncryptor</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
有了這個外掛,直接maven clean package打包就行了,打出來的包包含所有依賴檔案.
然後把這個包上傳到spark主節點,在spark/bin下執行
spark-submit --master spark://cdh112:7077 --class com.dome.PvStatistic /opt/spark/job/analyseSys-1.0-SNAPSHOT.jar
然後可以向之前配置的flume的接收埠傳送http請求測試了
傳送內容:
[
{
"body":"{\"url\": \"http://20160926 16:02\"}",
"headers":{"v1":"log"}
}
]
在redis裡面可以看到 SparkPv開頭的key下面 每個訪問地址的訪問次數.
上面提到的jar包
連結:http://pan.baidu.com/s/1kUEJJJx 密碼:9fxv
最後感謝一下群內的各位兄弟以及群主aDog~ 有興趣交流的可以加入QQ群:459898801