1. 程式人生 > >java如何實現一個Future

java如何實現一個Future

warnings 全局 tle 標識 寫到 技術分享 控制 interrupt 程序

技術分享圖片

實現Futrue接口

public class MsgFuture<V> implements java.util.concurrent.Future<V> {
    ...
    ...
}    

  

Future的主要特性為Future.get()、

  get()
 get(long timeout, TimeUnit unit)

主要思路如下:
構造MsgFuture時,設置開始時間,這裏是sendTime;設置timeout,默認get()方法的超時時間,我們的程序不可能會無限等待
默認的get()對應的值域是result,默認為一個NULL對象,標識沒有返回數據

result的值需要其他線程在做完任務後將值寫到Future對象中,這裏暴露了一個方法setResult(object)
    /**
     * 設置結果值result,喚醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

  使用ReentrantLock來進行數據可見性控制

condition.signalAll()可以喚醒condition.await的阻塞wait

至於其他線程如何調用到setResult(object)方法,可以使用ConcurrentHashMap,key為msgId,值為MsgFuture對象,設置成一個全局的,或兩個線程都可訪問,其他線程根據msgId獲取到MsgFuture,然後調用setResult(object)方法

    /**
     * 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);                      //根據timeout配置獲取剩余的世界
        if(left < 0){
            //已經沒有剩余時間
            if(isDone()){                                            //如果已經完成,直接放回結果
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超時,後續的響應將會被丟棄abort");
            }
        }else{

            reentrantLock.lock();                                    //同步
            try {
                //獲取鎖後先判斷是否已經完成,防止無意義的await
                if(isDone()){                                        //先判斷是否已經完成 
                    return (V)this.result;                           //直接返回  
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);       //沒有返回,阻塞等待,如果condition被喚醒,也會提前退出
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){                                            //被喚醒或超時時間已到,嘗試判斷是否完成
                return (V)this.result;                               //返回
            }

            throw new TimeoutException("未獲取到結果");                //超時  
        }
    }

  

    public boolean isDone() {
        return this.result != NULL;
    }

  



全部代碼
public class MsgFuture<V> implements java.util.concurrent.Future<V> {

    private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class);

    /**
     * 全局的空對象,如果Future獲取到值了,那麽一定不是NULL
     */
    private final static Object NULL = new Object();
    /**
     * 主鎖
     */
    private final ReentrantLock reentrantLock = new ReentrantLock();

    /**
     * 條件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法來實現阻塞、喚醒
     */
    private final Condition condition = reentrantLock.newCondition();

    private int timeout;

    private volatile Object result = NULL;

    private long sendTime;

    public MsgFuture(int timeout, long sendTime) {
        this.timeout = timeout;
        this.sendTime = sendTime;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    public boolean isCancelled() {
        return false;
    }

    public boolean isDone() {
        return this.result != NULL;
    }

    /**
     * 獲取future結果
     * @return
     * @throws InterruptedException
     */
    public V get() throws InterruptedException {
        logger.debug("sendTime:{}",sendTime);
        try {
            return get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            logger.error("獲取future結果異常", e);
        }
        return null;
    }

    /**
     * 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);
        if(left < 0){
            //已經沒有剩余時間
            if(isDone()){
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超時,後續的響應將會被丟棄abort");
            }
        }else{

            reentrantLock.lock();
            try {
                //獲取鎖後先判斷是否已經完成,防止無意義的await
                if(isDone()){
                    return (V)this.result;
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){
                return (V)this.result;
            }

            throw new TimeoutException("未獲取到結果");
        }
    }

    /**
     * 設置結果值result,喚醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

    /**
     * 計算剩余時間
     * @param timeout
     * @param unit
     * @return
     */
    private long getLeftTime(long timeout, TimeUnit unit){
        long now = System.currentTimeMillis();
        timeout = unit.toMillis(timeout); // 轉為毫秒
        return timeout - (now - sendTime);
    }

    /*public static void main(String[] args) {
        MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis());

        //測試先喚醒、後get是否正常
        msgFuture.setResult("yoxi");

        try {
            System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            logger.error("Interrupt異常", e);
        } catch (TimeoutException e) {
            logger.error("測試先喚醒,後get出錯", e);
        }
    }*/
}

  





java如何實現一個Future