1. 程式人生 > >google guava 併發程式設計使用

google guava 併發程式設計使用

package com.xiaoyuer.controller.future;


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;


public class ListenableFutureDemo {
public static void main(String[] args) {
testRateLimiter();
testListenableFuture();

}


/**
* RateLimiter類似於JDK的訊號量Semphore,他用來限制對資源併發訪問的執行緒數
*/
public static void testRateLimiter() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(5));


RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超過4個任務被提交


for (int i = 0; i < 10; i++) {
limiter.acquire(); // 請求RateLimiter, 超過permits會被阻塞


final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("is "+ i,i));

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out
.println("getlimiter result with callback "
+ result);
}


@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
}
}


public static void testListenableFuture() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
for (int i = 0; i < 10; i++) {
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("testListenableFuture",i));

//同步獲取呼叫結果
try {
System.out.println(listenableFuture.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
//第一種方式
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
try {
System.out.println("get listenable future's result "
+ listenableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}, executorService);
//第二種方式
Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out
.println("get listenable future's result with callback "
+ result);
}


@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
}

}
}


class Task implements Callable<Integer> {
String str;
int i;
public Task(String str,int i){
this.str = str;
this.i = i;
}
@Override
public Integer call() throws Exception {
System.out.println("call execute.." + str);
TimeUnit.SECONDS.sleep(1);
return i;
}
}