1. 程式人生 > >使用spark將hive中的資料匯入到mongodb

使用spark將hive中的資料匯入到mongodb

import com.huinong.truffle.push.process.domain.common.constant.Constants;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import java.io.Serializable; import java.util.*; @Slf4j public class SynchronizeData implements Serializable{ public static SparkSession getSparkSession(String mongoUrl ,String dbName ,String outputDabase ,String outputCollection){ SparkSession spark
= null; try{ spark = SparkSession.builder().master("local[*]") .appName("SparkHive") .config("spark.sql.warehouse.dir", Constants.WAREHOUSE_DIR).enableHiveSupport() .config("spark.mongodb.output.uri", mongoUrl + dbName) .config(
"spark.mongodb.output.database",outputDabase) .config("spark.mongodb.output.collection",outputCollection) .getOrCreate(); spark.sql("show databases").show(); spark.sql("show tables").show(); }catch (Exception e){ log.error("建立spark session失敗",e); } return spark; } public static void sync(String sql ,String mongoUrl ,String dbName ,String outputDabase ,String outputCollection) throws Exception{ SparkSession spark = getSparkSession(mongoUrl ,dbName ,outputDabase ,outputCollection); JavaSparkContext jc = new JavaSparkContext(spark.sparkContext()); System.out.println("===========================開始.........."+System.currentTimeMillis()); Dataset<Row> dataset = spark.sql(sql); if (dataset != null && dataset.count() > 0){ MongoSpark.save(dataset); } System.out.println("===========================結束.........."+System.currentTimeMillis()); jc.close(); } }

呼叫:

private String mongoUrl = "mongodb://10.10.3.241:27017/";

    public void synchronizeUserinfos() throws Exception{
        String sql = "select * from hn_application.push_userinfos";
        SynchronizeData.sync(sql , mongoUrl, "push_userinfos" ,"push" ,"push_userinfos");
    }
public static final String WAREHOUSE_DIR="/user/hive/warehouse";

參考資料:

https://www.cnblogs.com/kaiwen1/p/9179035.html

 

資料說要把叢集三個配置檔案放到resource目錄下,我這邊只放hive-site.xml檔案沒有問題。