1. 程式人生 > 其它 >Java同步器之ReentrantLock底層實現原理(二)

Java同步器之ReentrantLock底層實現原理(二)

Java裡的鎖有兩大類,一類是synchronized鎖,一類是concurrent包裡的鎖(JUC鎖)。其中synchronized鎖是JAVA語言層面提供的能力,在此不展開,本文主要討論JUC裡的ReentrantLock鎖。

一、JDK層

1.1 AbstractQueuedSynchronizer

ReentrantLocklock()unlock()API其實依賴於內部的Synchronizer(注意,不是synchronized)來實現。Synchronizer又分為FairSyncNonfairSync,顧名思義是指公平和非公平。

當呼叫ReentrantLocklock

方法時,其實就只是簡單地轉交給Synchronizerlock()方法:

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
}

public void lock() {
    sync.lock();
}

那麼這個sync又是什麼?我們看到Sync繼承自AbstractQueueSynchronizerAQS),AQSconcurrent包的基石,AQS本身並不實現任何同步介面(比如lock,unlock,countDown等等),但是它定義了一個併發資源控制邏輯的框架(運用了template method設計模式),它定義了acquire和release方法用於獨佔地(exclusive)獲取和釋放資源,以及acquireShared和releaseShared方法用於共享地獲取和釋放資源。比如acquire/release用於實現ReentrantLock,而acquireShared/releaseShared用於實現CountDownLacth,Semaphore。比如acquire的框架如下:

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

整體邏輯是,先進行一次tryAcquire,如果成功了,就沒啥事了,呼叫者繼續執行自己後面的程式碼,如果失敗,則執行addWaiteracquireQueued。其中tryAcquire()需要子類根據自己的同步需求進行實現,而acquireQueued()addWaiter()已經由AQS實現。addWaiter的作用是把當前執行緒加入到AQS內部同步佇列的尾部,而acquireQueued的作用是當tryAcquire()失敗的時候阻塞當前執行緒。

addWaiter的程式碼如下:

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    //建立節點,設定關聯執行緒和模式(獨佔或共享)
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 如果尾節點不為空,說明同步佇列已經初始化過
    if (pred != null) {
        //新節點的前驅節點設定為尾節點
        node.prev = pred;
        // 設定新節點為尾節點
        if (compareAndSetTail(pred, node)) {
            //老的尾節點的後繼節點設定為新的尾節點。 所以同步佇列是一個雙向列表。
            pred.next = node;
            return node;
        }
    }
    //如果尾節點為空,說明佇列還未初始化,需要初始化head節點並加入新節點
    enq(node);
    return node;
}

enq(node)的程式碼如下:

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果tail為空,則新建一個head節點,並且tail和head都指向這個head節點
            //佇列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何執行緒關聯
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //第二次迴圈進入這個分支,
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter執行結束後,同步佇列的結構如下所示:

