1. 程式人生 > >kafka二次開發程式碼例項

kafka二次開發程式碼例項

配置檔案b.txt

#選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料)
input_type = 1
#篩選條件 source_mac(多條檢索)、user(多條檢索)、message(模糊查詢) #time(1.當輸入一個時間 格式:yyyy-MM-dd-HH:mm:ss 或者輸入一個時間戳[秒級時間戳11位] 當輸入時間小於等於當前時間不會收集資料 當輸入時間大於當前時間將收集當前時間到輸入時間資料 2.當輸入二個時間將篩選倆時間段間資料)
Screening_conditions = message
#篩選內容
Screening_content = www
#輸入左面時間小,右面時間大2017-12-06 12:12:12,2017-12-06 15:12:12
#資料輸出路徑
output_Route = E:\data1\shenji.txt
資料輸出檔案大小(位元組) 1M=1048576B 1G=1073741824B
output_Size = 150000
----------------------------------------
#探針資料配置
#選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料)
tanzhen_input_type = 1
#篩選條件 ap_mac(裝置mac) sta_mac(使用者mac) time(時間)
tanzhen_Screening_conditions = ap_mac
#篩選內容
tanzhen_Screening_content = 60:cd:a9:01:1f:db,60:cd:a9:01:0e:89
tanzhen_output_Route=E:\data1\tanzhen
tanzhen_output_Size = 15000000
-----------------------------------------
#使用者資料配置
#選擇輸出型別(0.篩選輸出 1.全體輸出 2.不輸出資料))
yonghu_input_type = 1
#篩選條件 ap_mac(裝置mac) sta_mac(使用者mac) time(時間) phone(手機等驗證資訊)
yonghu_Screening_conditions = time
#篩選內容
yonghu_Screening_content =	1111
yonghu_output_Route=E:\data1\yonghu.txt
yonghu_output_Size = 150000

demo.java

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;

public class Demo  {
//探針資料

