1. 程式人生 > >chromium源碼閱讀--進程的Message Loop

chromium源碼閱讀--進程的Message Loop

delta is_null b- sco run bar oop defer sin

上一篇總結了chromium進程的啟動,接下來就看線程的消息處理,這裏的線程包含進程的主進程。

消息處理是由base::MessageLoop中實現,消息中的任務和定時器都是異步事件的。

主要如下幾點:

1、消息的類型分類

2、延時處理的消息是如何實現

一、消息分類

chromium主要將消息類型如下定義:(chromium//src/base/message_loop/message_loop.h 112行)

1  enum Type {
2     TYPE_DEFAULT,
3     TYPE_UI,
4     TYPE_CUSTOM,
5     TYPE_IO,
6 #if
defined(OS_ANDROID) 7 TYPE_JAVA, 8 #endif // defined(OS_ANDROID) 9 };

1.TYPE_DEFAULT:

處理chromium定義的Task(閉包代碼塊)和定時器任務

2.TYPE_UI:

除了TYPE_DEFAULT定義的範圍,還支持原生的UI事件消息(比如用戶操作的窗口消息),MessageLoopForUI類

3.TYPE_IO:

除了TYPE_DEFAULT定義的範圍,還支持異步IO的事件消息,MessageLoopForIO類

4.TYPE_JAVA

是Android平臺的特有的消息消息,因為Android裏,有java消息和native消息分層,native消息與java消息交互,java消息與應用程序交互,可以看做java消息接管了native消息。

5.TYPE_CUSTOM

定制消息,比較少見使用。

消息類型的不同也就會創建不同的MessagePump。對於UI消息,不同的平臺也會有不同的實現。在chromium//src/base/message_loop/message_loop.cc 166行

技術分享
 1 // static
 2 std::unique_ptr<MessagePump> MessageLoop::CreateMessagePumpForType(Type type) {
 3 // TODO(rvargas): Get rid of the OS guards.
 4 #if defined(USE_GLIB) && !defined(OS_NACL)
 5
using MessagePumpForUI = MessagePumpGlib; 6 #elif (defined(OS_LINUX) && !defined(OS_NACL)) || defined(OS_BSD) 7 using MessagePumpForUI = MessagePumpLibevent; 8 #elif defined(OS_FUCHSIA) 9 using MessagePumpForUI = MessagePumpFuchsia; 10 #endif 11 12 #if defined(OS_IOS) || defined(OS_MACOSX) 13 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(MessagePumpMac::Create()) 14 #elif defined(OS_NACL) || defined(OS_AIX) 15 // Currently NaCl and AIX don‘t have a UI MessageLoop. 16 // TODO(abarth): Figure out if we need this. 17 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>() 18 #else 19 #define MESSAGE_PUMP_UI std::unique_ptr<MessagePump>(new MessagePumpForUI()) 20 #endif 21 22 #if defined(OS_MACOSX) 23 // Use an OS native runloop on Mac to support timer coalescing. 24 #define MESSAGE_PUMP_DEFAULT 25 std::unique_ptr<MessagePump>(new MessagePumpCFRunLoop()) 26 #else 27 #define MESSAGE_PUMP_DEFAULT 28 std::unique_ptr<MessagePump>(new MessagePumpDefault()) 29 #endif 30 31 if (type == MessageLoop::TYPE_UI) { 32 if (message_pump_for_ui_factory_) 33 return message_pump_for_ui_factory_(); 34 return MESSAGE_PUMP_UI; 35 } 36 if (type == MessageLoop::TYPE_IO) 37 return std::unique_ptr<MessagePump>(new MessagePumpForIO()); 38 39 #if defined(OS_ANDROID) 40 if (type == MessageLoop::TYPE_JAVA) 41 return std::unique_ptr<MessagePump>(new MessagePumpForUI()); 42 #endif 43 44 DCHECK_EQ(MessageLoop::TYPE_DEFAULT, type); 45 return MESSAGE_PUMP_DEFAULT; 46 }
View Code

二、延時消息如何處理

消息的處理與消息隊列密不可分,internal::IncomingTaskQueue實現了一個線程安全的消息隊列。 MessageLoop裏定義了(chromium//src/base/message_loop/message_loop.h 392行)

1 scoped_refptr<internal::IncomingTaskQueue> incoming_task_queue_;

接收到的消息就緩存在這個隊列裏。那麽我們先看看這個類的構造函數。

chromium//src/base/message_loop/incoming_task_queue.cc 52行
 1 IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
 2     : always_schedule_work_(AlwaysNotifyPump(message_loop->type())),
 3       triage_tasks_(this),
 4       delayed_tasks_(this),
 5       deferred_tasks_(this),
 6       message_loop_(message_loop) {
 7   // The constructing sequence is not necessarily the running sequence in the
 8   // case of base::Thread.
 9   DETACH_FROM_SEQUENCE(sequence_checker_);
10 }

構造函數裏通過MessageLoop的類型來初始化bool成員 always_schedule_work_ ,來判斷是否對消息進行調度, 並保存了message_loop指針。

繼續分析代碼,前面看到消息隊列已經初始化了,那接下來我們看看是怎麽往隊列裏添加任務的。

bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
                                           OnceClosure task,
                                           TimeDelta delay,
                                           Nestable nestable) {
  ......

  PendingTask pending_task(from_here, std::move(task),
                           CalculateDelayedRuntime(delay), nestable);
  ......
  return PostPendingTask(&pending_task);
}

使用了PendingTask對象,並計算了延遲的時間和是否是嵌套任務。那麽看PostPendingTask函數:

 1 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
 5   bool accept_new_tasks;
 6   bool schedule_work = false;
 7   {
 8     AutoLock auto_lock(incoming_queue_lock_);
 9     accept_new_tasks = accept_new_tasks_;
10     if (accept_new_tasks)
11       schedule_work = PostPendingTaskLockRequired(pending_task);
12   }
13 
14   if (!accept_new_tasks) {
19 pending_task->task.Reset(); 20 return false; 21 } 22 29 if (schedule_work) { 30 // Ensures |message_loop_| isn‘t destroyed while running. 31 AutoLock auto_lock(message_loop_lock_); 32 if (message_loop_) 33 message_loop_->ScheduleWork(); 34 } 35 36 return true; 37 }

這裏已經開始給線程加鎖了,那麽繼續看PostPendingTaskLockRequired函數:

 1 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
 2   incoming_queue_lock_.AssertAcquired();
 3   ......
 4   
 7   pending_task->sequence_num = next_sequence_num_++;
 8 
 9   task_annotator_.DidQueueTask("MessageLoop::PostTask", *pending_task);
10 
11   bool was_empty = incoming_queue_.empty();
12   incoming_queue_.push(std::move(*pending_task));
13 
14   if (is_ready_for_scheduling_ &&
15       (always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) {
21     message_loop_scheduled_ = true;
22     return true;
23   }
24   return false;
25 }

這裏看到pending_task是保存在incoming_queue_ 這裏使用了std::queue容器(一個FIFO的數據結構),這個隊列裏面的任務還沒有添加到MessageLoop中,也可以看到這裏還沒有明確任務的執行方式,使用的是FIFO隊列。

下面的幾個成員變量,則就是在MessageLoop中使用了。

chromium//src/base/message_loop/incoming_task_queue.h 217行

1 // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
2   TriageQueue triage_tasks_;
3 
4   // Queue for delayed tasks on the |sequence_checker_| sequence.
5   DelayedQueue delayed_tasks_;
6 
7   // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
8   DeferredQueue deferred_tasks_;

1、TriageQueue

這是第一個按默認的任務處理順序(FIFO)接受所有任務的隊列,這個隊列的任務要馬上執行或者放到下面的DelayedQueue 或者 DeferredQueue。

triage_tasks_隊列的任務是通過 IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue)來實現切換的,可以將 incoming_queue_ 和 triage_tasks_看成冷熱備份的緩存,在triage_tasks_隊列的任務執行完了,即為空時,就將待執行的incoming_queue_隊列的任務與之交換。

1 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
2   if (queue_.empty()) {
3     // TODO(robliao): Since these high resolution tasks aren‘t yet in the
4     // delayed queue, they technically shouldn‘t trigger high resolution timers
5     // until they are.
6     outer_->pending_high_res_tasks_ += outer_->ReloadWorkQueue(&queue_);
7   }
8 }

2、DelayedQueue

這個隊列是存放延遲執行的任務,並且按期望時間排序的

delayed_tasks_是一個優先級隊列,按delayed_run_time排序,chromium//src/base/pending_task.h 63行

1 // PendingTasks are sorted by their |delayed_run_time| property.
2 using DelayedTaskQueue = std::priority_queue<base::PendingTask>;

3、DeferredQueue

這個隊列通常是存放哪些因為MessageLoop嵌套而不能執行的任務,這些任務通常會在空閑的時候執行。

OK,看到這裏,我們回顧一下MessageLoop的執行流程:

1 void MessageLoop::Run() {
2   DCHECK_EQ(this, current());
3   pump_->Run(this);
4 }

由MessagePump來執行,那麽我們選擇默認的MessagePump來看Run的流程,chromium//src/base/message_loop/message_pump_default.cc 29行:

技術分享
 1 void MessagePumpDefault::Run(Delegate* delegate) {
 2   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
 3 
 4   for (;;) {
 5 #if defined(OS_MACOSX)
 6     mac::ScopedNSAutoreleasePool autorelease_pool;
 7 #endif
 8 
 9     bool did_work = delegate->DoWork();
10     if (!keep_running_)
11       break;
12 
13     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
14     if (!keep_running_)
15       break;
16 
17     if (did_work)
18       continue;
19 
20     did_work = delegate->DoIdleWork();
21     if (!keep_running_)
22       break;
23 
24     if (did_work)
25       continue;
26 
27     ThreadRestrictions::ScopedAllowWait allow_wait;
28     if (delayed_work_time_.is_null()) {
29       event_.Wait();
30     } else {
31       event_.TimedWaitUntil(delayed_work_time_);
32     }
33   }
34 }
View Code

其中的流程是,delegate->DoWork(), delegate->DoDelayedWork(&delayed_work_time_),delegate->DoIdleWork()。

也可以看其他平臺的MessagePump,主要的流程是一致的,都是delegate的函數,而delegate指向一個MessageLoop指針,那麽又回到MessageLoop中。

上面具體的DoWork就不詳述了,接下來看看延遲任務是如何實現的:

chromium//src/base/message_loop/message_loop.cc 473行

TimeTicks next_run_time =
      incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
  if (next_run_time > recent_time_) {
    recent_time_ = TimeTicks::Now();  // Get a better view of Now();
    if (next_run_time > recent_time_) {
      *next_delayed_work_time = next_run_time;
      return false;
    }
  }

  PendingTask pending_task = incoming_task_queue_->delayed_tasks().Pop();

  if (incoming_task_queue_->delayed_tasks().HasTasks()) {
    *next_delayed_work_time =
        incoming_task_queue_->delayed_tasks().Peek().delayed_run_time;
  }

  return DeferOrRunPendingTask(std::move(pending_task));

在下一個可執行時間到來了,就會從incoming_task_queue_->delayed_tasks().Pop() 出來一個task, 如果incoming_task_queue_->delayed_tasks()裏還有延遲任務,就取裏面優先級最高的任務的延遲時間作為下次判斷的時間。

到這裏,消息的處理和延遲任務的執行都完成了。

好了,回到上面的Run()函數流程,在DoIdleWork()完成之後,當前線程開始休眠,直到有新的任務來會重新喚醒。

嗯,這篇就到這裏了,下面一篇會總結IPC消息聲明和IPC channel的創建。

chromium源碼閱讀--進程的Message Loop