acquireQueued的程式碼如下:

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取當前node的前驅node
            final Node p = node.predecessor();
            //如果前驅node是head node,說明自己是第一個排隊的執行緒,則嘗試獲鎖
            if (p == head && tryAcquire(arg)) {
                //把獲鎖成功的當前節點變成head node(啞節點)。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued的邏輯是:

判斷自己是不是同步佇列中的第一個排隊的節點,則嘗試進行加鎖,如果成功,則把自己變成head node,過程如下所示:

如果自己不是第一個排隊的節點或者tryAcquire失敗,則呼叫shouldParkAfterFailedAcquire,其主要邏輯是使用CAS將節點狀態由INITIAL設定成SIGNAL,表示當前執行緒阻塞等待SIGNAL喚醒。如果設定失敗,會在acquireQueued方法中的死迴圈中繼續重試,直至設定成功,然後呼叫parkAndCheckInterrupt方法。parkAndCheckInterrupt的作用是把當前執行緒阻塞掛起,等待喚醒。parkAndCheckInterrupt的實現需要藉助下層的能力,這是本文的重點,在下文中逐層闡述。

1.2 ReentrantLock

下面就讓我們一起看看ReentrantLock是如何基於AbstractQueueSynchronizer實現其語義的。

ReentrantLock內部使用的FairSyncNonfairSync,它們都是AQS的子類,比如FairSync的主要程式碼如下:

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

AQS中最重要的一個欄位就是state,鎖和同步器的實現都是圍繞著這個欄位的修改展開的。AQS可以實現各種不同的鎖和同步器的原因之一就是,不同的鎖或同步器按照自己的需要可以對同步狀態的含義有不同的定義,並重寫對應的tryAcquiretryReleasetryAcquiresharedtryReleaseShared等方法來操作同步狀態。

我們來看看ReentrantLockFairSynctryAcquire的邏輯:

  1. 如果此時state(private volatile int state)0,那麼就表示這個時候沒有人佔有鎖。但因為是公平鎖,所以還要判斷自己是不是首節點,然後才嘗試把狀態設定為1,假如成功的話,就成功的佔有了鎖。compareAndSetState也是通過CAS來實現。CAS是原子操作,而且state的型別是volatile,所以state的值是執行緒安全的。
  2. 如果此時狀態不是0,那麼再判斷當前執行緒是不是鎖的owner,如果是的話,則state遞增,當state溢位時,會拋錯。如果沒溢位,則返回true,表示成功獲取鎖。
  3. 上述都不滿足,則返回false,獲取鎖失敗。

至此,JAVA層面的實現基本說清楚了,小結一下,整個框架如下所示:

關於unlock的實現,限於篇幅,就不討論了,下文重點分析lock過程中是如何把當前執行緒阻塞掛起的,就是上圖中的unsafe.park()是如何實現的。

二、JVM層

Unsafe.parkUnsafe.unparksun.misc.Unsafe類的native方法,

public native void unpark(Object var1);

public native void park(boolean var1, long var2);

這兩個方法的實現是在JVMhotspot/src/share/vm/prims/unsafe.cpp檔案中,

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    const oop obj = thread->current_park_blocker();
    if (time == 0) {
      post_thread_park_event(&event, obj, min_jlong, min_jlong);
    } else {
      if (isAbsolute != 0) {
        post_thread_park_event(&event, obj, min_jlong, time);
      } else {
        post_thread_park_event(&event, obj, time, min_jlong);
      }
    }
  }
UNSAFE_END

核心是邏輯是thread->parker()->park(isAbsolute != 0, time);就是獲取java執行緒的parker物件,然後執行它的park方法。每個java執行緒都有一個Parker例項,Parker類是這樣定義的:

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  ...
public:
  void park(bool isAbsolute, jlong time);
  void unpark();
  ...
}
class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

park方法:

void Parker::park(bool isAbsolute, jlong time) {
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }

  ////進入safepoint region,更改執行緒為阻塞狀態
  ThreadBlockInVM tbivm(jt);
  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
  //如果執行緒被中斷,或者嘗試給互斥變數加鎖時失敗,比如被其它執行緒鎖住了,直接返回
    return;
  }
  //到這裡,意味著pthread_mutex_trylock(_mutex)成功
  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

park的思路:parker內部有個關鍵欄位_counter,這個counter用來記錄所謂的“permit”,當_counter大於0時,意味著有permit,然後就可以把_counter設定為0,就算是獲得了permit,可以繼續執行後面的程式碼。如果此時_counter不大於0,則等待這個條件滿足。

下面我具體來看看park的具體實現:

  1. 當呼叫park時,先嚐試能否直接拿到“許可”,即_counter>0時,如果成功,則把_counter設定為0,並返回。
  2. 如果不成功,則把執行緒的狀態設定成_thread_in_vm並且_thread_blocked_thread_in_vm表示執行緒當前在JVM中執行,_thread_blocked表示執行緒當前阻塞了。
  3. 拿到mutex之後,再次檢查_counter是不是>0,如果是,則把_counter設定為0unlock mutex並返回。
  4. 如果_counter還是不大於0,則判斷等待的時間是否等於0,然後呼叫相應的pthread_cond_wait系列函式進行等待,如果等待返回(即有人進行unpark,則pthread_cond_signal來通知),則把_counter設定為0unlock mutex並返回。

所以本質上來講,LockSupport.park是通過pthread庫的條件變數pthread_cond_t來實現的。下面我們就來看看pthread_cond_t是怎麼實現的。

三、GLIBC層

pthread_cond_t典型的用法如下:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥鎖*/
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  //初始化條件變數 

void *thread1(void *);
void *thread2(void *);

