1. 程式人生 > 其它 >併發程式設計-ThreadLocal&ForkJoinPool(使用以及原理分析)

併發程式設計-ThreadLocal&ForkJoinPool(使用以及原理分析)

併發程式設計-ThreadLocal&ForkJoinPool(使用以及原理分析)

本章只要聊聊兩個東西,這裡會給出如何使用他們,並且分析各自的原始碼以及原理。

ThreadLocal】:在指定執行緒中儲存資料,資料儲存後只有指定執行緒可以獲得

ForkJoinpool】:實際上他類似於【hadoop】他是將一個大任務分成若干個小任務,然後對每個小任務進行計算,最後彙總結果(其運用【分治法】和【工作竊取】兩種思想)

ThreadLocal

使用

他是用來解決執行緒安全問題的一個工具(本質上執行緒安全問題就是多個執行緒對同一個共享變數讀和寫的操作導致的),他就通過【執行緒隔離

】解決這一問題,試想咱們現在有一個變數,每個執行緒拿到的這個變數都是一樣的,各自對這個變數修改對彼此不可見。

public class ThreadLocalExample {
    static ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 0);
    public static void main(String[] args) {
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] 
= new Thread(() -> {
          //獲得執行緒中的資料
int i1 = integerThreadLocal.get().intValue();
          //儲存進去 integerThreadLocal.set(i1
+= 5); System.out.println(integerThreadLocal.get() + "-->" + Thread.currentThread().getName()); }); }
for (int i = 0; i < 5; i++) { threads[i].start(); } } }

結果(每個執行緒獲取的資料是一樣的):我們其實可以用它儲存使用者資訊,當前端傳遞過來一個token然後我們token進行解析,就可以當前使用者資訊儲存在這個中。

如何應用在現實中(我們知道simpledatefromate不是執行緒安全的,那我們多個執行緒用同一個simpledatefromate一定會有問題,我們就可以用他來解決)

public class SimpleDateExample {
    static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = new ThreadLocal<>();

    static SimpleDateFormat getDateFormate() {
        //從當前執行緒中獲取
        SimpleDateFormat simpleDateFormat = simpleDateFormatThreadLocal.get();
        if (simpleDateFormat == null) {
            // 在當前執行緒中設定一個
            simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
            simpleDateFormatThreadLocal.set(simpleDateFormat);
        }
        return simpleDateFormat;
    }