    public static void main(String[] args) {
//
//        KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaProperties.TOPIC1);
//        ConsumerConnector consumer=new KafkaConsumer(KafkaProperties.TOPIC1).createConnector();
//        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//        topicCountMap.put(kafkaConsumer.topic, 1);
//        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =  consumer.createMessageStreams(topicCountMap);
//
//        KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0);
//        //獲取我們每次接收到的資料
//
//        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC1).dateOutput(KafkaProperties.TOPIC1,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID);
     //   FileOutputStream fileOutputStream=new NewsScreening().createOutput("tanzhen_output_Route");
        long ts; //第一次輸入時間
        long ds;  //第二次輸入時間


        String strings = new NewsScreening().output("tanzhen_output_Route");//讀取輸出檔案
        File file = new File(strings);
        Date date=new Date();
        SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
        String time4=simpleDateFormat1.format(date);
        File file3=file;
        File file4=file3.getParentFile();//獲取輸入目錄
        String name=file3.getName();//獲取輸出名字
        String path=file4.toString();
        String path1="";
//        String path2=path+"\\"+time4+"\\"+name;
//        File files=new File(path+"\\"+time4);
        String path2=path+"/"+time4+"/"+name;
        File files=new File(path+"/"+time4);
        if(!files.exists()){
            files.mkdirs();
        }

        FileOutputStream fileOutputStream1= null;

        try {
            fileOutputStream1 = new FileOutputStream(new File(path2));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }





        while (iterator.hasNext()) {

            Date date1=new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss");
            String times=simpleDateFormat.format(date1);



            String time3=simpleDateFormat1.format(date);//時間年月日
//            File file5=new File(path+"\\"+time3);
            File file5=new File(path+"/"+time3);
            if(file5 !=null &&file5.exists()){
//                path1=path+"\\"+time3+"\\"+name;
                path1=path+"/"+time3+"/"+name;
            }else{
                //建立目錄
                file5.mkdirs();
//                path1=path+"\\"+time3+"\\"+name;
                path1=path+"/"+time3+"/"+name;
            }



            long size=new NewsScreening().fileSize(path2);
            long sizes=Long.parseLong(new NewsScreening().output("tanzhen_output_Size"));
            if (size >= sizes) {

                //獲取輸出路徑
                path2=path1+""+times;

                try {
                    fileOutputStream1=new FileOutputStream(path2);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }

                try {
                    file.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }







            String message = new String(iterator.next().message());
            String type=new NewsScreening().output("tanzhen_input_type");//輸出型別選擇
            long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
            if(type.equals("0")){  //篩選輸出

              String string=new NewsScreening().output("tanzhen_Screening_conditions");  //選擇篩選條件
                if(string.equals("ap_mac")){
                    String ap_mac=message.split("\"")[1];
                    String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content"); //篩選內容
                    List<String> list = Arrays.asList(aa);
                    if(list.contains(ap_mac)){
                        new NewsScreening().fileFlow(fileOutputStream1, message);
                    }
                }else if(string.equals("sta_mac")){
                    String sta_mac=message.split("\"")[3];
                    String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content"); //篩選內容
                    List<String> list = Arrays.asList(aa);
                    if(list.contains(sta_mac)){
                        new NewsScreening().fileFlow(fileOutputStream1, message);
                    }
                }else if(string.equals("time")) {
                    String time = message.split("\"")[19];
                    String aa[] = new NewsScreening().screeningContent("tanzhen_Screening_content");
                    if (aa.length == 1) {
                        String time1 = aa[0]; //獲取輸入檔案時間
                        boolean authentication = new NewsScreening().isNumeric(time1);
                        if (authentication) {
                            ts = Long.parseLong(time1);//輸入時間
                        } else {
                            ts = new NewsScreening().timeTransformation(time1);
                        }
                        if (ts >= dates) {

                            new NewsScreening().fileFlow(fileOutputStream1, message);
                        }
                    } else {
                        String time1 = aa[0]; //獲取輸入檔案時間1
                        boolean authentication = new NewsScreening().isNumeric(time1);
                        if (!authentication) {
                            //輸入第一次的時間
                            ts = new NewsScreening().timeTransformation(time1);
                        } else {
                            ts = Long.parseLong(time1);
                        }
                        String time2 = aa[1]; //獲取輸入檔案時間2
                        boolean authentication1 = new NewsScreening().isNumeric(time2);
                        if (!authentication1) {
                            //輸入第二次的時間
                            ds = new NewsScreening().timeTransformation(time2);
                        } else {
                            ds = Long.parseLong(time2);
                        }
                        if (ds >= dates && dates >= ts) {
                            new NewsScreening().fileFlow(fileOutputStream1, message);

                        }
                    }
                }
            }else if (type.equals("1")) {
                //輸出全部
                new NewsScreening().fileFlow(fileOutputStream1, message);
            }else{
                break;
            }
        }
    }

}

生產者程式碼

package JavaKafka;

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import java.io.*;
/**
 * kfaka生產者
 */

public class KafkaProducer extends Thread  {
    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();

        properties.put("metadata.broker.list",KafkaProperties.BROKERLIST);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }


    @Override
    public void run() {

        int messageNo = 1;

        while(true) {
            String message = "message_" + messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent: " + message);

            messageNo ++ ;

            try{
                Thread.sleep(2000);
            } catch (Exception e){
                e.printStackTrace();
            }
        }

    }
}

消費者程式碼

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.io.*;


public class KafkaConsumer extends Thread{
    public String topic;


    public KafkaConsumer(String topic){

        this.topic=topic;
    }

//    public ConsumerConnector createConnector(){
//        Properties properties=new Properties();
//        properties.put("zookeeper.connect",KafkaProperties.ZK);
//        properties.put("group.id",KafkaProperties.GROUP_ID);
//        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//    }