int i=1;
int main(void)
{
        pthread_t t_a;
        pthread_t t_b;
        pthread_create(&t_a,NULL,thread1,(void *)NULL);/*建立程序t_a*/
        pthread_create(&t_b,NULL,thread2,(void *)NULL); /*建立程序t_b*/
        pthread_join(t_b, NULL);/*等待程序t_b結束*/
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
        exit(0);
}
void *thread1(void *junk)
{
        for(i=1;i<=9;i++)
        {
             pthread_mutex_lock(&mutex);//
             if(i%3==0)
                 pthread_cond_signal(&cond);/*條件改變,傳送訊號,通知t_b程序*/
              else       
                  printf("thead1:%d/n",i);
              pthread_mutex_unlock(&mutex);//*解鎖互斥量*/
              printf("Up Unlock Mutex/n");      
              sleep(1);
        }
}
void *thread2(void *junk)
{
        while(i<9)
        {
            pthread_mutex_lock(&mutex);
            if(i%3!=0)
                   pthread_cond_wait(&cond,&mutex);/*等待*/
            printf("thread2:%d/n",i);
            pthread_mutex_unlock(&mutex);
            printf("Down Ulock Mutex/n");
            sleep(1);
         }

}

重點就是:無論是pthread_cond_wait還是pthread_cond_signal都必須得先pthread_mutex_lock

如果沒有這個保護,可能會產生race condition,漏掉訊號。pthread_cond_wait()函式一進入wait狀態就會自動release mutex

當其他執行緒通過pthread_cond_signalpthread_cond_broadcast把該執行緒喚醒,使pthread_cond_wait()返回時,該執行緒又自動獲得該mutex

整個過程如下圖所示:

3.1 pthread_mutex_lock

例如,在Linux中,使用了稱為Futex(快速使用者空間互斥鎖的簡稱)的系統。

在此係統中,對使用者空間中的互斥變數執行原子增量和測試操作。

如果操作結果表明鎖上沒有爭用,則對pthread_mutex_lock的呼叫將返回,而無需將上下文切換到核心中,因此獲取互斥量的操作可以非常快。

僅當檢測到爭用時,系統呼叫(稱為futex)才會發生,並且上下文切換到核心中,這會使呼叫程序進入睡眠狀態,直到釋放互斥鎖為止。

還有很多更多的細節,尤其是對於可靠和/或優先順序繼承互斥,但這就是它的本質。

nptl/pthread_mutex_lock.c

int
PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)
{
  /* See concurrency notes regarding mutex type which is loaded from __kind
     in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h.  */
  unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);

  LIBC_PROBE (mutex_entry, 1, mutex);

  if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
                                 | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
    return __pthread_mutex_lock_full (mutex);

  if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
    {
      FORCE_ELISION (mutex, goto elision);
    simple:
      /* Normal mutex.  */
      LLL_MUTEX_LOCK_OPTIMIZED (mutex);
      assert (mutex->__data.__owner == 0);
    }
#if ENABLE_ELISION_SUPPORT
  else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
    {
  elision: __attribute__((unused))
      /* This case can never happen on a system without elision,
         as the mutex type initialization functions will not
         allow to set the elision flags.  */
      /* Don't record owner or users for elision case.  This is a
         tail call.  */
      return LLL_MUTEX_LOCK_ELISION (mutex);
    }
#endif
  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
                             == PTHREAD_MUTEX_RECURSIVE_NP, 1))
    {
      /* Recursive mutex.  */
      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

      /* Check whether we already hold the mutex.  */
      if (mutex->__data.__owner == id)
        {
          /* Just bump the counter.  */
          if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
            /* Overflow of the counter.  */
            return EAGAIN;

          ++mutex->__data.__count;

          return 0;
        }

      /* We have to get the mutex.  */
      LLL_MUTEX_LOCK_OPTIMIZED (mutex);

      assert (mutex->__data.__owner == 0);
      mutex->__data.__count = 1;
    }
  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
                          == PTHREAD_MUTEX_ADAPTIVE_NP, 1))
    {
      if (LLL_MUTEX_TRYLOCK (mutex) != 0)
        {
          int cnt = 0;
          int max_cnt = MIN (max_adaptive_count (),
                             mutex->__data.__spins * 2 + 10);
          do
            {
              if (cnt++ >= max_cnt)
                {
                  LLL_MUTEX_LOCK (mutex);
                  break;
                }
              atomic_spin_nop ();
            }
          while (LLL_MUTEX_TRYLOCK (mutex) != 0);

          mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
        }
      assert (mutex->__data.__owner == 0);
    }
  else
    {
      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
      assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);
      /* Check whether we already hold the mutex.  */
      if (__glibc_unlikely (mutex->__data.__owner == id))
        return EDEADLK;
      goto simple;
    }

  pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

  /* Record the ownership.  */
  mutex->__data.__owner = id;
