1. 程式人生 > 實用技巧 >阻塞佇列的簡易實現

阻塞佇列的簡易實現

/**
* 業務場景:
* 檢視資料庫辦件的推送情況
* 推送限時為60秒
* 如果60秒內被推送出去,資料庫狀態為1
* 沒被推送過為0
* 超過60秒失效為-1
* (推送過的不考慮,只實現未推送的,和超時的
* 以及伺服器重啟後從新加入佇列的處理)
*=======隨便瞎寫,歡迎指點======
*/

入口 類:
package com...delayQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import
org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class DelayQueueController { private static final String SUCCESS = "seccess"; private static
final String FAILUER = "failure"; @Autowired private DelayQueueSave pDelayQueueSave; /** * http://localhost:8080/submit?Number=5 * 接收5個請求,資料庫生成5條辦件 * @param Number * @return */ @RequestMapping("/submit") @ResponseBody public String saveUser(@RequestParam("Number")int
Number){ pDelayQueueSave.insert(Number); return SUCCESS; } }

資料庫處理 類:

package com...delayQueue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import wfc.service.database.RecordSet;
import wfc.service.database.SQL;
import javax.annotation.PostConstruct;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

@Service
public class DelayQueueSave {

    @Autowired
    private DelayQueueService delayQueueService;

    public final static String UNPAY = "0";//未推送
    public final static String PAYED = "1";//已推送
    public final static String EXPIRED = "-1";//已過期


    public void insert(int Number) {

        for(int i=0;i<Number;i++) {

                Long time = new Date().getTime();
                Long times = time+60000;
//                Random random = new Random();
//                long expireTime = random.nextInt(20)+5;//超時時長,單位秒5~25
                long expireTime = 60;//超時時長,單位秒60
                //資料庫的 儲存和過期時間
                Timestamp saveTime=new Timestamp(time);
                Timestamp saveTimes=new Timestamp(times);
                String ST_FJ_ID = time+"_"+expireTime+"_S";//業務編號
                String insertSql = "insert into DANGAN_FJ(ST_FJ_ID,DANGAN_TYPE,TIME,TIMES) values (?,?,?,?)";
                Object[] insertObject = new Object[] {ST_FJ_ID,UNPAY,saveTime,saveTimes};
                SQL.execute(insertSql,insertObject);
                System.out.println("SQL新增標誌....");

                /*進行延時處理*/
                delayQueueService.Delay(ST_FJ_ID,expireTime);
        }

    }

    //服務重啟的處理
    @PostConstruct
    public void initDelay(){

        System.out.println("開始掃描未推送的辦件....");
        String type = "0";
        String insertSql = "select * from DANGAN_FJ where DANGAN_TYPE = ? ";
        Object[] insertObject = new Object[] {type};
        RecordSet rs = SQL.execute(insertSql,insertObject);
        List <String>list0 = new ArrayList<String>();
        List <String>list1 = new ArrayList<String>();
        while (rs.next()){
            String ST_FJ_ID = rs.getString("ST_FJ_ID");
            Timestamp saveTime = rs.getTimestamp("TIME");
            Timestamp saveTime2 = rs.getTimestamp("TIMES");
            list0.add(ST_FJ_ID);
            Long time = new Date().getTime();
            Long times = saveTime2.getTime();
            //未過期的從新放入佇列,並計算超時時間
            if(times-time>0){
                list1.add(ST_FJ_ID);
                Long expireTime = (times-time)/1000;
                System.out.println("未過期未推送辦件:"+ST_FJ_ID);
                System.out.println("剩餘過期時間:"+expireTime);
                //放入延遲佇列
                delayQueueService.Delay(ST_FJ_ID,expireTime);
            }else{
                //已過期的直接改
                String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? ";
                Object[] updateObject = new Object[]{ST_FJ_ID};
                RecordSet updateRs = SQL.execute(updateSql, updateObject);
                int number = updateRs.TOTAL_RECORD_COUNT;
                //影響行數
                System.out.println("已過期的直接改-->辦件編號為:"+ST_FJ_ID+"   辦件過期更改影響行數:  " + number);
            }
        }
        System.out.println(String.format("總共有 "+list0.size()+" 條未推送辦件....."));
        System.out.println(String.format("總共有 "+list1.size()+" 條未過期未推送辦件....."));

    }


}

