多執行緒04——執行緒協作
阿新 • • 發佈:2022-03-31
多執行緒04——執行緒協作
生產消費者模式
執行緒通訊
應用場景:生產者和消費者問題
◆假設倉庫中只能存放- -件產品,生產者將生產出來的產品放入倉庫,消費者將倉庫中產品取走消費.
◆如果倉庫中沒有產品,則生產者將產品放入倉庫,否則停止生產並等待,直到倉庫中的產品被消費者取走為止.
◆如果倉庫中放有產品,則消費者可以將產品取走消費,否則停止消費並等待,直到倉庫中再次放入產品為止.
執行緒通訊分析
這是一個執行緒同步問題,生產者和消費者共享同-一個資源,並且生產者和消費者之間相互依賴,互為條件.
- 對於生產者,沒有生產產品之前,要通知消費者等待.而生產了產品之後,又需要馬上通知消費者消費
- 對於消費者,在消費之後,要通知生產者已經結束消費,需要生產新的產品以供消費.
- 在生產者消費者問題中,僅有synchronized是不夠的
- synchronized可阻止併發更新同-一個共享資源,實現了同步
- synchronized不能用來實現不同執行緒之間的訊息傳遞(通訊)
Java提供了幾個方法解決執行緒之間的通訊問題
注意:
- 均是Object類的方法,都只能在同步方法或者同步程式碼塊中
使用,否則會丟擲異常llegalMonitorStateException - Sleep是抱著鎖(產品)睡覺
解決方式1管程法
併發協作模型"生產者/消費者模式" -->管程法
◆生產者:負責生產資料的模組(可能是方法,物件,執行緒,程序);
◆消費者:負責處理資料的模組(可能是方法,物件,執行緒,程序);
◆緩衝區:消費者不能直接使用生產者的資料,他們之間有個”緩衝區
生產者將生產好的資料放入緩衝區,消費者從緩衝區拿出資料
package com.hao.gaoji; // 測試:生產消費者模型-->利用緩衝區解決:管程法 // 生產者,消費者 , 產品 ,緩衝區 public class TestPC { public static void main(String[] args) { SynContainer container = new SynContainer(); new Productor(container).start(); new Consumer(container).start(); } } //生產者 class Productor extends Thread{ SynContainer container;//新建一個容器 public Productor( SynContainer container){ this.container=container;//構造器,方便建立物件 } @Override public void run() { for (int i = 0; i < 100; i++) { container.push(new Chicken(i)); System.out.println("生產了第"+i+"只雞"); } } } //消費者 class Consumer extends Thread{ SynContainer container; public Consumer(SynContainer container){ this.container=container; } //消費 @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("消費了第-->"+container.pop().id+"只雞"); } } } //產品 class Chicken{ int id ; //雞的編號 public Chicken(int id) { this.id = id; } } //緩衝區 class SynContainer{ //需要一個容器大小 Chicken[] chickens = new Chicken[10]; //容器計數器 int count = 0; //生產者放入產品 public synchronized void push(Chicken chicken){ //如果容器滿了,就需要等待消費者消費 if (count== chickens.length){ //生產等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //如果沒有滿,我們就需要丟入產品 chickens[count]=chicken; count++; //可以通知消費者消費了 this.notifyAll(); } //消費者消費產品 public synchronized Chicken pop(){ //判斷能否消費 if(count==0){ //等待生產者生產,消費者等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //如果可以消費 count--; Chicken chicken=chickens[count]; //吃完了,通知生產者生產 this.notifyAll(); return chicken; } }
解決方式2訊號燈法
引入標誌位,併發協作模型“生產者/消費者模式" -->訊號燈法
package com.hao.gaoji;
//測試生產者消費者問題2:訊號燈法,標誌位解決
public class TestPC2 {
public static void main(String[] args) {
TV tv = new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}
//生產者-->演員
class Player extends Thread{
TV tv;
public Player(TV tv){
this.tv=tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
if (i%2==0){
this.tv.play("快樂大本營播放中");
}else{
this.tv.play("抖音記錄美好生活");
}
}
}
}
//消費者-->觀眾
class Watcher extends Thread{
TV tv;
public Watcher(TV tv){
this.tv=tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
tv.watch();
}
}
}
//產品-->節目
class TV{
//演員表演的時候,觀眾等待 T
//觀眾表演的時候,演員等待 F
String voice;//表演的節目
boolean flag = true;
//表演
public synchronized void play(String voice){
if (!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("演員表演了"+voice);
//通知觀眾觀看
this.notifyAll();//通知喚醒
this.voice= voice ; //聲音更新
this.flag=!this.flag;
}
//觀看
public synchronized void watch(){
if (flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("觀看了"+voice);
//通知演員表演
this.notifyAll();//通知喚醒
this.flag=!this.flag;
}
}
使用執行緒池
-
背景:經常建立和銷燬、使用量特別大的資源,比如併發情況下的執行緒,對效能影響很大。
-
思路:提前建立好多個執行緒,放入執行緒池中,使用時直接獲取,使用完放回池中。可以避免頻繁建立銷燬、實現重複利用。類似生活中的公共交通工具。
-
好處:
- 提高響應速度(減少了建立新執行緒的時間)
- 降低資源消耗(重複利用執行緒池中執行緒,不需要每次都建立)
- 便於執行緒管理(....)
- corePoolSize: 核心池的大小
- maximumPoolSize:最大執行緒數
- keepAliveTime:執行緒沒有任務時最多保持多長時間後會終止
-
JDK5.0起提供了執行緒池相關的API:ExecutorService和Executors
-
ExecutorService:真正的執行緒池介面。常見子類ThreadPoolExecutor
- void execute(Runnable command):執行任務/命令,沒有返回值,一般又來執行Runnable
Future submit(Callable task):執行任務,有返回值,一般又來執行Callable - void shutdown():關閉連線池
-
Executors:工具類、執行緒池的工廠類,用於建立並返回不同型別的執行緒池
package com.hao.gaoji;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//測試執行緒池
public class TestPool {
public static void main(String[] args) {
//1.建立服務,建立執行緒池
//newFixedThreadPool 引數為:執行緒池大小
ExecutorService service = Executors.newFixedThreadPool(10);
//執行執行緒
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
//2.關閉連結
service.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}