    private static Date parse(String strDate) throws ParseException {
        return getDateFormate().parse(strDate);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(parse("1998-05-08"));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

【原理】

主要有兩個方法

  • set:在當前執行緒中設定一個數,儲存在ThreadLocal中,這個數值僅對當前執行緒可見【這就相當於在當前執行緒中建立了一個副本】
  • get:從當前執行緒中取出set方法設定的數值

原理猜想:

  • 能實現執行緒隔離,當前儲存的執行緒只會儲存在當前範圍內
  • 一定有一個容器儲存這些副本,並且這個容器中的值肯定和每一個執行緒相關聯
  • 容器的key肯定和當前執行緒有關係,因為我們在get資料的時候沒有傳入任何key

整體剖析(原始碼解析):其實是在每個執行緒中都把threadLoca儲存在他當前的一個【ThreadLocalMap】中,key指向了ThreadLocalMap而value就是儲存的資料

set():

public void set(T value) {
    Thread t = Thread.currentThread();
    //根據當前執行緒獲取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null)
        //不為空則直接儲存
        map.set(this, value);
    else
        //初始化
        createMap(t, value);
}

初始化

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //預設長度為10
    table = new Entry[INITIAL_CAPACITY];
    //計算陣列下標
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

儲存資料

private void set(ThreadLocal<?> key, Object value) {

    Entry[] tab = table;
    int len = tab.length;
    //根據key計算陣列下標
    int i = key.threadLocalHashCode & (len-1);

    //線性探索(這裡是怕通過TheradLocal計算的陣列下標是同一個,所以這裡是解決hash衝突的)
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        //如果這個位置有內容則直接進行替換
        if (k == key) {
            e.value = value;
            return;
        }

        //如果key為空則證明這裡儲存的資料是一個髒資料,需要被清理(因為這裡是弱引用,當threadLocal被Gc了那所引用他的物件也就為空了)
        if (k == null) {
        //我們一會主要聊聊這個方法 replaceStaleEntry(key, value, i);
return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }

get()

public T get() {
    Thread t = Thread.currentThread();
    // 獲取當前執行緒的ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //通過當前的ThreadLocal去獲取資料
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}
private T setInitialValue() {
    //獲取咱們設定的初始資料
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

在設定set內容的時候有一段程式碼比較特殊,我們來分析一下【replaceStaleEntry】:他主要做兩件事情

【把當前的value儲存在entry中】【並且清理無效的key】但是裡面牽扯到【線性定址法】可以解決hash衝突,具體是這樣的

從通過ThreadLocal生成的下標中向前查詢,然後再向後查詢。

髒enrty】:指的是key為null

可以替換的enrty】:傳入的通過當前ThreadLocal計算的hash值和整個entry中的某個key相同,則把兩個位置做一個替換

具體情況如下

  • 向前查找到了髒的enrty(✔),向後有可以替換的enrty(✔)->這一步是最繁瑣的,剩下的就簡單啦
    •   向前查詢直到找到key為null結束然後使用【slotToExpunge】記住髒entry的下標
    •   向後查詢,假設我們要新增一個value為6的entry,我們換算出來的下標為4,向後查詢,我們發現傳入的key的hash值和下標是6的hash值是相同的,那我就互換他們的位置(如果不替換他們的位置,我們設定內容的時候就會存在衝突的問題
    • 這個時候從【slotToExpunge】這裡還是給後面清理,從我們記住 的髒entry開始給後面遍歷entry,把enrty中key為null的value設定為空
  • 向前查找到了髒的enrty(✔),向後沒有發現可以替換的enrty(✖)
    •   這裡就直接插入資料,因為沒有hash衝突,然後我們向前查詢,找到【slotToExpunge】,然後從這裡再給遍歷把key為null的value設定為空
  • 向前查詢沒有髒的entry(✖),向後發現了可以替換的enrty(✔)
    •   那就找到衝突的hash然後替換。
  • 向前沒有找到髒的enrty(✖),向後沒有發現可以替換的enrty(✖)
    •   那直接插入即可
  1. 如果當前下標對應的entry中的key為null,則向前查詢是否還存在key為null的entry進行清理
  2. 通過線性探索解決hash衝突的問題
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;
    //這裡定義了一個需要開始清理的位置
    int slotToExpunge = staleSlot;
    //這裡向前查詢
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        //找到前面的key為null,然後記錄開始查詢位置
        if (e.get() == null)
            slotToExpunge = i;

    //向下查詢
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();


        //如果找到一個相同的key
        if (k == key) {
            //這裡就對他們進行替換
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            //這裡清理無效的key
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }


        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

對無效的key進行清理

//這裡就是我們向前查到後找到的key為null的下標
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
    //這裡迴圈查詢
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        //只要key為null,則設定他的value也為null
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;

              
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

關於記憶體洩漏問題,是會有極端情況下產生,你我們每次用完後就使用remove即可,remove底層也是使用了線性探索對key和value進行移除

Fork/Join

它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,有點像Hadoop

【使用】

API:

ForkJoinTask:

  RecursiveAction:沒有返回結果的

  RecursiveTask:有返回結果的

  CountedCompleter:沒有返回值,但是任務觸發後會進行回撥

fork():讓task非同步執行

join():讓task同步通同步,等待獲得返回值

ForkJoinPool:專門用來執行ForkJoinTask的執行緒池

public class ForkJoinExample {

    //以200為一個單位進行拆分
    private static Integer separateLine = 10;

    static class CalcJoinTask extends RecursiveTask<Integer> {
        //子任務開始計算的數值
        private Integer startValue;
        //子任務開始結束計算的數值
        private Integer endValue;

        CalcJoinTask(Integer startValue, Integer endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        protected Integer compute() {
            // 如果當前的資料計算已經小於給定數值則不需要進行拆分,否則會進行不斷的拆分
            if (endValue - startValue < separateLine) {
                System.out.println("開始計算startValue:" + startValue + "endValue:" + endValue);
                return endValue+startValue;
            }
            //從開始位置到一半的位置
            CalcJoinTask calcJoinTask = new CalcJoinTask(startValue, (startValue + endValue) / 2);
            calcJoinTask.fork();
            //總數一半加一的地方,到最大的數值
            CalcJoinTask calcJoinTask1 = new CalcJoinTask((startValue + endValue) / 2 + 1, endValue);
            calcJoinTask1.fork();
            //這裡彙總總數
            return calcJoinTask.join() + calcJoinTask1.join();
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CalcJoinTask calcJoinTask = new CalcJoinTask(1, 100);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> result = forkJoinPool.submit(calcJoinTask);
        System.out.println("result"+result.get());
    }
}

工作佇列

我們看fork方法的底層是一個工作佇列【workQueue】,他會把當前的任務放在一個workQueue,他是一個【雙端佇列】(是一個兩端進行新增和操作元素的佇列),他它可以實現後進先出(fifo),每個執行緒中都私有這樣一個雙端佇列,這個佇列中儲存的就是fork新增進行去的任務,每次執行任務都從這個佇列中取,因為是後進先出,所以第一個元素就是最後push進去的元素,執行完成第一個元素,把執行的資料傳遞給下一個佇列中的元素。但是如果一個執行緒執行完成了他自己的任務,那怎麼辦,我們不能讓他閒著,所有這裡使用了一個工作竊取的演算法

工作竊取

如果某個執行緒的【workQueue】執行完成了,那我先不閒著,我從其他的佇列中去獲取一個任務,從他們的對尾獲取,這樣就不會衝突,因為其他執行緒是從隊頭獲取任務執行,這樣就提升了效能

如何應用到實際業務中(拆分多個rpc任務去查詢商品資訊)

在一個電商系統中需要查詢:商品資訊查詢(【RPC->】商品基本資訊查詢 & 商品評價資訊查詢) 店鋪資訊查詢 (【RPC->】銷售情況,店鋪基本資訊 )然後我們把每個查詢都給他變成一個task

這裡是所有的pojo類

/**
 * @author : lizi
 * @date : 2021-06-25 18:01
 * 商品資訊
 **/
public class Shop {
    String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
View Code

這裡是我們模擬的所有的rpc服務

/**
 * @author : lizi
 * @date : 2021-06-25 18:08
 * //商品評價資訊查詢
 **/
@Service
public  class CommentService extends AbstractLodeDataProcessor {
    @Override
    public void load(Context context) {
        //這裡是模擬rpc呼叫
        Comment comment=new Comment();
        comment.setContent("Glen");
        comment.setName("everything is ok");
        context.setComment(comment);
    }
}/**
 * @author : lizi
 * @date : 2021-06-25 18:11
 * 商品基本資訊查詢
 **/
@Service
public class ItemService  extends AbstractLodeDataProcessor {
    @Override
    public void load(Context context) {
        Item item=new Item();
        item.setNum(100);
        item.setProductName("battery");
        context.setItem(item);
    }
}
/**
 * @author : lizi
 * @date : 2021-06-25 20:25
 * 銷售情況
 **/
@Service
public class SellerService extends AbstractLodeDataProcessor {
    @Override
    public void load(Context context) {
        Seller seller=new Seller();
        seller.setTotalNum(1000);
        seller.setSellerNum(100);
        context.setSeller(seller);
    }
}
/**
 * @author : lizi
 * @date : 2021-06-25 20:27
 * 店鋪基本資訊
 **/
@Service
public class ShopService extends AbstractLodeDataProcessor {
    @Override
    public void load(Context context) {
        Shop shop=new Shop();
        shop.setName("Glen’s shop");
        context.setShop(shop);
    }
}
View Code

這裡進行所有任務的聚合

/**
 * @author : lizi
 * @date : 2021-06-25 20:24
 **/
@Service
public class ComplexTradeService extends AbstractLodeDataProcessor implements ApplicationContextAware {
    ApplicationContext applicationContext;
    List<AbstractLodeDataProcessor> abstractLodeDataProcessors=new ArrayList<>();
    @Override
    public void load(Context context) {
        abstractLodeDataProcessors.forEach(x->{
            x.setContext(this.context);
            //建立一個fork task
            x.fork();
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
        abstractLodeDataProcessors.add(applicationContext.getBean(SellerService.class));
        abstractLodeDataProcessors.add(applicationContext.getBean(ShopService.class));
    }

    @Override
    public Context getContext() {
        this.abstractLodeDataProcessors.forEach(ForkJoinTask::join);
        return super.getContext();
    }
}
/**
 * @author : lizi
 * @date : 2021-06-25 20:14
 * 對於商品資訊的聚合任務 實現ApplicationContextAware去獲取上下文
 **/
@Service
public class ItemTaskForkJoinDataProcessor extends AbstractLodeDataProcessor implements ApplicationContextAware {
    ApplicationContext applicationContext;

    List<AbstractLodeDataProcessor> abstractLodeDataProcessors=new ArrayList<>();

    // 在這裡我們對於每個task進行一個fork
    @Override
    public void load(Context context) {
        abstractLodeDataProcessors.forEach(x->{
            x.setContext(this.context);
            //建立一個fork task
            x.fork();
        });
    }

    @Override
    public Context getContext() {
        //這裡獲取每個任務的等待
        this.abstractLodeDataProcessors.forEach(ForkJoinTask::join);
        return super.getContext();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
        abstractLodeDataProcessors.add(applicationContext.getBean(CommentService.class));
        abstractLodeDataProcessors.add(applicationContext.getBean(ItemService.class));
        abstractLodeDataProcessors.add(applicationContext.getBean(ComplexTradeService.class));
    }
}
View Code

我們需要有一個上下文去儲存獲取的資訊

public class Context {
    public Comment getComment() {
        return comment;
    }

    public void setComment(Comment comment) {
        this.comment = comment;
    }

    public Item getItem() {
        return item;
    }

    public void setItem(Item item) {
        this.item = item;
    }

    public Seller getSeller() {
        return seller;
    }

    public void setSeller(Seller seller) {
        this.seller = seller;
    }

    public Shop getShop() {
        return shop;
    }

    public void setShop(Shop shop) {
        this.shop = shop;
    }

    Comment comment;
    Item item;
    Seller seller;
    Shop shop;

    @Override
    public String toString() {
        return "Context{" +
                "comment=" + comment +
                ", item=" + item +
                ", seller=" + seller +
                ", shop=" + shop +
                '}';
    }
}
View Code

定一個一個模板模式的基礎類,所有的任務類都需要繼承這個類,並且去實現模板方法

/**
 * @author : lizi
 * @date : 2021-06-25 18:03
 **/
//因為我們不需要返回值所以用 RecursiveAction
public abstract  class AbstractLodeDataProcessor extends RecursiveAction implements  ILodeDataProcessor {

    public void setContext(Context context) {
        this.context = context;
    }

    public Context getContext() {
        // 得到一個聚合的結果
        this.join();
        return context;
    }

    protected Context context;

    @Override
    protected void compute() {
        //呼叫子類的具體實現,【所有實現了AbstractLodeDataProcessor的子類都會實現load方法】
        load(context);
    }
}

並且我們需要一個介面,這樣方便進行管理,並且遵循相關的設計思想

public interface ILodeDataProcessor {
    //載入資料
    void  load( Context context);
}
View Code

這裡這是一個controller進行訪問測試

@RestController
public class IndexController {
    @Autowired
    ItemTaskForkJoinDataProcessor itemTaskForkJoinDataProcessor;

    @GetMapping("/say")
    public Context index() {
        Context context = new Context();
        itemTaskForkJoinDataProcessor.setContext(context);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.submit(itemTaskForkJoinDataProcessor);
        return itemTaskForkJoinDataProcessor.getContext();
    }
}