java多執行緒設計模式 -- 流水線模式(Pipeline)
阿新 • • 發佈:2019-02-14
十一、流水線模式(Pipeline)
1、核心思想
將一個任務處理分解為若干個處理階段,其中每個處理階段的輸出作為下一個處理階段的輸入,並且各個處理階段都有相應的工作者執行緒去執行相應的計算。
2、評價:
充分利用CPU,提高其計算效率。
允許子任務間存在依賴關係的條件下實現平行計算。
非常便於採用單執行緒模型實現對子任務的處理。
有錯誤處理 PipeContext
3、適用場景
a、適合於處理規模較大的任務,否則可能得不償失。各個處理階段所使用的工作者執行緒或者執行緒池、輸入輸出物件的建立和轉移都有自身的時間和空間消耗。
/**
* 對處理階段的抽象。
* 負責對輸入進行處理,並將輸出作為下一處理階段的輸入
* @author huzhiqiang
*
* @param <IN>
* @param <OUT>
*/
public interface Pipe<IN, OUT> {
/**
* 設定當前Pipe例項的下個Pipe例項
* @param nextPipe
*/
public void setNextPipe(Pipe<?,?> nextPipe);
/**
* 對輸入的元素進行處理,並將處理結果作為下一個Pipe例項的輸入
* @param input
* @throws InterruptedException
*/
public void process(IN input) throws InterruptedException;
public void init(PipeContext pipeCtx);
public void shutdown(long timeout, TimeUnit unit);
}
/**
* 對複合Pipe的抽象。一個Pipeline例項可包含多個Pipe例項
* @author huzhiqiang
*
* @param <IN>
* @param <OUT>
*/
public interface PipeLine<IN, OUT> extends Pipe<IN, OUT> {
void addPipe(Pipe<?,?> pipe);
}
public abstract class AbsractPipe<IN, OUT> implements Pipe<IN, OUT> {
protected volatile Pipe<?, ?> nextPipe = null;
protected volatile PipeContext PipeCtx = null;
@Override
public void setNextPipe(Pipe<?, ?> nextPipe) {
this.nextPipe = nextPipe;
}
@SuppressWarnings("unchecked")
@Override
public void process(IN input) throws InterruptedException {
try {
OUT out = doProcess(input);
if(null != nextPipe){
if(null != out){
((Pipe<OUT, ?>) nextPipe).process(out);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (PipeException e) {
PipeCtx.handleError(e);
}
}
@Override
public void init(PipeContext pipeCtx) {
this.PipeCtx = pipeCtx;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
//什麼也不做
}
/**
* 留給子類實現,用於子類實現其任務處理邏輯
*/
public abstract OUT doProcess(IN input) throws PipeException;
}
public abstract class AbstractParallePipe<IN, OUT, V> extends AbsractPipe<IN, OUT> {
private final ExecutorService executorService;
public AbstractParallePipe(BlockingQueue<IN> queue, ExecutorService executorService) {
super();
this.executorService = executorService;
}
/**
* 留給子類實現,用於根據指定的輸入元素input構造一組子任務
* @param input
* @return
* @throws Exception
*/
protected abstract List<Callable<V>> buildTasks(IN input) throws Exception;
/**
* 留給子類實現,對各個子任務的處理結果進行合併,形成相應輸入元素的輸出結果
* @param subTaskResults
* @return
* @throws Exception
*/
protected abstract OUT combineResults(List<Future<V>> subTaskResults) throws Exception;
/**
* 以並行的方式執行一組子任務
* @param tasks
* @return
* @throws Exception
*/
protected List<Future<V>> invokeParallel(List<Callable<V>> tasks) throws Exception{
return executorService.invokeAll(tasks);
}
@Override
public OUT doProcess(IN input) throws PipeException {
OUT out = null;
try {
out = combineResults(invokeParallel(buildTasks(input)));
} catch (Exception e) {
throw new PipeException(this, input, "Task failed", e);
}
return out;
}
}
public class SimplePipeline<IN, OUT> extends AbsractPipe<IN, OUT> implements PipeLine<IN, OUT> {
private final Queue<Pipe<?, ?>> pipes = new LinkedList<Pipe<?, ?>>();
private final ExecutorService helperService;
public SimplePipeline() {
//建立固定執行緒數為1的執行緒池,整型的最大數的LinkedBlockingQueue的快取佇列
this(Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "SimplePpeLine-Helper");
t.setDaemon(true);
return t;
}
}));
}
public SimplePipeline(final ExecutorService helperService) {
super();
this.helperService = helperService;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
Pipe<?,?> pipe;
while(null != (pipe = pipes.poll())){
pipe.shutdown(timeout, unit);
}
helperService.shutdown();
}
@Override
public void addPipe(Pipe<?, ?> pipe) {
pipes.add(pipe);
}
@Override
public OUT doProcess(IN input) throws PipeException {
// TODO Auto-generated method stub
return null;
}
@Override
public void process(IN input) throws InterruptedException {
@SuppressWarnings("unchecked")
Pipe<IN, ?> firstPipe = (Pipe<IN, ?>) pipes.peek();
firstPipe.process(input);
}
@Override
public void init(PipeContext pipeCtx) {
LinkedList<Pipe<?, ?>> pipesList = (LinkedList<Pipe<?, ?>>) pipes;
Pipe<?, ?> prevPipe = this;
//設定處理任務的先後順序
for(Pipe<?, ?> pipe: pipesList){
prevPipe.setNextPipe(pipe);
prevPipe = pipe;
}
Runnable task = new Runnable() {
@Override
public void run() {
for(Pipe<?, ?> pipe: pipes){
pipe.init(pipeCtx);
}
}
};
helperService.submit(task);
}
public <INPUT, OUTPUT> void addAsWorkerThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, int workCount){
addPipe(new WorkThreadPipeDecorator<INPUT, OUTPUT>(delegate, workCount));
}
public <INPUT, OUTPUT> void addAsThreadBasedPipe(Pipe<INPUT, OUTPUT> delegate, ExecutorService executorService){
addPipe(new ThreadPoolPipeDecorator<INPUT, OUTPUT>(delegate, executorService));
}
public PipeContext newDefaultPipeContext(){
return new PipeContext() {
@Override
public void handleError(PipeException exp) {
helperService.submit(new Runnable() {
@Override
public void run() {
exp.printStackTrace();
}
});
}
};
}
}
public class ThreadPoolPipeDecorator<IN, OUT> implements Pipe<IN, OUT> {
private final Pipe<IN, OUT> delegate;
private final TerminationToken terminationToken;
private final ExecutorService executorService;
private final CountDownLatch stageProcessDoneLatch = new CountDownLatch(1);
public ThreadPoolPipeDecorator(Pipe<IN, OUT> delegate, ExecutorService executorService) {
super();
this.delegate = delegate;
this.executorService = executorService;
terminationToken = TerminationToken.newInstance(executorService);
}
@Override
public void setNextPipe(Pipe<?, ?> nextPipe) {
delegate.setNextPipe(nextPipe);
}
@Override
public void process(IN input) throws InterruptedException {
Runnable task = new Runnable() {
@Override
public void run() {
int remainingReservations = -1;
try {
delegate.process(input);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
remainingReservations = terminationToken.reservations.decrementAndGet();
}
if(terminationToken.isToShutDown() && 0 == remainingReservations){
//最後一個任務執行結束
stageProcessDoneLatch.countDown();
}
}
};
executorService.submit(task);
terminationToken.reservations.incrementAndGet();
}
@Override
public void init(PipeContext pipeCtx) {
delegate.init(pipeCtx);
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
terminationToken.setIsToShutdown();
if(terminationToken.reservations.get() > 0){
try {
if(stageProcessDoneLatch.getCount() > 0){
//保證執行緒池中的所有任務都已經執行結束才delegate.shutdown
stageProcessDoneLatch.await(timeout, unit);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
delegate.shutdown(timeout, unit);
}
private static class TerminationToken extends com.threadDesign.twoPhase.TerminationToken{
private final static ConcurrentHashMap<ExecutorService, TerminationToken>
INSTANCE_MAP = new ConcurrentHashMap<ExecutorService, TerminationToken>();
private TerminationToken(){
}
void setIsToShutdown(){
this.toShutDown = true;
}
static TerminationToken newInstance(ExecutorService executorService){
TerminationToken token = INSTANCE_MAP.get(executorService);
if(null == token){
token = new TerminationToken();
TerminationToken existingToken = INSTANCE_MAP.putIfAbsent(executorService, token);
if(null != existingToken){
token = existingToken;
}
}
return token;
}
}
}
/**
* 基於工作者執行緒的Pipe實現類
* 提交到該Pipe的任務由指定個數的工作者執行緒共同處理
* @author huzhiqiang
*
* @param <IN>
* @param <OUT>
*/
public class WorkThreadPipeDecorator<IN, OUT> implements Pipe<IN, OUT> {
protected final BlockingQueue<IN> workQueue;
protected final Set<AbstractTerminatableThread> workerThreads = new HashSet<AbstractTerminatableThread>();
protected final TerminationToken terminationToken = new TerminationToken();
private final Pipe<IN, OUT> delegate;
public WorkThreadPipeDecorator(Pipe<IN, OUT> delegate, int workerCount){
this(new SynchronousQueue<IN>(), delegate, workerCount);
}
public WorkThreadPipeDecorator(BlockingQueue<IN> workQueue, Pipe<IN, OUT> delegate, int workerCount) {
if(workerCount <= 0){
throw new IllegalArgumentException("workerCount should be positive!");
}
this.workQueue = workQueue;
this.delegate = delegate;
for(int i=0; i<workerCount; i++){
workerThreads.add(new AbstractTerminatableThread() {
@Override
protected void doRun() throws Exception {
try {
dispatch();
}finally {
terminationToken.reservations.decrementAndGet();
}
}
});
}
}
private void dispatch() throws InterruptedException {
IN input = workQueue.take();
delegate.process(input);
}
@Override
public void setNextPipe(Pipe<?, ?> nextPipe) {
delegate.setNextPipe(nextPipe);
}
@Override
public void process(IN input) throws InterruptedException {
workQueue.put(input);
terminationToken.reservations.incrementAndGet();
}
@Override
public void init(PipeContext pipeCtx) {
delegate.init(pipeCtx);
for(AbstractTerminatableThread thread : workerThreads){
thread.start();
}
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
for(AbstractTerminatableThread thread : workerThreads){
thread.terminate();
try {
thread.join(TimeUnit.MILLISECONDS.convert(timeout, unit));
} catch (InterruptedException e) {
}
}
delegate.shutdown(timeout, unit);
}
}
public class PipeException extends Exception {
private static final long serialVersionUID = 8647786507719222800L;
/**
* 丟擲異常的Pipe例項
*/
public final Pipe<?, ?> sourcePipe;
public final Object input;
public PipeException(Pipe<?, ?> sourcePipe, Object input, String message) {
super(message);
this.sourcePipe = sourcePipe;
this.input = input;
}
public PipeException(Pipe<?, ?> sourcePipe, Object input, String message, Throwable cause) {
super(message, cause);
this.sourcePipe = sourcePipe;
this.input = input;
}
}
/**
* 對各個處理階段的計算環境進行抽象,主要用於異常處理
* @author huzhiqiang
*/
public interface PipeContext {
public void handleError(PipeException exp);
}
/**
* 測試程式碼
* @author huzhiqiang
*
*/
public class ThreadPoolBasedPipeExample {
public static void main(String[] args) {
final ThreadPoolExecutor threadPoolExecutor;
threadPoolExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors()*2, 60, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
final SimplePipeline<String, String> pipeLine = new SimplePipeline<String, String>();
Pipe<String, String> pipe = new AbsractPipe<String, String>() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe1, " + Thread.currentThread().getName() + "]";
System.out.println(result);
return result;
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipe = new AbsractPipe<String, String>() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe2, " + Thread.currentThread().getName() + "]";
System.out.println(result);
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipe = new AbsractPipe<String, String>() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe3, " + Thread.currentThread().getName() + "]";
System.out.println(result);
try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipeLine.init(pipeLine.newDefaultPipeContext());
int N = 10;
try {
for(int i=0; i<N; i++){
pipeLine.process("Task-" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
pipeLine.shutdown(10, TimeUnit.SECONDS);
}
}