    public ConsumerConnector createConnector(String ZK,String GROUP_ID){
        Properties properties=new Properties();
        properties.put("zookeeper.connect",ZK);
        properties.put("group.id",GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    public ConsumerIterator dateOutput(String TOPIC,String ZK,String GROUP_ID ) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(TOPIC);
        ConsumerConnector consumer = new KafkaConsumer(TOPIC).createConnector(ZK,GROUP_ID);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(kafkaConsumer.topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0);
        //獲取我們每次接收到的資料

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        return iterator;
    }
//    public void run() {
//        ConsumerConnector consumer = createConnector();
//
//        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//        topicCountMap.put(topic, 1);
////        topicCountMap.put(topic2, 1);
////        topicCountMap.put(topic3, 1);
//
//        // String: topic
//        // List<KafkaStream<byte[], byte[]>>  對應的資料流
//        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =  consumer.createMessageStreams(topicCountMap);
//
//        KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);
//        //獲取我們每次接收到的資料
//
//        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
//        String type=new NewsScreening().output("input_type"); //檔案輸出型別(0.全體輸出 1.篩選輸出 2.不輸出)
//        FileOutputStream fileOutputStream = new NewsScreening().createOutput("output_Route");
//        //檔案流
//        long ts; //第一次輸入時間
//        long ds;  //第二次輸入時間
//
//
//        while (iterator.hasNext()) {
//        String message = new String(iterator.next().message());
//        long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
//        String str = new NewsScreening().output("Screening_conditions");//獲取篩選條件
//        if (type.equals("0")) {
//            //審計資料以source_mac(源端mac)篩選
//            if (str.equals("source_mac")) {
//                if (message.split("\\{").length == 4) {
//                    String source_mac = message.split("\\{")[2].split(":")[3].split("\"")[1];
//
//                    //String aa[]={"3C-8C-40-0C-4E-10","00-0F-E2-41-5E-5B"};
//                    String aa[] = new NewsScreening().screeningContent("Screening_content");
//                    List<String> list = Arrays.asList(aa);
//                    if (list.contains(source_mac)) {
//
//                        new NewsScreening().fileFlow(fileOutputStream, message);
//
//                    }
//                }
//            } else if (str.equals("user")) {
//                //以使用者名稱篩選
//                if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
//                    String user = message.split("\\{")[3].split("\"")[13];
//                    String aa[] = new NewsScreening().screeningContent("Screening_content");
//                    List<String> list = Arrays.asList(aa);
//                    if (list.contains(user)) {
//                        new NewsScreening().fileFlow(fileOutputStream, message);
//                    }
//                }
//            } else if (str.equals("message")) {
//                //模糊查詢url
//                if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
//                    String message1 = message.split("\"")[message.split("\"").length - 6];
//                    String aa[] = new NewsScreening().screeningContent("Screening_content");
//                    String url = aa[0];
//                    if (message1.contains(url)) {
//                        new NewsScreening().fileFlow(fileOutputStream, message);
//
//                    }
//                }
//            } else if (str.equals("time")) {
//                //篩選時間戳範圍
//                if (message.split("\\{").length == 4) {
//                    String time = message.split("\\{")[1].split(":")[5].split(",")[0];
//                    String aa[] = new NewsScreening().screeningContent("Screening_content");
//                    if (aa.length == 1) {
//                        String time1 = aa[0]; //獲取輸入檔案時間
//                        boolean authentication = new NewsScreening().isNumeric(time1);
//                        if (authentication) {
//                            ts = Long.parseLong(time1);//輸入時間
//                        } else {
//                            ts = new NewsScreening().timeTransformation(time1);
//                        }
//                        if (ts >= dates) {
//
//                            new NewsScreening().fileFlow(fileOutputStream, message);
//                        }
//                    } else {
//                        String time1 = aa[0]; //獲取輸入檔案時間1
//                        boolean authentication = new NewsScreening().isNumeric(time1);
//                        if (!authentication) {
//                            //輸入第一次的時間
//                            ts = new NewsScreening().timeTransformation(time1);
//                        } else {
//                            ts = Long.parseLong(time1);
//                        }
//                        String time2 = aa[1]; //獲取輸入檔案時間2
//                        boolean authentication1 = new NewsScreening().isNumeric(time2);
//                        if (!authentication1) {
//                            //輸入第二次的時間
//                            ds = new NewsScreening().timeTransformation(time2);
//                        } else {
//                            ds = Long.parseLong(time2);
//                        }
//                        if (ds >= dates && dates >= ts) {
//                            new NewsScreening().fileFlow(fileOutputStream, message);
//
//                        }
//
//                    }
//
//                }
//            }
//
//
//        }else if (type.equals("1")) {
//            //輸出全部
//            new NewsScreening().fileFlow(fileOutputStream, message);
//        }else{
//            break;
//        }
//
//    }
//
//
//
//
//
//}


}

連結程式碼

/**
 * Kafka常用配置檔案
 */

public class KafkaProperties {
    public static final String ZK = "node01:2181";
    //任子行審計
    public static final String TOPIC = "dess";
    //探針
    public static final String TOPIC1 = "tanzhen";

