Java NIO簡介和使用
前言
Java NIO其實就是JDK1.4中加入的新的基於Channel和Buffer的Iuput/Output方式。我個人認為NIO主要有以下兩個優點:
- 同步非阻塞:這點是指在結合使用Selector時,會不斷的輪詢,檢視註冊的事件是否就緒;而非阻塞模式可以是當前執行緒不需要傻傻的等待所以的資料都讀取完畢再繼續向下執行。所以通過通過非阻塞模式加Selector可以僅僅使用一個執行緒就能管理多個輸入輸出通道(FileChannel不能結合Selector使用)。
- Buffer操作更加靈活普通:IO的緩衝區是不能向前向後移動;NIO的緩衝區可以向前向後的移動。並且Channel可以對檔案讀取位置進行定位,大大增加了大檔案處理的靈活性。還有就是在獲取直接記憶體對映(filechannel.map)時,也可以之間定位到某一個大檔案的某一段區域來獲取。
NIO中主要有三個主要的結構:Channel(資料通道)、Buffer(快取區)、Selector(選擇器)。
NIO中的基本概念
Channel
Channel就是通道的意思,類似與IO中的流(InputStream/OutputStream),只不過Channel是有讀寫兩種模式,而IO中的流都是單一模式的。Channel的是實現主要分為網路讀寫和檔案讀寫兩大塊,主要實現有:
- FileChannel(檔案讀寫)
- SocketChannel(網路讀寫,TCP客戶端)
- ServerSocketChannel(網路讀寫,TCP服務端)
- DatagramChannel(UDP資料報)
Buffer
Buffer就是緩衝區的意思,在Buffer中主要有以下幾個屬性:
- mark:就是標記的意思。對於Buffer來說,當我們標記了Buffer當前位置,那麼使用reset方法就能返回之前標記的位置,實現緩衝區的前後移動(postion方法能直接指定快取的位置)。
- postion:位置就是當前讀取的位置
- limit:邊界,就是我們讀取或者寫入不能超過這個數值。
- capacity:就是緩衝區的容量。
public abstract class Buffer { // ...... // Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; // ...... }
一般情況下,這四個元素能組成緩衝區的讀和寫兩種基本模式。
- 寫模式:在寫模式中postion就是當前寫入的位置,limit=capacity表示最多可以寫和容量一樣大。
- 讀模式:一般在讀模式前需要使用flip()方法將寫模式轉換為讀模式,具體的改變就是將limit=position,position=0。
Buffer有多種型別,最常用的比如ByteBuffer、CharBuffer、MappedByteBuffer,其他的還有DoubleBuffer、FloatBuffer等等。
使用Buffer之前還需要對Buffer的大小通過allocate(int size)方法進行分配,其實就是建立陣列,然後初始化mark、position、capacity等屬性。對於檔案讀寫來說,上面兩個元件就能完成任務,因為檔案讀取是阻塞模式的,所以不能使用Selector。下面是一個檔案讀取的簡單例子:
public static void main(String[] args) throws IOException {
RandomAccessFile file = new RandomAccessFile("C:\\Users\\AAA\\Desktop\\工作\\Nio.txt", "r");
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder sb = new StringBuilder();
while ((channel.read(buffer)) != -1) { // 迴圈讀取檔案
buffer.flip(); // 寫->讀
while (buffer.hasRemaining()) {
char ch = (char)buffer.get();
if(ch == 'D') {
buffer.mark(); // 標記
}
sb.append(ch);
}
// mark的使用
buffer.reset();
while (buffer.hasRemaining()) {
sb.append((char)buffer.get());
}
buffer.clear(); // 讀->寫
}
// 元素資料:ABCDEFGHIJK ->現在輸出資料:ABCDEFGHIJKEFGHIJK
System.out.println(sb.toString());
}
下面說明Buffer中主要使用到的一些方法:
- position(int newPosition):設定新的快取區的位置
- limit(int newLimit):設定緩衝區的限制,注意mark>limit時,mark將失效
- mark():對當前位置打一個標記,就是mark=position
- reset():將此緩衝區的位置重置為先前標記的位置,一般mark和reset合在一起使用
- clear():清楚mark標識(-1),將position設定為0,limit=capacity,即將讀模式轉換為寫模式
- flip():和clear()對應,將寫模式轉換為讀模式,具體的操作是limit=position,position=0,mark=-1
- rewind():取消mark標識,重新讀取緩衝區
- hasRemaining():是否以及讀取完畢
Selector
Selector選擇器具有同時管理多個Channel的能力(通過詢問每一個Channel是否已經準備好I/O操作實現),這樣做的好處就是可以通過一個執行緒來管理多個Tcp或者Udp通道,減少執行緒的上下文切換。
Selector使用
下面先上一個例子來說明Selector的使用:
/**
* 伺服器端
*/
public class NioSocketServer {
//通道管理器
private Selector selector;
//獲取一個ServerSocket通道,並初始化通道
public NioSocketServer init(int port) throws IOException{
//獲取一個ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//獲取通道管理器
selector=Selector.open();
//將通道管理器與通道繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件,
//只有當該事件到達時,Selector.select()會返回,否則一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
return this;
}
public void listen() throws IOException{
System.out.println("伺服器端啟動成功");
//使用輪詢訪問selector
while(true){
//當有註冊的事件到達時,方法返回,否則阻塞。
selector.select();
//獲取selector中的迭代器,選中項為註冊的事件
Iterator<SelectionKey> ite=selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = ite.next();
//刪除已選key,防止重複處理
ite.remove();
//客戶端請求連線事件
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
//獲得客戶端連線通道
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//向客戶端發訊息
channel.write(ByteBuffer.wrap(new String("send message to client").getBytes()));
//在與客戶端連線成功後,為客戶端通道註冊SelectionKey.OP_READ事件。
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客戶端請求連線事件");
}else if(key.isReadable()){//有可讀資料事件
//獲取客戶端傳輸資料可讀取訊息通道。
SocketChannel channel = (SocketChannel)key.channel();
//建立讀取資料緩衝器
ByteBuffer buffer = ByteBuffer.allocate(10);
int read = channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("receive message from client, size:" + buffer.position() + " msg: " + message);
// ByteBuffer outbuffer = ByteBuffer.wrap(("server.".concat(msg)).getBytes());
// channel.write(outbuffer);
}
}
}
}
public static void main(String[] args) throws IOException {
new NioSocketServer().init(9981).listen();
}
}
/**
* 客戶端
*/
public class NioClient {
//管道管理器
private Selector selector;
public NioClient init(String serverIp, int port) throws IOException{
//獲取socket通道
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
//獲得通道管理器
selector=Selector.open();
//客戶端連線伺服器,需要呼叫channel.finishConnect();才能實際完成連線。
channel.connect(new InetSocketAddress(serverIp, port));
//為該通道註冊SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);
return this;
}
public void listen() throws IOException {
System.out.println("客戶端啟動");
//輪詢訪問selector
while(true){
//選擇註冊過的io操作的事件(第一次為SelectionKey.OP_CONNECT)
selector.select();
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = ite.next();
//刪除已選的key,防止重複處理
ite.remove();
if(key.isConnectable()){
SocketChannel channel=(SocketChannel)key.channel();
//如果正在連線,則完成連線
if(channel.isConnectionPending()){
channel.finishConnect();
}
channel.configureBlocking(false);
//向伺服器傳送訊息
channel.write(ByteBuffer.wrap(new String("send message to server.").getBytes()));
//連線成功後,註冊接收伺服器訊息的事件
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客戶端連線成功");
}else if(key.isReadable()){ //有可讀資料事件。
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
// ByteBuffer outbuffer = ByteBuffer.wrap(("client.".concat(msg)).getBytes());
// channel.write(outbuffer);
}
}
}
}
public static void main(String[] args) throws IOException {
new NioClient().init("127.0.0.1", 9981).listen();
}
}
- 通過Selector.open()來例項化一個Selector,預設通過SelectorProvider類openSelector()方法返回一個例項。
- 將當前channel設定為非阻塞模式(serverChannel.configureBlocking(false);),Filechannel不能使用,因為FileChannel沒有繼承SelectableChannel。
- 註冊Channel到Selector上(serverChannel.register(selector, SelectionKey.OP_ACCEPT);),這裡要注意的是第二個引數SelectorKey,代表的是Channel中將感興趣的事件註冊到Selector上。很明顯Selector在Selector中存放著Set<SelectionKey>集合,而SelectorKey中存放著對應channel的資訊,包括就緒狀態等。
- 通過Selector的select()阻塞式獲取通道上已註冊的事件傳送,如果沒有事件發生就一直阻塞,直到有事件發生。也可以通過設定超時時間或者使用另外一個非阻塞的selectNow()方法。同時我們也可以通過wakeup()方法喚醒阻塞的selector。
- 事件處理,通過selector.selectedKeys()方法獲取SelectorKey集合,然後遍歷處理,注意要刪除已經處理的key,不然下次還會重複處理。SelectionKey中會有isAcceptable()、isReadable()等方法判斷哪些事件準備就緒,cancel()可以取消事件監聽。
- 當Selector使用完畢,可以選項關閉Selector,該關閉並不會影響channel,channel仍然需要自己關閉通道。
SelectionKey
在註冊時我們會返回SelectionKey對像,這裡說一下SelectionKey包含哪些內容,通過原始碼我們可以看到:
- interestOps(感興趣事件集合)
- readyOps(已經準備好的事件集合)
- channel和selector(用於關係對應)
- attachment(附件物件)
感興趣的物件包括以下幾個操作:
// 讀操作
public static final int OP_READ = 1 << 0;
// 寫操作
public static final int OP_WRITE = 1 << 2;
// 連線動作
public static final int OP_CONNECT = 1 << 3;
// 接受連線
public static final int OP_ACCEPT = 1 << 4;
附件的作用:可以將一個或者多個附加物件繫結到SelectionKey上,以便容易的識別給定的通道。通常有兩種方式:
// 1.在註冊的時候直接繫結
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
// 2.在繫結完成之後附加
selectionKey.attach(theObject);//繫結
這是在一個單執行緒中使用一個Selector處理3個Channel的圖示:
實際專案分享
這裡分享一個利用Nio實現對大檔案(大於10GB的檔案)內容擷取的功能(按時間段),檔案的格式大致如下所示:
[2018-05-12 12:10:00.234] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[2018-05-12 12:10:00.530] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[2018-05-12 12:10:01.040] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[2018-05-12 12:10:01.040] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[2018-05-12 12:10:01.045] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
......
......
......
[2018-05-12 20:33:02.100] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
[2018-05-12 20:33:02.105] XXXX XXXXXX XXXXXX XXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
實現的思路如下所示,首先我們要知道直接記憶體對映最多隻支援2G記憶體(Integer.MAX_VALUE)。這裡我選擇了1G記憶體,通過遍歷的方式(每一個G為一個塊)確認開始位置在哪一個塊中。然後再通過查詢演算法(比如折半查詢等進行查詢),這裡要注意行的處理。程式碼如下所示:
public class Searcher {
/** 預設顯示行數 */
static int defaultRows = SysConfig.getResultShowNum();
/** 時間format */
static SimpleDateFormat format1 = new SimpleDateFormat("yyyyMMddHHmmssSSS");
static SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
static DecimalFormat decimalFormat = new DecimalFormat("#.00");
static {
format2.setTimeZone(TimeZone.getTimeZone("GMT+16"));
}
/**
* 錯誤日誌提取功能,並提供關鍵字查詢功能
*
* @param indexPath 索引目錄
* @param start 開始
* @param end 結束
* @param queryString 查詢的字串
* @param reverse false 升序 | true 降序
* @param save 是否儲存到檔案
* @param savePath 儲存路徑
* @throws Exception
*/
@SuppressWarnings("resource")
public static List<String> getErrorLogSegment(String indexPath,
String start, String end, String queryString,
boolean reverse, boolean save, String savePath) throws Exception{
List<String> resultList = new ArrayList<String>();
IndexSearcher searcher = null;
IndexReader reader = null;
Analyzer analyzer = new StandardAnalyzer();
try {
reader = DirectoryReader
.open(FSDirectory.open(Paths.get(indexPath)));
searcher = new IndexSearcher(reader);// 檢索工具
// 查詢配置區
// 多條件查詢
BooleanQuery.Builder builder = new BooleanQuery.Builder();
// 時間區間
Query query = new TermRangeQuery("date", new BytesRef(
format1.format(format2.parse(start))), new BytesRef(
format1.format(format2.parse(end))), true, true);
builder.add(query, Occur.MUST);
// 查詢關鍵詞
if (queryString != null && !queryString.equals("")) {
QueryParser parser = new QueryParser("content", analyzer);
Query wordsQuery = parser.parse(queryString);
builder.add(wordsQuery, Occur.MUST);
}
Sort sort = new Sort(
new SortField("date", SortField.Type.LONG, reverse));
int searchNum = save ? Integer.MAX_VALUE : defaultRows; // 查詢的個數
TopDocs results = searcher.search(builder.build(), searchNum, sort);
ScoreDoc[] hits = results.scoreDocs;
//System.out.println("總命中行數:" + hits.length);
if (save) {// 儲存結果到檔案中
FileWriter fwrite = new FileWriter(savePath, false);
for (int i = 0; i < hits.length; i++) {
Document doc = searcher.doc(hits[i].doc);
fwrite.write(format2.format(format1.parse(doc.get("date")))
+ " " + doc.get("content") + "\r\n");
fwrite.flush();
}
return null;
} else {// 顯示結果到介面
int size = hits.length >= defaultRows ? defaultRows : hits.length;
for (int i = 0; i < size; i++) {
Document doc = searcher.doc(hits[i].doc);
resultList.add(format2.format(format1.parse(doc.get("date")))
+ " " + doc.get("content"));
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return resultList;
}
/**
* 日誌片段擷取,並提供關鍵字查詢功能
* <p>若選擇儲存結果到檔案,則不返回查詢的結果;若不選擇儲存結果到檔案,則返回前指定個數的結果
*
* @param filePath 檔案路徑
* @param indexPath 索引目錄
* @param start 開始
* @param end 結束
* @param queryRequest 查詢的請求
* @param save 是否儲存
* @param savePath 儲存路徑
* @throws Exception
*/
public static List<String> getLogSegment(String filePath, String indexPath, String start,
String end, String queryRequest, boolean save, String savePath) throws Exception {
List<String> resultList = new ArrayList<String>();
File file = new File(filePath);
try{
if (!file.exists()) {
throw new LogAnalysisException("未找到檔案!");
}
if (queryRequest == null || queryRequest.equals("")) {
long startPos = getLogPos(file, "[" + start + "]", true);
startPos = startPos==-1?0:startPos;
long endPos = getLogPos(file, "[" + end + "]", false);
endPos = endPos==-1?file.length():endPos;
//System.out.println("pos:" + startPos + "," + endPos);
if(save){
while(true){
// FileChanel.map的最大長度不能超過Integer.MAX_VALUE(大約一次性最多讀取2G內容,大於2G需要分次讀取)
if(endPos - startPos > Integer.MAX_VALUE){
readFileByMBBAndWriter(file, startPos, (long)(startPos + Integer.MAX_VALUE), savePath);
startPos = startPos + (long)Integer.MAX_VALUE;
} else {
readFileByMBBAndWriter(file, startPos, endPos, savePath);
break;
}
}
} else {
resultList = readFileByMappedByteBuffer(file, startPos, endPos);
}
} else {
if(save){
getRequestLogSegment(indexPath, start, end, queryRequest, null, null, SortType.Default, false, save, savePath);
}else{
resultList = getRequestLogSegment(indexPath, start, end, queryRequest, null, null, SortType.Default, false, save, savePath);
}
}
} catch (Exception e) {
throw e;
}
return resultList;
}
/**
* 傳入位置列表,並找出所有的行
*
* @param file
* @param posList 位置列表
* @return
* @throws Exception
*/
@SuppressWarnings("unused")
private static List<String> readFileByMappedByteBuffer(File file, List<Long> posList) throws Exception{
List<String> resultList = new ArrayList<String>();
MappedByteBuffer mbb = null;
FileInputStream fis = null;
FileChannel fChannel = null;
try {
fis = new FileInputStream(file);
fChannel = fis.getChannel();
for (long pos : posList) {
mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, pos, 10*1024);
char ch = (char) mbb.get();
StringBuilder line = new StringBuilder();
while (mbb.hasRemaining()) {
ch = (char) mbb.get();
line.append(ch);
if (ch == '\n') {
resultList.add(line.toString());
line.delete(0, line.length());
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
try {
if (fChannel != null)
fChannel.close();
if (fis != null)
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return resultList;
}
/**
* 擷取檔案中start和end中間資料(讀取顯示到介面)
*
* @param file
* @param start
* @param end
* @return
* @throws Exception
*/
private static List<String> readFileByMappedByteBuffer(File file, long start,
long end) throws Exception {
List<String> resultList = new ArrayList<String>();
FileInputStream fis = null;
FileChannel fChannel = null;
ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
Charset charset = Charset.forName("UTF-8");// 解決亂碼問題
CharBuffer charBuffer = null;
int rows = 0;
try{
fis = new FileInputStream(file);
fChannel = fis.getChannel();
fChannel.position(start);
StringBuilder line = new StringBuilder();
outer:
while(fChannel.read(buffer) != -1){
buffer.flip();
charBuffer = charset.decode(buffer);
while (charBuffer.hasRemaining()) {
char ch = charBuffer.get();
if(ch != '\n'){
line.append(ch);
} else {
rows++;
line.append(ch);
resultList.add(line.toString());
line.delete(0, line.length());
if(rows >= defaultRows*2)
break outer;
}
}
buffer.clear();
charBuffer.clear();
}
}catch (IOException e) {
try {
if (fChannel != null)
fChannel.close();
if (fis != null)
fis.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
return resultList;
}
/**
* 擷取檔案中start和end中間資料(儲存到檔案中)
*
* @param file
* @param start
* @param end
* @param savePath
* @throws Exception
*/
private static void readFileByMBBAndWriter(File file, long start,
long end, String savePath) throws Exception {
MappedByteBuffer mbb = null;
FileInputStream fis = null;
FileChannel finChannel = null;
FileChannel foutChanel = null;
FileOutputStream fout = null;
try {
File saveFile = new File(savePath);
saveFile.getParentFile().mkdirs();
fis = new FileInputStream(file);
finChannel = fis.getChannel(); // 獲取fis的channel
fout = new FileOutputStream(savePath, true);
foutChanel = fout.getChannel();
// 長度最大值有限制不能超過 Size exceeds Integer.MAX_VALUE
mbb = finChannel.map(FileChannel.MapMode.READ_ONLY, start, end - start);
//byte[] buffer = new byte[(int) (end-start)];
foutChanel.write(mbb);
//mbb.get(buffer);
//fout.write(buffer);
fout.flush();
// 清理快取區
mbb.clear();
// 解決mbb記憶體釋放問題
//clean(mbb);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
try {
if (finChannel != null)
finChannel.close();
if (foutChanel != null)
foutChanel.close();
if (fis != null)
fis.close();
if (fout != null)
fout.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*public static void clean(final Object buffer) {
AccessController.doPrivileged(new PrivilegedAction() {
@Override
public Object run() {
try{
Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
getCleanerMethod.setAccessible(true);
Cleaner cleaner = (Cleaner)getCleanerMethod.invoke(buffer, new Object[0]);
cleaner.clean();
}catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
}*/
/**
* 獲取日誌中某個請求的詳細日誌
*
* @param datetime
* @param sessionAndThread
* @param url
* @param filePath
* @return
* @throws Exception
*/
public static List<String> getRequestDetailLog(String datetime, String sessionAndThread, String url, String filePath) throws Exception{
List<String> list = new ArrayList<String>();
File file = new File(filePath);
// 獲取時間所在的位置
long postion = getLogPos(file, datetime, true);
FileInputStream fis = null;
FileChannel finChannel = null;
ByteBuffer buffer = ByteBuffer.allocate(1024);// 1MB
try{
fis = new FileInputStream(file);
finChannel = fis.getChannel();
finChannel.position(postion);
StringBuilder line = new StringBuilder();
outer:
while(finChannel.read(buffer) != -1){
buffer.flip();
while (buffer.hasRemaining()) {
char ch = (char) buffer.get();
if(ch != '\n'){
line.append(ch);
} else {
String tmpline = line.toString();
line.delete(0, line.length());
if(tmpline.contains(sessionAndThread)){
list.add(tmpline);
if(tmpline.contains("After request")){
break outer;
}
}
}
}
buffer.clear();
}
buffer.clear();
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
try {
if(finChannel != null){
finChannel.close();
}
if(fis != null){
fis.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return list;
}
/**
* 查詢檔案中所要查詢日期的位置
*
* @param file 檔案
* @param date 查詢日期
* @param bigOrSmall 當bigOrSmall為true時,pos在查詢日期之前 | 當bigOrSmall為false時,pos在查詢日誌過後
* @return
* @throws Exception
*/
private static long getLogPos(File file, String date, boolean bigOrSmall) throws Exception {
String regex = "^\\[\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}\\]$";
String formatStr = "[yyyy-MM-dd HH:mm:ss.SSS]";
FileInputStream fis = null;
FileChannel fChannel = null;
Pattern pattern = Pattern.compile(regex);
SimpleDateFormat format = new SimpleDateFormat(formatStr);
// 查詢的位置 , 通道直接記憶體對映大小,預設1GB
long searchPos = 0L, size = 1073741824L;
MappedByteBuffer mbb = null;
try {
Date start = format.parse(date);
// 讀取的快取大小,預設1MB
byte[] buffer = null;
fis = new FileInputStream(file);
fChannel = fis.getChannel();
if(fChannel.size() >= 1048576)
buffer = new byte[1048576];
else
buffer = new byte[(int) fChannel.size()];
// 獲取日誌檔案最多需要遍歷的次數
int cyclicCount = (int) (fChannel.size() / size + 1);
long highPosOffset = size;
// 遍歷檔案,確定起始位置屬於哪個直接記憶體對映塊
outer: for (int i = 0; i < cyclicCount; i++) {
if (i == cyclicCount - 1) {
mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, i * size,
fChannel.size() - i * size);
highPosOffset = fChannel.size() - i * size;
} else {
mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, i * size,
size);
}
mbb.get(buffer);
StringBuilder line = new StringBuilder();
for (byte b : buffer) {
line.append((char) b);
if (b == '\n') {
if (line.length() > 25
&& pattern.matcher(line.substring(0, 25))
.matches()) {
if (format.parse(line.substring(0, 25)).getTime() > start
.getTime()) {
break outer;
}
searchPos = i * size;
break;
}
line.delete(0, line.length());
}
}
}
//System.out.println("區間:"+searchPos+","+(searchPos + highPosOffset));
return bisearchForFile(pattern, format, mbb, fChannel, start,
searchPos, searchPos + highPosOffset, bigOrSmall);
} catch (Exception e) {
//e.printStackTrace();
throw e;
} finally {
try {
if (fChannel != null)
fChannel.close();
if (fis != null)
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//return -1;
}
/**
* 折半查詢時間起始點
*
* @param pattern
* @param format
* @param mbb
* @param fChannel
* @param startDate
* @param lowPos
* @param highPos
* @return
* @throws ParseException
* @throws IOException
*/
private static long bisearchForFile(Pattern pattern,
SimpleDateFormat format, MappedByteBuffer mbb,
FileChannel fChannel, Date start, long lowPos, long highPos, boolean bigOrSmall)
throws java.text.ParseException, IOException {
if (lowPos <= highPos) {
long midPos = (lowPos + highPos) / 2;
byte[] tmpbuffer = new byte[1024*512];// 512kb
Date midDate = null;
//mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, midPos,
// (highPos - midPos));
//System.out.println("L:"+lowPos+", H:"+highPos);
if ((highPos - midPos)*2 > 1024*1024) {
mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, midPos, highPos - midPos);
mbb.get(tmpbuffer);
} else { // 區間達到1M的精確度
mbb = fChannel.map(FileChannel.MapMode.READ_ONLY, lowPos, highPos - lowPos);
//System.out.println("lowPos:"+lowPos+", highPos:"+highPos+", midPos:"+midPos);
midPos = lowPos;
tmpbuffer = new byte[(int) (highPos - lowPos)];
mbb.get(tmpbuffer);
StringBuilder line = new StringBuilder();
for (byte b : tmpbuffer) {
line.append((char) b);
if (b == '\n') {
if (line.length() > 25
&& pattern.matcher(line.substring(0, 25))
.matches()) {
midDate = format.parse(line.substring(0, 25));
if (start.getTime() <= midDate.getTime()) {
if(bigOrSmall){// 之前
return midPos;
} else {// 之後
return lowPos;
}
}
midPos = lowPos;
}
line.delete(0, line.length());
}
lowPos++;
}
return -1;
}
StringBuilder line = new StringBuilder();
for (byte b : tmpbuffer) {
line.append((char) b);
if (b == '\n') {
if (line.length() > 25
&& pattern.matcher(line.substring(0, 25)).matches()) {
midDate = format.parse(line.substring(0, 25));
break;
}
line.delete(0, line.length());
}
}
if (start.getTime() == midDate.getTime()) {
return midPos;
} else if (start.getTime() > midDate.getTime()) {// 大值半區
return bisearchForFile(pattern, format, mbb, fChannel, start,
midPos + 1, highPos, bigOrSmall);
} else {// 小值半區
return bisearchForFile(pattern, format, mbb, fChannel, start,
lowPos, midPos - 1, bigOrSmall);
}
}
return -1;
}
/**
* 查詢某個時間段內某個請求的所有記錄,並返回請求的時間資訊
*
* @param indexPath
* @param start
* @param end
* @param queryRequest
* @throws Exception
*/
public static List<String> getAllRequest(String indexPath, String start, String end, String queryRequest) throws Exception {
IndexSearcher searcher = null;
IndexReader reader = null;
Analyzer analyzer = new StandardAnalyzer();
List<String> requestTimes = new ArrayList<String>();
try {
reader = DirectoryReader
.open(FSDirectory.open(Paths.get(indexPath)));
searcher = new IndexSearcher(reader);// 檢索工具
// 查詢配置區
// 多條件查詢
BooleanQuery.Builder builder = new BooleanQuery.Builder();
// 時間區間
Query query = new TermRangeQuery("date", new BytesRef(
format1.format(format2.parse(start))), new BytesRef(
format1.format(format2.parse(end))), true, true);
builder.add(query, Occur.MUST);
// 查詢關鍵詞
if (queryRequest != null && !queryRequest.equals("")) {
QueryParser parser = new QueryParser("content", analyzer);
Query wordsQuery = parser.parse(queryRequest);
builder.add(wordsQuery, Occur.MUST);
}
TopDocs results = searcher.search(builder.build(), Integer.MAX_VALUE);
ScoreDoc[] hits = results.scoreDocs;
//System.out.println("總命中行數:" + hits.length);
for (int i = 0; i < hits.length; i++) {
Document doc = searcher.doc(hits[i].doc);
requestTimes.add(format2.format(format1.parse(doc.get("date"))));
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return requestTimes;
}
/**
* 日誌請求提取功能,並提供關鍵字查詢功能 (顯示前{@link #defaultRows}條;若選擇儲存,則會將結果儲存到指定目錄)
* <p>
* 選擇排序型別包括以下幾個型別:
* <p>
* 1.預設查詢排序:如果沒有選擇請求時間長短排序或者請求次數排序,則預設按時間排序
* <p>
* 2.請求時間長短排序:如果選擇請求時間長短排序,則按照時間的長短進行排序
* <p>
* 3.請求次數排序:如果選擇請求次數排序,則按照請求次數進行排序
*
* @param indexPath
* 索引目錄(必選)
* @param start
* 開始時間(必選)
* @param end
* 結束時間(必選)
* @param queryString
* 查詢字串(可選擇)
* @param session
* 會話(可選擇)
* @param thread
* 執行緒(可選擇)
* @param sortType
* 選擇排序型別
* @param reverse
* false 升序 | true 降序
* @param save
* 是否儲存
* @param savePath
* 儲存路徑
* @return
* @throws Exception
*/
public static List<String> getRequestLogSegment(String indexPath,
String start, String end, String queryString, String session, String thread,
SortType sortType, boolean reverse, boolean save, String savePath) throws Exception {
List<String> resultList = new ArrayList<String>();
IndexSearcher searcher = null;
IndexReader reader = null;
Analyzer analyzer = new StandardAnalyzer();
try {
reader = DirectoryReader
.open(FSDirectory.open(Paths.get(indexPath)));
searcher = new IndexSearcher(reader);// 檢索工具
// 查詢配置區
// 多條件查詢
BooleanQuery.Builder builder = new BooleanQuery.Builder();
// 時間區間
Query query = new TermRangeQuery("date", new BytesRef(
format1.format(format2.parse(start))), new BytesRef(
format1.format(format2.parse(end))), true, true);
builder.add(query, Occur.MUST);
// 查詢關鍵詞
if (queryString != null && !queryString.equals("")) {
QueryParser parser = new QueryParser("content", analyzer);
Query wordsQuery = parser.parse(queryString);
builder.add(wordsQuery, Occur.MUST);
}
if (thread != null && !thread.equals("")){
Term term = new Term("thread", thread);
Query query2 = new TermQuery(term);
builder.add(query2, Occur.MUST);
}
if (session != null && !session.equals("")){
Term term = new Term("session", session);
Query query2 = new TermQuery(term);
builder.add(query2, Occur.MUST);
}
// 排序, lucene5中對排序的欄位,必須是使用NumericDocValuesField欄位
if (sortType == SortType.Default) {
resultList = defaultRequestLogSegment(searcher, builder, reverse, save,
savePath);
} else if (sortType == SortType.RequestTime) {
resultList = requestTimeLogSegment(searcher, builder, !reverse, save,
savePath);
} else if (sortType == SortType.RequestFrequency) {
resultList = requestFrequencyLogSegment(searcher, builder, !reverse, save,
savePath);
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return resultList;
}
/**
* 預設的查詢,預設只顯示排名前{@link #defaultRows}的結果
*
* @param searcher
* @param builder
* @param save
* @param savePath
* @return
* @throws Exception
*/
private static List<String> defaultRequestLogSegment(
IndexSearcher searcher, Builder builder, boolean reverse,
boolean save, String savePath) throws Exception {
Sort sort = new Sort(
new SortField("date", SortField.Type.LONG, reverse));
int searchNum = save ? Integer.MAX_VALUE : defaultRows; // 查詢的個數
TopDocs results = searcher.search(builder.build(), searchNum, sort);
ScoreDoc[] hits = results.scoreDocs;
//System.out.println("總命中行數:" + hits.length);
if (save) {// 儲存結果到檔案中
//FileWriter fwrite = new FileWriter(savePath, false);
List<String[]> data = new ArrayList<String[]>();
for (int i = 0; i < hits.length; i++) {
Document doc = searcher.doc(hits[i].doc);
// {日誌級別}%{seesion:執行緒}%{日誌名稱}%{url}
data.add(new String[]{
"[" + format2.format(format1.parse(doc.get("date"))) + "]",
doc.get("content").split(" ")[0],
doc.get("content").split(" ")[1],
doc.get("content").split(" ")[2],
doc.get("content").split(" ")[3]});
}
ExcelExport.exportInfo(data, new String[]{"時間", "日誌級別", "session:執行緒", "日誌名稱", "URL"}, new File(savePath));
return null;
} else {// 顯示結果到介面
List<String> list = new ArrayList<String>();
int size = hits.length >= defaultRows ? defaultRows : hits.length;
for (int i = 0; i < size; i++) {
Document doc = searcher.doc(hits[i].doc);
list.add("[" + format2.format(format1.parse(doc.get("date"))).replace(" ", "_") + "] "
+ doc.get("content") + "\n");
}
return list;
}
}
/**
* 按請求時間長短進行查詢排序,預設只顯示排名前{@link #defaultRows}的結果
*
* @param searcher
* @param builder
* @param reverse
* @param save
* false 升序 | true 降序
* @param savePath
* @return
* @throws Exception
*/
private static List<String> requestTimeLogSegment(IndexSearcher searcher,
Builder builder, boolean reverse, boolean save, String savePath)
throws Exception {
Sort sort = new Sort(new SortField("interval", SortField.Type.FLOAT,
reverse));
int searchNum = save ? Integer.MAX_VALUE : defaultRows; // 查詢的個數
TopDocs results = searcher.search(builder.build(), searchNum, sort);
ScoreDoc[] hits = results.scoreDocs;
//System.out.println("總命中行數:" + hits.length);
if (save) {// 儲存到檔案
List<String[]> data = new ArrayList<String[]>();
for (int i = 0; i < hits.length; i++) {
Document doc = searcher.doc(hits[i].doc);
data.add(new String[]{ "[" + format2.format(format1.parse(doc.get("date"))) + "]",
doc.get("content").split(" ")[1],
doc.get("content").split(" ")[2],
doc.get("content").split(" ")[3],
doc.get("interval")});
}
ExcelExport.exportInfo(data, new String[]{"時間", "session:執行緒", "日誌名稱", "URL", "執行時間(毫秒)"}, new File(savePath));
return null;
} else {// 顯示結果到介面
List<String> list = new ArrayList<String>();
int size = hits.length >= defaultRows ? defaultRows : hits.length;
for (int i = 0; i < size; i++) {
Document doc = searcher.doc(hits[i].doc);
// {日誌級別}%{seesion:執行緒}%{日誌名稱}%{url}%{執行時間}
list.add(format2.format(format1.parse(doc.get("date"))).replace(" ", "_") + " "
+ doc.get("content").split(" ")[1] + " "
+ doc.get("content").split(" ")[2] + " "
+ doc.get("content").split(" ")[3] + " "
+ doc.get("interval") + "\n");
}
return list;
}
}
/**
* 按請求次數進行查詢排序,預設只顯示排名前{@link #defaultRows}的結果
*
* @param searcher
* @param builder
* @param reverse
* @param save
* false 升序 | true 降序
* @return
* @throws Exception
*/
private static List<String> requestFrequencyLogSegment(
IndexSearcher searcher, Builder builder, boolean reverse,
boolean save, String savePath) throws Exception {
Sort sort = new Sort(new SortField("date", SortField.Type.LONG));
TopDocs results = searcher.search(builder.build(), Integer.MAX_VALUE,
sort);
ScoreDoc[] hits = results.scoreDocs;
//System.out.println("總命中行數:" + hits.length);
HashMap<String, RequestModel> requestMap = new HashMap<String, RequestModel>();
// 獲取
for (ScoreDoc scoreDoc : hits) {
Document doc = searcher.doc(scoreDoc.doc);
String url = doc.get("content").split(" ")[3];
if (requestMap.containsKey(url)) {
requestMap.put(url, new Searcher().new RequestModel(requestMap.get(url).times + 1,
requestMap.get(url).sumTime + Float.parseFloat(doc.get("interval"))));
} else {
requestMap.put(url, new Searcher().new RequestModel(1, Float.parseFloat(doc.get("interval"))));
}
}
// 排序
List<Map.Entry<String, RequestModel>> retList = new ArrayList<Map.Entry<String, RequestModel>>();
retList.addAll(requestMap.entrySet());
MapValueComparator mvComparator = new Searcher().new MapValueComparator(reverse);
// sort方法使用的是歸併排序(合併排序),是穩定排序中最快的一種
Collections.sort(retList, mvComparator);
if (save) {// 儲存到檔案
List<String[]> data = new ArrayList<String[]>();
for (Entry<String, RequestModel> entry : retList) {
data.add(new String[]{entry.getKey(), String.valueOf(entry.getValue().times),
String.valueOf((int)(entry.getValue().sumTime/entry.getValue().times))});
}
ExcelExport.exportInfo(data, new String[]{"URL", "訪問次數", "平均執行時間(毫秒)"}, new File(savePath));
return null;
} else {// 顯示結果到介面
List<String> list = new ArrayList<String>();
int showCount = 0;
for (Entry<String, RequestModel> entry : retList) {
showCount++;
list.add(entry.getKey() + " " + entry.getValue().times + " " + (int)(entry.getValue().sumTime/entry.getValue().times));
if (showCount >= defaultRows) {
break;
}
}
return list;
}
}
/**
* 對Map型別元素的值進行比較
*
* @author wuxinhui
*
*/
class MapValueComparator implements Comparator<Map.Entry<String, RequestModel>> {
public MapValueComparator() {
}
public MapValueComparator(boolean reverse) {
this.reverse = reverse;
}
boolean reverse = false;// false 升序|true 降序
public void setReverse(boolean reverse) {
this.reverse = reverse;
}
@Override
public int compare(Map.Entry<String, RequestModel> map1,
Map.Entry<String, RequestModel> map2) {
if (reverse) {
return map2.getValue().times - map1.getValue().times;
} else {
return map1.getValue().times - map2.getValue().times;
}
}
}
/**
* 希爾排序(縮小增量排序):對於資料庫不大的排序建議使用,希爾排序是對直接插入排序的優化。
*
* @param data
*/
@SuppressWarnings("unused")
private static int[] shellSort(int data[]) {
int j = 0, temp = 0;
for (int increment = data.length / 2; increment > 0; increment = increment / 2) {
//System.out.println("increment:" + increment);
for (int i = increment; i < data.length; i++) {
temp = data[i];
for (j = i - increment; j >= 0; j = j - increment) {
if (temp < data[j]) {
data[j + increment] = data[j];
} else {
break;
}
}
data[j + increment] = temp;
}
for (int i = 0; i < data.length; i++) {
System.out.print(data[i] + " ");
}
}
return data;
}
class RequestModel{
int times;// 訪問的次數
float sumTime;// 訪問的總時間,以便計算平均時間
public RequestModel(int times, float sumTime) {
super();
this.times = times;
this.sumTime = sumTime;
}
public int getTimes() {
return times;
}
public float getSumTime() {
return sumTime;
}
}
}