1. 程式人生 > 其它 >java中執行緒同步的幾種方法

java中執行緒同步的幾種方法

1.同步關鍵字

   Synchronized

2.併發包中鎖

   Lock

3.object物件等待通知

   ObjectMonitor

   wait

   notify


4.鎖對應的條件變數

   併發包中鎖的條件變數

   condition

   await

   signal


5.併發包中的阻塞佇列

   BlockingQueue
6.併發包中的原子操作

   Atomic

7.volatile


8.final

 

9.Thread.join


10.CountDownLatch

    

相當於執行緒中的方法join方法,不過功能更多。CountDownLatch能夠使一個執行緒在等待另外一些執行緒完成各自工作之後,再繼續執行。使用一個計數器進行實現。
計數器初始值為執行緒的數量。當每一個執行緒完成自己任務後,計數器的值就會減一。當計數器的值為0時,表示所有的執行緒都已經完成了任務,然後在CountDownLatch上等待的執行緒就可以恢復執行任務。

 


11.CyclicBarrier

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,
所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。

 


12.Semaphore

控制併發執行緒數的semaphore, Semaphore翻譯成字面意思為 訊號量,Semaphore可以控同時訪問的執行緒個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
如下所示程式碼中雖然有30個執行緒在執行,但是隻允許10個併發執行,Semaphore(int
permits) 接受一個整形數字,表示可用的許可證數量,Semaphore(10)表示允許10個執行緒獲取許可證,也就是併發數為10.

 


13.Phaser

Phaser類的特點是把多個執行緒協作執行的任務劃分成多個階段(phase),在每個階段上都可以有任意個參與者參與。執行緒可以隨時註冊並參與到某個階段的執行中來。當一個階段中所有的執行緒都成功完成之後,Phaser類的物件會自動進入下一個階段,如此迴圈下去,直到Phaser類的物件中不再包含任何參與者,此時它會自動結束。功能強大,可以替代CountDownLatch和CyclicBarrier。

Phaser的構造器可指定初始的參與者的個數。

(
1)register 動態新增參與者 (2)arriveAndAwaitAdvance 完成之後等待其他參與者完成,會阻塞直到Phaser類的物件成功進入下一個階段 (3)arriveAndDeregister 執行完成之後取消自己的註冊,不參與下一個階段的執行

 

示例程式碼如下:

public class PhaserDemo {
 
    private final Phaser phaser = new Phaser(1);
    private final Pattern imageUrlPattern = Pattern.compile("src=['\"]?(.*?(\\.jpg|\\.gif|\\.png))['\"]?[\\s>]+", Pattern.CASE_INSENSITIVE);
    public void download(URL url, final Path path, Charset charset) throws IOException {
        if (charset == null) {
            charset = StandardCharsets.UTF_8;
        }
        String content = getContent(url, charset);
        List<URL> imageUrls = extractImageUrls(content);
        for (final URL imageUrl : imageUrls) {
            phaser.register();
            new Thread() {
                public void run() {
                    //等待其他執行緒建立完成
                    phaser.arriveAndAwaitAdvance();
                    //進入圖片下載階段
                    try {
                        InputStream is = imageUrl.openStream();
                        Files.copy(is, getSavedPath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        phaser.arriveAndDeregister();
                    }
                }
            }.start();
        }
        //等待其他下載執行緒建立完成
        phaser.arriveAndAwaitAdvance();
        //等待下載階段的下載執行緒執行完成
        phaser.arriveAndAwaitAdvance();
        //下載完成之後登出自己
        phaser.arriveAndDeregister();
    }
    private String getContent(URL url, Charset charset) throws IOException {
        InputStream is = url.openStream();
        return IOUtils.toString(new InputStreamReader(is, charset.name()));
    }
    private List<URL> extractImageUrls(String content) {
        List<URL> result = new ArrayList<URL>();
        Matcher matcher = imageUrlPattern.matcher(content);
        while (matcher.find()) {
            try {
                result.add(new URL(matcher.group(1)));
            } catch (MalformedURLException e) {
                //忽略
            }
        }
        return result;
    }
    private Path getSavedPath(Path parent, URL url) {
        String urlString = url.toString();
        int index = urlString.lastIndexOf("/");
        String fileName = urlString.substring(index + 1);
        return parent.resolve(fileName);
    }
 
    public static void main(String[] args) throws IOException {
        URL url = new URL("http://www.baidu.com");
        PhaserDemo downloader = new PhaserDemo();
        downloader.download(url, Paths.get("imgs"), Charset.forName("GB2312"));
    }
 
}
View Code

 


14.fork-join

fork/join是java7更新的一個新的輕量級任務執行框架,其主要目的是要更好滴利用底層平臺上的多核CPU和多處理器來進行並行處理,解決問題時通常採用分治(divide and conquer)演算法或map/reduce演算法來進行。

fork操作是把一個大問題劃分為若干較小的問題,一般是遞迴進行。

join操作是把部分解收集並組織起來,得到最終的完整解,也可能是遞迴進行的。

如果某個子問題由於等待另外一個子問題的完成而無法繼續執行,那麼處理該子問題的執行緒會主動尋找其他尚未執行的子問題來執行,減少了執行緒等待時間,提高了效能。

ForkJoinTask的子類:RecursiveTask(可返回結果)與RecursiveAction。

ForkJoinPool實現了ExecutorService介面。

示例程式碼如下:

public class ForkJoinDemo {
 
