實時計算實踐(spark streaming+kafka+hdfs)
一直在研究大資料處理方向的各種技術和工具,但沒有真正用於實踐中,恰好目前風控措施轉向,需要實施“線上+離線”的雙線防控措施,遂在調研查閱相關文件後,決定從零開始構造(資料探勘轉工程開發,思維轉變是關鍵),這裡面涉及的幾個階段慢慢說來:
專案開發環境選擇(scala2.10.4 IDE +maven3.3)
最開始的選擇是直接在eclipse環境上安裝scala的IDE外掛,但實踐證明此種方式很不穩定,受限於網路的限制基本沒能成功,索性直接利用scala提供的IDE開發環境,這視為邁出第一步,雖然IDE提供了構建scala project的模板,但是後期證明沒有合適的版本管理工具,很難在包依賴及部署上做到得心應手,雖然網上普遍推薦SBT在該scala專案管理上的作用,但是有過java開發經驗的還是首推maven工具,為編譯打包提供極大的方便,同時實現在windows環境下編譯執行spark程式碼,不過裡面涉及很多坑後面再術。實際計算平臺(spark 1.5.2)
網上關於storm與spark ,誰在流式計算方式更具有優勢的討論甚多,這裡不做比較,本人結合已有的平臺環境加開發工具選擇了 spark streaming作為實時計算的計算引擎,另外一個原因在於spark在於機器學習支撐上的強有力地位,方便後日擴充套件,此外spark streaming也提供了針對各種資料來源的高階API,方便從不同資料來源中獲取DStearm,同時支援寫入各種儲存介質中。資料來源(kafka)
kafka作為一個分散式釋出-訂閱訊息系統,可以利用已有的API實時的從topic中pull資料,滿足實時計算的要求(前期嘗試從hdfs中讀取資料做計算並非有效),同時streaming-kafka的API友好的提供了資料來源無縫的讀取方式,一切都為達到實時計算的目的資料去向(HDFS)
利用上述提到的各種平臺工具等, 舉個簡單案例,程式碼如下:
package com.kafka.test
import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaWordCounts {
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
return Some(newValues.sum +runningCount.getOrElse(0))
}
def main(args: Array[String]) {
//System.setProperty("hadoop.home.dir", "E:\\software\\hadoop-2.5.2");
val zkQuorum = "101.271.251.161:2181"
val group = "zjz-test-waf8"
val topics = "monitor_firewall_accesslog_pre"
val numThreads="2"
if (args.length >= 4){
val zkQuorum = args(0)
val group = args(1)
val topics = args(2)
val numThreads=args(3)
}
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).updateStateByKey[Int](updateFunction(_,_))
wordCounts.print()
wordCounts.saveAsTextFiles("sstest/kafka_waf_streaming","txt")
ssc.start()
ssc.awaitTermination()
}
}
相關推薦
實時計算實踐(spark streaming+kafka+hdfs)
一直在研究大資料處理方向的各種技術和工具,但沒有真正用於實踐中,恰好目前風控措施轉向,需要實施“線上+離線”的雙線防控措施,遂在調研查閱相關文件後,決定從零開始構造(資料探勘轉工程開發,思維轉變是關鍵),這裡面涉及的幾個階段慢慢說來: 專案開發環境選擇(sc
圖平行計算實踐(二)(spark streaming+graphx+kafka)
上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin
大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東
基於Python的Spark Streaming+Kafka程式設計實踐及調優總結
說明Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明演示環境Spark:1.6Kafka:kafka_2.11-0.9.0.1實現語言:Python程式設計模型目前Spark S
基於Python的Spark Streaming+Kafka程式設計實踐
說明 Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明 演示環境 Spark:1.6 Kafka:kafka_2.11-0.9.0.1 實現語言:P
Spark Streaming+kafka訂單實時統計實現
package com.lm.sparkLearning.orderexmaple; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.
spark----基於Python的Spark Streaming+Kafka程式設計實踐
來源:http://blog.csdn.net/eric_sunah/article/details/54096057?utm_source=tuicool&utm_medium=referral 說明 Spark Streaming的原理說明的文章很多,這裡不
【六】Spark Streaming接入HDFS的資料Local模式(使用Scala語言)
Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。 專案目錄 pom.xml <project xmlns="http://maven.apache.org/POM/4.
用Spark Streaming+Kafka實現訂單數和GMV的實時更新
前言 在雙十一這樣的節日,很多電商都會在大螢幕上顯示實時的訂單總量和GMV總額。由於訂單數量巨大,不可能每隔一秒就到資料庫裡進行一次SQL的資料統計,這時候就需要用到流式計算。本文將介紹一個簡單的Demo,講解如何通過Spark Stream消費來自Kafka中訂單資訊,
Spark Streaming + Kafka + Opencv + Face Recognizer + HDFS Sequence File + Mysql
<pre name="code" class="java">/** * Created by lwc on 6/17/16. */ import java.io.*; import java.sql.*; import java.util.*; impo
簡單實時計算方案(kafka+flink+druid/es)
最近在從事實時方面的工作,主要涉及到資料處理、加工及視覺化,在採坑的過程中總結出一套比較簡單的實時計算方案,供大家參考。主要涉及到幾個元件,kafka,flink,redis,druid和es。相信大家對以上幾個元件都比較熟悉了,這裡就不細說了。我們從一個簡單的需求,
下載基於大數據技術推薦系統實戰教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)
大數據技術推薦系統 推薦系統實戰 地址:http://pan.baidu.com/s/1c2tOtwc 密碼:yn2r82課高清完整版,轉一播放碼。互聯網行業是大數據應用最前沿的陣地,目前主流的大數據技術,包括 hadoop,spark等,全部來自於一線互聯網公司。從應用角度講,大數據在互聯網領域主
流式大資料計算實踐(2)----Hadoop叢集和Zookeeper
一、前言 1、上一文搭建好了Hadoop單機模式,這一文繼續搭建Hadoop叢集 二、搭建Hadoop叢集 1、根據上文的流程得到兩臺單機模式的機器,並保證兩臺單機模式正常啟動,記得第二臺機器core-site.xml內的fs.defaultFS引數值要改成本機的來啟動,啟動完畢後再改回來 2、清空資
流式大數據計算實踐(2)----Hadoop集群和Zookeeper
nts 環境變量 技術 文件創建 con mon orm rm2 sam 一、前言 1、上一文搭建好了Hadoop單機模式,這一文繼續搭建Hadoop集群 二、搭建Hadoop集群 1、根據上文的流程得到兩臺單機模式的機器,並保證兩臺單機模式正常啟動,記得第二臺機器c
流式大資料計算實踐(3)----高可用的Hadoop叢集
流式大資料計算實踐(3)----高可用的Hadoop叢集 一、前言 1、上文中我們已經搭建好了Hadoop和Zookeeper的叢集,這一文來將Hadoop叢集變得高可用 2、由於Hadoop叢集是主從節點的模式,如果叢集中的namenode主節點掛掉,那麼叢集就會癱瘓,所以我們要改造成
Spark Streaming+Kafka spark 寫入 kafka
目錄 前言 在WeTest輿情專案中,需要對每天千萬級的遊戲評論資訊進行詞頻統計,在生產者一端,我們將資料按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取資料進行詞頻統計。本文首先對spark stre
Spark踩坑記——Spark Streaming+Kafka
目錄 前言 Spark streaming接收Kafka資料 基於Receiver的方式 直接讀取方式 Spark向kafka中寫入資料 Spark streaming+Kafka應用 Spark str
Spark Streaming+Kafka
前言 在WeTest輿情專案中,需要對每天千萬級的遊戲評論資訊進行詞頻統計,在生產者一端,我們將資料按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取資料進行詞頻統計。本文首先對spark streamin
Spark-Streaming+kafka實現零丟失
原文連結 kafka和sparkstreaming是兩種適配很好的技術,兩者都是分散式系統適用於處理大量資料,兩者對於實現資料的零丟失並沒有提供現成的解決方案,所以這篇文章就是希望可以幫助你完成這個目標 注:使用Spark Streaming的Direct St
流式大資料計算實踐(7)----Hive安裝
一、前言 1、這一文學習使用Hive 二、Hive介紹與安裝 Hive介紹:Hive是基於Hadoop的一個數據倉庫工具,可以通過HQL語句(類似SQL)來操作HDFS上面的資料,其原理就是將使用者寫的HQL語句轉換成MapReduce任務去執行,這樣不用開發者去寫繁瑣的MapReduce程式,直接編寫