1. 程式人生 > >SparkStreaming實時處理應用

SparkStreaming實時處理應用

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

/**
 * 
 * @description 
 */
public class SceneRecognition {
	
    private static Logger logger = Logger.getLogger(SceneRecognition.class);
	private static String appName = Constants.SPARK_CONFIG_APPNAME;
	private static int duration = Constants.SPARK_CONFIG_DURATION;
    private static String groupId = Constants.SPARK_CONFIG_GROUPID;
    private static String kafkaTopic = Constants.CHECK_SOURCE_TOPIC;
	
	@SuppressWarnings("deprecation")
	public static void main(String[] args) {
		//初始化
		RedisTool.init();
		Configs.init();
		StandardInput.PGinit();
		System.out.println("scene recognition Configs/Redis/PG init done.");
        logger.warn("scene recognition Configs/Redis/PG init done.");
		//初始化寫入Kafka
        System.out.println("Start Launch scene recognition engine Job ...");
        System.out.println("----- Configurations ------");
        System.out.println("AppName: " + appName);
        System.out.println("Duration: " + duration);
        System.out.println("Group: " + groupId);
        System.out.println("Topic: " + kafkaTopic);
        System.out.println("Broker List: " + Configs.getBrokerList());
        System.out.println("---------------------------");
        
		//初始化Sparkconf
		SparkConf conf = new SparkConf();
		conf.setAppName(appName);
		// conf.setMaster("local[4]");//local模式開啟
		conf.set("spark.streaming.kafka.maxRatePerPartition", String.valueOf(Configs.getKafkaMaxrate() / Configs.getPartitionNum()));
		System.out.println("scene recognition spark conf init done.");
		logger.warn("scene recognition spark conf init done.");
		
		//初始化spark context
		JavaSparkContext jsc = new JavaSparkContext(conf);
		System.out.println("scene recognition spark context init done.");
		logger.warn("scene recognition spark context init done.");
		
		//初始化StreamingContext
		JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(duration));
		System.out.println("scene recognition StreamingContext init done.");
		logger.warn("scene recognition StreamingContext init done.");
		
		//初始化 kafka
		Set<String> topicSet = new HashSet<String>();
		topicSet.add(kafkaTopic);
		Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("group.id", groupId);
		kafkaParams.put("metadata.broker.list", Configs.getBrokerList());

		//白名單資料過濾
		new Thread(new WhitelistValidation(jsc),"WhiteList data Validation Process running... ").start();

