基於mongodb的日誌收集模組設計
由於log4mongo0.7.4.jar只支援mongodb2.X版本,但是現在的mongodb都已經3.6了。然後我再找找,它的新版本log4mongo0.9.0.jar版本,整合進去發現報錯了,看了一些資料說是自身bug(可能是水平有限,沒找到解決辦法)?因此我就參考網上某篇blog的思路實現了log4j的mongodb整合。
使用jar包情況:mongo-java-driver-3.4.2.jar
擴充套件log4j整合類(繼承AppenderSkeleton)
具體程式碼如下:
import com.alibaba.fastjson.JSONObject;
import com.mongodb.*;
import com.mongodb.client.MongoDatabase;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.ErrorCode;
import org.apache.log4j.spi.LoggingEvent;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class MongoDbAppender extends AppenderSkeleton {
private final static String DEFAULT_MONGO_DB_HOSTNAME = "localhost";
private final static String DEFAULT_MONGO_DB_PORT = "27017";
private final static String DEFAULT_MONGO_DB_DATABASE_NAME = "expmis";
private final static String DEFAULT_MONGO_DB_COLLECTION_NAME = "log";
private static final String KEY_TIMESTAMP = "timestamp";
private String hostname = DEFAULT_MONGO_DB_HOSTNAME;
private String port = DEFAULT_MONGO_DB_PORT;
private String databaseName = DEFAULT_MONGO_DB_DATABASE_NAME;
private String collectionName = DEFAULT_MONGO_DB_COLLECTION_NAME;
private String userName = null;
private String password = null;
private String writeConcern = null;
private MongoClient client = null;
private DBCollection collection = null;
MongoDatabase db = null;
private WriteConcern concern;
private boolean initialized = false;
private MongoDbEnvent mongoDbEnvent = new MongoDbEnvent();;
//列印日誌核心方法:abstract protected void append(LoggingEvent event);
//正常情況下我們只需要覆蓋append方法即可
@Override
protected void append(final LoggingEvent loggingEvent) {
try {
mongoDbEnvent.setDb(db);
mongoDbEnvent.setCollectionName(collectionName);
Object obj=loggingEvent.getMessage();
if(obj instanceof JSONObject){
JSONObject message = (JSONObject)loggingEvent.getMessage();
//message.put(KEY_HOSTNAME, InetAddress.getLocalHost().getHostName());
//message.put(KEY_IP, InetAddress.getLocalHost().getHostAddress());
message.put(KEY_TIMESTAMP, new Date(loggingEvent.getTimeStamp()));
mongoDbEnvent.log(message.toJSONString());
}else{
JSONObject message = new JSONObject();
message.put(KEY_TIMESTAMP, new Date(loggingEvent.getTimeStamp()));
message.put("content", loggingEvent.getMessage()+"");
mongoDbEnvent.log(message.toJSONString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
//釋放資源:public void close()
@Override
public void close() {
mongoDbEnvent.stop();
if (client != null) {
collection = null;
client.close();
}
}
//否需要按格式輸出文字:public boolean requiresLayout()
@Override
public boolean requiresLayout() {
//TODO Auto-generated method stub
return false;
}
/**
* @see org.apache.log4j.AppenderSkeleton#activateOptions()
*/
@Override
public void activateOptions() {
try {
List<ServerAddress> seeds = new ArrayList<ServerAddress>();
ServerAddress serverAddress = new ServerAddress(hostname, Integer.valueOf(port));
seeds.add(serverAddress);
if(userName!=null && !"".equals(userName)){
MongoCredential credentials = MongoCredential.createScramSha1Credential(userName, databaseName,
password.toCharArray());
List<MongoCredential> credentialsList = new ArrayList<MongoCredential>();
credentialsList.add(credentials);
client = new MongoClient(seeds, credentialsList);
}else{
client = new MongoClient(seeds);
}
db = client.getDatabase(databaseName);
initialized = true;
} catch (Exception e) {
e.printStackTrace();
errorHandler.error("Unexpected exception while initialising MongoDbAppender.", e,
ErrorCode.GENERIC_FAILURE);
}
}
/*
* This method could be overridden to provide the DB instance from an existing connection.
*/
protected DB getDatabase(Mongo mongo, String databaseName) {
return mongo.getDB(databaseName);
}
/*
* This method could be overridden to provide the Mongo instance from an existing connection.
*/
protected Mongo getMongo(List<ServerAddress> addresses) {
if (addresses.size() < 2) {
return new Mongo(addresses.get(0));
} else {
// Replica set
return new Mongo(addresses);
}
}
/**
* Note: this method is primarily intended for use by the unit tests.
*
* @param collection
* The MongoDB collection to use when logging events.
*/
public void setCollection(final DBCollection collection) {
assert collection != null : "collection must not be null";
this.collection = collection;
}
/**
* @return The hostname of the MongoDB server <i>(will not be null, empty or blank)</i>.
*/
public String getHostname() {
return hostname;
}
/**
* @param hostname
* The MongoDB hostname to set <i>(must not be null, empty or blank)</i>.
*/
public void setHostname(final String hostname) {
assert hostname != null : "hostname must not be null";
assert hostname.trim().length() > 0 : "hostname must not be empty or blank";
this.hostname = hostname;
}
/**
* @return The port of the MongoDB server <i>(will be > 0)</i>.
*/
public String getPort() {
return port;
}
/**
* @param port
* The port to set <i>(must not be null, empty or blank)</i>.
*/
public void setPort(final String port) {
assert port != null : "port must not be null";
assert port.trim().length() > 0 : "port must not be empty or blank";
this.port = port;
}
/**
* @return The database used in the MongoDB server <i>(will not be null, empty or blank)</i>.
*/
public String getDatabaseName() {
return databaseName;
}
/**
* @param databaseName
* The database to use in the MongoDB server <i>(must not be null, empty or
* blank)</i>.
*/
public void setDatabaseName(final String databaseName) {
assert databaseName != null : "database must not be null";
assert databaseName.trim().length() > 0 : "database must not be empty or blank";
this.databaseName = databaseName;
}
/**
* @return The collection used within the database in the MongoDB server <i>(will not be null,
* empty or blank)</i>.
*/
public String getCollectionName() {
return collectionName;
}
/**
* @param collectionName
* The collection used within the database in the MongoDB server <i>(must not be
* null, empty or blank)</i>.
*/
public void setCollectionName(final String collectionName) {
assert collectionName != null : "collection must not be null";
assert collectionName.trim().length() > 0 : "collection must not be empty or blank";
this.collectionName = collectionName;
}
/**
* @return The userName used to authenticate with MongoDB <i>(may be null)</i>.
*/
public String getUserName() {
return userName;
}
/**
* @param userName
* The userName to use when authenticating with MongoDB <i>(may be null)</i>.
*/
public void setUserName(final String userName) {
this.userName = userName;
}
/**
* @param password
* The password to use when authenticating with MongoDB <i>(may be null)</i>.
*/
public void setPassword(final String password) {
this.password = password;
}
/**
* @return the writeConcern setting for Mongo.
*/
public String getWriteConcern() {
return writeConcern;
}
/**
* @param writeConcern
* The WriteConcern setting for Mongo.<i>(may be null). If null, set to default of dbCollection's writeConcern.</i>
*/
public void setWriteConcern(final String writeConcern) {
this.writeConcern = writeConcern;
concern = WriteConcern.valueOf(writeConcern);
}
public WriteConcern getConcern() {
if (concern == null) {
concern = getCollection().getWriteConcern();
}
return concern;
}
/**
* Returns true if appender was successfully initialized. If this method returns false, the
* appender should not attempt to log events.
*
* @return true if appender was successfully initialized
*/
public boolean isInitialized() {
return initialized;
}
/**
*
* @return The MongoDB collection to which events are logged.
*/
protected DBCollection getCollection() {
return collection;
}
/**
* Returns a List of ServerAddress objects for each host specified in the hostname property.
* Returns an empty list if configuration is detected to be invalid, e.g.:
* <ul>
* <li>Port property doesn't contain either one port or one port per host</li>
* <li>After parsing port property to integers, there isn't either one port or one port per host
* </li>
* </ul>
*
* @param hostname
* Blank space delimited hostnames
* @param port
* Blank space delimited ports. Must specify one port for all hosts or a port per
* host.
* @return List of ServerAddresses to connect to
*/
private List<ServerAddress> getServerAddresses(String hostname, String port) {
List<ServerAddress> addresses = new ArrayList<ServerAddress>();
String[] hosts = hostname.split(" ");
String[] ports = port.split(" ");
if (ports.length != 1 && ports.length != hosts.length) {
errorHandler.error(
"MongoDB appender port property must contain one port or a port per host",
null, ErrorCode.ADDRESS_PARSE_FAILURE);
} else {
List<Integer> portNums = getPortNums(ports);
// Validate number of ports again after parsing
if (portNums.size() != 1 && portNums.size() != hosts.length) {
errorHandler
.error("MongoDB appender port property must contain one port or a valid port per host",
null, ErrorCode.ADDRESS_PARSE_FAILURE);
} else {
boolean onePort = (portNums.size() == 1);
int i = 0;
for (String host : hosts) {
int portNum = (onePort) ? portNums.get(0) : portNums.get(i);
try {
addresses.add(new ServerAddress(host.trim(), portNum));
} catch (Exception e) {
errorHandler.error(
"MongoDB appender hostname property contains unknown host", e,
ErrorCode.ADDRESS_PARSE_FAILURE);
}
i++;
}
}
}
return addresses;
}
private List<Integer> getPortNums(String[] ports) {
List<Integer> portNums = new ArrayList<Integer>();
for (String port : ports) {
try {
Integer portNum = Integer.valueOf(port.trim());
if (portNum < 0) {
errorHandler.error(
"MongoDB appender port property can't contain a negative integer",
null, ErrorCode.ADDRESS_PARSE_FAILURE);
} else {
portNums.add(portNum);
}
} catch (NumberFormatException e) {
errorHandler.error(
"MongoDB appender can't parse a port property value into an integer", e,
ErrorCode.ADDRESS_PARSE_FAILURE);
}
}
return portNums;
}
}
使用執行緒阻塞佇列往mongodb插入記錄,參考程式碼如下:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.bson.Document;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public class MongoDbEnvent {
private final BlockingQueue<String> queue;
private final LogThread logThread = new LogThread();
private boolean isShutdown = false;
private int reservations;
private static final String defaultFormat = "yyyy-MM-dd";
private String collectionName = "log";
MongoDatabase db = null;
public MongoDatabase getDb() {
return db;
}
public void setDb(MongoDatabase db) {
this.db = db;
}
public String getCollectionName() {
return collectionName;
}
public void setCollectionName(String collectionName) {
this.collectionName = collectionName;
}
public MongoDbEnvent() {
this.queue = new LinkedBlockingQueue<String>();
logThread.setDaemon(true);
logThread.start();
System.out.println("啟動日誌執行緒*************************************");
}
public void start() {
logThread.start();
}
public void stop() {
System.out.println("關閉日誌執行緒*************************************");
synchronized (this) {
isShutdown = true;
}
logThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown) {
throw new IllegalStateException("log service stop!!");
}
++reservations;
}
queue.put(msg);
}
/**
* 該執行緒類是做列印日誌用的
*/
private class LogThread extends Thread {
public void run() {
try {
int i= 0;
long time= System.currentTimeMillis();
LinkedList<Object> log =new LinkedList();
while (true) {
synchronized (MongoDbEnvent.this) {
if (isShutdown && reservations == 0) {
break;
}
}
try {
String msg = queue.take();
log.add(msg);
i++;
long time2= System.currentTimeMillis();
if(i%100==0 || time2-time>3000){
//插入資料庫
assert collectionName != null : "collection must not be null";
assert collectionName.trim().length() > 0 : "collection must not be empty or blank";
SimpleDateFormat sdf = new SimpleDateFormat(defaultFormat);
// String collectionNameDay = collectionName+"_" + sdf.format(new Date());
//日誌儲存到一張表中
MongoCollection<Document> collection = db.getCollection(collectionName);
List<Document> documents = new ArrayList<Document>();
for (int j = 0; j < log.size(); j++) {
documents.add(Document.parse((String)log.get(j)));
}
if(documents!=null && documents.size()>0){
collection.insertMany(documents);
}
log =new LinkedList();
i=0;
time = time2;
}
synchronized (MongoDbEnvent.this) {
--reservations;
}
} catch (Exception e) {
e.printStackTrace();
}
}
} finally {
}
}
}
}
log4j配置檔案內容如下:
log4j.appender.MongoDB=com.*.MongoDbAppender
log4j.appender.MongoDB.databaseName=expmis
log4j.appender.MongoDB.collectionName=log4j
log4j.appender.MongoDB.hostname=192.168.188.110
log4j.appender.MongoDB.port=27017
log4j.appender.MongoDB.userName=mongodb
log4j.appender.MongoDB.password=mongodb
歡迎大家提出意見。