1. 程式人生 > >基於mongodb的日誌收集模組設計

基於mongodb的日誌收集模組設計

由於log4mongo0.7.4.jar只支援mongodb2.X版本,但是現在的mongodb都已經3.6了。然後我再找找,它的新版本log4mongo0.9.0.jar版本,整合進去發現報錯了,看了一些資料說是自身bug(可能是水平有限,沒找到解決辦法)?因此我就參考網上某篇blog的思路實現了log4j的mongodb整合。

使用jar包情況:mongo-java-driver-3.4.2.jar

擴充套件log4j整合類(繼承AppenderSkeleton)

具體程式碼如下:


import com.alibaba.fastjson.JSONObject;
import com.mongodb.*;
import com.mongodb.client.MongoDatabase;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.ErrorCode;
import org.apache.log4j.spi.LoggingEvent;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class MongoDbAppender extends AppenderSkeleton {
 private final static String DEFAULT_MONGO_DB_HOSTNAME = "localhost";
 private final static String DEFAULT_MONGO_DB_PORT = "27017";
 private final static String DEFAULT_MONGO_DB_DATABASE_NAME = "expmis";
 private final static String DEFAULT_MONGO_DB_COLLECTION_NAME = "log";
 private static final String KEY_TIMESTAMP = "timestamp";
 
 private String hostname = DEFAULT_MONGO_DB_HOSTNAME;
 private String port = DEFAULT_MONGO_DB_PORT;
 private String databaseName = DEFAULT_MONGO_DB_DATABASE_NAME;
 private String collectionName = DEFAULT_MONGO_DB_COLLECTION_NAME;
 private String userName = null;
 
 private String password = null;
 private String writeConcern = null;
 
 private  MongoClient client = null;
 private DBCollection collection = null;
 MongoDatabase db = null;
 
 private WriteConcern concern;
 private boolean initialized = false;
 private MongoDbEnvent mongoDbEnvent = new MongoDbEnvent();; 

 //列印日誌核心方法:abstract protected void append(LoggingEvent event);
 //正常情況下我們只需要覆蓋append方法即可
 @Override
 protected void append(final LoggingEvent loggingEvent) {
  try {
   mongoDbEnvent.setDb(db);
   mongoDbEnvent.setCollectionName(collectionName);
   Object obj=loggingEvent.getMessage();
   if(obj instanceof JSONObject){
    JSONObject message = (JSONObject)loggingEvent.getMessage();
    //message.put(KEY_HOSTNAME, InetAddress.getLocalHost().getHostName());
    //message.put(KEY_IP, InetAddress.getLocalHost().getHostAddress());
    message.put(KEY_TIMESTAMP, new Date(loggingEvent.getTimeStamp()));
    mongoDbEnvent.log(message.toJSONString());
   }else{
    JSONObject message = new JSONObject();
    message.put(KEY_TIMESTAMP, new Date(loggingEvent.getTimeStamp()));
    message.put("content", loggingEvent.getMessage()+"");
    mongoDbEnvent.log(message.toJSONString());
   }
  } catch (Exception e) {
  e.printStackTrace();
  }
 } 

 //釋放資源:public void close()
 @Override
 public void close() {
 mongoDbEnvent.stop();
     if (client != null) {
         collection = null;
         client.close();
     }
 }
 
 //否需要按格式輸出文字:public boolean requiresLayout()
 @Override
 public boolean requiresLayout() {
 //TODO Auto-generated method stub
 return false;
 }
 
 /**
  * @see org.apache.log4j.AppenderSkeleton#activateOptions()
  */
 @Override
 public void activateOptions() {
     try { 
     List<ServerAddress> seeds = new ArrayList<ServerAddress>(); 
         ServerAddress serverAddress = new ServerAddress(hostname, Integer.valueOf(port)); 
       
         seeds.add(serverAddress); 
         if(userName!=null && !"".equals(userName)){
             MongoCredential credentials = MongoCredential.createScramSha1Credential(userName, databaseName, 
               password.toCharArray()); 
                List<MongoCredential> credentialsList = new ArrayList<MongoCredential>(); 
                credentialsList.add(credentials); 
                client = new MongoClient(seeds, credentialsList);
         }else{
         client = new MongoClient(seeds);
         }
         db = client.getDatabase(databaseName);
        
         initialized = true;
     } catch (Exception e) { 
         e.printStackTrace(); 
         errorHandler.error("Unexpected exception while initialising MongoDbAppender.", e,
                 ErrorCode.GENERIC_FAILURE);
        
     } 
 }


 /*
  * This method could be overridden to provide the DB instance from an existing connection.
  */
 protected DB getDatabase(Mongo mongo, String databaseName) {
     return mongo.getDB(databaseName);
 }
 
 /*
  * This method could be overridden to provide the Mongo instance from an existing connection.
  */
 protected Mongo getMongo(List<ServerAddress> addresses) {
     if (addresses.size() < 2) {
         return new Mongo(addresses.get(0));
     } else {
         // Replica set
         return new Mongo(addresses);
     }
 }
 
 /**
  * Note: this method is primarily intended for use by the unit tests.
  *
  * @param collection
  *            The MongoDB collection to use when logging events.
  */
 public void setCollection(final DBCollection collection) {
     assert collection != null : "collection must not be null";
    
     this.collection = collection;
 }
 
 /**
  * @return The hostname of the MongoDB server <i>(will not be null, empty or blank)</i>.
  */
 public String getHostname() {
     return hostname;
 }
 
 /**
  * @param hostname
  *            The MongoDB hostname to set <i>(must not be null, empty or blank)</i>.
  */
 public void setHostname(final String hostname) {
     assert hostname != null : "hostname must not be null";
     assert hostname.trim().length() > 0 : "hostname must not be empty or blank";
 
 
     this.hostname = hostname;
 }
 
 /**
  * @return The port of the MongoDB server <i>(will be > 0)</i>.
  */
 public String getPort() {
     return port;
 }
 
 /**
  * @param port
  *            The port to set <i>(must not be null, empty or blank)</i>.
  */
 public void setPort(final String port) {
     assert port != null : "port must not be null";
     assert port.trim().length() > 0 : "port must not be empty or blank";
 
 
     this.port = port;
 }
 
 
 /**
  * @return The database used in the MongoDB server <i>(will not be null, empty or blank)</i>.
  */
 public String getDatabaseName() {
     return databaseName;
 }
 
 
 /**
  * @param databaseName
  *            The database to use in the MongoDB server <i>(must not be null, empty or
  *            blank)</i>.
  */
 public void setDatabaseName(final String databaseName) {
     assert databaseName != null : "database must not be null";
     assert databaseName.trim().length() > 0 : "database must not be empty or blank";
 
 
     this.databaseName = databaseName;
 }
 
 /**
  * @return The collection used within the database in the MongoDB server <i>(will not be null,
  *         empty or blank)</i>.
  */
 public String getCollectionName() {
     return collectionName;
 }
 
 /**
  * @param collectionName
  *            The collection used within the database in the MongoDB server <i>(must not be
  *            null, empty or blank)</i>.
  */
 public void setCollectionName(final String collectionName) {
 
     assert collectionName != null : "collection must not be null";
     assert collectionName.trim().length() > 0 : "collection must not be empty or blank";
 
 
     this.collectionName = collectionName;
 }
 
 
 /**
  * @return The userName used to authenticate with MongoDB <i>(may be null)</i>.
  */
 public String getUserName() {
     return userName;
 }
 
 
 /**
  * @param userName
  *            The userName to use when authenticating with MongoDB <i>(may be null)</i>.
  */
 public void setUserName(final String userName) {
     this.userName = userName;
 }
 
 
 /**
  * @param password
  *            The password to use when authenticating with MongoDB <i>(may be null)</i>.
  */
 public void setPassword(final String password) {
     this.password = password;
 }
 
 /**
  * @return the writeConcern setting for Mongo.
  */
 public String getWriteConcern() {
 return writeConcern;
 }
 
 /**
  * @param writeConcern
  * The WriteConcern setting for Mongo.<i>(may be null). If null, set to default of dbCollection's writeConcern.</i>
  */
 public void setWriteConcern(final String writeConcern) {
 this.writeConcern = writeConcern;
 concern = WriteConcern.valueOf(writeConcern);
 }
 
 public WriteConcern getConcern() {
 if (concern == null) {
 concern = getCollection().getWriteConcern();
 }
 return concern;
 }
 
 /**
  * Returns true if appender was successfully initialized. If this method returns false, the
  * appender should not attempt to log events.
  *
  * @return true if appender was successfully initialized
  */
 public boolean isInitialized() {
     return initialized;
 }
 
 
 /**
  *
  * @return The MongoDB collection to which events are logged.
  */
 protected DBCollection getCollection() {
     return collection;
 }
 
 
 /**
  * Returns a List of ServerAddress objects for each host specified in the hostname property.
  * Returns an empty list if configuration is detected to be invalid, e.g.:
  * <ul>
  * <li>Port property doesn't contain either one port or one port per host</li>
  * <li>After parsing port property to integers, there isn't either one port or one port per host
  * </li>
  * </ul>
  *
  * @param hostname
  *            Blank space delimited hostnames
  * @param port
  *            Blank space delimited ports. Must specify one port for all hosts or a port per
  *            host.
  * @return List of ServerAddresses to connect to
  */
 private List<ServerAddress> getServerAddresses(String hostname, String port) {
     List<ServerAddress> addresses = new ArrayList<ServerAddress>();
 
 
     String[] hosts = hostname.split(" ");
     String[] ports = port.split(" ");
 
 
     if (ports.length != 1 && ports.length != hosts.length) {
         errorHandler.error(
                 "MongoDB appender port property must contain one port or a port per host",
                 null, ErrorCode.ADDRESS_PARSE_FAILURE);
     } else {
         List<Integer> portNums = getPortNums(ports);
         // Validate number of ports again after parsing
         if (portNums.size() != 1 && portNums.size() != hosts.length) {
             errorHandler
                     .error("MongoDB appender port property must contain one port or a valid port per host",
                             null, ErrorCode.ADDRESS_PARSE_FAILURE);
         } else {
             boolean onePort = (portNums.size() == 1);
 
 
             int i = 0;
             for (String host : hosts) {
                 int portNum = (onePort) ? portNums.get(0) : portNums.get(i);
                 try {
                     addresses.add(new ServerAddress(host.trim(), portNum));
                 } catch (Exception e) {
                     errorHandler.error(
                             "MongoDB appender hostname property contains unknown host", e,
                             ErrorCode.ADDRESS_PARSE_FAILURE);
                 }
                 i++;
             }
         }
     }
     return addresses;
 }
 
 private List<Integer> getPortNums(String[] ports) {
     List<Integer> portNums = new ArrayList<Integer>();
 
 
     for (String port : ports) {
         try {
             Integer portNum = Integer.valueOf(port.trim());
             if (portNum < 0) {
                 errorHandler.error(
                         "MongoDB appender port property can't contain a negative integer",
                         null, ErrorCode.ADDRESS_PARSE_FAILURE);
             } else {
                 portNums.add(portNum);
             }
         } catch (NumberFormatException e) {
             errorHandler.error(
                     "MongoDB appender can't parse a port property value into an integer", e,
                     ErrorCode.ADDRESS_PARSE_FAILURE);
         }
 
 
     }
     return portNums;
 }
}

