如何構建“高效能”“大小無限”(磁碟)佇列?
假設場景:
1. 針對一個高併發的應用,你是否會選擇列印訪問日誌?
2. 針對分散式的應用,你是否會選擇將所有日誌列印到日誌中心?
解決方案:
1. 如果如果你選擇為了效能,不列印日誌,那無可厚非。但是你得考慮清楚,出問題的時候是否能夠做到快速排查?
2. 你覺得日誌分佈在各臺機器上很方便,那不用日誌中心也行!
如果,你還是會選擇列印大量的訪問日誌,如果你還是會選擇列印日誌到日誌中心,那麼本文對你有用!
如果自己實現一個日誌中心,不說很難吧,也還是要費很大力氣的,比如效能,比如容量大小!
所以,本文選擇阿里雲的 loghub 作為日誌中心,收集所有日誌!
loghub 常規操作:
在提出本文主題之前,咱們要看看loghub自己的方式,以及存在的問題!
在官方接入文件裡,就建議咱們使用 logProducer 接入。
其實 logProducer 已經做了太多的優化,比如當日志包資料達到一定數量,才統一進行傳送,非同步傳送等等!
至於為什麼還會存在本篇文章,則是由於這些優化還不夠,比如 這些日誌傳送仍然會影響業務效能,仍然會受到記憶體限制,仍然會搶佔大量cpu。。。
好吧,接入方式:
1. 引入maven依賴:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log-logback-appender</artifactId> <version>0.1.13</version> </dependency>
2. logback中新增appender:
<appender name="LOGHUB-APPENDER" class="appender:com.aliyun.openservices.log.logback.LoghubAppender"> <endpoint>${loghub.endpoint}</endpoint> <accessKeyId>${loghub.accessKeyId}</accessKeyId> <accessKey>${loghub.accessKey}</accessKey> <projectName>${loghub.projectName}</projectName> <logstore>test-logstore</logstore> <topic>${loghub.topic}</topic> <packageTimeoutInMS>1500</packageTimeoutInMS> <logsCountPerPackage>4096</logsCountPerPackage> <!-- 4718592=4M, 3145728=3M, 2097152=2M --> <logsBytesPerPackage>3145728</logsBytesPerPackage> <!-- 17179869184=2G(溢位丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M--> <memPoolSizeInByte>536870912</memPoolSizeInByte> <retryTimes>1</retryTimes> <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <root level="${logging.level}"> <appender-ref ref="STDOUT"/> <appender-ref ref="LOGHUB-APPENDER" /> </root>
3. 在程式碼中進行日誌列印:
private static Logger logger = LoggerFactory.getLogger(MyClass.class); logger.warn("give me five: {}", name);
看似高效接入,存在的問題:
1. loghub日誌的傳送是非同步的沒錯,但是當傳送網路很慢時,將會出現大量記憶體堆積;
2. 堆積也不怕,如上配置,當堆積記憶體達到一定限度時,就不會再大了。他是怎麼辦到的?其實就是通過一個鎖,將後續所有請求全部阻塞了,這想想都覺得可怕;
3. 網路慢我們可以多開幾個傳送執行緒嘛,是的,這樣能在一定程度上緩解發送問題,但是基本也無補,另外,日誌傳送執行緒開多之後,執行緒的排程將會更可怕,而這只是一個可有可無的功能而已啊;
針對以上問題,我們能做什麼?
1. 去除不必要的日誌列印,這不是廢話嘛,能這麼幹早幹了!
2. 在網路慢的時候,減少日誌列印;這有點牽強,不過可以試試!
3. 直接使用非同步執行緒進行日誌接收和傳送,從根本上解決問題!
4. 如果使用非同步執行緒進行傳送,那麼當日志大量堆積怎麼辦?
5. 使用本地檔案儲存需要進行傳送的日誌,解決大量日誌堆積問題,待網路暢通後,快速傳送!
考慮到使用非同步執行緒傳送日誌、使用本地磁碟儲存大量日誌堆積,問題應該基本都解決了!
但是具體怎麼做呢?
如何非同步?
如何儲存磁碟?
這些都是很現實的問題!
如果看到這裡,覺得很low的同學,基本可以撤了!
下面我們來看看具體實施方案:
1. 如何非同步?
能想像到的,基本就是使用一個佇列來接收日誌寫請求,然後,開另外的消費執行緒進行消費即可!
但是,這樣會有什麼問題?因為外部請求訪問進來,都是併發的,這個佇列得執行緒安全吧!用 synchronized ? 用阻塞佇列?
總之,看起來都會有一個並行轉序列的問題,這會給應用併發能力帶去打擊的!
所以,我們得減輕這鎖的作用。我們可以使用多個佇列來解決這個問題,類似於分段鎖!如果併發能力不夠,則增加鎖數量即可!
說起來還是很抽象吧,現成的程式碼擼去吧!
1. 覆蓋原來的 logProducer 的 appender, 使用自己實現的appender, 主要就是解決非同步問題:
<appender name="LOGHUB-APPENDER" class="com.test.AsyncLoghubAppender"> <endpoint>${loghub.endpoint}</endpoint> <accessKeyId>${loghub.accessKeyId}</accessKeyId> <accessKey>${loghub.accessKey}</accessKey> <projectName>${loghub.projectName}</projectName> <logstore>apollo-alarm</logstore> <topic>${loghub.topic}</topic> <packageTimeoutInMS>1500</packageTimeoutInMS> <logsCountPerPackage>4096</logsCountPerPackage> <!-- 4718592=4M, 3145728=3M, 2097152=2M --> <logsBytesPerPackage>3145728</logsBytesPerPackage> <!-- 17179869184=2G(溢位丟棄) , 104857600=12.5M, 2147483647=2G, 536870912=512M--> <memPoolSizeInByte>536870912</memPoolSizeInByte> <retryTimes>1</retryTimes> <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender>
2. 接下來就是核心的非同步實現: AsyncLoghubAppender
import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.classic.spi.StackTraceElementProxy; import ch.qos.logback.classic.spi.ThrowableProxyUtil; import ch.qos.logback.core.CoreConstants; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.util.IOUtils; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.logback.LoghubAppender; import com.aliyun.openservices.log.logback.LoghubAppenderCallback; import com.test.biz.cache.LocalDiskEnhancedQueueManager; import com.test.biz.cache.LocalDiskEnhancedQueueManagerFactory; import com.test.model.LoghubItemsWrapper; import com.taobao.notify.utils.threadpool.NamedThreadFactory; import org.joda.time.DateTime; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 非同步寫loghub appender, 解決框架的appender 無法承受高併發寫的問題 * */ public class AsyncLoghubAppender<E> extends LoghubAppender<E> { /** * put 執行緒,從業務執行緒接收訊息過來 */ private ExecutorService puterExecutor; /** * 佇列搬運執行緒執行器 */ private ExecutorService takerExecutor; /** * mapdb 操作腳手架 */ private LocalDiskEnhancedQueueManager localDiskEnhancedQueueManager; /** * 日誌訊息傳球手 */ private List<LinkedBlockingQueue<LoghubItemsWrapper>> distributeLogItemPoster; // puter 的執行緒數,與cpu核數保持一致 private final int puterThreadNum = 4; // taker 的執行緒數,可以稍微少點 private final int takerThreadNum = 1; @Override public void start() { super.start(); // 開啟單個put 執行緒 doStart(); } private void doStart() { initMapDbQueue(); initPosterQueue(); startPutterThread(); startTakerThread(); } /** * 初始化 mapdb 資料庫 */ private void initMapDbQueue() { localDiskEnhancedQueueManager = LocalDiskEnhancedQueueManagerFactory.newMapDbQueue(); } /** * 初始化訊息傳球手資料 */ private void initPosterQueue() { distributeLogItemPoster = new ArrayList<>(); for(int i = 0; i < puterThreadNum; i++) { distributeLogItemPoster.add(new LinkedBlockingQueue<>(10000000)); } } /** * 開啟 putter 執行緒組,此執行緒組不應慢於業務執行緒太多,否則導致記憶體溢位 */ private void startPutterThread() { puterExecutor = Executors.newFixedThreadPool(puterThreadNum, new NamedThreadFactory("Async-LoghubItemPoster")); for(int i = 0; i < puterThreadNum; i++) { puterExecutor.execute(new InnerQueuePuterThread(distributeLogItemPoster.get(i))); } } /** * 初始化取數執行緒組,此執行緒組可以執行慢 */ private void startTakerThread() { takerExecutor = Executors.newFixedThreadPool(takerThreadNum, new NamedThreadFactory("Async-LoghubAppender")); for(int i = 0; i < takerThreadNum; i++) { takerExecutor.execute(new InnerQueueTakerThread()); } } @Override public void stop() { super.stop(); localDiskEnhancedQueueManager.close(); } // copy from parent @Override public void append(E eventObject) { try { appendEvent(eventObject); } catch (Exception e) { addError("Failed to append event.", e); } } /** * 優雅停機 */ public void shutdown() { puterExecutor.shutdown(); try { puterExecutor.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { addError("【日誌appender】loghub shutdown interupt", e); Thread.currentThread().interrupt(); } } // modify from parent private void appendEvent(E eventObject) { //init Event Object if (!(eventObject instanceof LoggingEvent)) { return; } LoggingEvent event = (LoggingEvent) eventObject; List<LogItem> logItems = new ArrayList<>(); LogItem item = new LogItem(); logItems.add(item); item.SetTime((int) (event.getTimeStamp() / 1000)); DateTime dateTime = new DateTime(event.getTimeStamp()); item.PushBack("time", dateTime.toString(formatter)); item.PushBack("level", event.getLevel().toString()); item.PushBack("thread", event.getThreadName()); StackTraceElement[] caller = event.getCallerData(); if (caller != null && caller.length > 0) { item.PushBack("location", caller[0].toString()); } String message = event.getFormattedMessage(); item.PushBack("message", message); IThrowableProxy iThrowableProxy = event.getThrowableProxy(); if (iThrowableProxy != null) { String throwable = getExceptionInfo(iThrowableProxy); throwable += fullDump(event.getThrowableProxy().getStackTraceElementProxyArray()); item.PushBack("throwable", throwable); } if (this.encoder != null) { // 框架也未處理好該問題,暫時忽略 // item.PushBack("log", new String(this.encoder.encode(eventObject))); } LoghubItemsWrapper itemWrapper = new LoghubItemsWrapper(); itemWrapper.setLogItemList(logItems); putItemToPoster(itemWrapper); } /** * 將佇列放入 poster 中 * * @param itemsWrapper 日誌資訊 */ private void putItemToPoster(LoghubItemsWrapper itemsWrapper) { try { LinkedBlockingQueue<LoghubItemsWrapper> selectedQueue = getLoadBalancedQueue(); selectedQueue.put(itemsWrapper); } catch (InterruptedException e) { addError("【日誌appender】放入佇列中斷"); Thread.currentThread().interrupt(); } } /** * 選擇一個佇列進行日誌放入 * * @return 佇列容器 */ private LinkedBlockingQueue<LoghubItemsWrapper> getLoadBalancedQueue() { long selectQueueIndex = System.nanoTime() % distributeLogItemPoster.size(); return distributeLogItemPoster.get((int) selectQueueIndex); } // copy from parent private String fullDump(StackTraceElementProxy[] stackTraceElementProxyArray) { StringBuilder builder = new StringBuilder(); for (StackTraceElementProxy step : stackTraceElementProxyArray) { builder.append(CoreConstants.LINE_SEPARATOR); String string = step.toString(); builder.append(CoreConstants.TAB).append(string); ThrowableProxyUtil.subjoinPackagingData(builder, step); } return builder.toString(); } // copy from parent private String getExceptionInfo(IThrowableProxy iThrowableProxy) { String s = iThrowableProxy.getClassName(); String message = iThrowableProxy.getMessage(); return (message != null) ? (s + ": " + message) : s; } class InnerQueuePuterThread implements Runnable { private LinkedBlockingQueue<LoghubItemsWrapper> queue; public InnerQueuePuterThread(LinkedBlockingQueue<LoghubItemsWrapper> queue) { this.queue = queue; } @Override public void run() { // put the item to mapdb while (!Thread.interrupted()) { LoghubItemsWrapper itemsWrapper = null; try { itemsWrapper = queue.take(); } catch (InterruptedException e) { addError("【日誌appender】poster佇列中斷"); Thread.currentThread().interrupt(); } if(itemsWrapper != null) { flushLogItemToMapDb(itemsWrapper); } } } /** * 將記憶體佇列儲存到 mapdb 中, 由消費執行緒獲取 * * @param itemsWrapper 日誌資訊 */ private void flushLogItemToMapDb(LoghubItemsWrapper itemsWrapper) { byte[] itemBytes = JSONObject.toJSONBytes(itemsWrapper.getLogItemList()); localDiskEnhancedQueueManager.push(itemBytes); } } /** * for debug, profiler for mapdb */ private static final AtomicLong takerCounter = new AtomicLong(0); class InnerQueueTakerThread implements Runnable { @Override public void run() { long startTime = System.currentTimeMillis(); while (!Thread.interrupted()) { //item = fullLogQueues.take(); // take items without lock try { while (localDiskEnhancedQueueManager.isEmpty()) { Thread.sleep(100L); } } catch (InterruptedException e) { addError("【日誌appender】中斷異常", e); Thread.currentThread().interrupt(); break; } byte[] itemBytes = localDiskEnhancedQueueManager.pollFirstItem(); try { if(itemBytes != null && itemBytes != localDiskEnhancedQueueManager.EMPTY_VALUE_BYTE_ARRAY) { List<LogItem> itemWrapper = JSONObject.parseArray( new String(itemBytes, IOUtils.UTF8), LogItem.class); if(itemWrapper != null) { doSend(itemWrapper); } } else { // 如果資料不為空,且一直在迴圈,說明存在異常,暫時處理為重置佇列,但應從根本上解決問題 localDiskEnhancedQueueManager.reset(); } } catch (Exception e) { addError("【日誌appender】json解析異常", e); } // for debug test, todo: 上線時去除該程式碼 if(takerCounter.incrementAndGet() % 1000 == 0) { System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + ": per 1000 items took time: " + (System.currentTimeMillis() - startTime) + " ms."); startTime = System.currentTimeMillis(); } } } /** * 傳送資料邏輯,主要為 loghub * * @param item logItem */ private void doSend(List<LogItem> item) { AsyncLoghubAppender.this.doSendToLoghub(item); } } /** * 傳送資料邏輯,loghub * * @param item logItem */ private void doSendToLoghub(List<LogItem> item) { producer.send(projectConfig.projectName, logstore, topic, source, item, new LoghubAppenderCallback<>(AsyncLoghubAppender.this, projectConfig.projectName, logstore, topic, source, item)); } }
如上實現,簡單說明下:
1. 開啟n個消費執行緒的 distributeLogItemPoster 阻塞佇列,用於接收業務執行緒發來的日誌請求;
2. 開啟n個消費執行緒, 將從業務執行緒接收過來的請求佇列,放入磁碟佇列中,從而避免可能記憶體溢位;
3. 開啟m個taker執行緒,從磁碟佇列中取出資料,進行loghub的傳送任務;
如上,我們已經完全將日誌的傳送任務轉移到非同步來處理了!
但是,這樣真的就ok了嗎?磁碟佇列是什麼?可靠嗎?效能如何?執行緒安全嗎?
2. 如何儲存磁碟佇列?
好吧。咱們這裡使用的是 mapdb 來實現的磁碟佇列, mapdb 的 github star數超3k, 應該還是不錯了!
但是,它更多的是用來做磁碟快取,佇列並沒有過多關注,不管怎麼樣,我們還是可以選擇的!
mapdb專案地址: https://github.com/jankotek/mapdb
其實mapdb有幾個現成的佇列可用: IndexTreeList, TreeSet. 但是我們仔細看下他的官宣,看到這些資料結構只支援少量資料時的儲存,在資料量巨大之後,效能完全無法保證,甚至 poll 一個元素需要1s+ 。
所以,還得拋棄其佇列實現,只是自己實現一個了,其 HashTree 是個不錯的選擇, 使用 HashTree 來實現佇列,唯一的難點在於,如何進行元素迭代;(大家不仿先自行思考下)
下面我們來看下我的一個實現方式:
import com.test.biz.cache.LocalDiskEnhancedQueueManager; import com.taobao.notify.utils.threadpool.NamedThreadFactory; import org.mapdb.BTreeMap; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.NavigableSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * MapDb 實現的記憶體佇列工具類 * */ public class LocalDiskEnhancedQueueManagerMapDbImpl implements LocalDiskEnhancedQueueManager { private static final Logger logger = LoggerFactory.getLogger(LocalDiskEnhancedQueueManagerMapDbImpl.class); /** * 預設儲存檔案 */ private final String DEFAULT_DB_FILE = "/opt/mapdb/logappender.db"; /** * 佇列名 */ private final String LOG_ITEM_LIST_TABLE = "hub_log_appender"; private final String LOG_ITEM_TREE_SET_TABLE = "hub_log_appender_tree_set"; private final String LOG_ITEM_HASH_MAP_TABLE = "hub_log_appender_hash_map"; private final String LOG_ITEM_BTREE_TABLE = "hub_log_appender_btree"; private final String QUEUE_OFFSET_HOLDER_BTREE_TABLE = "queue_offset_holder_btree_table"; /** * db 例項 */ private final DB mapDb; // private IndexTreeList<byte[]> indexTreeListQueue; /** * 假裝是個佇列 */ private NavigableSet<byte[]> treeSetQueue; private BTreeMap<byte[], Byte> bTreeQueue; private ConcurrentMap<Long, byte[]> concurrentMapQueue; /** * 佇列偏移量持有器, 對於小容量的節點使用 btree 處理很好 */ private BTreeMap<String, Long> queueOffsetDiskHolder; /** * 讀佇列偏移器, jvm 執行時使用該值, 該值被定時重新整理到 mapdb 中 * * 會有部分資料重複情況 * */ private AtomicLong readerOfQueueOffsetJvmHolder; /** * 寫佇列偏移器, jvm 執行時使用該值, 該值被定時重新整理到 mapdb 中 * * 會有部分資料重複情況 */ private AtomicLong writerOfQueueOffsetJvmHolder; private final String readerOffsetCacheKeyName = "loghub_appender_queue_key_read_offset"; private final String writerOffsetCacheKeyName = "loghub_appender_queue_key_write_offset"; /** * mapdb 構造方法,給出佇列持有者 * */ public LocalDiskEnhancedQueueManagerMapDbImpl() { mapDb = DBMaker.fileDB(getDbFilePath()) .checksumHeaderBypass() .closeOnJvmShutdown() .fileChannelEnable() .fileMmapEnableIfSupported() // 嘗試修復刪除元素後磁碟檔案大小不變化的bug .cleanerHackEnable() .concurrencyScale(128) .make(); initQueueOffsetHolder(); initQueueOwner(); initCleanUselessSpaceJob(); } /** * 初始化佇列偏移器 */ private void initQueueOffsetHolder() { queueOffsetDiskHolder = mapDb.treeMap(QUEUE_OFFSET_HOLDER_BTREE_TABLE, Serializer.STRING, Serializer.LONG) .createOrOpen(); initQueueReaderOffset(); initQueueWriterOffset(); } /** * 初始化讀偏移資料 */ private void initQueueReaderOffset() { Long readerQueueOffsetFromDisk = queueOffsetDiskHolder.get(readerOffsetCacheKeyName); if(readerQueueOffsetFromDisk == null) { readerOfQueueOffsetJvmHolder = new AtomicLong(1); } else { readerOfQueueOffsetJvmHolder = new AtomicLong(readerQueueOffsetFromDisk); } } /** * 初始化寫偏移資料 */ private void initQueueWriterOffset() { Long writerQueueOffsetFromDisk = queueOffsetDiskHolder.get(writerOffsetCacheKeyName); if(writerQueueOffsetFromDisk == null) { writerOfQueueOffsetJvmHolder = new AtomicLong(1); } else { writerOfQueueOffsetJvmHolder = new AtomicLong(writerQueueOffsetFromDisk); } } /** * 刷入最新的讀偏移 */ private void flushQueueReaderOffset() { queueOffsetDiskHolder.put(readerOffsetCacheKeyName, readerOfQueueOffsetJvmHolder.get()); } /** * 刷入最新的讀偏移 */ private void flushQueueWriterOffset() { queueOffsetDiskHolder.put(writerOffsetCacheKeyName, writerOfQueueOffsetJvmHolder.get()); } /** * 初始化佇列容器 */ private void initQueueOwner() { // indexTreeListQueue = db.indexTreeList(LOG_ITEM_LIST_TABLE, Serializer.BYTE_ARRAY).createOrOpen(); // bTreeQueue = mapDb.treeMap(LOG_ITEM_BTREE_TABLE, // Serializer.BYTE_ARRAY, Serializer.BYTE) // .counterEnable() // .valuesOutsideNodesEnable() // .createOrOpen(); // treeSetQueue = mapDb.treeSet(LOG_ITEM_TREE_SET_TABLE, Serializer.BYTE_ARRAY) // .createOrOpen(); concurrentMapQueue = mapDb.hashMap(LOG_ITEM_HASH_MAP_TABLE, Serializer.LONG, Serializer.BYTE_ARRAY) .counterEnable() // 當處理能力很差時,就將該日誌列印丟掉 .expireMaxSize(100 * 10000 * 10000L) // 3小時後還沒消費就過期了 .expireAfterCreate(3L, TimeUnit.HOURS) .expireAfterGet() .createOrOpen(); } /** * 清理無用空間,如磁碟檔案等等 */ private void initCleanUselessSpaceJob() { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Async-MapDbSpaceCleaner")); // 每過10分鐘清理一次無用空間,看情況調整 scheduledExecutorService.scheduleAtFixedRate(() -> { mapDb.getStore().compact(); }, 0L, 10L, TimeUnit.MINUTES); // 每過10s刷入一次讀寫偏移,允許重複和丟失 scheduledExecutorService.scheduleAtFixedRate(() -> { flushQueueWriterOffset(); flushQueueReaderOffset(); }, 30L, 10L, TimeUnit.SECONDS); } /** * 獲取檔案儲存位置,考慮後續擴充套件被子類覆蓋 * * @return db檔案地址 */ protected String getDbFilePath() { return DEFAULT_DB_FILE; } /** * 獲取下一個佇列讀編號 (確保準確性可承受,效能可承受) * * @return 佇列編號 */ private long getNextReaderId() { return readerOfQueueOffsetJvmHolder.incrementAndGet(); } /** * 獲取下一個佇列寫編號 (確保準確性可承受,效能可承受) * * @return 佇列編號 */ private long getNextWriterId() { return writerOfQueueOffsetJvmHolder.incrementAndGet(); } @Override public boolean push(byte[] itemBytes) { // return indexTreeListQueue.add(itemBytes); // bTreeQueue.put(itemBytes, (byte)1 ); // treeSetQueue.add(itemBytes); concurrentMapQueue.put(getNextWriterId(), itemBytes); return true; } @Override public byte[] pollFirstItem() { // 使用時不得使用修改元素方法 // return indexTreeListQueue.remove(index); // Map.Entry<byte[], Byte> entry = bTreeQueue.pollFirstEntry(); // return treeSetQueue.pollFirst(); return concurrentMapQueue.remove(getNextReaderId()); } @Override public boolean isEmpty() { // 佇列為空,不一定代表就沒有可供讀取的元素了,因為 counter 可能落後於併發寫操作了 // 佇列不為空,不一定代表就一定有可供讀取的元素,因為 counter 可能落後於併發 remove 操作了 // 當讀指標等於寫指標時,則代表所有元素已被讀取完成,應該是比較準確的空判定標準 return concurrentMapQueue.isEmpty() || readerOfQueueOffsetJvmHolder.get() == writerOfQueueOffsetJvmHolder.get(); } @Override public void close() { flushQueueWriterOffset(); flushQueueWriterOffset(); mapDb.close(); } @Override public void reset() { concurrentMapQueue.clear(); // 同步兩個值,非準確的 readerOfQueueOffsetJvmHolder.set(writerOfQueueOffsetJvmHolder.get()); logger.error("【mapdb快取】讀寫指標衝突,強制重置指標,請注意排查併發問題. reader:{}, writer:{}", readerOfQueueOffsetJvmHolder.get(), writerOfQueueOffsetJvmHolder.get()); } }
如上,就是使用 mapdb的hashMap 實現了磁碟佇列功能,主要思路如下:
1. 使用一個long的自增資料作為 hashMap 的key,將佇列存入value中;
2. 使用另一人 long 的自增指標做為讀key, 依次讀取資料;
3. 讀寫指標都定期刷入磁碟,以防出異常crash時無法恢復;
4. 當實在出現了未預料的bug時,允許直接丟棄衝突日誌,從一個新的讀取點開始新的工作;
最後,再加一個工廠類,生成mapdb佇列例項: LocalDiskEnhancedQueueManagerFactory
import com.test.biz.cache.impl.LocalDiskEnhancedQueueManagerMapDbImpl; /** * 本地磁碟佇列等例項工廠類 * */ public class LocalDiskEnhancedQueueManagerFactory { /** * 生產一個mapDb實現的佇列例項 * * @return mapdb 佇列例項 */ public static LocalDiskEnhancedQueueManager newMapDbQueue() { return new LocalDiskEnhancedQueueManagerMapDbImpl(); } /** * 生產一個使用 ehcache 實現的佇列例項 * * @return ehcache 佇列例項 */ public static LocalDiskEnhancedQueueManager newEhcacheQueue() { // 有興趣的同學可以實現下 return null; } /** * 生產一個使用 fqueue 實現的佇列例項 * * @return fqueue 佇列例項 */ public static LocalDiskEnhancedQueueManager newFQueueQueue() { // 有興趣的同學可以實現下, 不過不太建議 return null; } /** * 生產一個使用 自己直接寫磁碟檔案 實現的佇列例項 * * @return file 佇列例項 */ public static LocalDiskEnhancedQueueManager newOwnFileQueue() { // 有興趣的同學可以挑戰下 return null; } }
這樣,我們就實現了一個既能滿足高併發場景下的日誌列印需求了吧。業務執行緒優先,日誌執行緒非同步化、可丟棄、cpu佔用少、記憶體不限制。
老話: 優化之路,道阻且