muduo_base程式碼剖析之Thread/Mutex/MutexLockGuard /Condition/CountDownLatch
阿新 • • 發佈:2018-11-26
(1) 執行緒:Thread、CurrentThread
Thread.h、Thread.cc程式碼解讀
功能介紹:
- 建立執行緒時,把執行緒函式當作形參傳遞進去:muduo::Thread t(std::bind(threadFunc2, 42), “thread for free function with argument”);
說明:執行緒形參func_是void func()型別的,可以使用boost::bind進行適配 - 啟動執行緒,使執行緒回撥執行緒函式func_:t.start()
- 使執行緒不先退出:t.join
原始碼分析:Thread類
class Thread : noncopyable
{
public:
typedef std::function<void ()> ThreadFunc; //執行緒函式型別
//建立執行緒時,形參:func是執行緒函式,當start()時,func()開始執行
explicit Thread(ThreadFunc func, const string& n)
: started_(false),joined_(false),pthreadId_(0),tid_(0),
func_(std::move(func)), //執行緒函式
name_ (n),latch_(1)
{
setDefaultName();
}
~Thread();
void start() //最後呼叫了傳進來的func()函式
{//呼叫順序:start()-->pthread_create()-->startThread()-->runInThread()-->func_()
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData( func_, name_, &tid_, &latch_);
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
{
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
else
{
latch_.wait();
assert(tid_ > 0);
}
}
int join();
bool started() const { return started_; }
pid_t tid() const { return tid_; }
const string& name() const { return name_; }
static int numCreated() { return numCreated_.get(); }
private:
void setDefaultName();
bool started_;
bool joined_;
pthread_t pthreadId_;
pid_t tid_;
ThreadFunc func_; //執行緒函式
string name_;
CountDownLatch latch_;
static AtomicInt32 numCreated_;
};
測試程式碼 ./test/Thread_test.c
#include <muduo/base/Thread.h>
#include <muduo/base/CurrentThread.h>
#include <string>
#include <stdio.h>
#include <unistd.h>
void threadFunc()
{
printf("tid=%d\n", muduo::CurrentThread::tid());
}
void threadFunc2(int x)
{
printf("tid=%d, x=%d\n", muduo::CurrentThread::tid(), x);
}
class Foo
{
public:
explicit Foo(double x)
: x_(x)
{
}
void memberFunc()
{
printf("tid=%d, Foo::x_=%f\n", muduo::CurrentThread::tid(), x_);
}
void memberFunc2(const std::string& text)
{
printf("tid=%d, Foo::x_=%f, text=%s\n", muduo::CurrentThread::tid(), x_, text.c_str());
}
private:
double x_;
};
int main()
{
printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
//threadFunc無參,無需適配
muduo::Thread t1(threadFunc);
t1.start();
printf("t1.tid=%d\n", t1.tid());
t1.join();
//threadFunc一個引數,需適配
muduo::Thread t2(std::bind(threadFunc2, 42),
"thread for free function with argument");
t2.start();
printf("t2.tid=%d\n", t2.tid());
t2.join();
//Foo物件的成員函式memberFunc
Foo foo(87.53);
muduo::Thread t3(std::bind(&Foo::memberFunc, &foo),
"thread for member function without argument");
t3.start();
t3.join();
//Foo物件的成員函式memberFunc2
muduo::Thread t4(std::bind(&Foo::memberFunc2, std::ref(foo), std::string("Shuo Chen")));
t4.start();
t4.join();
printf("number of created threads %d\n", muduo::Thread::numCreated());
}
CurrentThread.h
namespace CurrentThread
{
# __thread修飾的變數是執行緒區域性儲存的,每個執行緒都有一份
__thread int t_cachedTid = 0; # 執行緒真實pid(tid)的快取,是為
#了提高獲取tid的效率,較少::syscall(SYS_gettid)系統呼叫的次數
__thread char t_tidString[32]; # tid的字串表示形式
__thread int t_tidStringLength = 6;
__thread const char* t_threadName = "unknown"; #每個執行緒的名稱
(2)互斥鎖:MutexLock、MutexLockGuard
MutexLock
功能:對Linux C下的互斥鎖Mutex進行了封裝,功能一致
class CAPABILITY("mutex") MutexLock : noncopyable
{
private:
pthread_mutex_t mutex_;
pid_t holder_; //當前擁有該鎖的真實的執行緒id
public:
MutexLock()
: holder_(0)
{
MCHECK(pthread_mutex_init(&mutex_, NULL));
}
~MutexLock()
{
assert(holder_ == 0);
MCHECK(pthread_mutex_destroy(&mutex_));
}
// must be called when locked, i.e. for assertion
bool isLockedByThisThread() const // 是否當前執行緒擁有該鎖
{
return holder_ == CurrentThread::tid();
}
void assertLocked() const ASSERT_CAPABILITY(this)
{
assert(isLockedByThisThread());
}
// internal usage
void lock() ACQUIRE() //加鎖
{
MCHECK(pthread_mutex_lock(&mutex_));
assignHolder();
}
void unlock() RELEASE()
{
unassignHolder();
MCHECK(pthread_mutex_unlock(&mutex_));
}
pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_;
}
... ...
};
MutexLockGuard
與MutexLock不同點:當MutexLockGuard型別的鎖物件被銷燬時,將呼叫解構函式,導致加的鎖將自動釋放
class SCOPED_CAPABILITY MutexLockGuard : noncopyable
{
public:
explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex)
: mutex_(mutex) // 構造時,對mutex_加鎖
{
mutex_.lock();
}
~MutexLockGuard() RELEASE() //析構時,對mutex_解鎖
{
mutex_.unlock();
}
private:
MutexLock& mutex_;
};
// 巨集:不允許定義一個匿名的MutexLockGuard物件
#define MutexLockGuard(x) error "Missing guard object name"
示例程式碼
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Timestamp.h>
#include <vector>
#include <stdio.h>
using namespace muduo;
using namespace std;
MutexLock g_mutex;
vector<int> g_vec;
const int kCount = 10*1000*1000;
void threadFunc()
{
for (int i = 0; i < kCount; ++i)
{
MutexLockGuard lock(g_mutex); //建構函式中,加鎖
g_vec.push_back(i);
} //函式退出,呼叫MutexLockGuard解構函式,解鎖(因此不需要手動的解鎖)
}
int main()
{
const int kMaxThreads = 8;
g_vec.reserve(kMaxThreads * kCount);
//單個執行緒不加鎖
Timestamp start(Timestamp::now());
for (int i = 0; i < kCount; ++i)
{
g_vec.push_back(i);
}
printf("single thread without lock %f\n", timeDifference(Timestamp::now(), start));
//單個執行緒加鎖
start = Timestamp::now();
threadFunc();
printf("single thread with lock %f\n", timeDifference(Timestamp::now(), start));
//分別建立1、2、3、4、5、6、7個執行緒,統計時間開銷
for (int nthreads = 1; nthreads < kMaxThreads; ++nthreads)
{
std::vector<std::unique_ptr<Thread>> threads;
g_vec.clear();
start = Timestamp::now();
for (int i = 0; i < nthreads; ++i)
{
threads.emplace_back(new Thread(&threadFunc));
threads.back()->start();
}
for (int i = 0; i < nthreads; ++i)
{
threads[i]->join();
}
printf("%d thread(s) with lock %f\n", nthreads, timeDifference(Timestamp::now(), start));
}
}
(3) 條件變數:Condition、CountDownLatch
Condition
主要功能:對Linux C下的條件變數Condition進行了封裝,功能一致
class Condition : noncopyable
{
private:
MutexLock& mutex_; // 條件變數要配合互斥鎖一起使用
pthread_cond_t pcond_;
public:
explicit Condition(MutexLock& mutex)
: mutex_(mutex)
{
MCHECK(pthread_cond_init(&pcond_, NULL));
}
~Condition()
{
MCHECK(pthread_cond_destroy(&pcond_));
}
void wait()
{
MutexLock::UnassignGuard ug(mutex_);
MCHECK(pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()));
}
// returns true if time out, false otherwise.
bool waitForSeconds(double seconds) // 呼叫了pthread_cond_timedwait
{
struct timespec abstime;
// FIXME: use CLOCK_MONOTONIC or CLOCK_MONOTONIC_RAW to prevent time rewind.
clock_gettime(CLOCK_REALTIME, &abstime);
const int64_t kNanoSecondsPerSecond = 1000000000;
int64_t nanoseconds = static_cast<int64_t>(seconds * kNanoSecondsPerSecond);
abstime.tv_sec += static_cast<time_t>((abstime.tv_nsec + nanoseconds) / kNanoSecondsPerSecond);
abstime.tv_nsec = static_cast<long>((abstime.tv_nsec + nanoseconds) % kNanoSecondsPerSecond);
MutexLock::UnassignGuard ug(mutex_);
return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
}
void notify()
{
MCHECK(pthread_cond_signal(&pcond_));
}
void notifyAll()
{
MCHECK(pthread_cond_broadcast(&pcond_));
}
};
CountDownLatch
CountDownLatch類,對條件變數類進行了封裝
- 使用條件變數Condition,可以使得執行緒阻塞在一個條件上
- 使用CountDownLatch,可以使得執行緒阻塞在count_變數是否為0上。那麼,哪個執行緒將會阻塞在count!=0條件上呢?答案:呼叫wait()函式的執行緒
class CountDownLatch : noncopyable
{
private:
mutable MutexLock mutex_;
Condition condition_ ;
int count_ ;
public:
explicit CountDownLatch::CountDownLatch(int count)
: mutex_(),condition_(mutex_),count_(count)
{
}
//呼叫wait()函式的執行緒,當count_>0時將會阻塞在wait()函式處
void wait()
{
MutexLockGuard lock(mutex_);
while (count_ > 0)
{
condition_.wait();
}
}
void countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_.notifyAll();
}
}
int getCount() const
{
MutexLockGuard lock(mutex_);
return count_;
}
};