通過佇列實現非同步批量處理
阿新 • • 發佈:2020-10-14
特別提示:本人部落格部分有參考網路其他部落格,但均是本人親手編寫過並驗證通過。如發現部落格有錯誤,請及時提出以免誤導其他人,謝謝!歡迎轉載,但記得標明文章出處:http://www.cnblogs.com/mao2080/
1、問題描述
最近有一個非同步寫日誌的需求,為了提高效率採用了非同步批量插入的方式,大致思路是:有日誌產生時存入指定佇列,一個執行緒從佇列中批量讀取固定數量的日誌,同時設定一個超時時間,避免了長時間未產生足夠的日誌從而不存庫的情況。下面的demo模擬了基本流程。
2、操作方法
1、引入pom檔案
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.1-jre</version> <scope>compile</scope> </dependency>
2、程式碼片段
package com; import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class T { /** * 批量存入數量 */ private static int NUM_ELEMENTS = 10; /** * 等待超時時間(單位:秒) */ private static int TIME_OUT = 5; public static void main(String[] args) { final BlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();
//模擬產生日誌 new Thread(new Runnable(){ public void run() { long num=1L; while(true) { try { queue.put(num); num++; long time = (long) (Math.random()*1000); Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }}).start(); while (true) { try { List<Long> list = new ArrayList<Long>(NUM_ELEMENTS); Queues.drain(queue, list, NUM_ELEMENTS, TIME_OUT, TimeUnit.SECONDS); System.out.println(list+", size:"+list.size());
//執行存庫動作 } catch (InterruptedException e) { e.printStackTrace(); } } } }
3、執行效果
console日誌,從結果上看和預期結果一致,即有滿足指定數量10條的情況,也有滿足超時(不夠10條的情況)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], size:10 [11, 12, 13, 14, 15, 16, 17, 18], size:8 [19, 20, 21, 22, 23, 24, 25, 26, 27, 28], size:10 [29, 30, 31, 32, 33, 34, 35, 36, 37], size:9 [38, 39, 40, 41, 42, 43, 44, 45, 46, 47], size:10 [48, 49, 50, 51, 52, 53, 54, 55, 56, 57], size:10 [58, 59, 60, 61, 62, 63, 64, 65, 66, 67], size:10 [68, 69, 70, 71, 72, 73, 74, 75], size:8 [76, 77, 78, 79, 80, 81, 82, 83, 84], size:9 [85, 86, 87, 88, 89, 90, 91, 92, 93, 94], size:10 [95, 96, 97, 98, 99, 100, 101, 102, 103], size:9 [104, 105, 106, 107, 108, 109, 110], size:7