1. 程式人生 > >使用scala開發SparkSql程式

使用scala開發SparkSql程式

依賴

 <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.6</scala.version>
        <scala.compat.version>
2.10</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency
>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId
>
<artifactId>spark-streaming_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies>

1 使用RDD和Case Class關聯進行表操作(推斷模式)

package cn.itcast.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

object InferringSchema {
  def main(args: Array[String]) {

    //建立SparkConf()並設定App名稱
    val conf = new SparkConf().setAppName("SQL-1")
    //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf)
    //建立SQLContext
    val sqlContext = new SQLContext(sc)

    //從指定的地址建立RDD
    val lineRDD = sc.textFile(args(0)).map(_.split(" "))

    //建立case class
    //將RDD和case class關聯
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //匯入隱式轉換,如果不到人無法將RDD轉換成DataFrame
    //將RDD轉換成DataFrame
    import sqlContext.implicits._
    val personDF = personRDD.toDF
    //登錄檔
    personDF.registerTempTable("t_person")
    //傳入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //將結果以JSON的方式儲存到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()

  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)

2 RDD對映到rowRDD並指定schema(指定模式)

package cn.itcast.spark.sql

import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //建立SparkConf()並設定App名稱
    val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf)
    //建立SQLContext
    val sqlContext = new SQLContext(sc)
    //從指定的地址建立RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通過StructType直接指定每個欄位的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD對映到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //將schema資訊應用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //登錄檔
    personDataFrame.registerTempTable("t_person")
    //執行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //將結果以JSON的方式儲存到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}

3 DataFrame的JDBC操作

package cn.itcast.spark.sql

import java.util.Properties

import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRDD {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MySQL-Demo")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //通過並行化建立RDD
    val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" "))
    //通過StructType直接指定每個欄位的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD對映到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //將schema資訊應用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //建立Properties儲存資料庫相關屬性
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123456")
    //將資料追加到資料庫
    personDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata", "bigdata.person", prop)
    //停止SparkContext
    sc.stop()
  }
}

相關推薦

使用scala開發SparkSql程式

依賴 <properties> <maven.compiler.source>1.7</maven.compiler.source>

Spark實戰----(1)使用Scala開發本地測試的Spark WordCount程式

第一步:JDk的安裝 第二步:Scala的安裝   不會的可以看這裡   Scala環境安裝 鑑於以上兩步較為簡單,不再詳細贅述 第三步:去Spark官方網站下載Spark包 我下載的檔名是spark-1.6.2-bin-hadoop2.6          點選Dow

學習大資料的第一步-搭建Scala開發環境,以及使用Intellij IDEA開發Scala程式

1、為什麼要學習Scala語言? 結合Spark處理大資料 這是Scala的一個主要應用,而且Spark也是那Scala寫的。 Java的指令碼語言版 可以直接寫Scala的指令碼,也可以在.sh直接使用Scala。 代替Java Sca

scala程式設計】學習大資料的第一步-搭建Scala開發環境,以及使用Intellij IDEA開發Scala程式

1、為什麼要學習Scala語言?結合Spark處理大資料 這是Scala的一個主要應用,而且Spark也是那Scala寫的。Java的指令碼語言版 可以直接寫Scala的指令碼,也可以在.sh直接使用Scala。代替Java Scala的程式設計風格更簡潔,當然也很可能降低可

IDEA搭建scala開發環境開發spark應用程式

一、idea社群版安裝scala外掛 因為idea預設不支援scala開發環境,所以當需要使用idea搭建scala開發環境時,首先需要安裝scala外掛,具體安裝辦法如下。 1、開啟idea,點選configure下拉選單中的plugins選項: 2、在彈出對話方塊中點選紅框按鈕: 3、在彈出最新對話

通過IDEA搭建scala開發環境開發spark應用程式

一、idea社群版安裝scala外掛因為idea預設不支援scala開發環境,所以當需要使用idea搭建scala開發環境時,首先需要安裝scala外掛,具體安裝辦法如下。1、開啟idea,點選configure下拉選單中的plugins選項:2、在彈出對話方塊中點選紅框按鈕:3、在彈出最新對話方塊的搜尋欄輸

IDEA+scala外掛開發spark程式

spark由scala語言編寫,開發spark程式,自然也少不了scala環境,這裡介紹如何利用Intellij IDEA開發spark。1、環境準備。jdk,scala,idea這些對於本文來說都已經預設安裝。2、idea中安裝scala language外掛。File-&