#ifndef NO_INCR
  ++mutex->__data.__nusers;
#endif

  LIBC_PROBE (mutex_acquired, 1, mutex);

  return 0;
}

pthread_mutex_t的定義如下:

typedef union
{
  struct __pthread_mutex_s
  {
    int __lock;
    unsigned int __count;
    int __owner;
    unsigned int __nusers;
    int __kind;
    int __spins;
    __pthread_list_t __list;
  } __data;
  ......
} pthread_mutex_t;

其中__kind欄位是指鎖的型別,取值如下:

/* Mutex types.  */
enum
{ 
  PTHREAD_MUTEX_TIMED_NP,
  PTHREAD_MUTEX_RECURSIVE_NP,
  PTHREAD_MUTEX_ERRORCHECK_NP,
  PTHREAD_MUTEX_ADAPTIVE_NP
#if defined __USE_UNIX98 || defined __USE_XOPEN2K8
  ,
  PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,
  PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,
  PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,
  PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL
#endif
#ifdef __USE_GNU
  /* For compatibility.  */
  , PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP
#endif
};

其中:

  • PTHREAD_MUTEX_TIMED_NP,這是預設值,也就是普通鎖。
  • PTHREAD_MUTEX_RECURSIVE_NP,可重入鎖,允許同一個執行緒對同一個鎖成功獲得多次,並通過多次unlock解鎖。
  • PTHREAD_MUTEX_ERRORCHECK_NP,檢錯鎖,如果同一個執行緒重複請求同一個鎖,則返回EDEADLK,否則與PTHREAD_MUTEX_TIMED_NP型別相同。
  • PTHREAD_MUTEX_ADAPTIVE_NP,自適應鎖,自旋鎖與普通鎖的混合。

mutex預設用的是PTHREAD_MUTEX_TIMED_NP,所以會走到LLL_MUTEX_LOCK_OPTIMIZED,這是個巨集:

# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)
lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
  /* The single-threaded optimization is only valid for private
     mutexes.  For process-shared mutexes, the mutex could be in a
     shared mapping, so synchronization with another process is needed
     even without any threads.  If the lock is already marked as
     acquired, POSIX requires that pthread_mutex_lock deadlocks for
     normal mutexes, so skip the optimization in that case as
     well.  */
  int private = PTHREAD_MUTEX_PSHARED (mutex);
  if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
    mutex->__data.__lock = 1;
  else
    lll_lock (mutex->__data.__lock, private);
}

由於不是LLL_PRIVATE,所以走lll_locklll_lock也是個巨集:

#define lll_lock(futex, private)
  __lll_lock (&(futex), private)

注意這裡出現了futex,本文的後續主要就是圍繞它展開的。

#define __lll_lock(futex, private)
  ((void)
   ({
     int *__futex = (futex);
     if (__glibc_unlikely
         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))
       {
         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)
           __lll_lock_wait_private (__futex);
         else
           __lll_lock_wait (__futex, private);
       }
   }))

其中,atomic_compare_and_exchange_bool_acq是嘗試通過原子操作嘗試將__futex(就是mutex->__data.__lock)從0變為1,如果成功就直接返回了,如果失敗,則呼叫__lll_lock_wait,程式碼如下:

void
__lll_lock_wait(int*futex, int private)
{
  if (atomic_load_relaxed (futex) == 2)
    goto futex;

  while (atomic_exchange_acquire (futex, 2) != 0)
    {
    futex:
      LIBC_PROBE (lll_lock_wait, 1, futex);
      futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2.  */
    }
}

在這裡先要說明一下,pthreadfutex的鎖狀態定義為3種:

  • 0,代表當前鎖空閒無鎖,可以進行快速上鎖,不需要進核心。
  • 1,代表有執行緒持有當前鎖,如果這時有其它執行緒需要上鎖,就必須標記futex為“鎖競爭”,然後通過futex系統呼叫進核心把當前執行緒掛起。
  • 2,代表鎖競爭,有其它執行緒將要或正在核心的futex系統中排隊等待鎖。

