SparkStream手動提交KafkaOffset實現資料容災處理JAVA版本
阿新 • • 發佈:2019-01-08
啥也不說了,直接貼程式碼:
package net.icsoc.bigdata.utils;
import org.I0Itec.zkclient.ZkClient;
public class ZKUtils {
private static ZkClient zkClient;
public static ZkClient getZKClient(String zkServer){
if (zkClient==null){
return new ZkClient(zkServer);
}
return zkClient;
}
}
package net.icsoc.bigdata.utils;
import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupDirs;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class KafkaOffsetManager {
private static Logger log = LoggerFactory.getLogger(KafkaOffsetManager.class);
public static String zkTopicPath(String topic){
ZKGroupDirs zgt = new ZKGroupTopicDirs("test-consumer-group",topic);
return zgt.consumerDir();
}
public static int countTopicPath(String zkServer,String zkTopicPath){
ZkClient zkClient = ZKUtils.getZKClient(zkServer);
return zkClient.countChildren(zkTopicPath);
}
public static Map<TopicAndPartition,Long> readOffsets(String zkServer,String zkTopicPath,int countChildren,String topic){
ZkClient zkClient =ZKUtils.getZKClient(zkServer);
Map<TopicAndPartition,Long> offsetsMap = new HashMap<>();
for (int i=0;i<countChildren;i++){
String path = zkTopicPath+"/"+i;
String offset =zkClient.readData(path);
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,i);
offsetsMap.put(topicAndPartition,Long.parseLong(offset));
}
return offsetsMap;
}
public static void writeOffset(String zkServer,String zkTopicPath,AtomicReference<OffsetRange[]> offsetRanges){
try {
ZkClient zkClient = ZKUtils.getZKClient(zkServer);
OffsetRange[] offsets = offsetRanges.get();
log.debug("offsets {} " ,offsets);
if (offsets != null) {
for (OffsetRange offset : offsets) {
String zkPath = zkTopicPath + "/" + offset.partition();
ZkUtils.updatePersistentPath(zkClient, zkPath, offset.untilOffset() + "");
}
}
}catch (Exception e){
log.error("write data to zk error ",e.getMessage());
}
}
}
package net.icsoc.bigdata.loaddata;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import ch.qos.logback.core.util.StatusPrinter;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import net.icsoc.bigdata.common.config.PropertiesLoad;
import net.icsoc.bigdata.utils.KafkaOffsetManager;
import org.I0Itec.zkclient.ZkClient;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
/***
* Spark Stream 拉取kafka的訊息
*/
public class LoadDataBySparkStream {
private static Logger log = LoggerFactory.getLogger(LoadDataBySparkStream.class);
private static HiveContext hiveContxt;
private static SparkConf sparkConf;
private static StructType schema;
private static JavaSparkContext sparkContext;
private static Map<String,String> params;
private static Set<String> topics;
private static JavaStreamingContext jsc;
private static JavaPairInputDStream<String,String> stream;
//zk手動提交Offset問題
private static AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
private static String zkTopicPath;
private static int countChildren;
private static Map<TopicAndPartition,Long> fromOffsets;
private static String zkServer;
public static void main(String[] args) {
// initLogback();
//最好的方式是從引數傳入
PropertiesLoad.environment = args[0];
//組裝生成kafkaDStream所需要的引數
createParams();
//組裝Spark啟動需要的配置的引數
createSparkConf();
//建立zk服務
createZKServer();
//組裝spark全域性的SparkContext
createSparkContext();
//建立hiveContxt用於向hive寫入資料
createHiveContext(sparkContext);
//建立StreamContext
createJavaStreamContext();
//建立解析輸入JSON資料的格式
createOutputSchema();
//建立消費kafka生成Dstream
createDStream();
try {
jsc.start();
jsc.awaitTermination();
}catch (Exception e) {
System.out.println("Stream Context Exception!");
}finally {
jsc.stop();
}
}
private static void createZKServer(){
zkTopicPath = KafkaOffsetManager.zkTopicPath(topics.iterator().next());
countChildren = KafkaOffsetManager.countTopicPath(zkServer,zkTopicPath);
fromOffsets = KafkaOffsetManager.readOffsets(zkServer,zkTopicPath,countChildren,topics.iterator().next());
}
/**
* 建立DStream物件,用於從kafka中讀取,b並且返回DStream
*/
private static void createDStream(){
log.debug("countChildren is {}" ,countChildren);
if (countChildren>0) {
//如果大於0 表示改程式之前執行過,並且儲存過offset在kafka路徑中
KafkaUtils.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class, String.class,params, fromOffsets,
new Function<MessageAndMetadata<String,String>,String>(){
@Override
public String call(MessageAndMetadata<String, String> tuple) {
return tuple.message();
}
}).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> tuple) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges)tuple.rdd()).offsetRanges();
log.debug("offsets {}",offsets);
offsetRanges.set(offsets);
return tuple;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> jsonRdd) throws Exception {
if (jsonRdd.isEmpty()) return;
writeToHive(jsonRdd);
}
});
}else { //否咋表示程式是初次執行
KafkaUtils.createDirectStream(jsc,String.class,String.class,StringDecoder.class,StringDecoder.class,params,topics)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> tuple) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges)tuple.rdd()).offsetRanges();
log.debug("offsets {}",offsets);
offsetRanges.set(offsets);
return tuple;
}
}).map(new Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> jsonRdd) throws Exception {
if (jsonRdd.isEmpty()) return;
writeToHive(jsonRdd);
}
});
}
}
/**
* 使用SparkConxt物件,建立SparkStreamContx物件用於SparkStream任務排程
*/
private static void createJavaStreamContext(){
jsc = new JavaStreamingContext(sparkContext,new Duration(PropertiesLoad.getConfig().getInt("stream.duration.time")));
}
/**
* 組裝kafkaUtils工具建立DStream所需要的配置檔案
*/
private static void createParams(){
params = new HashMap<>();
params.put("bootstrap.servers",PropertiesLoad.getConfig().getString("bootstrap.servers"));
topics = new HashSet<>();
topics.add(PropertiesLoad.getConfig().getString("stream.kafka.topics"));
zkServer = PropertiesLoad.getConfig().getString("zookeeper.servers");
}
/**
* 建立SparkContext全域性物件
*/
private static void createSparkContext(){
sparkContext = new JavaSparkContext(sparkConf);
}
/**
* 建立SparkConf配置檔案物件
*/
private static void createSparkConf(){
sparkConf = new SparkConf()
.set("fs.hdfs.impl", DistributedFileSystem.class.getName())
.set("fs.file.impl", LocalFileSystem.class.getName())
.set("spark.sql.warehouse.dir", PropertiesLoad.getConfig().getString("spark.sql.warehouse.dir"))
.set("dfs.client.use.datanode.hostname", "true")
.set("fs.defaultFS", PropertiesLoad.getConfig().getString("fs.defaultFS"))
.set("ffs.default.name", PropertiesLoad.getConfig().getString("fs.default.name"))
.set("hive.server2.thrift.bind.host", PropertiesLoad.getConfig().getString("hive.server2.thrift.bind.host"))
.set("hive.server2.webui.host", PropertiesLoad.getConfig().getString("hive.server2.webui.host"))
.set("javax.jdo.option.ConnectionURL", PropertiesLoad.getConfig().getString("javax.jdo.option.ConnectionURL"))
.set("hive.metastore.uris", PropertiesLoad.getConfig().getString("hive.metastore.uris"))
.set("mapred.job.tracker", PropertiesLoad.getConfig().getString("mapred.job.tracker"))
.set("dfs.support.append", "true")
.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true").setAppName("load-data-to-hive");
}
/**
* 建立SparkSql中的HiveContx物件
* @param sparkContext
*/
private static void createHiveContext(JavaSparkContext sparkContext){
hiveContxt = new HiveContext(sparkContext);
}
/**
*
* 建立解析JSON資料的schema結構
*
*/
private static void createOutputSchema(){
List<StructField> outputFields = new ArrayList<>();
outputFields.add(DataTypes.createStructField("vccId",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("clientName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("clientPhone",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketNo",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketStatus",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketSource",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketSourceOrder",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketPriority",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("ticketPriorityOrder",DataTypes.IntegerType,true));
outputFields.add(DataTypes.createStructField("flowName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("flowId",DataTypes.IntegerType,true));
outputFields.add(DataTypes.createStructField("ticketTypes",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("currentNodeName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("createUserName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("currentUserName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("participants",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("doneUserName",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("remindNums",DataTypes.IntegerType,true));
outputFields.add(DataTypes.createStructField("createTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("updateTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("lastHandleTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("lastSubmitTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("doneTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("sendBackTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("transferTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("nodeAssignTime",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeStartTime",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeCreateTime",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeExpireTime",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeWaitSecs",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeHandleSecs",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("evaluteSendTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("evaluteEndTime",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("evaluteStar",DataTypes.StringType,true));
outputFields.add(DataTypes.createStructField("evaluteTags",DataTypes.createArrayType(DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("totalSecs",DataTypes.IntegerType,true));
outputFields.add(DataTypes.createStructField("isFirstDone",DataTypes.IntegerType,true));
outputFields.add(DataTypes.createStructField("nodeForm",DataTypes.createMapType(DataTypes.StringType,DataTypes.StringType),true));
outputFields.add(DataTypes.createStructField("nodeDynamicField",DataTypes.createMapType(DataTypes.StringType,DataTypes.StringType),true));
schema = DataTypes.createStructType(outputFields);
}
/**
* 利用Hql語句的insert into table select from table 將解析的DataSet<Row>通過HiveContext儲存入hive中
* @param jsonRdd
*/
private static void writeToHive(JavaRDD<String> jsonRdd){
Dataset<Row> row=null;
try {
row = hiveContxt.jsonRDD(jsonRdd.repartition(1), schema);
row.createOrReplaceTempView("tempview");
String sql = "insert into " + PropertiesLoad.getConfig().getString("hive.table.ticket") +
" PARTITION(year_month_day='" + org.apache.tools.ant.util.DateUtils.format(new Date(), "yyyy-MM-dd") + "') "
+ "select vccId as vcc_id, clientName as client_name, clientPhone as client_phone, ticketNo as ticket_no, ticketStatus as ticket_statuts , " +
"ticketSource as ticket_source ,ticketSourceOrder as ticket_source_order, ticketPriority as ticket_priority ,ticketPriorityOrder as ticket_priority_order , " +
"flowName flow_name, flowId as flow_id, ticketTypes as ticket_types , currentNodeName as current_node_name , createUserName as create_user_name ," +
" currentUserName as current_user_name , participants , doneUserName as done_user_name, remindNums as remind_nums, createTime as create_time ," +
"updateTime as update_time, lastHandleTime as laste_handle_time , lastSubmitTime as last_submit_time , doneTime as done_time, sendBackTime as " +
"send_back_time , transferTime as transfer_time, nodeAssignTime as node_assign_time , nodeStartTime as node_start_time , nodeCreateTime as " +
"node_create_time , nodeExpireTime as node_expire_time, nodeWaitSecs as node_secs , nodeHandleSecs as node_handle_secs , evaluteSendTime as " +
"evalute_send_time , evaluteEndTime as evalute_end_time, evaluteStar as evalute_star , evaluteTags as evalute_tags , totalSecs as total_secs , " +
"isFirstDone as is_first_secs , nodeForm as node_form , nodeDynamicField as node_dynamic_fie from tempview";
long start = System.currentTimeMillis();
hiveContxt.sql(sql);
long end = System.currentTimeMillis();
//如果寫入到Hive成功才可以將Offset寫入到zk,否則不寫入到zk,只有資料處理成功才可以寫日誌
KafkaOffsetManager.writeOffset(zkServer,zkTopicPath,offsetRanges);
log.warn("insert into hive Cost Time {} ones time size is {} " ,(end - start) , row.count());
}catch (Exception e){
if (row!=null){
//列印失敗的資料的日誌
row.foreach(new ForeachFunction<Row>() {
@Override
public void call(Row row) throws Exception {
log.error("Insert into hive Error! ticket_no is {} and update Time is {}",row.getString(3),row.getString(20));
}
});
}else {
log.error("row is null!");
}
}
}
/**
* 初始化logback
*/
private static void initLogback() {
InputStream inStream = null;
//載入 logback配置資訊
try {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
inStream = LoadDataBySparkStream.class.getClassLoader().getResourceAsStream("logback.xml");
configurator.doConfigure(inStream);
StatusPrinter.printInCaseOfErrorsOrWarnings(lc);
} catch (JoranException e) {
e.printStackTrace();
log.error("load logback.xml error! ", e);
System.exit(0);
} finally {
if (inStream != null) {
try {
inStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}