重入鎖、讀寫鎖,鎖的高階深化
阿新 • • 發佈:2018-12-27
import java.util.concurrent.CountDownLatch; public class UseCountDownLatch { public static void main(String[] args) { final CountDownLatch countDown = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("進入執行緒t1" + "等待其他執行緒處理完成..."); countDown.await(); System.out.println("t1執行緒繼續執行..."); } catch (InterruptedException e) { e.printStackTrace(); } } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t2執行緒進行初始化操作..."); Thread.sleep(3000); System.out.println("t2執行緒初始化完畢,通知t1執行緒繼續..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t3執行緒進行初始化操作..."); Thread.sleep(4000); System.out.println("t3執行緒初始化完畢,通知t1執行緒繼續..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } }
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; public class ZookeeperBase { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181"; /** session超時時間 */ static final int SESSION_OUTTIME = 2000;//ms /** 訊號量,阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception{ ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){ @Override public void process(WatchedEvent event) { //獲取事件的狀態 KeeperState keeperState = event.getState(); EventType eventType = event.getType(); //如果是建立連線 if(KeeperState.SyncConnected == keeperState){ if(EventType.None == eventType){ //如果建立連線成功,則傳送訊號量,讓後續阻塞程式向下執行 connectedSemaphore.countDown(); System.out.println("zk 建立連線"); } } } }); //進行阻塞 connectedSemaphore.await(); System.out.println(".."); //建立父節點 // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //建立子節點,使用EPHEMERAL,主程式執行完成後該節點被刪除,只在本次會話內有效,可以用作分散式鎖。 // zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //獲取節點資訊 // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); // System.out.println(zk.getChildren("/testRoot", false)); //修改節點的值,-1表示跳過版本檢查,其他正數表示如果傳入的版本號與當前版本號不一致,則修改不成功,刪除是同樣的道理。 // zk.setData("/testRoot", "modify data root".getBytes(), -1); // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); //判斷節點是否存在 // System.out.println(zk.exists("/testRoot/children", false)); //刪除節點 // zk.delete("/testRoot/children", -1); // System.out.println(zk.exists("/testRoot/children", false)); zk.close(); } }
import java.io.IOException; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UseCyclicBarrier { static class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(5)); System.out.println(name + " 準備OK."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } } public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3); // 3 ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "zhangsan"))); executor.submit(new Thread(new Runner(barrier, "lisi"))); executor.submit(new Thread(new Runner(barrier, "wangwu"))); executor.shutdown(); } }
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class UseFuture implements Callable<String>{
private String para;
public UseFuture(String para){
this.para = para;
}
/**
* 這裡是真實的業務邏輯,其執行可能很慢
*/
@Override
public String call() throws Exception {
//模擬執行耗時
Thread.sleep(5000);
String result = this.para + "處理完成";
return result;
}
//主控制函式
public static void main(String[] args) throws Exception {
String queryStr = "query";
//構造FutureTask,並且傳入需要真正進行業務邏輯處理的類,該類一定是實現了Callable介面的類
FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
//建立一個固定執行緒的執行緒池且執行緒數為1,
ExecutorService executor = Executors.newFixedThreadPool(2);
//這裡提交任務future,則開啟執行緒執行RealData的call()方法執行
//submit和execute的區別: 第一點是submit可以傳入實現Callable介面的例項物件, 第二點是submit方法有返回值
Future f1 = executor.submit(future); //單獨啟動一個執行緒去執行的
Future f2 = executor.submit(future2);
System.out.println("請求完畢");
try {
//這裡可以做額外的資料操作,也就是主程式執行其他業務邏輯
System.out.println("處理實際的業務邏輯...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//呼叫獲取資料方法,如果call()方法沒有執行完成,則依然會進行等待
System.out.println("資料:" + future.get());
System.out.println("資料:" + future2.get());
executor.shutdown();
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class UseSemaphore {
public static void main(String[] args) {
// 執行緒池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5個執行緒同時訪問
final Semaphore semp = new Semaphore(5);
// 模擬20個客戶端訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 獲取許可
semp.acquire();
System.out.println("Accessing: " + NO);
//模擬實際業務邏輯
Thread.sleep((long) (Math.random() * 10000));
// 訪問完後,釋放
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(semp.getQueueLength());
// 退出執行緒池
exec.shutdown();
}
}
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UseReentrantLock {
private Lock lock = new ReentrantLock();
public void method1(){
try {
lock.lock();
System.out.println("當前執行緒:" + Thread.currentThread().getName() + "進入method1..");
Thread.sleep(1000);
System.out.println("當前執行緒:" + Thread.currentThread().getName() + "退出method1..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method2(){
try {
lock.lock();
System.out.println("當前執行緒:" + Thread.currentThread().getName() + "進入method2..");
Thread.sleep(2000);
System.out.println("當前執行緒:" + Thread.currentThread().getName() + "退出method2..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final UseReentrantLock ur = new UseReentrantLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ur.method1();
ur.method2();
}
}, "t1");
t1.start();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(ur.lock.getQueueLength());
}
}
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class UseManyCondition {
private ReentrantLock lock = new ReentrantLock();
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
public void m1(){
try {
lock.lock();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "進入方法m1等待..");
c1.await();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "方法m1繼續..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m2(){
try {
lock.lock();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "進入方法m2等待..");
c1.await();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "方法m2繼續..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m3(){
try {
lock.lock();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "進入方法m3等待..");
c2.await();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "方法m3繼續..");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m4(){
try {
lock.lock();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "喚醒..");
c1.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void m5(){
try {
lock.lock();
System.out.println("當前執行緒:" +Thread.currentThread().getName() + "喚醒..");
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final UseManyCondition umc = new UseManyCondition();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
umc.m1();
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
umc.m2();
}
},"t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
umc.m3();
}
},"t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
umc.m4();
}
},"t4");
Thread t5 = new Thread(new Runnable() {
@Override
public void run() {
umc.m5();
}
},"t5");
t1.start(); // c1
t2.start(); // c1
t3.start(); // c2
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t4.start(); // c1
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t5.start(); // c2
}
}