    private static final int RANGE_LENGTH = 2000;
    private final ForkJoinPool forkJoinPool = new ForkJoinPool();
 
    private static class MaxValueTask extends RecursiveTask<Long> {
        private final long[] array;
        private final int start;
        private final int end;
 
        MaxValueTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
 
        protected Long compute() {
            long max = Long.MIN_VALUE;
            if (end - start <= RANGE_LENGTH) {
                for (int i = start; i < end; i++) {
                    if (array[i] > max) {
                        max = array[i];
                    }
                }
            } else {
                int mid = (start + end) / 2;
                MaxValueTask lowTask = new MaxValueTask(array, start, mid);
                MaxValueTask highTask = new MaxValueTask(array, mid, end);
                lowTask.fork();
                highTask.fork();
                max = Math.max(max, lowTask.join());
                max = Math.max(max, highTask.join());
            }
            return max;
        }
    }
 
    public void calculate(long[] array) {
        MaxValueTask task = new MaxValueTask(array, 0, array.length);
        Long result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
 
    public void calculateNormal(long[] array) {
        long max = Long.MIN_VALUE;
        for (int i = 0, n = array.length; i < n; i++) {
            if (array[i] > max) {
                max = array[i];
            }
        }
        System.out.println(max);
    }
 
    public static void main(String[] args) {
        Random random = new Random();
        int size = Integer.MAX_VALUE / 256;
        long[] array = new long[size];
        for (int i = 0; i < size; i++) {
            array[i] = random.nextLong();
        }
        ForkJoinDemo mv = new ForkJoinDemo();
        long startTime = System.currentTimeMillis();
        mv.calculate(array);
        long midTime = System.currentTimeMillis();
        System.out.println(midTime - startTime);
        mv.calculateNormal(array);
        long endTime = System.currentTimeMillis();
        System.out.println(endTime - midTime);
    }
 
}
View Code

 


15.parrelStream

 

Java8並行流ParallelStream和Stream的區別就是支援並行執行,提高程式執行效率。但是如果使用不當可能會發生執行緒安全的問題。Demo如下:
public static void concurrentFun() {
        List<Integer> listOfIntegers =
                new ArrayList<>();
        for (int i = 0; i <100; i++) {
            listOfIntegers.add(i);
        }
        List<Integer> parallelStorage = new ArrayList<>() ;
        listOfIntegers
                .parallelStream()
                .filter(i->i%2==0)
                .forEach(i->parallelStorage.add(i));
        System.out.println();

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));

        System.out.println();
        System.out.println("Sleep 5 sec");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));
    }
View Code