SparkStreaming實時處理應用
阿新 • • 發佈:2018-12-21
import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; /** * * @description */ public class SceneRecognition { private static Logger logger = Logger.getLogger(SceneRecognition.class); private static String appName = Constants.SPARK_CONFIG_APPNAME; private static int duration = Constants.SPARK_CONFIG_DURATION; private static String groupId = Constants.SPARK_CONFIG_GROUPID; private static String kafkaTopic = Constants.CHECK_SOURCE_TOPIC; @SuppressWarnings("deprecation") public static void main(String[] args) { //初始化 RedisTool.init(); Configs.init(); StandardInput.PGinit(); System.out.println("scene recognition Configs/Redis/PG init done."); logger.warn("scene recognition Configs/Redis/PG init done."); //初始化寫入Kafka System.out.println("Start Launch scene recognition engine Job ..."); System.out.println("----- Configurations ------"); System.out.println("AppName: " + appName); System.out.println("Duration: " + duration); System.out.println("Group: " + groupId); System.out.println("Topic: " + kafkaTopic); System.out.println("Broker List: " + Configs.getBrokerList()); System.out.println("---------------------------"); //初始化Sparkconf SparkConf conf = new SparkConf(); conf.setAppName(appName); // conf.setMaster("local[4]");//local模式開啟 conf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(Configs.getKafkaMaxrate() / Configs.getPartitionNum())); System.out.println("scene recognition spark conf init done."); logger.warn("scene recognition spark conf init done."); //初始化spark context JavaSparkContext jsc = new JavaSparkContext(conf); System.out.println("scene recognition spark context init done."); logger.warn("scene recognition spark context init done."); //初始化StreamingContext JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(duration)); System.out.println("scene recognition StreamingContext init done."); logger.warn("scene recognition StreamingContext init done."); //初始化 kafka Set<String> topicSet = new HashSet<String>(); topicSet.add(kafkaTopic); Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("group.id", groupId); kafkaParams.put("metadata.broker.list", Configs.getBrokerList()); //白名單資料過濾 new Thread(new WhitelistValidation(jsc),"WhiteList data Validation Process running... ").start(); // 從當前offset開始消費kafka資料 JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); System.out.println("getting message from Kafka..."); logger.warn("getting message from Kafka..."); //取出有效字串部分,Tuple2<String,String>對映為String JavaDStream<String> logJavaDStream = messages.map(new mapFuc()); //場景識別演算法 logJavaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> msgRDD) { /*場景輸入:一個RDD 輸出:一個字串列表,其中每個字串為寫入kafka的標準格式*/ if (!msgRDD.isEmpty()) { System.out.println("****Input an RDD****"); logger.warn("****Input an RDD****"); /*AuthorityPromote*/ try { ArrayList<StandardOutput> apResult = new AuthorityPromote().authorityPromoteMain(msgRDD); if ((apResult!=null) && (!apResult.isEmpty())) { //new SendDataToKafka().sendToKafka(apResult); SendDataToPG.sendToPG(apResult); // SendDataToPG.sendToPG(FilterMessage.filter(apResult, "authLift")); System.out.println("AuthorityPromote completed! Output " + Integer.toString(apResult.size()) + " logs to Database."); logger.warn("AuthorityPromote completed! Output " + Integer.toString(apResult.size()) + " logs to Database."); for (StandardOutput line : apResult) { logger.warn(line.getType() + ":" + line.getError_message()); } apResult.clear(); } } catch (Exception e) { System.out.println("AuthorityPromote error!"); e.printStackTrace(); logger.error("AuthorityPromote error!\n" + e.getStackTrace()); } /*IllegalDownload*/ try { ArrayList<StandardOutput> idResult = new IllegalDownload().illegalDownloadMain(msgRDD); if ((idResult!=null) && (!idResult.isEmpty())) { //new SendDataToKafka().sendToKafka(idResult); SendDataToPG.sendToPG(idResult); System.out.println("IllegalDownload completed! Output " + Integer.toString(idResult.size()) + " logs to Database."); logger.warn("IllegalDownload completed! Output " + Integer.toString(idResult.size()) + " logs to Database."); for (StandardOutput line : idResult) { logger.warn(line.getType() + ":" + line.getError_message()); } idResult.clear(); } } catch (Exception e) { System.out.println("IllegalDownload error!"); e.printStackTrace(); logger.error("IllegalDownload error!\n" + e.getStackTrace()); } /*IllegalUpload*/ try { ArrayList<StandardOutput> iuResult = new IllegalUpload().illegalUploadMain(msgRDD); if ((iuResult!=null) && (!iuResult.isEmpty())) { //new SendDataToKafka().sendToKafka(iuResult); SendDataToPG.sendToPG(iuResult); System.out.println("IllegalUpload completed! Output " + Integer.toString(iuResult.size()) + " logs to Database."); logger.warn("IllegalUpload completed! Output " + Integer.toString(iuResult.size()) + " logs to Database."); for (StandardOutput line : iuResult) { logger.warn(line.getType() + ":" + line.getError_message()); } iuResult.clear(); } } catch (Exception e) { System.out.println("IllegalUpload error!"); e.printStackTrace(); logger.error("IllegalUpload error!\n" + e.getStackTrace()); } /*IllegalLogin*/ try { ArrayList<StandardOutput> iLResult = new IllegalLogin().ADD_loginlogMain(msgRDD); if ((iLResult!=null) && (!iLResult.isEmpty())) { //new SendDataToKafka().sendToKafka(iLResult); SendDataToPG.sendToPG(iLResult); System.out.println("IllegalLogin completed! Output " + Integer.toString(iLResult.size()) + " logs to Database."); logger.warn("IllegalLogin completed! Output " + Integer.toString(iLResult.size()) + " logs to Database."); for (StandardOutput line : iLResult) { logger.warn(line.getType() + ":" + line.getError_message()); } iLResult.clear(); } } catch (Exception e) { System.out.println("IllegalLogin error!"); e.printStackTrace(); logger.error("IllegalLogin error!\n" + e.getStackTrace()); } /*IllegalProcess*/ try { ArrayList<StandardOutput> ipResult = new IllegalProcess().illegalProcessMain(msgRDD); if ((ipResult!=null) && (!ipResult.isEmpty())) { //new SendDataToKafka().sendToKafka(ipResult); SendDataToPG.sendToPG(ipResult); System.out.println("IllegalProcess completed! Output " + Integer.toString(ipResult.size()) + " logs to Database."); logger.warn("IllegalProcess completed! Output " + Integer.toString(ipResult.size()) + " logs to Database."); for (StandardOutput line : ipResult) { logger.warn(line.getType() + ":" + line.getError_message()); } ipResult.clear(); } } catch (Exception e) { System.out.println("IllegalProcess error!"); e.printStackTrace(); logger.error("IllegalProcess error!\n" + e.getStackTrace()); } /*IllegalLogOperation*/ try { ArrayList<StandardOutput> ioResult = new IllegalLogOperation().illLogOpeMain(msgRDD); if ((ioResult!=null) && (!ioResult.isEmpty())) { //new SendDataToKafka().sendToKafka(ioResult); SendDataToPG.sendToPG(ioResult); System.out.println("IllegalLogOperation completed! Output " + Integer.toString(ioResult.size()) + " logs to Database."); logger.warn("IllegalLogOperation completed! Output " + Integer.toString(ioResult.size()) + " logs to Database."); for (StandardOutput line : ioResult) { logger.warn(line.getType() + ":" + line.getError_message()); } ioResult.clear(); } } catch (Exception e) { System.out.println("IllegalLogOperation error!"); e.printStackTrace(); logger.error("IllegalLogOperation error!\n" + e.getStackTrace()); } /*hostrelation*/ try { ArrayList<StandardOutput> hostrelation = new HostRelationCalc().onlineData(msgRDD); System.out.println("hostrelation Writing to Database..."); logger.warn("hostrelation Writing to Database..."); if((hostrelation!=null) && !hostrelation.isEmpty()){ SendDataToPG.sendonlineToPG(hostrelation); hostrelation.clear(); } System.out.println("hostrelation completed! Output to Database."); logger.warn("hostrelation completed! Output to Database."); } catch (Exception e) { System.out.println("hostrelation error!"); e.printStackTrace(); logger.error("hostrelation error!\n" + e.getStackTrace()); } /*統計項*/ try { System.out.println("stats begin..."); logger.warn("stats begin..."); new StatsData().statsMain(msgRDD); System.out.println("stats writing to Database..."); logger.warn("stats writing to Database..."); RedisPersistent.online_data_calc(); System.out.println("stats completed! Output to Database."); logger.warn("stats completed! Output to Database."); } catch (Exception e) { System.out.println("stats error!"); e.printStackTrace(); logger.error("stats error!\n" + e.getStackTrace()); } System.out.println("RDD Finished processing! Wait for next..."); logger.warn("RDD Finished processing! Wait for next..."); } else { System.out.println("****Input empty****"); logger.warn("****Input empty****"); } return null; } }); jssc.start(); jssc.awaitTermination(); jssc.close(); } /** * map將(,json)對映成json */ static class mapFuc implements Function<Tuple2<String,String>, String> { @Override public String call(Tuple2<String, String> jsonMessage) throws Exception { // TODO Auto-generated method stub return jsonMessage._2; } } }