sparkstreaming結合sparksql-2.x實時向hive中寫資料
今天主要來介紹一下SparkSql,2.x新版本操作hive的一個寫法.
Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也能夠被用於從已存在的 Hive 環境中讀取資料
廢話不多說,直接上程式碼:
package spark import java.io.File import java.util import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams} import net.sf.json.JSONObject import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies} import redis.RedisPool /** * 利用sparksql 2.0向hive中寫資料; */ object SparkSqlDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.INFO) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath val spark = SparkSession.builder().appName("Spark SQL Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate() spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "2000") spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") spark.conf.set("spark.streaming.concurrentJobs", "10") spark.conf.set("spark.streaming.kafka.maxRetries", "50") @transient val sc = spark.sparkContext val scc = new StreamingContext(sc, Seconds(2)) val topic = "jason_20180511" val topicSet: Set[String] = Set(topic) //設定kafka的topic; val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker") , "group.id" -> PropertiesScalaUtils.loadProperties("groupId") , "enable.auto.commit" -> (false: java.lang.Boolean) ) val maxTotal = 200 val maxIdle = 100 val minIdle = 10 val testOnBorrow = false val testOnReturn = false val maxWaitMillis = 500 RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis) val jedis = RedisPool.getPool.getResource jedis.select(dbIndex) val keys: util.Set[String] = jedis.keys(topic + "*") if (keys.size() == 0) { kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } else { val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets)) } RedisPool.getPool.returnResource(jedis) kafkaStreams.foreachRDD(rdd=>{ val jedis_jason = RedisPool.getPool.getResource jedis_jason.select(dbIndex) val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges import spark.sql if(!rdd.isEmpty()){ val rowRDD:RDD[Row] = rdd.map(x=>{ val json = JSONObject.fromObject(x.value().toString) val a = json.get("name") val b = json.get("addr") Row(a,b) }) val schemaString = "name addr" val field = schemaString.split(" ").map(x=> StructField(x,StringType,nullable = true)) val schema = StructType(field) val df = spark.createDataFrame(rowRDD, schema) df.show() df.createOrReplaceTempView("tempTable") val sq = "insert into test_2 select * from tempTable" sql(sq) println("插入hive成功了") } offsetRanges.foreach { offsetRange => println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "") } }) scc.start() scc.awaitTermination() } }
需要注意的是: spark.sql.warehouse.dir 配置的目錄,該目錄預設為Spark應用程式當前目錄中的 spark-warehouse 目錄 但請注意,自從2.0.0以來,hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已被棄用。 而是使用 spark.sql.warehouse.dir 來指定倉庫中資料庫的預設位置。
還有一個問題是,這樣寫的話,會在hdfs上這個表的目錄下生成很多的小檔案,這個時候如果想在hive中進行統計,計算的時候,會產生很多個map,嚴重影響計算的速度,大家可以先考慮下這個問題.
為了解決在表目錄下面生成很多小檔案的問題,我們可以把hive表建成一個分割槽表,怎麼建分割槽表在我的另一篇blog裡面有寫到,或者可以直接用: insert overwrite table combine_data partition (day_time='2018-08-01') select data,enter_time from combine_data where day_time = '2018-08-01';來合併小檔案.
或者也可把用reparation減少分割槽數,但是這麼寫,會減少rdd的並行度,降低效能,自己參考使用.
如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝
參考:
http://spark.apachecn.org/docs/cn/2.2.0/sql-programming-guide.html點選開啟連結
相關推薦
sparkstreaming結合sparksql-2.x實時向hive中寫資料
今天主要來介紹一下SparkSql,2.x新版本操作hive的一個寫法. Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也能夠被用於從已存在的 Hive 環境中讀取資料 廢話不多說,直接上程式碼: package spark import j
使用Sqoop從PostgreSQL向Hive中遷移資料遇到的問題
postgreSQL的介面 跟mysql不同之處就是,多了一個 2 ,這也是導致資料遷移錯誤原因 1.資料庫名稱 2.schema 3.表名 PostgreSQL中的物件屬於三層模型,即database->schema->table。
SparkStreaming向Hbase中寫資料
在SparkStreaming中統計了資料之後,我們需要將結果寫入外部檔案系統。 首先,需要說一下,下面的這個方法。 foreachRDD(func) 最通用的輸出操作,把func作用於從stream生成的每一個RDD。 注意:這個函式是在 執行streaming程式的dr
sparkStreaming結合sparkSql進行日誌分析
pass mysql foreach eat instance dataframe sel park array package testimport java.util.Propertiesimport org.apache.spark.SparkConfimport o
sparkStreaming結合SparkSql實例
top alt comment each 語句 cit 計算 相關 分隔 SparkSQL結合SparkStreaming的使用 Flume+Kafka+SparkStreaming已經發展為一個比較成熟的實時日誌收集與計算架構,利用Kafka,即可以支持將用於離線分析的
sparksql 2.x demo I
package com.ws.sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql._ import or
sparksql讀取hive中的資料儲存到hdfs中
package wondersgroup_0905_Test import org.apache.spark.sql.SparkSession object sparkHive { def main(args: Array[String]): Unit = { //資料庫名稱
SparkSQL讀取Hive中的資料
注意紅色字。 ---------------------- 由於我Spark採用的是Cloudera公司的CDH,並且安裝的時候是線上自動安裝和部署的叢集。最近在學習SparkSQL,看到SparkSQL on HIVE。下面主要是介紹一下如何通過SparkSQL在
新版VirtualBox5.2.x設定Ubuntu與windows共享資料夾的方法
1、安裝增強功能首先點選“裝置”——“安裝增強功能”。即可在主資料夾下看到:點選執行軟體,完成增強功能的安裝。2、設定共享資料夾點選“裝置”——“共享資料夾”,新增共享資料夾,並勾選“自動掛載”和“固定分配”。如圖:重啟ubuntu,即可看到共享資料夾(所在目錄為“ /med
Spark 2.x 決策樹 示例程式碼-IRIS資料集
資料集下載 下載連結 程式碼 package Iris; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import o
分散式sparkSQL引擎應用:從遠端通過thriftServer連線spark叢集處理hive中的資料
實現原理: 客戶端(java程式)與thriftServer連線,thriftServer再代理客戶端轉換成spark的操作流程,再載入hive的資料到spark的worker節點,並執行Map-Re
Hive通過查詢語句向表中插入資料過程中發現的坑
前言最近在學習使用Hive(版本0.13.1)的過程中,發現了一些坑,它們或許是Hive提倡的比關係資料庫更加自由的體現(同時引來一些問題),或許是一些bug。總而言之,這些都需要使用Hive的開發人員額外注意。本文旨在列舉我發現的3個通過查詢語句向表中插入資料過程中的問題,
Boot 2.x 普羅米修斯資料採集
<!-- boot2.x 相容--> <!-- The client --> <dependency
Spring Boot 2.x基礎教程:配置元資料的應用
在使用Spring Boot開發應用的時候,你是否有發現這樣的情況:自定義屬性是有高量背景的,滑鼠放上去,有一個`Cannot resolve configuration property`的配置警告。 ![](https://img2020.cnblogs.com/other/626506/202101/
通過資料庫批量向kettle中插入資料,建立trans和job的模板(按照不同的要求需要自行調整)
import psycopg2 # 用來操作資料庫的類 class GPCommand(object): # 類的初始化 def __init__(self): self.hostname = 'XXX.XX.X.XX' self.username
VS2013 c++連結資料庫,應用儲存過程,向資料庫中寫入資料
// ConsoleApplication1.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include "iomanip" using namespace std; #import "c:\Program Files\Common Files\S
使用spark將hive中的資料匯入到mongodb
import com.huinong.truffle.push.process.domain.common.constant.Constants; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.WriteConf
Pig指令碼從Hive中load資料並存入到Hbase中
1、我們先建一個Hive表test01: create table test01(name String, age int, phone String,province String, city String) ROW FORMAT DELIMITED FIELDS TERMINATED B
【嵌入式 C】向地址空間寫資料
做嵌入式開發的,一定免不了和硬體打交道。我們通過讀寫暫存器的方式來管理相關的硬體,這裡的暫存器並不是CPU空間中的暫存器,我們的暫存器實際上是一個地址單元(屬於特殊暫存器),燒寫程式時要被對映到地址空間上。所以我們通過讀寫地址這種方法來訪問暫存器是避免不了的。 方法: (*(volatile
【JEECG示例文件】使用Kettle從mysql向oracle中抽取資料
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!