		// 從當前offset開始消費kafka資料
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        		jssc,
        		String.class,
        		String.class,
        		StringDecoder.class,
        		StringDecoder.class,
        		kafkaParams,
        		topicSet);
        System.out.println("getting message from Kafka...");
        logger.warn("getting message from Kafka...");
		
        //取出有效字串部分,Tuple2<String,String>對映為String
        JavaDStream<String> logJavaDStream = messages.map(new mapFuc());
        //場景識別演算法
        logJavaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
			@Override
			public Void call(JavaRDD<String> msgRDD)  {
				/*場景輸入:一個RDD
				  輸出:一個字串列表,其中每個字串為寫入kafka的標準格式*/
				if (!msgRDD.isEmpty()) {
					System.out.println("****Input an RDD****");
					logger.warn("****Input an RDD****");
					/*AuthorityPromote*/
					try {
						ArrayList<StandardOutput> apResult = new AuthorityPromote().authorityPromoteMain(msgRDD);
						if ((apResult!=null) && (!apResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(apResult);
							SendDataToPG.sendToPG(apResult);
//							SendDataToPG.sendToPG(FilterMessage.filter(apResult, "authLift"));
							System.out.println("AuthorityPromote completed! Output " + Integer.toString(apResult.size()) + " logs to Database.");
							logger.warn("AuthorityPromote completed! Output " + Integer.toString(apResult.size()) + " logs to Database.");
							for (StandardOutput line : apResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							apResult.clear();
						}
					} catch (Exception e) {
						System.out.println("AuthorityPromote error!");
						e.printStackTrace();
						logger.error("AuthorityPromote error!\n" + e.getStackTrace());
					}
					/*IllegalDownload*/
					try {
						ArrayList<StandardOutput> idResult = new IllegalDownload().illegalDownloadMain(msgRDD);
						if ((idResult!=null) && (!idResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(idResult);
							SendDataToPG.sendToPG(idResult);
							System.out.println("IllegalDownload completed! Output " + Integer.toString(idResult.size()) + " logs to Database.");
							logger.warn("IllegalDownload completed! Output " + Integer.toString(idResult.size()) + " logs to Database.");
							for (StandardOutput line : idResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							idResult.clear();
						}
					} catch (Exception e) {
						System.out.println("IllegalDownload error!");
						e.printStackTrace();
						logger.error("IllegalDownload error!\n" + e.getStackTrace());
					}
					/*IllegalUpload*/
					try {
						ArrayList<StandardOutput> iuResult = new IllegalUpload().illegalUploadMain(msgRDD);
						if ((iuResult!=null) && (!iuResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(iuResult);
							SendDataToPG.sendToPG(iuResult);
							System.out.println("IllegalUpload completed! Output " + Integer.toString(iuResult.size()) + " logs to Database.");
							logger.warn("IllegalUpload completed! Output " + Integer.toString(iuResult.size()) + " logs to Database.");
							for (StandardOutput line : iuResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							iuResult.clear();
						}
					} catch (Exception e) {
						System.out.println("IllegalUpload error!");
						e.printStackTrace();
						logger.error("IllegalUpload error!\n" + e.getStackTrace());
					}
					/*IllegalLogin*/
					try {
						ArrayList<StandardOutput> iLResult = new IllegalLogin().ADD_loginlogMain(msgRDD);
						if ((iLResult!=null) && (!iLResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(iLResult);
							SendDataToPG.sendToPG(iLResult);
							System.out.println("IllegalLogin completed! Output " + Integer.toString(iLResult.size()) + " logs to Database.");
							logger.warn("IllegalLogin completed! Output " + Integer.toString(iLResult.size()) + " logs to Database.");
							for (StandardOutput line : iLResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							iLResult.clear();
						}
					} catch (Exception e) {
						System.out.println("IllegalLogin error!");
						e.printStackTrace();
						logger.error("IllegalLogin error!\n" + e.getStackTrace());
					}
					/*IllegalProcess*/
					try {
						ArrayList<StandardOutput> ipResult = new IllegalProcess().illegalProcessMain(msgRDD);
						if ((ipResult!=null) && (!ipResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(ipResult);
							SendDataToPG.sendToPG(ipResult);
							System.out.println("IllegalProcess completed! Output " + Integer.toString(ipResult.size()) + " logs to Database.");
							logger.warn("IllegalProcess completed! Output " + Integer.toString(ipResult.size()) + " logs to Database.");
							for (StandardOutput line : ipResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							ipResult.clear();
						}
					} catch (Exception e) {
						System.out.println("IllegalProcess error!");
						e.printStackTrace();
						logger.error("IllegalProcess error!\n" + e.getStackTrace());
					}
					/*IllegalLogOperation*/
					try {
						ArrayList<StandardOutput> ioResult = new IllegalLogOperation().illLogOpeMain(msgRDD);
						if ((ioResult!=null) && (!ioResult.isEmpty())) {
							//new SendDataToKafka().sendToKafka(ioResult);
							SendDataToPG.sendToPG(ioResult);
							System.out.println("IllegalLogOperation completed! Output " + Integer.toString(ioResult.size()) + " logs to Database.");
							logger.warn("IllegalLogOperation completed! Output " + Integer.toString(ioResult.size()) + " logs to Database.");
							for (StandardOutput line : ioResult) {
								logger.warn(line.getType() + ":" + line.getError_message());
							}
							ioResult.clear();
						}
					} catch (Exception e) {
						System.out.println("IllegalLogOperation error!");
						e.printStackTrace();
						logger.error("IllegalLogOperation error!\n" + e.getStackTrace());
					}
					/*hostrelation*/
					try {
						ArrayList<StandardOutput> hostrelation = new HostRelationCalc().onlineData(msgRDD);
						System.out.println("hostrelation Writing to Database...");
						logger.warn("hostrelation Writing to Database...");
						if((hostrelation!=null) && !hostrelation.isEmpty()){
							SendDataToPG.sendonlineToPG(hostrelation);
							hostrelation.clear();
						}
						System.out.println("hostrelation completed! Output to Database.");
						logger.warn("hostrelation completed! Output to Database.");
					} catch (Exception e) {
						System.out.println("hostrelation error!");
						e.printStackTrace();
						logger.error("hostrelation error!\n" + e.getStackTrace());
					}
					/*統計項*/
					try {
						System.out.println("stats begin...");
						logger.warn("stats begin...");
						new StatsData().statsMain(msgRDD);
						System.out.println("stats writing to Database...");
						logger.warn("stats writing to Database...");
						RedisPersistent.online_data_calc();
						System.out.println("stats completed! Output to Database.");
						logger.warn("stats completed! Output to Database.");
					} catch (Exception e) {
						System.out.println("stats error!");
						e.printStackTrace();
						logger.error("stats error!\n" + e.getStackTrace());
					}
					System.out.println("RDD Finished processing! Wait for next...");
					logger.warn("RDD Finished processing! Wait for next...");
				}
				else {
					System.out.println("****Input empty****");
					logger.warn("****Input empty****");
				}
				return null;
			}
		});
        jssc.start();
        jssc.awaitTermination();
		jssc.close();
	}
	/**
	 * map將(,json)對映成json
	 */
	static class mapFuc implements Function<Tuple2<String,String>, String> {
		@Override
		public String call(Tuple2<String, String> jsonMessage) throws Exception {
			// TODO Auto-generated method stub
			return jsonMessage._2;
		}
	}
}