1. 程式人生 > >Multi-Programming-17 Semaphore & CountDownLatch訊號量和計數栓之間的關係

Multi-Programming-17 Semaphore & CountDownLatch訊號量和計數栓之間的關係

1. 訊號量與計數栓Semaphore & CountdownLatch

關於該問題的Q&A主要來自此處,可以自行檢視。

訊號量:Semaphore is used to control the number of concurrent threads that are using a resource. That resource can be something like a file, or could be cpu by limiting the number of threads executing. The count on a semaphore can go up and down as different threads acquire() and release().
訊號量是用來限制併發訪問某一共享資源的執行緒的數量的。這裡的共享資源可以比如是:一個檔案,CPU等。訊號量可以通過acquire和release函式變化。

計數栓:CountDownLatch is used to start a series of threads and then wait until all of them are complete (or until they call countDown() a given number of times).
計數栓是用來保證某個執行緒可以在其他一些執行緒完成之後開始執行的。

1.1 關於Semaphore的程式碼及執行結果。

package com.fqyuan._12semAndlatch;

import java.util.concurrent.CountDownLatch;
import
java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Main { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); // For semaphore test here:
Semaphore semaphore = new Semaphore(3); TaskWithSemaphore task = new TaskWithSemaphore(semaphore); for (int i = 0; i < 10; i++) executor.submit(new Runnable() { @Override public void run() { task.doWork(); } }); // For latch test here: // CountDownLatch latch = new CountDownLatch(3); // TaskWithLatch task1 = new TaskWithLatch("Network", 1000, latch); // TaskWithLatch task2 = new TaskWithLatch("DiskService", 2000, latch); // TaskWithLatch task3 = new TaskWithLatch("Battery", 2500, latch); // TaskWithLatch[] tasks = { task1, task2, task3 }; // for (TaskWithLatch t : tasks) // executor.submit(new Runnable() { // // @Override // public void run() { // t.doWork(); // } // }); // // Cause current thread to wait until latch has counted to zero. // latch.await(); executor.shutdown(); // System.out.println("Main Thread work starts!"); } } // Semaphore: used to control the number of the concurrent threads accessing // sharing data. class TaskWithSemaphore { private Semaphore semaphore; public TaskWithSemaphore(Semaphore semaphore) { this.semaphore = semaphore; } public void doWork() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " Running."); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { semaphore.release(); } } } // CountDownLatch: used to start a series of threads and then waits until they // are complete. class TaskWithLatch { private String name; private int timeToRun; private CountDownLatch latch; public TaskWithLatch(String name, int time, CountDownLatch latch) { this.name = name; this.timeToRun = time; this.latch = latch; } public void doWork() { try { System.out.println(name + " preparing..."); Thread.sleep(timeToRun); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } latch.countDown(); System.out.println(name + " finished..."); } }
//Running result:
pool-1-thread-2 Running.
pool-1-thread-1 Running.
pool-1-thread-3 Running.
pool-1-thread-4 Running.
pool-1-thread-6 Running.
pool-1-thread-5 Running.
pool-1-thread-7 Running.
pool-1-thread-8 Running.
pool-1-thread-9 Running.
pool-1-thread-10 Running.

1.2. CountDownLatch 程式碼及執行結果

package com.fqyuan._12semAndlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        // For semaphore test here:
        // TaskWithSemaphore task = new TaskWithSemaphore();
        // for (int i = 0; i < 10; i++)
        // executor.submit(new Runnable() {
        //
        // @Override
        // public void run() {
        // task.doWork();
        // }
        // });

        // For latch test here:
        CountDownLatch latch = new CountDownLatch(3);
        TaskWithLatch task1 = new TaskWithLatch("Network", 1000, latch);
        TaskWithLatch task2 = new TaskWithLatch("DiskService", 2000, latch);
        TaskWithLatch task3 = new TaskWithLatch("Battery", 2500, latch);
        TaskWithLatch[] tasks = { task1, task2, task3 };
        for (TaskWithLatch t : tasks)
            executor.submit(new Runnable() {

                @Override
                public void run() {
                    t.doWork();
                }
            });
        // Cause current thread to wait until latch has counted to zero.
        latch.await();
        executor.shutdown();
        System.out.println("Main Thread work starts!");
    }
}

// Semaphore: used to control the number of the concurrent threads accessing
// sharing data.
class TaskWithSemaphore {
    private Semaphore semaphore = new Semaphore(3);

    public void doWork() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " Running.");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}

// CountDownLatch: used to start a series of threads and then waits until they
// are complete.
class TaskWithLatch {
    private String name;
    private int timeToRun;
    private CountDownLatch latch;

    public TaskWithLatch(String name, int time, CountDownLatch latch) {
        this.name = name;
        this.timeToRun = time;
        this.latch = latch;
    }