使用idea和maven開發和打包scala和spark程式

使用idea構建maven管理的scala和spark程式,預設已經裝好了idea、scala並在idea中安裝了scala外掛。一、新建Maven專案開啟idea,點選File—New—Project,彈出如下介面,選擇Maven專案,檢視是否是正確的JDK配置項正常來說這

Springboot中使用Scala開發

odi ID dep prop jsoup client sse 簡單例子 sna 新建maven工程,添加pom依賴: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www

Scala系統學習(二):Scala開發環境安裝配置

www 執行 posit 令行 完成後 version 繼續 environ ava Scala可以安裝在任何基於UNIX/Linux或基於Windows的系統上。在您的機器上開始安裝Scala之前,必須在計算機上安裝Java 1.8或更高版本。 下面請按照以下步驟安裝S

IDEA搭建scala開發環境開發spark應用程序

編寫 運行程序 通過 https apach import input inf 搭建 一、idea社區版安裝scala插件 因為idea默認不支持scala開發環境,所以當需要使用idea搭建scala開發環境時,首先需要安裝scala插件,具體安裝辦法如下。 1、

安裝Scala開發環境

Scala 介紹   Step 1: 安裝 Java開發環境 Scala 版本與Java版本的相容關係 從Oracle網站下載JDK  URL: http://www.oracle.com/technetwork/java/javase/downloads/index

8年開發java程式設計師教你:JAVA開發應該學習什麼?讓你不迷茫

java入門學習有哪些內容?很多想學習java的學生都不知道怎麼學java,特別是沒有基礎的學生,今天8年開發的老程式設計師,給大家整理了一下,java入門學習有哪些內容: 第一階段 計算機基本原理,Java語言發展簡史,Java開發環境的搭建,體驗Java程式的開發,Java語法格式

關於vs開發windows程式過程中記憶體檢查二三事

做為一個C/C++程式設計師,面對資源管理是必不可少的。今天,我對我這些年的經驗的一些總結。 每一個程式在執行時都佔用一塊可用的記憶體空間,用於存放動態分配的物件,此記憶體空間稱為程式的自由儲存區或堆。 C 語言程式使用一對標準庫函式 malloc 和 free 在自由儲存區

想高效開發程式,mpvue瞭解下(一)

序言 小程式一定是今年熱門話題之一,對於我們開發者來講,開發小程式也是屬於我們的技能之一了。從去年我也玩過小程式,但當時處於內測的階段,各種反人類的設計都有,連es6都不支援,只能說瞎折騰了。到了如今,小程式迎來春天,友好度提高了不少,wepy、taro與mpvue的出現也帶來更高的開發

程式開發-小程式的組成

3.1 小程式的組成 WEB前端組成:HTML+CSS+JavaScript+AJAX+PHP介面 小程式的組成:WXML 模板 + WXSS 樣式 + JS 互動邏輯 + PHP介面 總結: WXML 模板 就是 HTML標籤,區別就是微信重新命名了新

程式開發-小程式開始開發及基本設定

3.0 小程式開始開發及基本設定 微信開發文件:https://developers.weixin.qq.com/miniprogram/dev/ 下載微信開發者工具 下載地址:https://developers.weixin.qq.com/min

程式開發-小程式介紹

小程式是什麼? 微信小程式(wei xin xiao cheng xu),簡稱小程式,英文名Mini Program,是一種不需要下載安裝即可使用的應用,它實現了應用“觸手可及”的夢想,使用者掃一掃或搜一下即可開啟應用。 小程式開發成本,大概只需要開發一個App成本的五

VS中用C#開發應用程式的除錯入門、技巧和例項(轉載)

入門篇 假設你是有著.Net平臺的程式設計師,並且使用Visual Studio 做為開發工具。 斷點:最簡單的一種,設定一個斷點,程式執行到那一句就自動中斷進入除錯狀態。設定斷點,在你覺得有問題的程式碼行,左側單擊,會出現紅色的紅點即斷點。 啟動調式:按F5,或者選單欄---調式---開始除錯,或

Cordova開發Android程式筆記一:開發環境搭建

  Cordova開發Android程式筆記一:開發環境搭建   一、Java開發環境搭建   參考資料:Eclipse+ADT+Android SDK 搭建安卓開發環境  https://www.cnblogs.com/zh719588366/p/