1. 程式人生 > >SparkStream手動提交KafkaOffset實現資料容災處理JAVA版本

SparkStream手動提交KafkaOffset實現資料容災處理JAVA版本

啥也不說了,直接貼程式碼:

package net.icsoc.bigdata.utils;

import org.I0Itec.zkclient.ZkClient;

public class ZKUtils {
    private static ZkClient zkClient;

    public static ZkClient getZKClient(String zkServer){
        if (zkClient==null){
            return new ZkClient(zkServer);
        }
        return
zkClient; } }
package net.icsoc.bigdata.utils;

import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupDirs;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import
org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; public class KafkaOffsetManager { private static Logger log = LoggerFactory.getLogger(KafkaOffsetManager.class); public static String zkTopicPath(String topic){ ZKGroupDirs zgt = new
ZKGroupTopicDirs("test-consumer-group",topic); return zgt.consumerDir(); } public static int countTopicPath(String zkServer,String zkTopicPath){ ZkClient zkClient = ZKUtils.getZKClient(zkServer); return zkClient.countChildren(zkTopicPath); } public static Map<TopicAndPartition,Long> readOffsets(String zkServer,String zkTopicPath,int countChildren,String topic){ ZkClient zkClient =ZKUtils.getZKClient(zkServer); Map<TopicAndPartition,Long> offsetsMap = new HashMap<>(); for (int i=0;i<countChildren;i++){ String path = zkTopicPath+"/"+i; String offset =zkClient.readData(path); TopicAndPartition topicAndPartition = new TopicAndPartition(topic,i); offsetsMap.put(topicAndPartition,Long.parseLong(offset)); } return offsetsMap; } public static void writeOffset(String zkServer,String zkTopicPath,AtomicReference<OffsetRange[]> offsetRanges){ try { ZkClient zkClient = ZKUtils.getZKClient(zkServer); OffsetRange[] offsets = offsetRanges.get(); log.debug("offsets {} " ,offsets); if (offsets != null) { for (OffsetRange offset : offsets) { String zkPath = zkTopicPath + "/" + offset.partition(); ZkUtils.updatePersistentPath(zkClient, zkPath, offset.untilOffset() + ""); } } }catch (Exception e){ log.error("write data to zk error ",e.getMessage()); } } }
package net.icsoc.bigdata.loaddata;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import ch.qos.logback.core.util.StatusPrinter;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import net.icsoc.bigdata.common.config.PropertiesLoad;
import net.icsoc.bigdata.utils.KafkaOffsetManager;
import org.I0Itec.zkclient.ZkClient;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;


/***
 * Spark Stream 拉取kafka的訊息
 */
public class LoadDataBySparkStream {
    private static Logger log = LoggerFactory.getLogger(LoadDataBySparkStream.class);
    private static HiveContext hiveContxt;
    private  static SparkConf sparkConf;
    private static StructType schema;
    private static JavaSparkContext sparkContext;
    private static Map<String,String> params;
    private static  Set<String> topics;
    private static JavaStreamingContext jsc;
    private static JavaPairInputDStream<String,String> stream;

    //zk手動提交Offset問題
    private static AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
    private static String zkTopicPath;
    private static int countChildren;
    private static Map<TopicAndPartition,Long> fromOffsets;
    private static String zkServer;


    public static void main(String[] args) {
//        initLogback();
        //最好的方式是從引數傳入
        PropertiesLoad.environment = args[0];
        //組裝生成kafkaDStream所需要的引數
        createParams();
        //組裝Spark啟動需要的配置的引數
        createSparkConf();
        //建立zk服務
        createZKServer();
        //組裝spark全域性的SparkContext
        createSparkContext();
        //建立hiveContxt用於向hive寫入資料
        createHiveContext(sparkContext);
        //建立StreamContext
        createJavaStreamContext();
        //建立解析輸入JSON資料的格式
        createOutputSchema();
        //建立消費kafka生成Dstream
        createDStream();
        try {
            jsc.start();
            jsc.awaitTermination();
        }catch (Exception e) {
            System.out.println("Stream Context Exception!");
        }finally {
            jsc.stop();
        }
    }
    private static void createZKServer(){
        zkTopicPath = KafkaOffsetManager.zkTopicPath(topics.iterator().next());
        countChildren = KafkaOffsetManager.countTopicPath(zkServer,zkTopicPath);
        fromOffsets = KafkaOffsetManager.readOffsets(zkServer,zkTopicPath,countChildren,topics.iterator().next());
    }

