kafka二次開發程式碼例項
阿新 • • 發佈:2019-02-03
配置檔案b.txt
#選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料) input_type = 1 #篩選條件 source_mac(多條檢索)、user(多條檢索)、message(模糊查詢) #time(1.當輸入一個時間 格式:yyyy-MM-dd-HH:mm:ss 或者輸入一個時間戳[秒級時間戳11位] 當輸入時間小於等於當前時間不會收集資料 當輸入時間大於當前時間將收集當前時間到輸入時間資料 2.當輸入二個時間將篩選倆時間段間資料) Screening_conditions = message #篩選內容 Screening_content = www #輸入左面時間小,右面時間大2017-12-06 12:12:12,2017-12-06 15:12:12 #資料輸出路徑 output_Route = E:\data1\shenji.txt 資料輸出檔案大小(位元組) 1M=1048576B 1G=1073741824B output_Size = 150000 ---------------------------------------- #探針資料配置 #選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料) tanzhen_input_type = 1 #篩選條件 ap_mac(裝置mac) sta_mac(使用者mac) time(時間) tanzhen_Screening_conditions = ap_mac #篩選內容 tanzhen_Screening_content = 60:cd:a9:01:1f:db,60:cd:a9:01:0e:89 tanzhen_output_Route=E:\data1\tanzhen tanzhen_output_Size = 15000000 ----------------------------------------- #使用者資料配置 #選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料)) yonghu_input_type = 1 #篩選條件 ap_mac(裝置mac) sta_mac(使用者mac) time(時間) phone(手機等驗證資訊) yonghu_Screening_conditions = time #篩選內容 yonghu_Screening_content = 1111 yonghu_output_Route=E:\data1\yonghu.txt yonghu_output_Size = 150000
demo.java
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; public class Demo { //探針資料 public static void main(String[] args) { // // KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaProperties.TOPIC1); // ConsumerConnector consumer=new KafkaConsumer(KafkaProperties.TOPIC1).createConnector(); // Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); // topicCountMap.put(kafkaConsumer.topic, 1); // Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap); // // KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0); // //獲取我們每次接收到的資料 // // ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC1).dateOutput(KafkaProperties.TOPIC1,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID); // FileOutputStream fileOutputStream=new NewsScreening().createOutput("tanzhen_output_Route"); long ts; //第一次輸入時間 long ds; //第二次輸入時間 String strings = new NewsScreening().output("tanzhen_output_Route");//讀取輸出檔案 File file = new File(strings); Date date=new Date(); SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd"); String time4=simpleDateFormat1.format(date); File file3=file; File file4=file3.getParentFile();//獲取輸入目錄 String name=file3.getName();//獲取輸出名字 String path=file4.toString(); String path1=""; // String path2=path+"\\"+time4+"\\"+name; // File files=new File(path+"\\"+time4); String path2=path+"/"+time4+"/"+name; File files=new File(path+"/"+time4); if(!files.exists()){ files.mkdirs(); } FileOutputStream fileOutputStream1= null; try { fileOutputStream1 = new FileOutputStream(new File(path2)); } catch (FileNotFoundException e) { e.printStackTrace(); } while (iterator.hasNext()) { Date date1=new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss"); String times=simpleDateFormat.format(date1); String time3=simpleDateFormat1.format(date);//時間年月日 // File file5=new File(path+"\\"+time3); File file5=new File(path+"/"+time3); if(file5 !=null &&file5.exists()){ // path1=path+"\\"+time3+"\\"+name; path1=path+"/"+time3+"/"+name; }else{ //建立目錄 file5.mkdirs(); // path1=path+"\\"+time3+"\\"+name; path1=path+"/"+time3+"/"+name; } long size=new NewsScreening().fileSize(path2); long sizes=Long.parseLong(new NewsScreening().output("tanzhen_output_Size")); if (size >= sizes) { //獲取輸出路徑 path2=path1+""+times; try { fileOutputStream1=new FileOutputStream(path2); } catch (FileNotFoundException e) { e.printStackTrace(); } try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } String message = new String(iterator.next().message()); String type=new NewsScreening().output("tanzhen_input_type");//輸出型別選擇 long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳 if(type.equals("0")){ //篩選輸出 String string=new NewsScreening().output("tanzhen_Screening_conditions"); //選擇篩選條件 if(string.equals("ap_mac")){ String ap_mac=message.split("\"")[1]; String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content"); //篩選內容 List<String> list = Arrays.asList(aa); if(list.contains(ap_mac)){ new NewsScreening().fileFlow(fileOutputStream1, message); } }else if(string.equals("sta_mac")){ String sta_mac=message.split("\"")[3]; String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content"); //篩選內容 List<String> list = Arrays.asList(aa); if(list.contains(sta_mac)){ new NewsScreening().fileFlow(fileOutputStream1, message); } }else if(string.equals("time")) { String time = message.split("\"")[19]; String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content"); if (aa.length == 1) { String time1 = aa[0]; //獲取輸入檔案時間 boolean authentication = new NewsScreening().isNumeric(time1); if (authentication) { ts = Long.parseLong(time1);//輸入時間 } else { ts = new NewsScreening().timeTransformation(time1); } if (ts >= dates) { new NewsScreening().fileFlow(fileOutputStream1, message); } } else { String time1 = aa[0]; //獲取輸入檔案時間1 boolean authentication = new NewsScreening().isNumeric(time1); if (!authentication) { //輸入第一次的時間 ts = new NewsScreening().timeTransformation(time1); } else { ts = Long.parseLong(time1); } String time2 = aa[1]; //獲取輸入檔案時間2 boolean authentication1 = new NewsScreening().isNumeric(time2); if (!authentication1) { //輸入第二次的時間 ds = new NewsScreening().timeTransformation(time2); } else { ds = Long.parseLong(time2); } if (ds >= dates && dates >= ts) { new NewsScreening().fileFlow(fileOutputStream1, message); } } } }else if (type.equals("1")) { //輸出全部 new NewsScreening().fileFlow(fileOutputStream1, message); }else{ break; } } } }
生產者程式碼
package JavaKafka; import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import java.util.Properties; import java.io.*; /** * kfaka生產者 */ public class KafkaProducer extends Thread { private String topic; private Producer<Integer, String> producer; public KafkaProducer(String topic) { this.topic = topic; Properties properties = new Properties(); properties.put("metadata.broker.list",KafkaProperties.BROKERLIST); properties.put("serializer.class","kafka.serializer.StringEncoder"); properties.put("request.required.acks","1"); producer = new Producer<Integer, String>(new ProducerConfig(properties)); } @Override public void run() { int messageNo = 1; while(true) { String message = "message_" + messageNo; producer.send(new KeyedMessage<Integer, String>(topic, message)); System.out.println("Sent: " + message); messageNo ++ ; try{ Thread.sleep(2000); } catch (Exception e){ e.printStackTrace(); } } } }
消費者程式碼
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.io.*;
public class KafkaConsumer extends Thread{
public String topic;
public KafkaConsumer(String topic){
this.topic=topic;
}
// public ConsumerConnector createConnector(){
// Properties properties=new Properties();
// properties.put("zookeeper.connect",KafkaProperties.ZK);
// properties.put("group.id",KafkaProperties.GROUP_ID);
// return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
// }
public ConsumerConnector createConnector(String ZK,String GROUP_ID){
Properties properties=new Properties();
properties.put("zookeeper.connect",ZK);
properties.put("group.id",GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
public ConsumerIterator dateOutput(String TOPIC,String ZK,String GROUP_ID ) {
KafkaConsumer kafkaConsumer = new KafkaConsumer(TOPIC);
ConsumerConnector consumer = new KafkaConsumer(TOPIC).createConnector(ZK,GROUP_ID);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(kafkaConsumer.topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0);
//獲取我們每次接收到的資料
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
return iterator;
}
// public void run() {
// ConsumerConnector consumer = createConnector();
//
// Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
// topicCountMap.put(topic, 1);
//// topicCountMap.put(topic2, 1);
//// topicCountMap.put(topic3, 1);
//
// // String: topic
// // List<KafkaStream<byte[], byte[]>> 對應的資料流
// Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
//
// KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);
// //獲取我們每次接收到的資料
//
// ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
// String type=new NewsScreening().output("input_type"); //檔案輸出型別(0.全體輸出 1.篩選輸出 2.不輸出)
// FileOutputStream fileOutputStream = new NewsScreening().createOutput("output_Route");
// //檔案流
// long ts; //第一次輸入時間
// long ds; //第二次輸入時間
//
//
// while (iterator.hasNext()) {
// String message = new String(iterator.next().message());
// long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
// String str = new NewsScreening().output("Screening_conditions");//獲取篩選條件
// if (type.equals("0")) {
// //審計資料以source_mac(源端mac)篩選
// if (str.equals("source_mac")) {
// if (message.split("\\{").length == 4) {
// String source_mac = message.split("\\{")[2].split(":")[3].split("\"")[1];
//
// //String aa[]={"3C-8C-40-0C-4E-10","00-0F-E2-41-5E-5B"};
// String aa[] = new NewsScreening().screeningContent("Screening_content");
// List<String> list = Arrays.asList(aa);
// if (list.contains(source_mac)) {
//
// new NewsScreening().fileFlow(fileOutputStream, message);
//
// }
// }
// } else if (str.equals("user")) {
// //以使用者名稱篩選
// if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
// String user = message.split("\\{")[3].split("\"")[13];
// String aa[] = new NewsScreening().screeningContent("Screening_content");
// List<String> list = Arrays.asList(aa);
// if (list.contains(user)) {
// new NewsScreening().fileFlow(fileOutputStream, message);
// }
// }
// } else if (str.equals("message")) {
// //模糊查詢url
// if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
// String message1 = message.split("\"")[message.split("\"").length - 6];
// String aa[] = new NewsScreening().screeningContent("Screening_content");
// String url = aa[0];
// if (message1.contains(url)) {
// new NewsScreening().fileFlow(fileOutputStream, message);
//
// }
// }
// } else if (str.equals("time")) {
// //篩選時間戳範圍
// if (message.split("\\{").length == 4) {
// String time = message.split("\\{")[1].split(":")[5].split(",")[0];
// String aa[] = new NewsScreening().screeningContent("Screening_content");
// if (aa.length == 1) {
// String time1 = aa[0]; //獲取輸入檔案時間
// boolean authentication = new NewsScreening().isNumeric(time1);
// if (authentication) {
// ts = Long.parseLong(time1);//輸入時間
// } else {
// ts = new NewsScreening().timeTransformation(time1);
// }
// if (ts >= dates) {
//
// new NewsScreening().fileFlow(fileOutputStream, message);
// }
// } else {
// String time1 = aa[0]; //獲取輸入檔案時間1
// boolean authentication = new NewsScreening().isNumeric(time1);
// if (!authentication) {
// //輸入第一次的時間
// ts = new NewsScreening().timeTransformation(time1);
// } else {
// ts = Long.parseLong(time1);
// }
// String time2 = aa[1]; //獲取輸入檔案時間2
// boolean authentication1 = new NewsScreening().isNumeric(time2);
// if (!authentication1) {
// //輸入第二次的時間
// ds = new NewsScreening().timeTransformation(time2);
// } else {
// ds = Long.parseLong(time2);
// }
// if (ds >= dates && dates >= ts) {
// new NewsScreening().fileFlow(fileOutputStream, message);
//
// }
//
// }
//
// }
// }
//
//
// }else if (type.equals("1")) {
// //輸出全部
// new NewsScreening().fileFlow(fileOutputStream, message);
// }else{
// break;
// }
//
// }
//
//
//
//
//
//}
}
連結程式碼
/**
* Kafka常用配置檔案
*/
public class KafkaProperties {
public static final String ZK = "node01:2181";
//任子行審計
public static final String TOPIC = "dess";
//探針
public static final String TOPIC1 = "tanzhen";
public static final String TOPIC2 = "yonghu";
public static final String TOPIC3 = "shenji";
public static final String BROKERLIST = "node01:9093,node01:9094,node01:9095";
public static final String GROUP_ID = "test_group1";
}
二次開發程式碼
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
public class NewsScreening {
//訊息篩選
// File file=new File("E:\\data1\\b.txt");//配置檔案所在路徑
File file=new File("/app/java/b.txt");//配置檔案所在路徑
BufferedReader reader=null;
String tempString= null; //每行的資料
String filter_file=null; //檔案篩選內容
String inputType=null; //檔案傳輸型別
int array_long=0; //陣列的長度
//判斷是否是純數字
public boolean isNumeric(String str){
for (int i = 0; i < str.length(); i++){
if (!Character.isDigit(str.charAt(i))){
return false;
}
}
return true;
}
//將標準時間格式轉化為時間戳
public long timeTransformation(String string){
long ts=0;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dt = simpleDateFormat.parse(string);
ts = dt.getTime()/1000;
} catch (Exception e) {
e.printStackTrace();
}
return ts;
}
//檔案流寫入
public void fileFlow(FileOutputStream fileOutputStream,String string){
try {
// System.out.println(string);
fileOutputStream.write(string.getBytes());
// fileOutputStream.write("\r\n".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
//建立輸出檔案
public FileOutputStream createOutput(String string){
FileOutputStream fileOutputStream = null;//檔案流
try {
fileOutputStream = new FileOutputStream(new File(new NewsScreening().output(string)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return fileOutputStream;
}
//讀取配置檔案
public String output(String string){
try {
reader = new BufferedReader(new FileReader(file));
while ((tempString = reader.readLine()) != null) {
if(tempString.startsWith(string)){
inputType=tempString.split("=")[1].trim();
}
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return inputType;
}
//返回篩選內容
public String[] screeningContent(String string){
try {
reader = new BufferedReader(new FileReader(file));
while ((tempString = reader.readLine()) != null) {
if(tempString.startsWith(string)){
filter_file=tempString.split("=")[1].trim();
array_long = filter_file.split(",").length;
}
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
String[] demo=new String[array_long];
for(int a=0;a<array_long;a++){
demo[a]= filter_file.split(",")[a];
}
return demo;
}
// public static void main(String[] args) {
// String bb=new NewsScreening().outputRoute();
// System.out.println(bb);
// }
public long fileSize(String string){
//傳遞檔案目錄,返回檔案大小
long s=0;
File file=new File(string);
if(file.exists()){
try {
FileInputStream fis= new FileInputStream(file);
try {
s= fis.available();
fis.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
try {
//檔案不存在,建立檔案
file.createNewFile();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return s;
}
}
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class yonghu {
public static void main(String[] args) {
// KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaProperties.TOPIC2);
// ConsumerConnector consumer = new KafkaConsumer(KafkaProperties.TOPIC2).createConnector();
// Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
// topicCountMap.put(kafkaConsumer.topic, 1);
// Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
//
// KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0);
// //獲取我們每次接收到的資料
//
// ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC2).dateOutput(KafkaProperties.TOPIC2,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID);
// FileOutputStream fileOutputStream=new NewsScreening().createOutput("yonghu_output_Route");
// FileOutputStream fileOutputStream = new NewsScreening().createOutput("yonghu_output_Route");
long ts; //第一次輸入時間
long ds; //第二次輸入時間
String strings = new NewsScreening().output("yonghu_output_Route");//讀取輸出檔案
File file = new File(strings);
Date date=new Date();
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
String time4=simpleDateFormat1.format(date);
File file3=file;
File file4=file3.getParentFile();//獲取輸入目錄
String name=file3.getName();//獲取輸出名字
String path=file4.toString();
String path1="";
// String path2=path+"\\"+time4+"\\"+name;
// File files=new File(path+"\\"+time4);
String path2=path+"/"+time4+"/"+name;
File files=new File(path+"/"+time4);
if(!files.exists()){
files.mkdirs();
}
FileOutputStream fileOutputStream1= null;
try {
fileOutputStream1 = new FileOutputStream(new File(path2));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
while (iterator.hasNext()) {
Date date1=new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss");
String times=simpleDateFormat.format(date1);
String time3=simpleDateFormat1.format(date);//時間年月日
// File file5=new File(path+"\\"+time3);
File file5=new File(path+"/"+time3);
if(file5 !=null &&file5.exists()){
// path1=path+"\\"+time3+"\\"+name;
path1=path+"/"+time3+"/"+name;
}else{
//建立目錄
file5.mkdirs();
// path1=path+"\\"+time3+"\\"+name;
path1=path+"/"+time3+"/"+name;
}
long size=new NewsScreening().fileSize(path2);
long sizes=Long.parseLong(new NewsScreening().output("yonghu_output_Size"));
if (size >= sizes) {
//獲取輸出路徑
path2=path1+""+times;
try {
fileOutputStream1=new FileOutputStream(path2);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
// long size=new NewsScreening().fileSize(new NewsScreening().output("yonghu_output_Route"));
// if(size>=104857600 ){
//
// }
String message = new String(iterator.next().message());
String type=new NewsScreening().output("yonghu_input_type");//輸出型別選擇
long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
if(type.equals("0")) { //篩選輸出
String string=new NewsScreening().output("yonghu_Screening_conditions");
if(string.equals("phone")){
String phone=message.split("\"")[3];
String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
List<String> list = Arrays.asList(aa);
if(list.contains(phone)){
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}else if(string.equals("sta_mac")){
String sta_mac=message.split("\"")[7];
String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
List<String> list = Arrays.asList(aa);
if(list.contains(sta_mac)){
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}else if(string.equals("ap_mac")){
String ap_mac=message.split("\"")[9];
String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
List<String> list = Arrays.asList(aa);
if(list.contains(ap_mac)){
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}else if(string.equals("time")){
String time=message.split("\"")[1];
String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content");
if (aa.length == 1) {
String time1 = aa[0]; //獲取輸入檔案時間
boolean authentication = new NewsScreening().isNumeric(time1);
if (authentication) {
ts = Long.parseLong(time1);//輸入時間
} else {
ts = new NewsScreening().timeTransformation(time1);
}
if (ts >= dates) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}else {
String time1 = aa[0]; //獲取輸入檔案時間1
boolean authentication = new NewsScreening().isNumeric(time1);
if (!authentication) {
//輸入第一次的時間
ts = new NewsScreening().timeTransformation(time1);
} else {
ts = Long.parseLong(time1);
}
String time2 = aa[1]; //獲取輸入檔案時間2
boolean authentication1 = new NewsScreening().isNumeric(time2);
if (!authentication1) {
//輸入第二次的時間
ds = new NewsScreening().timeTransformation(time2);
} else {
ds = Long.parseLong(time2);
}
if (ds >= dates && dates >= ts) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}
}
}else if(type.equals("1")){
new NewsScreening().fileFlow(fileOutputStream1, message);
}else{
break;
}
}
}
}
import kafka.consumer.ConsumerIterator;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
public class shenji {
public static void main(String[] args) {
ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC).dateOutput(KafkaProperties.TOPIC,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID);
// FileOutputStream fileOutputStream=new NewsScreening().createOutput("output_Route");
long ts; //第一次輸入時間
long ds; //第二次輸入時間
String strings = new NewsScreening().output("output_Route");//讀取輸出檔案
File file = new File(strings);
Date date=new Date();
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
String time4=simpleDateFormat1.format(date);
File file3=file;
File file4=file3.getParentFile();//獲取輸入目錄
String name=file3.getName();//獲取輸出名字
String path=file4.toString();
String path1="";
// String path2=path+"\\"+time4+"\\"+name;
// File files=new File(path+"\\"+time4);
String path2=path+"/"+time4+"/"+name;
File files=new File(path+"/"+time4);
if(!files.exists()){
files.mkdirs();
}
FileOutputStream fileOutputStream1= null;
try {
fileOutputStream1 = new FileOutputStream(new File(path2));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
String type=new NewsScreening().output("input_type"); //檔案輸出型別(0.全體輸出 1.篩選輸出 2.不輸出)
while (iterator.hasNext()) {
Date date1=new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss");
String times=simpleDateFormat.format(date1);
String time3=simpleDateFormat1.format(date);//時間年月日
// File file5=new File(path+"\\"+time3);
File file5=new File(path+"/"+time3);
if(file5 !=null &&file5.exists()){
// path1=path+"\\"+time3+"\\"+name;
path1=path+"/"+time3+"/"+name;
}else{
//建立目錄
file5.mkdirs();
// path1=path+"\\"+time3+"\\"+name;
path1=path+"/"+time3+"/"+name;
}
long size=new NewsScreening().fileSize(path2);
long sizes=Long.parseLong(new NewsScreening().output("output_Size"));
if (size >= sizes) {
//獲取輸出路徑
path2=path1+""+times;
try {
fileOutputStream1=new FileOutputStream(path2);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
String message = new String(iterator.next().message());
long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
String str = new NewsScreening().output("Screening_conditions");//獲取篩選條件
if (type.equals("0")) {
//審計資料以source_mac(源端mac)篩選
if (str.equals("source_mac")) {
if (message.split("\\{").length == 4) {
String source_mac = message.split("\\{")[2].split(":")[3].split("\"")[1];
//String aa[]={"3C-8C-40-0C-4E-10","00-0F-E2-41-5E-5B"};
String aa[] = new NewsScreening().screeningContent("Screening_content");
List<String> list = Arrays.asList(aa);
if (list.contains(source_mac)) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}
} else if (str.equals("user")) {
//以使用者名稱篩選
if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
String user = message.split("\\{")[3].split("\"")[13];
String aa[] = new NewsScreening().screeningContent("Screening_content");
List<String> list = Arrays.asList(aa);
if (list.contains(user)) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}
} else if (str.equals("message")) {
//模糊查詢url
if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
String message1 = message.split("\"")[message.split("\"").length - 6];
String aa[] = new NewsScreening().screeningContent("Screening_content");
String url = aa[0];
if (message1.contains(url)) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}
} else if (str.equals("time")) {
//篩選時間戳範圍
if (message.split("\\{").length == 4) {
String time = message.split("\\{")[1].split(":")[5].split(",")[0];
String aa[] = new NewsScreening().screeningContent("Screening_content");
if (aa.length == 1) {
String time1 = aa[0]; //獲取輸入檔案時間
boolean authentication = new NewsScreening().isNumeric(time1);
if (authentication) {
ts = Long.parseLong(time1);//輸入時間
} else {
ts = new NewsScreening().timeTransformation(time1);
}
if (ts >= dates) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
} else {
String time1 = aa[0]; //獲取輸入檔案時間1
boolean authentication = new NewsScreening().isNumeric(time1);
if (!authentication) {
//輸入第一次的時間
ts = new NewsScreening().timeTransformation(time1);
} else {
ts = Long.parseLong(time1);
}
String time2 = aa[1]; //獲取輸入檔案時間2
boolean authentication1 = new NewsScreening().isNumeric(time2);
if (!authentication1) {
//輸入第二次的時間
ds = new NewsScreening().timeTransformation(time2);
} else {
ds = Long.parseLong(time2);
}
if (ds >= dates && dates >= ts) {
new NewsScreening().fileFlow(fileOutputStream1, message);
}
}
}
}
}else if (type.equals("1")) {
//輸出全部
new NewsScreening().fileFlow(fileOutputStream1, message);
}else{
break;
}
}
}
}