1. 程式人生 > >spark或sparksql對錶進行Join並將結果存為Avro格式

spark或sparksql對錶進行Join並將結果存為Avro格式

描述一下需求

有個表的結構如下

    Emp (
           Eno CHAR(4),
           Ename CHAR(8),
           Esex CHAR(1) CHECK(Esex IN  ('M','F')),
           EDno CHAR(4) REFERENCES Dept (Dno),
           PRIMARY KEY (Eno)
    );
    Dept (
           Dno CHAR(4)  NOT NULL UNIQUE,
           Dname CHAR(20),
           Daddr CHAR(30)
    );

上表簡單的分為Emp.txt和Dept.txt兩個文字檔案,資料具體內容自定義

用Spark和Spark SQL分別實現以下功能:

1.提取Eno,Ename,Esex,Dname,Daddr

2.用Avro格式將資料存到指定目錄下

在這裡主要想記錄的是:

1.使用spark,sql如何進行表的連線(Join)

2.如何將資料儲存為需要的格式

先放pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huangxiao</groupId>
    <artifactId>local_spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>
        <!-- https://mvnrepository.com/artifact/com.databricks/spark-avro -->
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>4.0.0</version>
    </dependency>
    </dependencies>

</project>

然後spark部分

package scala

import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import com.databricks.spark.avro._

//用spark完成對資料的提取和儲存操作
//用Spark和Spark SQL分別實現以下功能:提取Eno,Ename,Esex,Dname,Daddr,用Avro格式將資料存到xx目錄下

