1. 程式人生 > 其它 >多執行緒設計模式-兩階段結束設計模式(Two Phase Termination)

多執行緒設計模式-兩階段結束設計模式(Two Phase Termination)

技術標籤:java併發程式設計java多執行緒多執行緒設計模式設計模式多執行緒

兩個階段結束設計模式(Two Phase Termination)

執行緒一般結束後,執行緒中的資料還存在,需要釋放,這樣的情況為兩個階段,也就是目前所說的兩階段結束執行緒設計模式

執行緒統計資料

package com.ln.concurrent;

import java.util.Random;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent
 * @Name:CounterIncrement
 * @Author:linianest
 * @CreateTime:2021/1/4 11:39
 * @version:1.0
 * @Description TODO: 執行緒統計資料
 */
public class CounterIncrement extends Thread { private volatile boolean terminated = false; private int counter = 0; private static final Random random = new Random(System.currentTimeMillis()); @Override public void run() { try { while (!terminated) { System.
out.println(Thread.currentThread().getName() + " " + counter++); Thread.sleep(random.nextInt(1_000)); } } catch (InterruptedException e) { // e.printStackTrace(); } finally { this.clean(); } } private void clean
() { System.out.println("do some clean work for the second phase.current counter=" + counter); } public void close() { this.terminated = true; this.interrupt(); } }

兩階段終結執行緒設計模式

package com.ln.concurrent.chapter15;

import com.ln.concurrent.CounterIncrement;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter15
 * @Name:CounterTest
 * @Author:linianest
 * @CreateTime:2021/1/4 11:46
 * @version:1.0
 * @Description TODO: 兩階段終結執行緒設計模式
 */
public class CounterTest {
    public static void main(String[] args) throws InterruptedException {
        CounterIncrement counterIncrement = new CounterIncrement();
        counterIncrement.start();
        Thread.sleep(10_000L);
        counterIncrement.close();
    }
}

案例:請求後臺資料,後臺出現異常,終結執行緒並釋放請求的執行緒資源,會用到前面學的Thread-pre-Message模式,可以通過telnet測試

請求後臺資料,後臺出現異常,終結執行緒並釋放請求的執行緒資源,會用到前面學的Thread-pre-Message模式

package com.ln.concurrent.chapter16;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter15
 * @Name:AppServer
 * @Author:linianest
 * @CreateTime:2021/1/4 11:50
 * @version:1.0
 * @Description TODO: 請求後臺資料,後臺出現異常,終結執行緒並釋放請求的執行緒資源,會用到前面學的Thread-pre-Message模式
 */
public class AppServer extends Thread {
    private int port;
    private static final int DEFULT_PORT = 12722;
    private volatile boolean start = true;
    private List<ClientHandle> clientHandlers = new ArrayList<>();
    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    private ServerSocket server;

    public AppServer() {
        this(DEFULT_PORT);
    }

    public AppServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        try {
            this.server = new ServerSocket(port);
            while (start) {
                Socket client = this.server.accept();
                ClientHandle clientHandler = new ClientHandle(client);
                executor.submit(clientHandler);
                this.clientHandlers.add(clientHandler);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            this.dispose();
        }
    }

    /**
     * 釋放資源
     */
    private void dispose() {
        this.clientHandlers.stream().forEach(ClientHandle::stop);
        this.executor.shutdown();
    }

    public void shutdown() throws IOException {
        this.start = false;
        this.server.close();
        this.interrupt();
    }
}

客戶端

package com.ln.concurrent.chapter16;

import java.io.*;
import java.net.Socket;
import java.util.stream.Stream;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter16
 * @Name:ClientHandle
 * @Author:linianest
 * @CreateTime:2021/1/4 12:07
 * @version:1.0
 * @Description TODO: 客戶端
 */
public class ClientHandle implements Runnable {
    private final Socket socket;
    private volatile boolean running = true;

    public ClientHandle(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        /**
         * 使用try resource方式,出現異常,資料流關閉
         */
        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream();
             final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
             final PrintWriter printWriter = new PrintWriter(outputStream);) {
            while (running) {
                final String message = reader.readLine();
                if (null == message) {
                    break;
                }
                System.out.println("Come from client >" + message);
                printWriter.write("echo " + message + "\n");
                // 將資料刷到管道中
                printWriter.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
            this.running = false;
        } finally {
            this.stop();
        }

    }

    public void stop() {
        if (running) {
            return;
        }
        this.running = false;
        try {
            this.socket.close();
        } catch (IOException e) {
//            e.printStackTrace();
        }
    }
}

測試socket server連線

package com.ln.concurrent.chapter16;

import java.io.IOException;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter16
 * @Name:AppServerClient
 * @Author:linianest
 * @CreateTime:2021/1/4 13:08
 * @version:1.0
 * @Description TODO: 測試socket server連線
 */
public class AppServerClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        final AppServer server = new AppServer(13345);
        server.start();

        Thread.sleep(45_000L);
        server.shutdown();
    }
}