【Spark】SparkSql分析結果寫入Mysql
文章目錄
- 前言
- 裝備
- Core Code
- 1. Mysql資料庫建結果表
- 2. DB配置檔案
- 3. 搞個檔案
- 4. 資料分層
- 5. SparkJob父類
- 6. MetroAnalysisJob(具體業務sparkjob)
- 7. SparkJob工具類
- 8. MySQLjdbcConfig
- 9. Running Result
- Github
- 總結
前言
hadoop完全分散式叢集搭建剛寫完。還是先寫一下應用。這裡寫一下sparksql怎麼應用起來。
Spark SQL是spark家族裡面最常用的。在實際開發當中也比RDD常用。
題外話。想了解RDD的java函數語言程式設計可以看
【Spark】SparkRDD開發手冊(JavaAPI函數語言程式設計) https://blog.csdn.net/HuHui_/article/details/83905308
-
我們先看一下wiki(來歷):
Spark SQL在Spark核心上帶出一種名為SchemaRDD的資料抽象化概念,提供結構化和半結構化資料相關的支援。Spark SQL提供了領域特定語言,可使用Scala、Java或Python來操縱SchemaRDDs。它還支援使用使用命令列介面和ODBC/JDBC伺服器操作SQL語言。在Spark 1.3版本,SchemaRDD被重新命名為DataFrame。
-
重點注意
以前專案經常能看到Hive解析MapReduce提交到叢集上執行。後來出現了Spark,又有了Hive解析成SparkJob提交到叢集上執行。這裡解釋一下
SparkSQL的前身是Shark,但又因為Shark對於Hive的太多依賴,2014年spark團隊停止對Shark的開發,將所有資源放SparkSQL專案上,SparkSQL作為Spark生態的一員逐漸發展,而不再受限於Hive,只是相容Hive;Hive on Spark是由Cloudera發起,由Intel、MapR等公司共同參與的開源專案,2014年spark團隊停止對Shark的開發,將所有資源放SparkSQL專案上,也就是說,Hive將不再受限於一個引擎,可以採用Map-Reduce、Tez、Spark等引擎
-
應用場景
可以說大資料應用來說是最簡單的,最方便應用的計算方式。通過對資料來源的讀取後,使用sql語言即可分析並解決大部分的大資料分析計算問題。一般在資料分層裡面不管是源資料計算還是業務資料分析都十分常用。
-
小編提醒
我github的例子不需要你去安裝spark環境。單機版的在本機local既可用。
以後更新完所有的基礎用法。用一個專案把全部應用起來。就不能用單機版跑了。
但是學習過程並不需要被這些束縛。
最常用的是計算第一第二層資料,存hbase,然後計算第三層資料(業務相關),存到結果表(ES or Mysql…etc)。
裝備
- maven依賴 我這裡使用spark2.1.1
- sql基礎
- mysql(其他database也可以,用於計算結果儲存)
Core Code
1. Mysql資料庫建結果表
CREATE TABLE `hui_metro_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`from_station` varchar(255) DEFAULT NULL,
`to_station` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL,
`distance` double DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=201743 DEFAULT CHARSET=utf8;
2. DB配置檔案
db.url=jdbc:mysql://127.0.0.1:3306/hui?characterEncoding=UTF-8
db.user=root
db.password=123456
db.driver=com.mysql.jdbc.Driver
3. 搞個檔案
{ "from_station":"西朗","to_station":"坑口","count":"1","distance":"1.6" },
{ "from_station":"西朗","to_station":"花地灣","count":"2","distance":"2.5" },
{ "from_station":"西朗","to_station":"芳村","count":"3","distance":"3.8" },
{ "from_station":"西朗","to_station":"黃沙","count":"4","distance":"5.2" },
{ "from_station":"西朗","to_station":"長壽路","count":"5","distance":"6.1" },
{ "from_station":"西朗","to_station":"陳家祠","count":"6","distance":"7.3" },
{ "from_station":"西朗","to_station":"西門口","count":"7","distance":"8.3" },
{ "from_station":"西朗","to_station":"公園前","count":"8","distance":"9.1" },
{ "from_station":"西朗","to_station":"農講所","count":"9","distance":"10.3" },
{ "from_station":"西朗","to_station":"烈士陵園","count":"10","distance":"11.3" }
4. 資料分層
我這裡沒有什麼資料分層,根據實際需要。我是直接把分析結果檔案存入mysql
5. SparkJob父類
/**
* <b><code>SparkJob</code></b>
* <p/>
* Description:
* <p/>
* <b>Creation Time:</b> 2018/11/11 17:39.
*
* @author Hu Weihui
*/
public class SparkJob {
/**
* The constant LOGGER.
*
* @since hui_project 1.0.0
*/
private static final Logger LOGGER = LoggerFactory.getLogger(SparkJob.class);
/**
* The constant serialVersionUID.
*
* @since hui_project 1.0.0
*/
private static final long serialVersionUID = 771902776566370732L;
/**
* Instantiates a new Spark job.
*/
protected SparkJob(){}
/**
* Execute.
*
* @param sparkContext the spark context
* @param args the args
* @since hui_project 1.0.0
*/
public void execute(JavaSparkContext sparkContext, String[] args) {
}
/**
* Execute.
*
* @param sparkContext the spark context
* @since hui_project 1.0.0
*/
public void execute(JavaSparkContext sparkContext) {
}
}
6. MetroAnalysisJob(具體業務sparkjob)
/**
* <b><code>MetroAnalysisJob</code></b>
* <p/>
* Description:
* <p/>
* <b>Creation Time:</b> 2018/11/11 17:32.
*
* @author Hu Weihui
*/
public class MetroAnalysisJob extends SparkJob {
private static Logger LOGGER = LoggerFactory.getLogger(MetroAnalysisJob.class);
private static final String INPUT_FILE_PATH
= MetroAnalysisJob.class.getClassLoader().getResource("test.json").toString();
private static final String OUTPUT_FILE_PATH
= "D:/test/test";
private static final String SQL = "select * from hui_metro_testjson";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("test")
.setMaster("local[4]");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
MetroAnalysisJob metroAnalysisJob = new MetroAnalysisJob();
metroAnalysisJob.execute(sparkContext, args);
}
@Override
public void execute(JavaSparkContext sparkContext, String[] args) {
super.execute(sparkContext, args);
deal(sparkContext, INPUT_FILE_PATH, OUTPUT_FILE_PATH);
}
/**
* 資料邏輯處理
* @param sparkContext
* @param inPutPath
* @param outPutPath
*/
private void deal(JavaSparkContext sparkContext, String inPutPath, String outPutPath) {
SparkJobUtil.checkFileExists(inPutPath);
SQLContext sqlContext = new SQLContext(sparkContext);
// sqlContext.setConf("spark.sql.parquet.binaryAsString","true");
//建立快照臨時表
Dataset<Row> dataset = sqlContext.read().json(inPutPath);
dataset.registerTempTable("hui_metro_testjson");
dataset.show(10);
Dataset<Row> resultFrame = sqlContext.sql(SQL);
if (resultFrame.count() > 0) {
resultFrame.repartition(3).write()
.mode(SaveMode.Append).json(outPutPath);
}
resultFrame.show(10);
//結果寫入資料庫
MySQLJdbcConfig jdbcConfig = new MySQLJdbcConfig();
jdbcConfig.init();
resultFrame.write().mode("append")
.jdbc(jdbcConfig.getUrl(), "hui_metro_test", jdbcConfig.getConnectionProperties());
}
}
7. SparkJob工具類
/**
* <b><code>SparkJobUtil</code></b>
* <p/>
* Description:
* <p/>
* <b>Creation Time:</b> 2018/11/11 17:48.
*
* @author Hu Weihui
*/
public class SparkJobUtil {
/**
* The constant LOGGER.
*
* @since hui_project 1.0.0
*/
private static final Logger LOGGER = LoggerFactory.getLogger(SparkJobUtil.class);
/**
* Close quietly.
*
* @param fileSystem the file system
* @since hui_project 1.0.0
*/
public static void closeQuietly(FileSystem fileSystem) {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
LOGGER.error("Fail to close FileSystem:" + fileSystem, e);
}
}
}
/**
* Check file exists.
*
* @param path the path
* @since hui_project 1.0.0
*/
public static void checkFileExists(String path) {
Configuration configuration = new Configuration();
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(configuration);
if (!fileSystem.exists(new Path(path))) {
throw new FileNotFoundException(path);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
closeQuietly(fileSystem);
}
}
}
8. MySQLjdbcConfig
主要是讀取配置檔案然後傳入DataFrame去連線
/**
* <b><code>MySQLJdbcConfig</code></b>
* <p/>
* Description:
* <p/>
* <b>Creation Time:</b> 2018/11/11 17:32.
*
* @author Hu Weihui
*/
public class MySQLJdbcConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLJdbcConfig.class);
private String table;
private String url;
private Properties connectionProperties;
public void init(){
Properties properties = new Properties();
InputStream resourceAsStream = this.getClass().getClassLoader().getResourceAsStream("jdbc.properties");
try {
properties.load(resourceAsStream);
setUrl(properties.getProperty("db.url"));
//考慮多資料來源的情況,另外建立properties傳入
Properties connectionProperties = new Properties();
connectionProperties.setProperty("user",properties.getProperty("db.user"));
connectionProperties.setProperty("password",properties.getProperty("db.password"));
connectionProperties.setProperty("url",properties.getProperty("db.url"));
setConnectionProperties(connectionProperties);
} catch (IOException e) {
LOGGER.info("讀取配置檔案失敗");
}
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Properties getConnectionProperties() {
return connectionProperties;
}
public void setConnectionProperties(Properties connectionProperties) {
this.connectionProperties = connectionProperties;
}
}
9. Running Result
Github
只要自己建立個數據庫就可以跑了。
往後更新H2資料庫 直接跑H2,資料庫都不需要建立
https://github.com/ithuhui/hui-bigdata-spark/tree/master/src/main/java/com/bigdata/spark/sparksql
總結
- Spark不難,但是原理需要我們去理解。以後再更新原始碼方面,更詳細的東西
- 資料分層很重要
- 轉載註明一下作者唄 。 感謝小哥哥小姐姐~
- 喜歡的話留下評論討論問題,如果能幫到你們很開心。