    public static final String TOPIC2 = "yonghu";

    public static final String TOPIC3 = "shenji";

    public static final String BROKERLIST = "node01:9093,node01:9094,node01:9095";

    public static final String GROUP_ID = "test_group1";
}

二次開發程式碼

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

public class NewsScreening {
    //訊息篩選
 //   File file=new File("E:\\data1\\b.txt");//配置檔案所在路徑
    File file=new File("/app/java/b.txt");//配置檔案所在路徑
    BufferedReader reader=null;
    String tempString= null;    //每行的資料
    String filter_file=null;    //檔案篩選內容
    String inputType=null;           //檔案傳輸型別
    int array_long=0;           //陣列的長度


    //判斷是否是純數字
    public  boolean isNumeric(String str){
        for (int i = 0; i < str.length(); i++){
            if (!Character.isDigit(str.charAt(i))){
                return false;
            }
        }
        return true;
    }
    //將標準時間格式轉化為時間戳

    public long timeTransformation(String string){
        long ts=0;
        try {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date dt = simpleDateFormat.parse(string);
            ts = dt.getTime()/1000;

        } catch (Exception e) {
            e.printStackTrace();
        }
        return ts;
    }
//檔案流寫入
    public void fileFlow(FileOutputStream fileOutputStream,String string){

        try {
  //          System.out.println(string);
            fileOutputStream.write(string.getBytes());
 //           fileOutputStream.write("\r\n".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //建立輸出檔案
    public FileOutputStream createOutput(String string){
        FileOutputStream fileOutputStream = null;//檔案流

        try {
            fileOutputStream = new FileOutputStream(new File(new NewsScreening().output(string)));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        return fileOutputStream;
    }

    //讀取配置檔案
    public String output(String string){

        try {
            reader = new BufferedReader(new FileReader(file));
            while ((tempString = reader.readLine()) != null) {
                if(tempString.startsWith(string)){
                    inputType=tempString.split("=")[1].trim();
                }
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return inputType;
    }

    //返回篩選內容
    public String[] screeningContent(String string){
        try {
            reader = new BufferedReader(new FileReader(file));
            while ((tempString = reader.readLine()) != null) {

                if(tempString.startsWith(string)){
                    filter_file=tempString.split("=")[1].trim();
                    array_long = filter_file.split(",").length;
                }
            }
            reader.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
        String[] demo=new String[array_long];
        for(int a=0;a<array_long;a++){
            demo[a]= filter_file.split(",")[a];
        }
        return demo;
    }

//    public static void main(String[] args) {
//        String bb=new NewsScreening().outputRoute();
//        System.out.println(bb);
//        }



public long fileSize(String string){
  //傳遞檔案目錄,返回檔案大小
    long s=0;
    File file=new File(string);
    if(file.exists()){
        try {
            FileInputStream fis= new FileInputStream(file);

            try {
                s= fis.available();
                fis.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }else{
        try {
            //檔案不存在,建立檔案
            file.createNewFile();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    return s;
}



}
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

public class yonghu  {
    public static void main(String[] args) {

//        KafkaConsumer kafkaConsumer = new KafkaConsumer(KafkaProperties.TOPIC2);
//        ConsumerConnector consumer = new KafkaConsumer(KafkaProperties.TOPIC2).createConnector();
//        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//        topicCountMap.put(kafkaConsumer.topic, 1);
//        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
//
//        KafkaStream<byte[], byte[]> stream = messageStream.get(kafkaConsumer.topic).get(0);
//        //獲取我們每次接收到的資料
//
//        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC2).dateOutput(KafkaProperties.TOPIC2,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID);

        //       FileOutputStream fileOutputStream=new NewsScreening().createOutput("yonghu_output_Route");
//        FileOutputStream fileOutputStream = new NewsScreening().createOutput("yonghu_output_Route");
        long ts; //第一次輸入時間
        long ds;  //第二次輸入時間



        String strings = new NewsScreening().output("yonghu_output_Route");//讀取輸出檔案
        File file = new File(strings);
        Date date=new Date();
        SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
        String time4=simpleDateFormat1.format(date);
        File file3=file;
        File file4=file3.getParentFile();//獲取輸入目錄
        String name=file3.getName();//獲取輸出名字
        String path=file4.toString();
       String path1="";
//       String path2=path+"\\"+time4+"\\"+name;
//       File files=new File(path+"\\"+time4);

        String path2=path+"/"+time4+"/"+name;
        File files=new File(path+"/"+time4);

        if(!files.exists()){
            files.mkdirs();
        }

        FileOutputStream fileOutputStream1= null;

        try {
            fileOutputStream1 = new FileOutputStream(new File(path2));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }


        while (iterator.hasNext()) {

            Date date1=new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss");
            String times=simpleDateFormat.format(date1);



            String time3=simpleDateFormat1.format(date);//時間年月日
//            File file5=new File(path+"\\"+time3);
            File file5=new File(path+"/"+time3);

            if(file5 !=null &&file5.exists()){
//                path1=path+"\\"+time3+"\\"+name;

                path1=path+"/"+time3+"/"+name;
            }else{
                //建立目錄
                file5.mkdirs();
//                path1=path+"\\"+time3+"\\"+name;
                path1=path+"/"+time3+"/"+name;
            }



            long size=new NewsScreening().fileSize(path2);
            long sizes=Long.parseLong(new NewsScreening().output("yonghu_output_Size"));
            if (size >= sizes) {

                //獲取輸出路徑
                path2=path1+""+times;

                try {
                    fileOutputStream1=new FileOutputStream(path2);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }

                try {
                    file.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
//            long size=new NewsScreening().fileSize(new NewsScreening().output("yonghu_output_Route"));
//            if(size>=104857600 ){
//
//            }

            String message = new String(iterator.next().message());
            String type=new NewsScreening().output("yonghu_input_type");//輸出型別選擇
            long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
            if(type.equals("0")) {  //篩選輸出
                String string=new NewsScreening().output("yonghu_Screening_conditions");
                if(string.equals("phone")){
                    String phone=message.split("\"")[3];
                    String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
                    List<String> list = Arrays.asList(aa);
                    if(list.contains(phone)){
                        new NewsScreening().fileFlow(fileOutputStream1, message);
                    }
                }else if(string.equals("sta_mac")){
                    String sta_mac=message.split("\"")[7];
                    String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
                    List<String> list = Arrays.asList(aa);
                    if(list.contains(sta_mac)){
                      new NewsScreening().fileFlow(fileOutputStream1, message);
                    }
                }else if(string.equals("ap_mac")){
                    String ap_mac=message.split("\"")[9];
                    String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content"); //篩選內容
                    List<String> list = Arrays.asList(aa);
                    if(list.contains(ap_mac)){
                        new NewsScreening().fileFlow(fileOutputStream1, message);
                    }
                }else if(string.equals("time")){
                    String time=message.split("\"")[1];
                    String aa[] = new NewsScreening().screeningContent("yonghu_Screening_content");
                    if (aa.length == 1) {
                        String time1 = aa[0]; //獲取輸入檔案時間
                        boolean authentication = new NewsScreening().isNumeric(time1);
                        if (authentication) {
                            ts = Long.parseLong(time1);//輸入時間
                        } else {
                            ts = new NewsScreening().timeTransformation(time1);
                        }
                        if (ts >= dates) {

                            new NewsScreening().fileFlow(fileOutputStream1, message);
                        }
                    }else {
                        String time1 = aa[0]; //獲取輸入檔案時間1
                        boolean authentication = new NewsScreening().isNumeric(time1);
                        if (!authentication) {
                            //輸入第一次的時間
                            ts = new NewsScreening().timeTransformation(time1);
                        } else {
                            ts = Long.parseLong(time1);
                        }
                        String time2 = aa[1]; //獲取輸入檔案時間2
                        boolean authentication1 = new NewsScreening().isNumeric(time2);
                        if (!authentication1) {
                            //輸入第二次的時間
                            ds = new NewsScreening().timeTransformation(time2);
                        } else {
                            ds = Long.parseLong(time2);
                        }
                        if (ds >= dates && dates >= ts) {
                            new NewsScreening().fileFlow(fileOutputStream1, message);

                        }
                    }
                }
            }else if(type.equals("1")){
                new NewsScreening().fileFlow(fileOutputStream1, message);
            }else{
                break;
            }
        }
    }
}
import kafka.consumer.ConsumerIterator;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

public class shenji  {
    public static void main(String[] args) {
        ConsumerIterator<byte[], byte[]> iterator=new KafkaConsumer(KafkaProperties.TOPIC).dateOutput(KafkaProperties.TOPIC,new KafkaProperties().ZK, new KafkaProperties().GROUP_ID);
  //      FileOutputStream fileOutputStream=new NewsScreening().createOutput("output_Route");
        long ts; //第一次輸入時間
        long ds;  //第二次輸入時間


        String strings = new NewsScreening().output("output_Route");//讀取輸出檔案
        File file = new File(strings);
        Date date=new Date();
        SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
        String time4=simpleDateFormat1.format(date);
        File file3=file;
        File file4=file3.getParentFile();//獲取輸入目錄
        String name=file3.getName();//獲取輸出名字
        String path=file4.toString();
        String path1="";

//        String path2=path+"\\"+time4+"\\"+name;
//        File files=new File(path+"\\"+time4);
        String path2=path+"/"+time4+"/"+name;
        File files=new File(path+"/"+time4);

        if(!files.exists()){
            files.mkdirs();
        }

        FileOutputStream fileOutputStream1= null;

        try {
            fileOutputStream1 = new FileOutputStream(new File(path2));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }




        String type=new NewsScreening().output("input_type"); //檔案輸出型別(0.全體輸出 1.篩選輸出 2.不輸出)
        while (iterator.hasNext()) {

            Date date1=new Date();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH-mm-ss");
            String times=simpleDateFormat.format(date1);



            String time3=simpleDateFormat1.format(date);//時間年月日
//            File file5=new File(path+"\\"+time3);
            File file5=new File(path+"/"+time3);
            if(file5 !=null &&file5.exists()){
  //              path1=path+"\\"+time3+"\\"+name;
                path1=path+"/"+time3+"/"+name;
            }else{
                //建立目錄
                file5.mkdirs();
  //              path1=path+"\\"+time3+"\\"+name;
                path1=path+"/"+time3+"/"+name;
            }



            long size=new NewsScreening().fileSize(path2);
            long sizes=Long.parseLong(new NewsScreening().output("output_Size"));
            if (size >= sizes) {

                //獲取輸出路徑
                path2=path1+""+times;

                try {
                    fileOutputStream1=new FileOutputStream(path2);
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }

                try {
                    file.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }










            String message = new String(iterator.next().message());
            long dates = System.currentTimeMillis() / 1000; //獲取當前時間戳
            String str = new NewsScreening().output("Screening_conditions");//獲取篩選條件
            if (type.equals("0")) {
                //審計資料以source_mac(源端mac)篩選
                if (str.equals("source_mac")) {
                    if (message.split("\\{").length == 4) {
                        String source_mac = message.split("\\{")[2].split(":")[3].split("\"")[1];

                        //String aa[]={"3C-8C-40-0C-4E-10","00-0F-E2-41-5E-5B"};
                        String aa[] = new NewsScreening().screeningContent("Screening_content");
                        List<String> list = Arrays.asList(aa);
                        if (list.contains(source_mac)) {

                            new NewsScreening().fileFlow(fileOutputStream1, message);

                        }
                    }
                } else if (str.equals("user")) {
                    //以使用者名稱篩選
                    if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
                        String user = message.split("\\{")[3].split("\"")[13];
                        String aa[] = new NewsScreening().screeningContent("Screening_content");
                        List<String> list = Arrays.asList(aa);
                        if (list.contains(user)) {
                            new NewsScreening().fileFlow(fileOutputStream1, message);
                        }
                    }
                } else if (str.equals("message")) {
                    //模糊查詢url
                    if (message.split("\\{").length == 4 && message.split("\\{")[3].split("\"").length == 31) {
                        String message1 = message.split("\"")[message.split("\"").length - 6];
                        String aa[] = new NewsScreening().screeningContent("Screening_content");
                        String url = aa[0];
                        if (message1.contains(url)) {
                            new NewsScreening().fileFlow(fileOutputStream1, message);

                        }
                    }
                } else if (str.equals("time")) {
                    //篩選時間戳範圍
                    if (message.split("\\{").length == 4) {
                        String time = message.split("\\{")[1].split(":")[5].split(",")[0];
                        String aa[] = new NewsScreening().screeningContent("Screening_content");
                        if (aa.length == 1) {
                            String time1 = aa[0]; //獲取輸入檔案時間
                            boolean authentication = new NewsScreening().isNumeric(time1);
                            if (authentication) {
                                ts = Long.parseLong(time1);//輸入時間
                            } else {
                                ts = new NewsScreening().timeTransformation(time1);
                            }
                            if (ts >= dates) {

                                new NewsScreening().fileFlow(fileOutputStream1, message);
                            }
                        } else {
                            String time1 = aa[0]; //獲取輸入檔案時間1
                            boolean authentication = new NewsScreening().isNumeric(time1);
                            if (!authentication) {
                                //輸入第一次的時間
                                ts = new NewsScreening().timeTransformation(time1);
                            } else {
                                ts = Long.parseLong(time1);
                            }
                            String time2 = aa[1]; //獲取輸入檔案時間2
                            boolean authentication1 = new NewsScreening().isNumeric(time2);
                            if (!authentication1) {
                                //輸入第二次的時間
                                ds = new NewsScreening().timeTransformation(time2);
                            } else {
                                ds = Long.parseLong(time2);
                            }
                            if (ds >= dates && dates >= ts) {
                                new NewsScreening().fileFlow(fileOutputStream1, message);

                            }

                        }

                    }
                }


            }else if (type.equals("1")) {
                //輸出全部
                new NewsScreening().fileFlow(fileOutputStream1, message);
            }else{
                break;
            }

        }
    }
}