    public void doWork() {

        try {
            System.out.println(name + " preparing...");
            Thread.sleep(timeToRun);

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();
        System.out.println(name + " finished...");
    }
}
//Running reuslt:
DiskService preparing...
Battery preparing...
Network preparing...
Network finished...
DiskService finished...
Battery finished...
Main Thread work starts!

2. 計數栓與迴圈障礙CountDownLatch & CyclicBarrier

關於該問題的Q&A主要來自此處stackoverflow,可以自行檢視。

主要有3點區別:
1. java.util.concurrent.CyclicBarrier.CyclicBarrier(int parties, Runnable barrierAction), CyclicBarrier建構函式可以傳入一個Runnable的任務,該任務會在barrier滿足之後執行。
2. CountDownLatch->NumOfCalls;
CyclicBarrier->NumOfThreads;
當我們使用CyclicBarrier時,假設我們指定了觸發障礙執行緒數量為5時,我們必須至少有5個執行緒呼叫barrier.await();
當我們使用CountDownLatch時,我們指定了呼叫countDown()函式的次數,一旦達到該次數,則所有的等待執行緒將被釋放。這意味著我們可以只使用一個執行緒完成CountDownLatch的功能。
3. 一旦barrier達到時,CountDownLatch會被自動reset!該部分程式碼如下:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierCycles {

    static CyclicBarrier barrier;

    public static void main(String[] args) throws InterruptedException {
        barrier = new CyclicBarrier(3); 

        new Worker().start();
        Thread.sleep(1000);
        new Worker().start();
        Thread.sleep(1000);
        new Worker().start();
        Thread.sleep(1000);

        System.out.println("Barrier automatically resets.");

        new Worker().start();
        Thread.sleep(1000);
        new Worker().start();
        Thread.sleep(1000);
        new Worker().start();
    }

}


class Worker extends Thread {
    @Override
    public void run() {
        try {
            CyclicBarrierCycles.barrier.await();
            System.out.println("Let's play.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

關於Barrier的Demo:

package com.fqyuan._13cyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class BarrierTest {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {

            @Override
            public void run() {
                System.out.println("Termination condiotn fulfilled!");
            }
        });
        BarrierWork barrierWork1 = new BarrierWork(barrier, "East", 1000);
        BarrierWork barrierWork2 = new BarrierWork(barrier, "South", 1500);
        BarrierWork barrierWork3 = new BarrierWork(barrier, "West", 2000);
        BarrierWork barrierWork4 = new BarrierWork(barrier, "North", 3000);
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    barrierWork1.doWork();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    barrierWork2.doWork();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    barrierWork3.doWork();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        executor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    barrierWork4.doWork();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();
    }

}

class BarrierWork {
    private CyclicBarrier barrier;
    private String name;
    private int time;

    public BarrierWork(CyclicBarrier barrier, String name, int time) {
        this.barrier = barrier;
        this.name = name;
        this.time = time;
    }

    public void doWork() throws InterruptedException, BrokenBarrierException {
        System.out.println("Collecting data in " + name + " with " + time + "ms.");
        Thread.sleep(time);

        barrier.await();
        System.out.println(name + " finished!");

    }
}

3. 用countdownLatch 實現的順序執行程式

package com.fqyuan.latch_sequence;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);
        CountDownLatch latch3 = new CountDownLatch(1);
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Runnable() {

            @Override
            public void run() {
                new Latch1(latch1).doWork1();
            }
        });
        latch1.await();
        executor.execute(new Runnable() {

            @Override
            public void run() {
                new Latch2(latch2).doWork2();
            }
        });
        latch2.await();
        executor.execute(new Runnable() {

            @Override
            public void run() {
                new Latch3(latch3).doWork3();
            }
        });
        latch3.await();
        executor.shutdown();
        System.out.println("The main thread starts!");
    }

}

class Latch1 {
    private CountDownLatch latch;

    public Latch1(CountDownLatch latch) {
        this.latch = latch;
    }

    public void doWork1() {
        System.out.println("Preparing with latch1...");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();
    }
}

class Latch2 {
    private CountDownLatch latch;

    public Latch2(CountDownLatch latch) {
        this.latch = latch;
    }

    public void doWork2() {
        System.out.println("Preparing with latch2...");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();
    }
}

class Latch3 {
    private CountDownLatch latch;

    public Latch3(CountDownLatch latch) {
        this.latch = latch;
    }

    public void doWork3() {
        System.out.println("Preparing with latch3...");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();
    }
}
//Running result:
Preparing with latch1...
Preparing with latch2...
Preparing with latch3...
The main thread starts!

4. 個人見解

  1. 訊號量控制了訪問共享資源的執行緒的數量;
  2. 計數栓用來使得一些執行緒在另一些執行緒完成之後進行,控制執行緒執行順序;
  3. 迴圈障礙是用來等待其他執行緒完成–latches are waiting for events; barrier are for waiting for other threads.