所以上鎖失敗進入到__lll_lock_wait這裡後,先判斷futex是不是等於2,如果是則說明大家都在排隊,你也排著吧(直跳轉到futex_wait)。如果不等於2,那說明你是第一個來競爭的人,把futex設定成2,告訴後面來的人要排隊,然後自己以身作則先排隊。

futex_wait實質上就是呼叫futex系統呼叫。在第四節,我們就來仔細分析這個系統呼叫。

3.2 pthread_cond_wait

本質也是走到futex系統呼叫,限於篇幅就不展開了。

四、核心層

為什麼要有futex,它解決什麼問題?何時加入核心的?

簡單來講,futex的解決思路是:在無競爭的情況下操作完全在user space進行,不需要系統呼叫,僅在發生競爭的時候進入核心去完成相應的處理(wait或者wake up)。所以說,futex是一種user modekernel mode混合的同步機制,需要兩種模式合作才能完成,futex變數位於user space,而不是核心物件,futex的程式碼也分為user modekernel mode兩部分,無競爭的情況下在user mode,發生競爭時則通過sys_futex系統呼叫進入kernel mode進行處理。

使用者態的部分已經在前面講解了,本節重點講解futex在核心部分的實現。

futex設計了三個基本資料結構:futex_hash_bucketfutex_keyfutex_q

struct futex_hash_bucket {
        atomic_t waiters;
        spinlock_t lock;
        struct plist_head chain;
} ____cacheline_aligned_in_smp;
struct futex_q {
        struct plist_node list;
        struct task_struct *task;
        spinlock_t *lock_ptr;
        union futex_key key;   //唯一標識uaddr的key值
        struct futex_pi_state *pi_state;
        struct rt_mutex_waiter *rt_waiter;
        union futex_key *requeue_pi_key;
        u32 bitset;
};
union futex_key { 
        struct {
                unsigned long pgoff;
                struct inode *inode;
                int offset;
        } shared;
        struct {
                unsigned long address;
                struct mm_struct *mm;
                int offset;
        } private; 
        struct {
                unsigned long word;
                void *ptr;
                int offset;
        } both;
};

其實還有個struct __futex_data,如下所示,這個

static struct {
        struct futex_hash_bucket *queues;
        unsigned long            hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));

#define futex_queues   (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)

futex初始化的時候(futex_init),會確定hashsize,比如24cpu時,hashsize = 8192。

然後根據這個hashsize呼叫alloc_large_system_hash分配陣列空間,並初始化陣列元素裡的相關欄位,比如plist_head,lock

static int __init futex_init(void)
{
        unsigned int futex_shift;
        unsigned long i;

#if CONFIG_BASE_SMALL
        futex_hashsize = 16;
#else
        futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif

        futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
                                               futex_hashsize, 0,
                                               futex_hashsize < 256 ? HASH_SMALL : 0,
                                               &futex_shift, NULL,
                                               futex_hashsize, futex_hashsize);
        futex_hashsize = 1UL << futex_shift;

        futex_detect_cmpxchg();

        for (i = 0; i < futex_hashsize; i++) {
                atomic_set(&futex_queues[i].waiters, 0);
                plist_head_init(&futex_queues[i].chain);
                spin_lock_init(&futex_queues[i].lock);
        }

        return 0;
}

這些資料結構之間的關係如下所示:

腦子裡有了資料結構,流程就容易理解了。futex_wait的總體流程如下:

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
                      ktime_t *abs_time, u32 bitset)
{
        struct hrtimer_sleeper timeout, *to = NULL;
        struct restart_block *restart;
        struct futex_hash_bucket *hb;
        struct futex_q q = futex_q_init;
        int ret;

        if (!bitset)
                return -EINVAL;
        q.bitset = bitset;

        if (abs_time) {
                to = &timeout;
                hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
                                      CLOCK_REALTIME : CLOCK_MONOTONIC,
                                      HRTIMER_MODE_ABS);
                hrtimer_init_sleeper(to, current);
                hrtimer_set_expires_range_ns(&to->timer, *abs_time,
                                             current->timer_slack_ns);
        }

