1. 程式人生 > >Spark流程式設計指引(三)-------------------------------------初始化StreamingContext

Spark流程式設計指引(三)-------------------------------------初始化StreamingContext

基本概念

接下來,我們在上一節例子的基礎上,來闡述Spark Streaming的基本知識。

連結

和Spark類似,Spark Streaming也包含在maven的中央倉庫中。為了寫基於Spark Streaming的程式,你需要為你的SBT或Maven工程分別新增以下依懶:

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"

為了從Kafka, Flume, 和 Kinesis等源中提取資料,你需要新增額外的依懶。因為Spark Streaming的核心API並不直接支援。下面是一些常用的依懶:

Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

要獲得最新的列表,請在maven中央倉庫Maven repository檢視支援的源和座標。

初始化StreamingContext

為了初始化一個Spark Streaming程式,需要建立一個StreamingContext物件,它是所有Spark Streaming功能的主入口點。

可以從一個SparkConf物件建立一個StreamingContext

import org.apache.spark._
import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))

appName引數是你的應用程式在叢集UI上的名字。master是Spark, Mesos或者YARN cluster 的URL,或者用"local[*]'字串表示執行在本地模式。

在實踐中,當在叢集上執行程式時,你不希望硬編碼master的值。你可以通過從spark-submit啟動你的程式,並指明引數。然而,為了本地測試或者單元測試,你可以傳遞"local[*]”引數來執行Spark Streaming程式。注意,你可以通過ssc.SparkContext的方式來訪問SparkContext.

批處理的間隔需要根據你的應用程式的延遲要求和可用的叢集資源來設定。

SparkStreaming物件也可以由已經存在的SparkContext來建立:

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

在建立了context之後,你需要做以下幾步:

1.通過建立輸入DStreams來定義輸入源

2.通過對DStreams應用轉變操作和輸出操作來定義流計算。

3.用streamingContext.start()來開始接收資料和處理流程。

4.通過streamingContext.awaitTermination()方法來等待處理結束(手動結束或因為錯誤)

5.還可以通過streamingContext.stop()來手動結束程序。

需要記住的關鍵點:

1.一旦context啟動後,就不能再新增新的streaming計算或操作。

2.一旦context停止後,就不能再重新啟動它了。

3.在同一時間內,一個JVM上只能執行一個StreamingContext

4.在StreamingContext上的stop()操作也會將SparkContext停止。如果只想停止StreamingContext,對stop的可選引數stopSparkContext設定為false.

5.一個SparkContext可以用來建立多個StreamingContext,只要前一個StreamingContext已經停止了。

相關推薦

Spark程式設計指引()-------------------------------------初始StreamingContext

基本概念 接下來,我們在上一節例子的基礎上,來闡述Spark Streaming的基本知識。 連結 和Spark類似,Spark Streaming也包含在maven的中央倉庫中。為了寫基於Spark Streaming的程式,你需要為你的SBT或Maven工程分別新增以

Spark程式設計指引(五)-----------------------------DStreams上的轉換操作

與RDDs類似,轉換操作允許對來自輸入DStreams的資料進行修改。DStreams支援許多在通常Spark RDD上的轉換操作。下面是一些常見的: 轉換 含義 map(func) Return a new DStream by passing each element

Minecraft Forge程式設計入門初始專案結構和邏輯”

經過前面兩個教程Minecraft Forge程式設計入門一 “環境搭建”和Minecraft Forge程式設計入門二 “工藝和食譜”,我們大體知道了如何自定義合成配方,主要是在 Mod類的init方法中進行註冊,但可想而之隨著專案的進行需要註冊的內容會越來

Spark程式設計指引(四)---------------------------DStreams基本模型,輸入DStreams和接收者

離散流(DStreams) 離散流或者稱為DStreams是Spark流程式設計提供的基本抽象。它代表了持續的資料流,從一個數據源接收到的資料流或者是在一個輸入流上應用轉變操作處理後的資料流。 在內部實現上,DStream代表了一系列連續的RDDs.RDDs是Spark對不

Spark程式設計指引()-----------------RDD操作,shuffle和持久化