    /**
     * 建立DStream物件,用於從kafka中讀取,b並且返回DStream
     */
    private static void createDStream(){
        log.debug("countChildren is {}" ,countChildren);
        if (countChildren>0) {
            //如果大於0 表示改程式之前執行過,並且儲存過offset在kafka路徑中
            KafkaUtils.createDirectStream(jsc, String.class, String.class,
                    StringDecoder.class, StringDecoder.class, String.class,params, fromOffsets,
                    new Function<MessageAndMetadata<String,String>,String>(){
                        @Override
                        public String call(MessageAndMetadata<String, String> tuple) {
                            return tuple.message();
                        }
                    }).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaRDD<String> tuple) throws Exception {
                    OffsetRange[] offsets = ((HasOffsetRanges)tuple.rdd()).offsetRanges();
                    log.debug("offsets {}",offsets);
                    offsetRanges.set(offsets);
                    return tuple;
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> jsonRdd) throws Exception {
                    if (jsonRdd.isEmpty())  return;
                    writeToHive(jsonRdd);
                }
            });
        }else { //否咋表示程式是初次執行
            KafkaUtils.createDirectStream(jsc,String.class,String.class,StringDecoder.class,StringDecoder.class,params,topics)
                    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
                        @Override
                        public JavaPairRDD<String, String> call(JavaPairRDD<String, String> tuple) throws Exception {
                            OffsetRange[] offsets = ((HasOffsetRanges)tuple.rdd()).offsetRanges();
                            log.debug("offsets {}",offsets);
                            offsetRanges.set(offsets);
                            return tuple;
                        }
                    }).map(new Function<Tuple2<String,String>, String>() {
                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> jsonRdd) throws Exception {
                    if (jsonRdd.isEmpty()) return;
                    writeToHive(jsonRdd);
                }
            });
        }
    }

    /**
     * 使用SparkConxt物件,建立SparkStreamContx物件用於SparkStream任務排程
     */
    private static void  createJavaStreamContext(){
        jsc = new JavaStreamingContext(sparkContext,new Duration(PropertiesLoad.getConfig().getInt("stream.duration.time")));
    }
    /**
     * 組裝kafkaUtils工具建立DStream所需要的配置檔案
     */
    private static void createParams(){
        params = new HashMap<>();
        params.put("bootstrap.servers",PropertiesLoad.getConfig().getString("bootstrap.servers"));
        topics = new HashSet<>();
        topics.add(PropertiesLoad.getConfig().getString("stream.kafka.topics"));
        zkServer = PropertiesLoad.getConfig().getString("zookeeper.servers");
    }

    /**
     * 建立SparkContext全域性物件
     */
    private static void createSparkContext(){
        sparkContext  = new JavaSparkContext(sparkConf);
    }

    /**
     * 建立SparkConf配置檔案物件
     */
    private static void createSparkConf(){
        sparkConf = new SparkConf()
                .set("fs.hdfs.impl", DistributedFileSystem.class.getName())
                .set("fs.file.impl", LocalFileSystem.class.getName())
                .set("spark.sql.warehouse.dir", PropertiesLoad.getConfig().getString("spark.sql.warehouse.dir"))
                .set("dfs.client.use.datanode.hostname", "true")
                .set("fs.defaultFS", PropertiesLoad.getConfig().getString("fs.defaultFS"))
                .set("ffs.default.name", PropertiesLoad.getConfig().getString("fs.default.name"))
                .set("hive.server2.thrift.bind.host", PropertiesLoad.getConfig().getString("hive.server2.thrift.bind.host"))
                .set("hive.server2.webui.host", PropertiesLoad.getConfig().getString("hive.server2.webui.host"))
                .set("javax.jdo.option.ConnectionURL", PropertiesLoad.getConfig().getString("javax.jdo.option.ConnectionURL"))
                .set("hive.metastore.uris", PropertiesLoad.getConfig().getString("hive.metastore.uris"))
                .set("mapred.job.tracker", PropertiesLoad.getConfig().getString("mapred.job.tracker"))
                .set("dfs.support.append", "true")
                .set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
                .set("dfs.client.block.write.replace-datanode-on-failure.enable", "true").setAppName("load-data-to-hive");
    }

    /**
     * 建立SparkSql中的HiveContx物件
     * @param sparkContext
     */
    private static void createHiveContext(JavaSparkContext sparkContext){
        hiveContxt = new HiveContext(sparkContext);
    }

    /**
     *
     * 建立解析JSON資料的schema結構
     *
     */
    private static void createOutputSchema(){
        List<StructField> outputFields = new ArrayList<>();
        outputFields.add(DataTypes.createStructField("vccId",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("clientName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("clientPhone",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketNo",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketStatus",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketSource",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketSourceOrder",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketPriority",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("ticketPriorityOrder",DataTypes.IntegerType,true));
        outputFields.add(DataTypes.createStructField("flowName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("flowId",DataTypes.IntegerType,true));
        outputFields.add(DataTypes.createStructField("ticketTypes",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("currentNodeName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("createUserName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("currentUserName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("participants",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("doneUserName",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("remindNums",DataTypes.IntegerType,true));
        outputFields.add(DataTypes.createStructField("createTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("updateTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("lastHandleTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("lastSubmitTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("doneTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("sendBackTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("transferTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("nodeAssignTime",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeStartTime",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeCreateTime",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeExpireTime",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeWaitSecs",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeHandleSecs",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("evaluteSendTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("evaluteEndTime",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("evaluteStar",DataTypes.StringType,true));
        outputFields.add(DataTypes.createStructField("evaluteTags",DataTypes.createArrayType(DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("totalSecs",DataTypes.IntegerType,true));
        outputFields.add(DataTypes.createStructField("isFirstDone",DataTypes.IntegerType,true));
        outputFields.add(DataTypes.createStructField("nodeForm",DataTypes.createMapType(DataTypes.StringType,DataTypes.StringType),true));
        outputFields.add(DataTypes.createStructField("nodeDynamicField",DataTypes.createMapType(DataTypes.StringType,DataTypes.StringType),true));
        schema = DataTypes.createStructType(outputFields);
    }
    /**
     * 利用Hql語句的insert into table select from table 將解析的DataSet<Row>通過HiveContext儲存入hive中
     * @param jsonRdd
     */
    private static void writeToHive(JavaRDD<String>  jsonRdd){
        Dataset<Row> row=null;
        try {
            row = hiveContxt.jsonRDD(jsonRdd.repartition(1), schema);
            row.createOrReplaceTempView("tempview");
            String sql = "insert into " + PropertiesLoad.getConfig().getString("hive.table.ticket") +
                    " PARTITION(year_month_day='" + org.apache.tools.ant.util.DateUtils.format(new Date(), "yyyy-MM-dd") + "') "
                    + "select vccId as vcc_id, clientName as client_name, clientPhone as client_phone, ticketNo as ticket_no, ticketStatus as ticket_statuts , " +
                    "ticketSource as ticket_source ,ticketSourceOrder as ticket_source_order, ticketPriority as ticket_priority ,ticketPriorityOrder as ticket_priority_order , " +
                    "flowName flow_name, flowId as flow_id, ticketTypes as ticket_types  , currentNodeName as current_node_name , createUserName as create_user_name ," +
                    " currentUserName as current_user_name , participants , doneUserName as done_user_name, remindNums as remind_nums, createTime as create_time ," +
                    "updateTime as update_time, lastHandleTime as laste_handle_time , lastSubmitTime as last_submit_time , doneTime as done_time, sendBackTime as " +
                    "send_back_time , transferTime  as transfer_time, nodeAssignTime as node_assign_time , nodeStartTime as node_start_time , nodeCreateTime as " +
                    "node_create_time , nodeExpireTime as node_expire_time, nodeWaitSecs as node_secs , nodeHandleSecs as node_handle_secs , evaluteSendTime as " +
                    "evalute_send_time , evaluteEndTime as evalute_end_time, evaluteStar as evalute_star , evaluteTags as evalute_tags , totalSecs as total_secs , " +
                    "isFirstDone as is_first_secs , nodeForm as node_form , nodeDynamicField as node_dynamic_fie from tempview";
            long start = System.currentTimeMillis();
            hiveContxt.sql(sql);
            long end = System.currentTimeMillis();
            //如果寫入到Hive成功才可以將Offset寫入到zk,否則不寫入到zk,只有資料處理成功才可以寫日誌
            KafkaOffsetManager.writeOffset(zkServer,zkTopicPath,offsetRanges);
            log.warn("insert into hive Cost Time {}   ones time size is  {}  " ,(end - start) , row.count());
        }catch (Exception e){
            if (row!=null){
                //列印失敗的資料的日誌
                row.foreach(new ForeachFunction<Row>() {
                    @Override
                    public void call(Row row) throws Exception {
                        log.error("Insert into hive Error! ticket_no is {} and update Time is {}",row.getString(3),row.getString(20));
                    }
                });
            }else {
                log.error("row is null!");
            }
        }
    }


    /**
     * 初始化logback
     */
    private  static  void initLogback() {
        InputStream inStream = null;
        //載入 logback配置資訊
        try {
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            inStream = LoadDataBySparkStream.class.getClassLoader().getResourceAsStream("logback.xml");
            configurator.doConfigure(inStream);
            StatusPrinter.printInCaseOfErrorsOrWarnings(lc);
        } catch (JoranException e) {
            e.printStackTrace();
            log.error("load logback.xml error! ", e);
            System.exit(0);
        } finally {
            if (inStream != null) {
                try {
                    inStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}