阻塞佇列的簡易實現
阿新 • • 發佈:2020-09-16
/**
* 業務場景:
* 檢視資料庫辦件的推送情況
* 推送限時為60秒
* 如果60秒內被推送出去,資料庫狀態為1
* 沒被推送過為0
* 超過60秒失效為-1
* (推送過的不考慮,只實現未推送的,和超時的
* 以及伺服器重啟後從新加入佇列的處理)
*=======隨便瞎寫,歡迎指點======
*/
入口 類:
package com...delayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; importorg.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class DelayQueueController { private static final String SUCCESS = "seccess"; private staticfinal String FAILUER = "failure"; @Autowired private DelayQueueSave pDelayQueueSave; /** * http://localhost:8080/submit?Number=5 * 接收5個請求,資料庫生成5條辦件 * @param Number * @return */ @RequestMapping("/submit") @ResponseBody public String saveUser(@RequestParam("Number")intNumber){ pDelayQueueSave.insert(Number); return SUCCESS; } }
資料庫處理 類:
package com...delayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import wfc.service.database.RecordSet; import wfc.service.database.SQL; import javax.annotation.PostConstruct; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; @Service public class DelayQueueSave { @Autowired private DelayQueueService delayQueueService; public final static String UNPAY = "0";//未推送 public final static String PAYED = "1";//已推送 public final static String EXPIRED = "-1";//已過期 public void insert(int Number) { for(int i=0;i<Number;i++) { Long time = new Date().getTime(); Long times = time+60000; // Random random = new Random(); // long expireTime = random.nextInt(20)+5;//超時時長,單位秒5~25 long expireTime = 60;//超時時長,單位秒60 //資料庫的 儲存和過期時間 Timestamp saveTime=new Timestamp(time); Timestamp saveTimes=new Timestamp(times); String ST_FJ_ID = time+"_"+expireTime+"_S";//業務編號 String insertSql = "insert into DANGAN_FJ(ST_FJ_ID,DANGAN_TYPE,TIME,TIMES) values (?,?,?,?)"; Object[] insertObject = new Object[] {ST_FJ_ID,UNPAY,saveTime,saveTimes}; SQL.execute(insertSql,insertObject); System.out.println("SQL新增標誌...."); /*進行延時處理*/ delayQueueService.Delay(ST_FJ_ID,expireTime); } } //服務重啟的處理 @PostConstruct public void initDelay(){ System.out.println("開始掃描未推送的辦件...."); String type = "0"; String insertSql = "select * from DANGAN_FJ where DANGAN_TYPE = ? "; Object[] insertObject = new Object[] {type}; RecordSet rs = SQL.execute(insertSql,insertObject); List <String>list0 = new ArrayList<String>(); List <String>list1 = new ArrayList<String>(); while (rs.next()){ String ST_FJ_ID = rs.getString("ST_FJ_ID"); Timestamp saveTime = rs.getTimestamp("TIME"); Timestamp saveTime2 = rs.getTimestamp("TIMES"); list0.add(ST_FJ_ID); Long time = new Date().getTime(); Long times = saveTime2.getTime(); //未過期的從新放入佇列,並計算超時時間 if(times-time>0){ list1.add(ST_FJ_ID); Long expireTime = (times-time)/1000; System.out.println("未過期未推送辦件:"+ST_FJ_ID); System.out.println("剩餘過期時間:"+expireTime); //放入延遲佇列 delayQueueService.Delay(ST_FJ_ID,expireTime); }else{ //已過期的直接改 String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? "; Object[] updateObject = new Object[]{ST_FJ_ID}; RecordSet updateRs = SQL.execute(updateSql, updateObject); int number = updateRs.TOTAL_RECORD_COUNT; //影響行數 System.out.println("已過期的直接改-->辦件編號為:"+ST_FJ_ID+" 辦件過期更改影響行數: " + number); } } System.out.println(String.format("總共有 "+list0.size()+" 條未推送辦件.....")); System.out.println(String.format("總共有 "+list1.size()+" 條未過期未推送辦件.....")); } }
佇列處理 類:
package com...delayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.DelayQueue; @Service public class DelayQueueService { @Autowired private ProcessToId processToId; /*負責儲存限時推送辦件的佇列*/ private static DelayQueue<DelayQueues<String>> delay = new DelayQueue<DelayQueues<String>>(); /*任務放入延遲佇列*/ public void Delay(String ST_FJ_ID,Long expireTime){ DelayQueues<String> delayQueues = new DelayQueues<String>(expireTime,ST_FJ_ID); delay.put(delayQueues); System.out.println("[辦件超時時長:"+expireTime+"秒]被推入本地檢查佇列,辦件編號:" +ST_FJ_ID); } private class TaskSend implements Runnable{ private ProcessToId processToId; public TaskSend(ProcessToId processToId){ super(); this.processToId = processToId; } @Override public void run() { System.out.println("啟動 處理未推送資料執行緒......"); while (!Thread.currentThread().isInterrupted()){ try { DelayQueues<String> delayQueues = delay.take(); if(delayQueues!=null){ processToId.doProcess(delayQueues.getData()); } } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("關閉 處理未推送資料執行緒......"); } } /*處理到期辦件的執行緒*/ private Thread thread; @PostConstruct public void init(){ thread = new Thread(new TaskSend(processToId)); thread.start(); } @PreDestroy public void close(){ thread.interrupt(); } }
佇列實現類:
package com...delayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayQueues<T> implements Delayed { /*到期時刻 */ private long activeTime; /*業務資料,泛型*/ private T data; public long getActiveTime() { return activeTime; } public T getData() { return data; } public DelayQueues(Long activeTime, T data){ super(); this.activeTime = activeTime*1000+System.currentTimeMillis(); this.data = data; } /** * 返回元素到啟用時刻的剩餘時長 */ public long getDelay(TimeUnit unit) { long d = unit.convert(this.activeTime - System.currentTimeMillis(),unit); return d; } /**按剩餘時間排序*/ public int compareTo(Delayed o) { long d = (getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); if (d==0){ return 0; }else{ if (d<0){ return -1; }else{ return 1; } } } }
改資料庫 操作:
package com...delayQueue; import org.springframework.stereotype.Service; import wfc.service.database.RecordSet; import wfc.service.database.SQL; @Service public class ProcessToId { public void doProcess(String ST_FJ_ID){ String insertSql = "select * from DANGAN_FJ where ST_FJ_ID = ? "; Object[] insertObject = new Object[] {ST_FJ_ID}; RecordSet rs = SQL.execute(insertSql,insertObject); String type=""; while (rs.next()){ type = rs.getString("DANGAN_TYPE"); } if(type.equals(DelayQueueSave.UNPAY)){ System.out.println("辦件【"+ST_FJ_ID+"】已過期,需要更改為過期辦件!"); String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? "; Object[] updateObject = new Object[]{ST_FJ_ID}; RecordSet updateRs = SQL.execute(updateSql, updateObject); int number = updateRs.TOTAL_RECORD_COUNT; //影響行數 System.out.println("辦件過期更改影響行數: " + number+" 辦件編號為:"+ST_FJ_ID); } } }