處理鍵-值對 儘管Spark的大部操作支援包含所有物件型別的RDDs,但是還有一些操作只支援鍵-值對的的RDDs.最常見的是類似"洗牌"的操作,比如以鍵值來分組或聚合所有的元素。 在Scala裡,這些操作對包含2元組的RDD是自動可用的。(Scala語言內建的元組,通過(a

【Java 程式設計】陣列初始的多種方式

文章目錄 1 陣列定義與初始化基本方式 2 Array 工具類 3 Stream 方式 4 Arrays 1 陣列定義與初始化基本方式 陣列定義的兩種基本方式: int[][] a1; int a2[][]; 與C、

Java程式設計思想ch5 初始和清理

5.1 用構造器初始化 new 類名() 將建立和初始化,綁在一起。 5.2 方法過載 5.4 this關鍵字 class Banana{void peel(int i)}{} public class BananaPeel{

Java程式設計師從笨鳥到菜鳥之(九十四)深入java虛擬機器()——類的生命週期 下)類的初始

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Jquery 頁面初始常用的種方法以及Jquery 發送ajax 請求

tree error 渲染 erro 發生 har 初始化 nload 事情 第一種 $(document).ready(function(){ //文檔就緒事件 }); 第二種是第一種的簡略寫法,效果上和第一種是等效的。 $(function(){ //文檔加載事

處理大數據常用的種Apache框架:Storm、Spark和Samza。(主要介紹Storm)

領導 hdf 客戶端 orm 至少 per yar 持續性 apache 處理實時的大數據流最常用的就是分布式計算系統,下面分別介紹Apache中處理大數據流的三大框架: Apache Storm 這是一個分布式實時大數據處理系統。Storm設計用於在容錯和

Spring Boot實戰筆記()-- Spring常用配置(Bean的初始和銷毀、Profile)

div nbsp troy string 實例化 public ive work 初始 一、Bean的初始化和銷毀   在我們的實際開發的時候,經常會遇到Bean在使用之前或之後做些必要的操作,Spring對Bean的生命周期操作提供了支持。在使用Java配置和註解配置下提

jQuery頁面加載初始常用的種方法

ini ready document 加載 clas 初始 ora win eth 當頁面打開時我們需要執行一些操作,這個時候如果我們選擇使用jquery的話,需要重寫他的3中方法,自我感覺沒什麽區 別,看個人喜好了,第二種感覺比較簡單明了: 第一種: 復制代碼代碼如下:

Spring IOC容器的初始-()BeanDefinition的註冊

store erro pan customize 註冊 failed mono def override ---恢復內容開始--- 前言 在上一篇中有一處代碼是BeanDefiniton註冊的入口,我們回顧一下。 1.BeanDefiniton在IOC容器註冊 首先我

buildroot構建項目(五)--- u-boot 2017.11 適配開發板修改 4 ---- 系統啟動初始

vid def include cmp ble soft setup.s bubuko 賦值 一、內存控制器   在關閉了MMU和caches 之後 就進入lowlevel_init 函數,對內存控制器進行初始化。lowlevel_init.S (board\samsung

Spring Core Container 源碼分析:Spring Beans 初始流程分析

turn raw time -c rri add 步驟 引用 lin 前言 本文是筆者所著的 Spring Core Container 源碼分析系列之一; 本篇文章主要試圖梳理出 Spring Beans 的初始化主流程和相關核心代碼邏輯; 本文轉載自本人的私人博客,傷神

linux文件系統 - 初始()

視圖 div 目錄遷移 oca script 方式 不能 輸出 str 一、目的 內核加載完initrd文件後,為掛載磁盤文件系統做好了必要的準備工作,包括掛載了sysfs、proc文件系統,加載了磁盤驅動程序驅動程序等。接下來,內核跳轉到用戶空間的init程序,

Linux內存初始() 內存布局

也會 mat 註冊 情況 align else if mod 而在 ech 一、前言 同樣的,本文是內存初始化文章的一份補充文檔,希望能夠通過這樣的一份文檔,細致的展示在初始化階段,Linux 4.4.6內核如何從device tree中提取信息,完成內存布局的任務。具體的

WebApplicationContext的初始方式

實例 aware web.xml span available HA 而且 util onf    ApplicationContext是Spring的核心,Context我們通常解釋為上下文環境,我想用“容器”來表述它更容易理解一些,ApplicationContext則

自己寫bootloader——mini2440(、關閉看門狗,初始始終)

com img tps image 參考 我們 csdn .net 參考資料 參考資料:https://blog.csdn.net/lee244868149/article/details/50450232      https://blog.csdn.net/lee244

Spark源碼剖析——SparkContext的初始(五)_創建任務調度器TaskScheduler

pool exec http 我們 分享 res locale sch fixed 5. 創建任務調度器TaskScheduler TaskScheduler也是SparkContext的重要組成部分,負責任務的提交,並且請求集群管理器對任務調度。TaskScheduler