佇列處理 類:

package com...delayQueue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.DelayQueue;

@Service
public class DelayQueueService {

    @Autowired
    private ProcessToId processToId;
    /*負責儲存限時推送辦件的佇列*/
    private static DelayQueue<DelayQueues<String>> delay
            = new DelayQueue<DelayQueues<String>>();


    /*任務放入延遲佇列*/
    public void Delay(String ST_FJ_ID,Long expireTime){
        DelayQueues<String> delayQueues = new DelayQueues<String>(expireTime,ST_FJ_ID);
        delay.put(delayQueues);
        System.out.println("[辦件超時時長:"+expireTime+"秒]被推入本地檢查佇列,辦件編號:" +ST_FJ_ID);
    }

    private class TaskSend implements Runnable{

        private ProcessToId processToId;

        public TaskSend(ProcessToId processToId){
            super();
            this.processToId = processToId;
        }

        @Override
        public void run() {

            System.out.println("啟動 處理未推送資料執行緒......");
            while (!Thread.currentThread().isInterrupted()){
                try {
                    DelayQueues<String> delayQueues = delay.take();
                    if(delayQueues!=null){
                        processToId.doProcess(delayQueues.getData());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("關閉 處理未推送資料執行緒......");
        }
    }

    /*處理到期辦件的執行緒*/
    private Thread thread;

    @PostConstruct
    public void init(){
        thread = new Thread(new TaskSend(processToId));
        thread.start();
    }

    @PreDestroy
    public void close(){
        thread.interrupt();
    }


}

佇列實現類:

package com...delayQueue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueues<T> implements Delayed {

    /*到期時刻  */
    private long activeTime;
    /*業務資料,泛型*/
    private T data;

    public long getActiveTime() {
        return activeTime;
    }

    public T getData() {
        return data;
    }


    public DelayQueues(Long activeTime, T data){
        super();
        this.activeTime = activeTime*1000+System.currentTimeMillis();
        this.data = data;
    }


    /**
     * 返回元素到啟用時刻的剩餘時長
     */
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime
                - System.currentTimeMillis(),unit);
        return d;
    }

    /**按剩餘時間排序*/
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.MILLISECONDS)
                -o.getDelay(TimeUnit.MILLISECONDS));
        if (d==0){
            return 0;
        }else{
            if (d<0){
                return -1;
            }else{
                return  1;
            }
        }
    }


}

改資料庫 操作:

package com...delayQueue;


import org.springframework.stereotype.Service;
import wfc.service.database.RecordSet;
import wfc.service.database.SQL;

@Service
public class ProcessToId {


    public void doProcess(String ST_FJ_ID){

        String insertSql = "select * from DANGAN_FJ where ST_FJ_ID = ? ";
        Object[] insertObject = new Object[] {ST_FJ_ID};
        RecordSet rs = SQL.execute(insertSql,insertObject);

        String type="";
        while (rs.next()){
            type = rs.getString("DANGAN_TYPE");
        }
        if(type.equals(DelayQueueSave.UNPAY)){

            System.out.println("辦件【"+ST_FJ_ID+"】已過期,需要更改為過期辦件!");

            String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? ";
            Object[] updateObject = new Object[]{ST_FJ_ID};
            RecordSet updateRs = SQL.execute(updateSql, updateObject);
            int number = updateRs.TOTAL_RECORD_COUNT;
            //影響行數
            System.out.println("辦件過期更改影響行數:  " + number+"   辦件編號為:"+ST_FJ_ID);
        }

    }





}