spark或sparksql對錶進行Join並將結果存為Avro格式
描述一下需求
有個表的結構如下
Emp ( Eno CHAR(4), Ename CHAR(8), Esex CHAR(1) CHECK(Esex IN ('M','F')), EDno CHAR(4) REFERENCES Dept (Dno), PRIMARY KEY (Eno) ); Dept ( Dno CHAR(4) NOT NULL UNIQUE, Dname CHAR(20), Daddr CHAR(30) );
上表簡單的分為Emp.txt和Dept.txt兩個文字檔案,資料具體內容自定義
用Spark和Spark SQL分別實現以下功能:
1.提取Eno,Ename,Esex,Dname,Daddr
2.用Avro格式將資料存到指定目錄下
在這裡主要想記錄的是:
1.使用spark,sql如何進行表的連線(Join)
2.如何將資料儲存為需要的格式
先放pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huangxiao</groupId> <artifactId>local_spark</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.databricks/spark-avro --> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>4.0.0</version> </dependency> </dependencies> </project>
然後spark部分
package scala import org.apache.spark.sql.{Row, SparkSession, types} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} import com.databricks.spark.avro._ //用spark完成對資料的提取和儲存操作 //用Spark和Spark SQL分別實現以下功能:提取Eno,Ename,Esex,Dname,Daddr,用Avro格式將資料存到xx目錄下 object quiz1_for_spark_521 { def main(args: Array[String]) { if (args.length < 2) { println("Usage:SparkWordCount FileName") System.exit(1) } val conf = new SparkConf().setAppName("log_deal").setMaster("local") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder().appName("RDD to DataFrame").config(conf).getOrCreate()//新的sql api,等價於sqlContext(). val Deptfile = sc.textFile(args(1)) val DeptRDD=Deptfile.map{l => val line = l.split(',') val dno=line(0) val dname=line(1) val daddr=line(2) (dno,dname+','+daddr) } val Empfile = sc.textFile(args(0)) val EmpRDD=Empfile.map{l => val line=l.split(',') val name=line(1) val sex=line(2) val edno=line(3) (edno,line(0)+','+name+','+sex)} //join兩個RDD val joinRDD=EmpRDD.join(DeptRDD).map(x=>x._2._1+','+x._2._2) .map{l=> val l_l=l.split(',') val eno=l_l(0) val ename=l_l(1) val esex= l_l(2) val dname= l_l(3) val daddr= l_l(4) Row(eno,ename,esex,dname,daddr)} //在這裡需要宣告一下表的結構。因為arvo需要列名 val schema_final = StructType( Seq( //這裡的StringType注意別用錯了,是org.apache.spark.sql.types.StringType。不注意的話會用到org.apache.spark.sql.StringType StructField("Eno",types.StringType,true) ,StructField("Ename",types.StringType,true) ,StructField("Esex",types.StringType,true) ,StructField("Dname",types.StringType,true) ,StructField("Daddr",types.StringType,true) ) ) //將RDD與表結構進行匹配,生成DF便於avro格式的儲存 val df=sparkSession.createDataFrame(joinRDD,schema_final) //這個方法是cloudera官網的例項,很多用法都有具體的例子 https://www.cloudera.com/documentation/enterprise/latest/topics/spark_avro.html df.write.format("com.databricks.spark.avro").save(args(2)) sc.stop() } }
sparkSQL部分
package scala
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import com.databricks.spark.avro._
object quiz1_for_sql_521 {
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage:SparkWordCount FileName")
System.exit(1)
}
val conf = new SparkConf().setAppName("log_deal").setMaster("local")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().appName("RDD to DataFrame")
.config(conf).getOrCreate()
val EmpFile = sc.textFile(args(0))
val EmpRDD = EmpFile.map{l=>
val l_l=l.split(',')
val eno=l_l(0)
val ename=l_l(1)
val esex= l_l(2)
val edno = l_l(3)
Row(eno,ename,esex,edno)}
val schema_E = StructType(
Seq(
StructField("Eno",types.StringType,true)
,StructField("Ename",types.StringType,true)
,StructField("Esex",types.StringType,true)
,StructField("Dno",types.StringType,true)
)
)
val df_E=sparkSession.createDataFrame(EmpRDD,schema_E)
// df_E.createTempView("E")
val DeptFile = sc.textFile(args(1))
val DeptRDD = DeptFile.map{l=>
val l_l=l.split(',')
val dno=l_l(0)
val dname=l_l(1)
val daddr= l_l(2)
Row(dno,dname,daddr)}
val schema = StructType(
Seq(
StructField("Dno",types.StringType,true)
,StructField("Dname",types.StringType,true)
,StructField("Daddr",types.StringType,true)
)
)
val df_D=sparkSession.createDataFrame(DeptRDD,schema)
// df_D.createTempView("D")
//此處是DataFrame的join方法
val final_df=df_E.join(df_D,"Dno")
//這裡用的也是sql新的api,建立一個檢視後就可以中sql語句進行分析啦
final_df.createTempView("E_D")
val results = sparkSession.sql("SELECT Eno,Ename,Esex,Dname,Daddr FROM E_D")
results.show()
results.write.format("com.databricks.spark.avro").save(args(2))
sc.stop()
}
}
相關推薦
spark或sparksql對錶進行Join並將結果存為Avro格式
描述一下需求有個表的結構如下 Emp ( Eno CHAR(4), Ename CHAR(8), Esex CHAR(1) CHECK(Esex IN ('M','F')), ED
php 高效、非遞迴迴圈所有下級,並將結果存為一維陣列
1.取出所有資料 public function teammember($id){ $next = pdo_fetchall("select id,openid,agentid from ".tablename("ewei_shop_member")."
pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源並將結果寫入MySQL中
目錄 PvUvToMysql類 ConnectionUtils類 jdbc.properties檔案 在IDEA中打jar包的兩種方式 IDEA打jar包 IDEA中maven方式打jar包 提交spark程式ja
spark讀hdfs檔案實現wordcount並將結果存回hdfs
package iie.udps.example.operator.spark; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.Jav
JAVA-阿里雲OSS檔案下載並將檔案壓縮為ZIP格式儲存
一,引言 由於公司業務功能需求,需要從阿里雲OSS(Object Storage Service,物件儲存服務)中獲取檔案並打壓縮成ZIP格式,在這次開發中使用了OSS檔案下載相關服務,檔案壓縮功能使用的是commons-compress-x.x.jar中提供的功能。 二
mysql查詢某一欄位,並將結果拼接為一個字串
select GROUP_CONCAT(uid) from users使用GROUP_CONCAT()函式,預設以‘,’將拼接的字串隔開,得到類似以下形式的字串:“1,2,3,4,5,6,”使用DIST
ffmpeg學習八:軟體生成yuv420p視訊並將其編碼為H264格式
通過前面對ffmpeg中常用的幾個api的原始碼分析,從而對api有了更好的理解。之前已經做過視訊的解碼了,今天來嘗試視訊的編碼。ffmpeg已經給我們提供了相應的可供參考的程式:doc/examples/decoding_encoding.c檔案就是解碼和編碼
ORACLE中建立表、對錶進行增刪改查的語法
最近在學習ORACLE,現將在ORACLE中建立表、對錶進行增刪該查的語法總結如下: 表是一種資料庫物件,是基本的資料儲存單位,由行和列組成 表的建立(以課程資訊表為例): CREATE TABLE OBJECTS
分割流:例如,將一個圖片(53k)以10k為單位(單位的大小可隨著檔案的大小進行調整)進行分割,並將分割資訊以鍵值對的形式儲存到.properties檔案中。最後還可以將分割的檔案能夠完整的合併在一起
將一個53k的圖片以10k為單位進行分割,最後再將分割的檔案合併到一起。 首先進行檔案的分割,這裡使用了兩種方法: (1)其中splitFile(file)方法只是簡單地將圖片進行了分割。 (2)splitFile_2(file)方法除了將檔案進行分割,還將一些配置資訊進行了儲存
使用線上重定義對錶進行分割槽 -- 基於rowid
RDBMS 11.2.0.4 之前對錶進行 線上重定義分割槽。主要使用的是primary key的方式。這次因為表上沒有primary key,所以通過rowid進行。以下為一個簡單的示例,該示例未涉及到索引等(其實索引也很簡單,設定到tmp表上帶過去就行了) 原始的表
第二章 集合與排序 3-1 對錶進行聚合排序
一、聚合函式 用於彙總的函式稱為聚合函式或者聚集函式。所謂聚合,就是將多行彙總為一行。 二、計算表中資料的行數(COUNT()函式) 1、計算全部資料的行數。 SELECT COUNT(*) FROM Product; 2、計算NULL之外的資料的行數 將包含NULL值的列作為引數時,輸出的結果為非空的行
python 連線oracle資料庫對錶進行增刪改查操作
Python 建立連線oracle資料庫的三種方式: 方式一:使用者名稱、密碼和監聽分開寫 import cx_Oracle db=cx_Oracle.connect('username/[email protected]/orcl') db.close()
小王說來來鬥地主,但是木有牌,怎麼辦-------模擬鬥地主洗牌和發牌 對牌進行排序 並同時使用Map,List,Set等集合
直接上程式碼了。 package cn.ketang.lianxi03; import java.util.ArrayList; import java.util.Collections; /** * ArrayList實現 * 模擬鬥地主洗牌和發牌 * 分析:
Docker Maven外掛(對專案進行打包並使用docker執行產生映象)
1. 使用dokerfile 進行構建 建立dockerfile2. maven 外掛12345678910111213141516<plugin><groupId>com.spotify</groupId><artifactI
在entity framework 中使用 LINQ 對錶進行左關聯查詢且group by 分組查詢的示例,並且按小時分組查詢時間段
有表RealTimeDatas的欄位RecordTime儲存了實時時間,格式為DateTime 現在需要以小時進行分組統計每個時間段的最大值,最小值,和平均值 同時,另一個表Devices中有標準溫度溼度最大最小值範圍,需要將這個結果一併關聯到查詢結果中
mono for android中使用dapper或petapoco對sqlite進行資料操作
在mono for android中使用dapper或petapoco,很簡單,新建android 類庫專案,直接把原來的檔案複製過來,對Connection連線報錯部分進行註釋和修改就可以運行了.(用可移植類庫PCL專案也可以的.) 如果需要原始碼可以聯絡我.10元收費哈.. 以下內容包括 1.在安卓中建
對MAP 進行排序 並遍歷取值
/** * 對map進行降序排列 * @param goodsCateMap * @return */ private Map<String, Long> Descend
mybatis環境搭建,對錶進行增刪改查(通過id,查詢所有行(list返回),通過兩個關鍵字進行查詢)
搭建mybatis 開發環境 1. 引入jar包 Mybatis 3.2.2.jar ojdbc5.jar log4j-1.2.17.jar(列印日誌,可以看到mybatis的具體實現) 2. 為mybatis 設定執行環境(通過配置檔案) myba
visio 禁止或允許對形狀進行更改
若要進行形狀保護,需要能看到“開發工具”選項卡。預設情況下,該選項卡是隱藏的。 檢視“開發工具”選項卡 單擊“檔案”選項卡。單擊“選項”。單擊“高階”,然後向下滾動到“常規”部分。選擇“以開發人員模式執行”。 禁止或允許對形狀屬性進行更改 選擇形狀。在“開發工具”選項卡上的“形狀設計”組中,單擊“保護”。選
mybatis(3)---使用mybatis對錶進行CRUD操作
前一節介紹過如何使用mybatis查詢資料庫表t_user中的資料,本節介紹如何使用mybatis對資料庫表t_user進行crud操作 步驟如下: 1、UserMapper.xml檔案內容如下:<?xml version="1.0" encoding="UTF-8"