使用執行緒阻塞佇列往mongodb插入記錄,參考程式碼如下:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.bson.Document;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;


public class MongoDbEnvent {

    private final BlockingQueue<String> queue;
    private final LogThread logThread = new LogThread();
   
    private boolean isShutdown = false;
    private int reservations;
   
    private static final String defaultFormat = "yyyy-MM-dd";

    private String collectionName = "log";
   


    MongoDatabase db = null;
   
   
    public MongoDatabase getDb() {
     return db;
 }
 
 public void setDb(MongoDatabase db) {
  this.db = db;
 }
 
 public String getCollectionName() {
     return collectionName;
 }
 

 public void setCollectionName(String collectionName) {
  this.collectionName = collectionName;
 }


 public MongoDbEnvent() {
        this.queue = new LinkedBlockingQueue<String>();
        logThread.setDaemon(true);
        logThread.start();
        System.out.println("啟動日誌執行緒*************************************");
    }
 
    public void start() {
        logThread.start();
    }
 
    public void stop() {
        System.out.println("關閉日誌執行緒*************************************");
        synchronized (this) {
            isShutdown = true;
        }
        logThread.interrupt();
    }
 
    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown) {
                throw new IllegalStateException("log service stop!!");
            }
            ++reservations;
        }
        queue.put(msg);
    }

    /**
     * 該執行緒類是做列印日誌用的
     */
    private class LogThread extends Thread {
        public void run() {
            try {
            int i= 0;
            long time= System.currentTimeMillis();
            LinkedList<Object> log =new LinkedList();
                while (true) {
                    synchronized (MongoDbEnvent.this) {
                        if (isShutdown && reservations == 0) {
                            break;
                        }
                    }
                    try {
                        String msg = queue.take();
                        log.add(msg);
                        i++;
                        long time2= System.currentTimeMillis();
                        if(i%100==0 || time2-time>3000){
                        //插入資料庫
                       
                            assert collectionName != null : "collection must not be null";
                            assert collectionName.trim().length() > 0 : "collection must not be empty or blank";
                           
                            SimpleDateFormat sdf = new SimpleDateFormat(defaultFormat);


//                            String collectionNameDay = collectionName+"_" + sdf.format(new Date());
                        //日誌儲存到一張表中
                            MongoCollection<Document> collection = db.getCollection(collectionName);
                              
                            List<Document> documents = new ArrayList<Document>(); 
                            for (int j = 0; j < log.size(); j++) { 
                           
                                documents.add(Document.parse((String)log.get(j))); 
                            } 
                            if(documents!=null && documents.size()>0){
                            collection.insertMany(documents);
                            }
                            log =new LinkedList();
                        i=0;
                        time = time2;
                        }
                       
                        synchronized (MongoDbEnvent.this) {
                            --reservations;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            } finally {
           
           
            }
        }
    }
}

log4j配置檔案內容如下:

log4j.appender.MongoDB=com.*.MongoDbAppender
log4j.appender.MongoDB.databaseName=expmis
log4j.appender.MongoDB.collectionName=log4j
log4j.appender.MongoDB.hostname=192.168.188.110
log4j.appender.MongoDB.port=27017
log4j.appender.MongoDB.userName=mongodb
log4j.appender.MongoDB.password=mongodb

歡迎大家提出意見。