object quiz1_for_spark_521 {
  def main(args: Array[String]) {
    if (args.length < 2) {
      println("Usage:SparkWordCount FileName")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("log_deal").setMaster("local")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().appName("RDD to DataFrame").config(conf).getOrCreate()//新的sql api,等價於sqlContext().


    val Deptfile = sc.textFile(args(1))
    val DeptRDD=Deptfile.map{l =>
      val line = l.split(',')
      val dno=line(0)
      val dname=line(1)
      val daddr=line(2)
      (dno,dname+','+daddr)
    }

    val Empfile = sc.textFile(args(0))
    val EmpRDD=Empfile.map{l =>
      val line=l.split(',')
      val name=line(1)
      val sex=line(2)
      val edno=line(3)
      (edno,line(0)+','+name+','+sex)}
    //join兩個RDD
    val joinRDD=EmpRDD.join(DeptRDD).map(x=>x._2._1+','+x._2._2)
      .map{l=>
      val l_l=l.split(',')
      val eno=l_l(0)
      val ename=l_l(1)
      val esex= l_l(2)
      val dname= l_l(3)
      val daddr= l_l(4)
      Row(eno,ename,esex,dname,daddr)}

    //在這裡需要宣告一下表的結構。因為arvo需要列名
    val schema_final = StructType(
      Seq(
        //這裡的StringType注意別用錯了,是org.apache.spark.sql.types.StringType。不注意的話會用到org.apache.spark.sql.StringType
        StructField("Eno",types.StringType,true)
        ,StructField("Ename",types.StringType,true)
        ,StructField("Esex",types.StringType,true)
        ,StructField("Dname",types.StringType,true)
        ,StructField("Daddr",types.StringType,true)
      )
    )
    //將RDD與表結構進行匹配,生成DF便於avro格式的儲存
    val df=sparkSession.createDataFrame(joinRDD,schema_final)
    //這個方法是cloudera官網的例項,很多用法都有具體的例子 https://www.cloudera.com/documentation/enterprise/latest/topics/spark_avro.html  
    df.write.format("com.databricks.spark.avro").save(args(2))
    sc.stop()
  }

}

sparkSQL部分

package scala

import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import com.databricks.spark.avro._

object quiz1_for_sql_521 {
  def main(args: Array[String]) {
    if (args.length < 2) {
      println("Usage:SparkWordCount FileName")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("log_deal").setMaster("local")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().appName("RDD to DataFrame")
      .config(conf).getOrCreate()

    val EmpFile = sc.textFile(args(0))
    val EmpRDD = EmpFile.map{l=>
      val l_l=l.split(',')
      val eno=l_l(0)
      val ename=l_l(1)
      val esex= l_l(2)
      val edno = l_l(3)
      Row(eno,ename,esex,edno)}

    val schema_E = StructType(
      Seq(
        StructField("Eno",types.StringType,true)
        ,StructField("Ename",types.StringType,true)
        ,StructField("Esex",types.StringType,true)
        ,StructField("Dno",types.StringType,true)
      )
    )
    val df_E=sparkSession.createDataFrame(EmpRDD,schema_E)
//    df_E.createTempView("E")

    val DeptFile = sc.textFile(args(1))
    val DeptRDD = DeptFile.map{l=>
      val l_l=l.split(',')
      val dno=l_l(0)
      val dname=l_l(1)
      val daddr= l_l(2)
      Row(dno,dname,daddr)}

    val schema = StructType(
      Seq(
        StructField("Dno",types.StringType,true)
        ,StructField("Dname",types.StringType,true)
        ,StructField("Daddr",types.StringType,true)
      )
    )


    val df_D=sparkSession.createDataFrame(DeptRDD,schema)
//    df_D.createTempView("D")
    //此處是DataFrame的join方法
    val final_df=df_E.join(df_D,"Dno")

    //這裡用的也是sql新的api,建立一個檢視後就可以中sql語句進行分析啦
    final_df.createTempView("E_D")
    val results = sparkSession.sql("SELECT Eno,Ename,Esex,Dname,Daddr FROM E_D")
    results.show()
    results.write.format("com.databricks.spark.avro").save(args(2))
    sc.stop()
  }

}


相關推薦

sparksparksql進行Join結果Avro格式

描述一下需求有個表的結構如下 Emp ( Eno CHAR(4), Ename CHAR(8), Esex CHAR(1) CHECK(Esex IN ('M','F')), ED

php 高效、非遞迴迴圈所有下級,結果一維陣列

1.取出所有資料 public function teammember($id){ $next = pdo_fetchall("select id,openid,agentid from ".tablename("ewei_shop_member")."

pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源結果寫入MySQL中

目錄 PvUvToMysql類 ConnectionUtils類 jdbc.properties檔案 在IDEA中打jar包的兩種方式 IDEA打jar包 IDEA中maven方式打jar包 提交spark程式ja

spark讀hdfs檔案實現wordcount結果回hdfs

package iie.udps.example.operator.spark; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.Jav

JAVA-阿里雲OSS檔案下載檔案壓縮ZIP格式儲存

一,引言 由於公司業務功能需求,需要從阿里雲OSS(Object Storage Service,物件儲存服務)中獲取檔案並打壓縮成ZIP格式,在這次開發中使用了OSS檔案下載相關服務,檔案壓縮功能使用的是commons-compress-x.x.jar中提供的功能。 二

mysql查詢某一欄位,結果拼接一個字串

select GROUP_CONCAT(uid) from users使用GROUP_CONCAT()函式,預設以‘,’將拼接的字串隔開,得到類似以下形式的字串:“1,2,3,4,5,6,”使用DIST

ffmpeg學習八:軟體生成yuv420p視訊其編碼H264格式

通過前面對ffmpeg中常用的幾個api的原始碼分析,從而對api有了更好的理解。之前已經做過視訊的解碼了,今天來嘗試視訊的編碼。ffmpeg已經給我們提供了相應的可供參考的程式:doc/examples/decoding_encoding.c檔案就是解碼和編碼

ORACLE中建立表、進行增刪改查的語法

最近在學習ORACLE,現將在ORACLE中建立表、對錶進行增刪該查的語法總結如下: 表是一種資料庫物件,是基本的資料儲存單位,由行和列組成 表的建立(以課程資訊表為例): CREATE TABLE OBJECTS       

分割流:例如,一個圖片(53k)以10k單位(單位的大小可隨著檔案的大小進行調整)進行分割,並將分割資訊以鍵值的形式儲存到.properties檔案中。最後還可以分割的檔案能夠完整的合併在一起

將一個53k的圖片以10k為單位進行分割,最後再將分割的檔案合併到一起。 首先進行檔案的分割,這裡使用了兩種方法: (1)其中splitFile(file)方法只是簡單地將圖片進行了分割。 (2)splitFile_2(file)方法除了將檔案進行分割,還將一些配置資訊進行了儲存

使用線上重定義進行分割槽 -- 基於rowid

RDBMS 11.2.0.4  之前對錶進行 線上重定義分割槽。主要使用的是primary key的方式。這次因為表上沒有primary key,所以通過rowid進行。以下為一個簡單的示例,該示例未涉及到索引等(其實索引也很簡單,設定到tmp表上帶過去就行了) 原始的表

第二章 集合與排序 3-1 進行聚合排序

一、聚合函式 用於彙總的函式稱為聚合函式或者聚集函式。所謂聚合,就是將多行彙總為一行。 二、計算表中資料的行數(COUNT()函式) 1、計算全部資料的行數。 SELECT COUNT(*) FROM Product; 2、計算NULL之外的資料的行數 將包含NULL值的列作為引數時,輸出的結果為非空的行

python 連線oracle資料庫進行增刪改查操作

Python 建立連線oracle資料庫的三種方式: 方式一:使用者名稱、密碼和監聽分開寫 import cx_Oracle db=cx_Oracle.connect('username/[email protected]/orcl') db.close()

小王說來來鬥地主,但是木有牌,怎麼辦-------模擬鬥地主洗牌和發牌 進行排序 同時使用Map,List,Set等集合

直接上程式碼了。 package cn.ketang.lianxi03; import java.util.ArrayList; import java.util.Collections; /** * ArrayList實現 * 模擬鬥地主洗牌和發牌 * 分析:

Docker Maven外掛(專案進行打包使用docker執行產生映象)

1. 使用dokerfile 進行構建    建立dockerfile2. maven 外掛12345678910111213141516<plugin><groupId>com.spotify</groupId><artifactI

在entity framework 中使用 LINQ 進行左關聯查詢且group by 分組查詢的示例,並且按小時分組查詢時間段

有表RealTimeDatas的欄位RecordTime儲存了實時時間,格式為DateTime 現在需要以小時進行分組統計每個時間段的最大值,最小值,和平均值 同時,另一個表Devices中有標準溫度溼度最大最小值範圍,需要將這個結果一併關聯到查詢結果中    

mono for android中使用dapperpetapocosqlite進行資料操作

在mono for android中使用dapper或petapoco,很簡單,新建android 類庫專案,直接把原來的檔案複製過來,對Connection連線報錯部分進行註釋和修改就可以運行了.(用可移植類庫PCL專案也可以的.) 如果需要原始碼可以聯絡我.10元收費哈.. 以下內容包括 1.在安卓中建

MAP 進行排序 遍歷取值

/** * 對map進行降序排列 * @param goodsCateMap * @return */ private Map<String, Long> Descend

mybatis環境搭建,進行增刪改查(通過id,查詢所有行(list返回),通過兩個關鍵字進行查詢)

搭建mybatis 開發環境 1.    引入jar包 Mybatis 3.2.2.jar    ojdbc5.jar    log4j-1.2.17.jar(列印日誌,可以看到mybatis的具體實現) 2.    為mybatis 設定執行環境(通過配置檔案) myba

visio 禁止允許形狀進行更改

若要進行形狀保護,需要能看到“開發工具”選項卡。預設情況下,該選項卡是隱藏的。 檢視“開發工具”選項卡 單擊“檔案”選項卡。單擊“選項”。單擊“高階”,然後向下滾動到“常規”部分。選擇“以開發人員模式執行”。 禁止或允許對形狀屬性進行更改 選擇形狀。在“開發工具”選項卡上的“形狀設計”組中,單擊“保護”。選

mybatis(3)---使用mybatis進行CRUD操作

前一節介紹過如何使用mybatis查詢資料庫表t_user中的資料,本節介紹如何使用mybatis對資料庫表t_user進行crud操作 步驟如下: 1、UserMapper.xml檔案內容如下:<?xml version="1.0" encoding="UTF-8"