retry:
        /*
         * Prepare to wait on uaddr. On success, holds hb lock and increments
         * q.key refs.
         */
        ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
        if (ret)
                goto out;

        /* queue_me and wait for wakeup, timeout, or a signal. */
        futex_wait_queue_me(hb, &q, to);

        /* If we were woken (and unqueued), we succeeded, whatever. */
        ret = 0;
        /* unqueue_me() drops q.key ref */
        if (!unqueue_me(&q))
                goto out;
        ret = -ETIMEDOUT;
        if (to && !to->task)
                goto out;

        /*
         * We expect signal_pending(current), but we might be the
         * victim of a spurious wakeup as well.
         */
        if (!signal_pending(current))
                goto retry;

        ret = -ERESTARTSYS;
        if (!abs_time)
                goto out;

        restart = &current->restart_block;
        restart->fn = futex_wait_restart;
        restart->futex.uaddr = uaddr;
        restart->futex.val = val;
        restart->futex.time = *abs_time;
        restart->futex.bitset = bitset;
        restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

        ret = -ERESTART_RESTARTBLOCK;

out:
        if (to) {
                hrtimer_cancel(&to->timer);
                destroy_hrtimer_on_stack(&to->timer);
        }
        return ret;
}

函式futex_wait_setup主要做兩件事,

一是對uaddr進行hash,找到futex_hash_bucket並獲取它上面的自旋鎖,
二是判斷*uaddr是否為預期值。如果不相等則會立即返回,由使用者態繼續trylock

/*
 * futex_wait_setup() - Prepare to wait on a futex
 * @uaddr:      the futex userspace address
 * @val:        the expected value
 * @flags:      futex flags (FLAGS_SHARED, etc.)
 * @q:          the associated futex_q
 * @hb:         storage for hash_bucket pointer to be returned to caller
 *
 * Setup the futex_q and locate the hash_bucket.  Get the futex value and
 * compare it with the expected value.  Handle atomic faults internally.
 * Return with the hb lock held and a q.key reference on success, and unlocked
 * with no q.key reference on failure.
 *
 * Return:
 *  -  0 - uaddr contains val and hb has been locked;
 *  - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
 */
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
                           struct futex_q *q, struct futex_hash_bucket **hb)
{
        u32 uval;
        int ret;        
retry:
        //初始化futex_q, 把uaddr設定到futex_key的欄位中,將來futex_wake時也是通過這個key來查詢futex。
        ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
        if (unlikely(ret != 0))
                return ret;

retry_private:
        //根據key計算hash,然後在數組裡找到對應的futex_hash_bucket
        *hb = queue_lock(q);
        //原子地將uaddr的值讀到uval中
        ret = get_futex_value_locked(&uval, uaddr);

        if (ret) {
                queue_unlock(*hb);

                ret = get_user(uval, uaddr);
                if (ret)
                        goto out;

                if (!(flags & FLAGS_SHARED))
                        goto retry_private;

                put_futex_key(&q->key);
                goto retry;
        }
        //如果當前uaddr指向的值不等於val,即說明其他程序修改了
  //uaddr指向的值,等待條件不再成立,不用阻塞直接返回。
       if (uval != val) {
                queue_unlock(*hb);
                ret = -EWOULDBLOCK;
        }

out:
        if (ret)
                put_futex_key(&q->key);
        return ret;
}

然後呼叫futex_wait_queue_me把當前程序掛起:

/**
 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
 * @hb:         the futex hash bucket, must be locked by the caller
 * @q:          the futex_q to queue up on
 * @timeout:    the prepared hrtimer_sleeper, or null for no timeout
 */
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
                                struct hrtimer_sleeper *timeout)
{
        /*
         * The task state is guaranteed to be set before another task can
         * wake it. set_current_state() is implemented using smp_store_mb() and
         * queue_me() calls spin_unlock() upon completion, both serializing
         * access to the hash list and forcing another memory barrier.
         */
        set_current_state(TASK_INTERRUPTIBLE);
        queue_me(q, hb);

        /* Arm the timer */
        if (timeout)
                hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);

        /*
         * If we have been removed from the hash list, then another task
         * has tried to wake us, and we can skip the call to schedule().
         */
        if (likely(!plist_node_empty(&q->list))) {
                /*
                 * If the timer has already expired, current will already be
                 * flagged for rescheduling. Only call schedule if there
                 * is no timeout, or if it has yet to expire.
                 */
                if (!timeout || timeout->task)
                        freezable_schedule();
        }
        __set_current_state(TASK_RUNNING);
}

futex_wait_queue_me主要做幾件事:

  1. 將當前程序插入到等待佇列,就是把futex_q掛到futex_hash_bucket
  2. 啟動定時任務
  3. 主動觸發核心程序排程