java如何實現一個Future
阿新 • • 發佈:2017-12-22
warnings 全局 tle 標識 寫到 技術分享 控制 interrupt 程序
實現Futrue接口
public class MsgFuture<V> implements java.util.concurrent.Future<V> { ... ... }
Future的主要特性為Future.get()、
get()
get(long timeout, TimeUnit unit)
主要思路如下:
構造MsgFuture時,設置開始時間,這裏是sendTime;設置timeout,默認get()方法的超時時間,我們的程序不可能會無限等待
默認的get()對應的值域是result,默認為一個NULL對象,標識沒有返回數據
result的值需要其他線程在做完任務後將值寫到Future對象中,這裏暴露了一個方法setResult(object)
/** * 設置結果值result,喚醒condition {@link #get(long, TimeUnit)} * @param result */ public synchronized void setResult(Object result) { reentrantLock.lock(); try { this.result = result; condition.signalAll(); }finally { reentrantLock.unlock(); } }
使用ReentrantLock來進行數據可見性控制
condition.signalAll()可以喚醒condition.await的阻塞wait
至於其他線程如何調用到setResult(object)方法,可以使用ConcurrentHashMap,key為msgId,值為MsgFuture對象,設置成一個全局的,或兩個線程都可訪問,其他線程根據msgId獲取到MsgFuture,然後調用setResult(object)方法
/** * 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException * @param timeout * @param unit * @return * @throws InterruptedException * @throws TimeoutException */ @SuppressWarnings("all") public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long left = getLeftTime(timeout, unit); //根據timeout配置獲取剩余的世界 if(left < 0){ //已經沒有剩余時間 if(isDone()){ //如果已經完成,直接放回結果 return (V)this.result; }else{ //timeout throw new TimeoutException("返回超時,後續的響應將會被丟棄abort"); } }else{ reentrantLock.lock(); //同步 try { //獲取鎖後先判斷是否已經完成,防止無意義的await if(isDone()){ //先判斷是否已經完成 return (V)this.result; //直接返回 } logger.debug("await "+left+" ms"); condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS); //沒有返回,阻塞等待,如果condition被喚醒,也會提前退出 }finally { reentrantLock.unlock(); } if(isDone()){ //被喚醒或超時時間已到,嘗試判斷是否完成 return (V)this.result; //返回 } throw new TimeoutException("未獲取到結果"); //超時 } }
public boolean isDone() { return this.result != NULL; }
全部代碼
public class MsgFuture<V> implements java.util.concurrent.Future<V> { private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class); /** * 全局的空對象,如果Future獲取到值了,那麽一定不是NULL */ private final static Object NULL = new Object(); /** * 主鎖 */ private final ReentrantLock reentrantLock = new ReentrantLock(); /** * 條件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法來實現阻塞、喚醒 */ private final Condition condition = reentrantLock.newCondition(); private int timeout; private volatile Object result = NULL; private long sendTime; public MsgFuture(int timeout, long sendTime) { this.timeout = timeout; this.sendTime = sendTime; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return this.result != NULL; } /** * 獲取future結果 * @return * @throws InterruptedException */ public V get() throws InterruptedException { logger.debug("sendTime:{}",sendTime); try { return get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { logger.error("獲取future結果異常", e); } return null; } /** * 獲取結果,如果到達timeout還未得到結果,則會拋出TimeoutException * @param timeout * @param unit * @return * @throws InterruptedException * @throws TimeoutException */ @SuppressWarnings("all") public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long left = getLeftTime(timeout, unit); if(left < 0){ //已經沒有剩余時間 if(isDone()){ return (V)this.result; }else{ //timeout throw new TimeoutException("返回超時,後續的響應將會被丟棄abort"); } }else{ reentrantLock.lock(); try { //獲取鎖後先判斷是否已經完成,防止無意義的await if(isDone()){ return (V)this.result; } logger.debug("await "+left+" ms"); condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS); }finally { reentrantLock.unlock(); } if(isDone()){ return (V)this.result; } throw new TimeoutException("未獲取到結果"); } } /** * 設置結果值result,喚醒condition {@link #get(long, TimeUnit)} * @param result */ public synchronized void setResult(Object result) { reentrantLock.lock(); try { this.result = result; condition.signalAll(); }finally { reentrantLock.unlock(); } } /** * 計算剩余時間 * @param timeout * @param unit * @return */ private long getLeftTime(long timeout, TimeUnit unit){ long now = System.currentTimeMillis(); timeout = unit.toMillis(timeout); // 轉為毫秒 return timeout - (now - sendTime); } /*public static void main(String[] args) { MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis()); //測試先喚醒、後get是否正常 msgFuture.setResult("yoxi"); try { System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { logger.error("Interrupt異常", e); } catch (TimeoutException e) { logger.error("測試先喚醒,後get出錯", e); } }*/ }
java如何實現一個Future