一、Chromium的消息循環

1.1 tls_delegate、thread_task_runner_tls

tls_delegate和thread_task_runner_tls是TLS(線程局部存儲)中兩個重要變數,都使用了延遲構造實例LazyInstance,在構造message_loop_時就會構造出它們。

在類型上,tls_delegate雖是RunLoop::Delegate,但其實就是MessageLoop。thread_task_runner_tls是SingleThreadTaskRunner(下面稱TaskRunner),type是TYPE_IO時,對應的真實類型是MessageLoopTaskRunner。

1.2 RunLoop、MessageLoop和MessagePumpForIO

圖1 Chromium中的消息循環(MessageLoop)

RunLoop血緣上是獨立的,即不是任何類的基類或派生類。要進行消息循環時直接用new base::RunLoop()構造。

std::unique_ptr<base::RunLoop> loop(new base::RunLoop());

RunLoop::Run()阻塞式執行消息循環,循環不執行完不退出。

RunLoop::Delegate* delegate_;
void RunLoop::Run() {
if (!BeforeRun())
return;
delegate_->Run(application_tasks_allowed);
AfterRun();
}

中間的delegate_->Run是主體,那delegate_是什麼?——類型是RunLoop::Delegate。RunLoop::Delegate是半協議類,真實執行時需要從它派生、實現出自個派生類,這個類就是MessageLoop。但是MessageLoop不僅繼承自RunLoop::Delegate,還有MessagePump::Delegate,圖1中後面的MessagePumpForIO須要這個繼承關係。

構造RunLoop時用不須要參數的new base::RunLoop(),MessageLoop實例是如何傳給成員變數delegate_?

RunLoop::RunLoop(Type type)
: delegate_(tls_delegate.Get().Get())
, ...
{...}

前面已說過,tls_delegate.Get().Get()返回該線程中的MessageLoop實例,所以只要是這線程中創建的RunLoop,都會共享這個MessageLoop。

綜上所述,delegate_->Run就是MessageLoop::Run,接下看它執行什麼。

std::unique_ptr<MessagePump> pump_;
void MessageLoop::Run(bool application_tasks_allowed) {
pump_->Run(this);
}

MessagePump是個半協議類,真正執行是它的派生類,這個派生類根據不同系統不同定義,像windows是MessagePumpWin。pump_是MessageLoop成員變員,在構造函數中創建(嚴格說是BindToCurrentThread)。

MessageLoop::MessageLoop(Type type, ...)
: type_(type)
, incoming_task_queue_(new internal::IncomingTaskQueue(this))
, unbound_task_runner_(new internal::MessageLoopTaskRunner(incoming_task_queue_))
, task_runner_(unbound_task_runner_)
......
{
......
pump_ = CreateMessagePumpForType(type_);
}

圖1中,MessagePumpForIO::DoRunLoop()是消息循環主體,它圍繞一個任務隊列(TaskQueue)不斷地進行循環,直到被通知停止為止。在圍繞任務隊列循環期間,它會不斷地檢查任務隊列是否為空。如果不為空,那麼就會將裡面的任務(Task)取出來,並且進行處理。這樣,一個線程如果要請求另外一個線程執行某一個操作,那麼只需要將該操作封裝成一個任務,並且發送到目標線程的任務隊列去即可。

在代碼中,可認為一個任務對應一次函數回調。

1.3 退出RunLoop::Run、DoIdleWork

DoIdleWork在DoRunLoop,主要功能是退出RunLoop::Run。

bool MessageLoop::DoIdleWork() {
if (ProcessNextDelayedNonNestableTask())
return true;

if (ShouldQuitWhenIdle())
pump_->Quit();
...
return false;
}

bool RunLoop::Delegate::ShouldQuitWhenIdle() {
return should_quit_when_idle_callback_.Run();
}

RunLoop::Delegate::Delegate()
: should_quit_when_idle_callback_(base::BindRepeating([](Delegate* self) {
return self->active_run_loops_.top()->quit_when_idle_received_;
},
...
{...}

由以上代碼可得知,當該線程最項端RunLoop的quit_when_idle_received_是true時,執行DoIdleWork將導致退出RunLoop::Run。有什麼方法讓quit_when_idle_received_置為true?1)RunLoop::QuitWhenIdle(),2)RunLoop::QuitCurrentWhenIdleDeprecated()(static函數),3)向它投遞base::MessageLoop::QuitWhenIdleClosure()。以下是幾種退出RunLoop::Run方法。

  • RunLoop::Quit()。直接退出,不用等到DoIdleWork。
  • RunLoop::QuitWhenIdle()。要等到DoIdleWork。
  • RunLoop::QuitCurrentWhenIdleDeprecated()(static函數)。基於QuitWhenIdle,要等到DoIdleWork。
  • base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, base::MessageLoop::QuitWhenIdleClosure())。基於QuitWhenIdle,要等到DoIdleWork。

1.4 在非Thread創建的線程使用消息循環

chromium中,線程base::Thread的成員message_loop_指示該線程要使用的消息循環(MessageLoop),Thread::StartWithOptions時則創建該成員(過程可參考「Chromium中Thread的創建」)。但那些非base::Thread創建的線程,該怎麼使用消息循環。

由「1.2 RunLoop、MessageLoop和MessagePumpForIO」可知,有了MessageLoop就可創建RunLoop,創建MessageLoop時會自動創建MessagePumpForIO,因而消息循環的核心是創建MessageLoop。只要有了MessageLoop,app就可像在使用base::Thread中的一樣,用它構造RunLoop,然後調用RunLoop::Run。於是如何使用消息循環問題就歸結為如何創建MessageLoop。

void trtspcapture::VideoReceiveStream::DecodeThreadFunction(void* ptr)
{
base::MessageLoop message_loop(base::MessageLoop::TYPE_IO);
message_loop.set_runloop_quit_with_delete_tasks(true);
while (static_cast<trtspcapture::VideoReceiveStream*>(ptr)->Decode()) {}
}

以上代碼是「Chromium(3/4):rtsp客戶端」中DecodingThread的線程函數,DecodingThread是webrtc創建。為了向socket thread接收幀,它需要一個消息循環。正如上面代碼所示,創建消息循環只要一條構造MessageLoop語句就夠了,後面set_runloop_quit_with_delete_tasks和清空任務隊列有關,詳見後面的「1.6 清空任務隊列」。

對一般線程,的確一條構造MessageLoop語句就夠了,但對主線程(以main開始的函數),還需再構造一個TaskScheduler對象。TaskScheduler會啟動一個線程(TaskSchedulerServiceThread),該線程執行調度任務。其它線程想在後臺執行任務A,但又不想新開一個線程,就把任務A投遞到調度線程。要用這功能的一個例子是HostResolverImpl在解析域名時,解析域名要花點時間,它就把解析任務投遞到TaskScheduler。

proc_task_runner_ = base::CreateTaskRunnerWithTraits({base::MayBlock(), ...);
......
proc_task_runner_->PostTask(FROM_HERE, base::Bind(&ProcTask::DoLookup, ...));

調用TaskScheduler::GetInstance()可得到TaskScheduler對象,全局只有一個,存儲在g_task_scheduler。

chromium已考慮到有app會在主線程使用消息循環和創建TaskScheduler,提供了一個叫base::test::ScopedTaskEnvironment類,只要構造該類的對象就能實現那兩個操作。Rose自寫了base::rose::ScopedTaskEnvironment,它源自base::test::ScopedTaskEnvironment,差別只是去掉了TestMockTimeTaskRunner。具體實現上,base_instance內置類型是ScopedTaskEnvironment的成員chromium_env_,base_instance構造函數中被構造。

base_instance::base_instance(...)
: chromium_env_(base::rose::ScopedTaskEnvironment::MainThreadType::IO)
, ......
{......}

1.5 任務如何加入TaskQueue

讓重看上面的MessageLoop構造函數,incoming_task_queue_對應TaskQueue,要由它繼續構造task_runner_,這個TaskRunner是什麼作用?——要有一個線程間可共享對象向外暴露這個TaskQueue,Chromium沒有把TaskQueue作為共享對象,而是TaskRunner。於是想操作TaskQueue時,用靜態方法base::ThreadTaskRunnerHandle::Get()先得到TaskRunner,然後調用它的成員函數把任務加入到TaskQueue。

  • PostTask,發送需要馬上進行處理的並且可以在嵌套消息循環中處理的消息。
  • PostDelayedTask,發送需要延遲處理的並且可以在嵌套消息循環中處理的消息。
  • PostNonNestableTask,發送需要馬上進行處理的並且不可以在嵌套消息循環中處理的消息。
  • PostNonNestableDelayedTask,發送需要延遲處理的並且不可以在嵌套消息循環中處理的消息。

bool MessageLoopTaskRunner::PostDelayedTask(const Location& from_here, OnceClosure task, base::TimeDelta delay)
{
return incoming_queue_->AddToIncomingQueue(from_here, std::move(task), delay, Nestable::kNestable);
}

上面是TaskRunner的PostDelayedTask實現,直接把任務派發向內中的incoming_queue_這個TaskQueue。

1.6 清空任務隊列

如果不做修改,析構RunLoop不會清空TaskQueue中的任務,這可能會造成不便。還是以「Chromium(3/4):rtsp客戶端」中的DecodingThread為例,它向socket線程投遞收一幀的任務後,就向自已投遞5秒後執行的Delayed任務:quit_runloop。如果5秒內socket線程收到一幀,RunLoop退出,否則執行quit_runloop,叫退出RunLoop。也就是說,當第N次是5秒內退出時,此次投遞的quit_runloop將不會被執行,而是放在了TaskQueue中的delayed_tasks_,當運行到第N+i次RunLoop::Run,時間離第N次投遞時刻超過5秒,於是調用第N次投遞的quit_runloop,從而造成本次RunLoop不該有的退出。為此有個疑問,有沒有辦法主動清空TaskQueue中任務。

void MessageLoop::DeletePendingTasks() {
incoming_task_queue_->triage_tasks().Clear();
incoming_task_queue_->deferred_tasks().Clear();
incoming_task_queue_->delayed_tasks().Clear();
}

DeletePendingTasks是Chromium提供的清任務函數,但光有它是不夠的。

圖2 task-A在TaskQueue中流動過程

bool MessageLoop::DoWork() {
...
while (incoming_task_queue_->triage_tasks().HasTasks()) {
PendingTask pending_task = incoming_task_queue_->triage_tasks().Pop();
if (!pending_task.delayed_run_time.is_null()) {
...
incoming_task_queue_->delayed_tasks().Push(std::move(pending_task));
} else if (DeferOrRunPendingTask(std::move(pending_task))) {
return true;
}
}

向TaskQueue投遞task-A時,任務首先被放在incoming_tasks_。執行MessageLoop::DoWork()期間,調用triage_tasks_.HasTask(),把任務從incoming_tasks_移到triage_tasks_,並且判斷任務是否設了延時。沒延時的立即執行,有延時的放到delayed_tasks_。假設發生一種情況,task-A被執行期間,像處於DeferOrRunPendingTask時,TaskQueue又收到了task-B,讓分析會發生什麼。後續DeferOrRunPendingTask(task-A)返回true後,會退出DoWork,一旦執行task-A會設置state_->should_quit=true,此次DoWork退出會導致RunLoop:Run的退出,就出現一個結果:此時的task-B是放在incoming_task_queue_,而不是任何一個DeletePendingTasks清空的x_tasks中。以下是自寫的清空任務的增強版。

void MessageLoop::DeletePendingTasksEx() {
if (!runloop_quit_with_delete_tasks_) {
return;
}
DeletePendingTasks();
if (incoming_task_queue_->triage_tasks().HasTasks()) {
incoming_task_queue_->triage_tasks().Clear();
}
}

RunLoop::~RunLoop() {
delegate_->DeletePendingTasksEx();
}

清空任務邏輯是這樣:如果該MessageLoop設了runloop_quit_with_delete_tasks_=true,基於它的RunLoop::Run被析構時,會自動清空TaskQueue中任務。runloop_quit_with_delete_tasks_默認值是false,可用set_runloop_quit_with_delete_tasks修改它。

void MessageLoop::set_runloop_quit_with_delete_tasks(bool with_delete_tasks) {
runloop_quit_with_delete_tasks_ = with_delete_tasks;
}

Rose主線程的MessageLoop由base::rose::ScopedTaskEnvironment構造,默認就把runloop_quit_with_delete_tasks_設為true。即基於Rose寫的app,對主線中的RunLoop,析構時一定會清空任務隊列。

二、Chromium的socket庫

幾個關於socket庫的結論。

  • 基於同一個socket上的操作必須放在同一個線程,包括創建、連接、讀、寫、關閉。——如何快速得出這結論?TCPSocketWin有個thread_checker_成員,執行操作時會要求DCHECK_CALLED_ON_VALID_THREAD(thread_checker_)。
  • 只支持非同步方式,當操作返回值是ERR_IO_PENDING,表示此次是非同步返回。
  • 如果Connect/Read/Write返回ERR_IO_PENDING,在沒觸發下次連接完成/可讀/可寫事件前,禁止再次調用Connect/Read/Write。——如何快速得出這結論?像TCPSocketWin::Read首先會要求core_->read_iobuffer_必須是nullptr,一次觸發了ERR_IO_PENDING的非同步讀會把read_iobuffer_設為有效值,此個非同步讀的後半部完成後,read_iobuffer_恢復到nullptr。

2.1 非同步(Async/Unblock/Overlapped)

以Windows為例,描述Chromium如何實現非同步。

圖3 Windows下的非同步實現
  1. app在socket線程調用socket庫的api:Read。buf、len是和收到數據後要放到緩衝區相關的兩個參數。callback是發生非同步時,recv讀出數據後要調用的函數,它會保存到變數read_callback_。如果此次是同步讀出,不會調用這函數。
  2. Read調用內部函數ReadIfReady,ReadIfReady也須要一個回調函數作為參數,值是RetryRead,它會保存到變數read_if_ready_callback_。
  3. ReadIfReady調用系統api:recv。在Windows,recv返回SOCKET_ERROR、並且WSAGetLastError返回WSAEWOULDBLOCK,表示發生非同步了。接下調用core_->WatchForRead()。
  4. WatchForRead會調用ObjectWatcher::StartWatchingInternal。後者執行三個操作,1)調用SequencedTaskRunnerHandle::Get()得到本線程的TaskRunner,賦給task_runner_,後面要用它把運行環境從ntdll.dll線程轉到socket線程。2)把ObjectWatcher::Signal賦給callback_。3)調用系統api:RegisterWaitForSingleObject,等待操作系統完成此次非同步讀。對RegisterWaitForSingleObject中參數,回調設的是DoneWaiting。等待時間設INFINITE,即無限等待,app想中止等待,只能通過關閉這個socket。——至此TCPSocketWin::Read結束了,返回值ERR_IO_PENDING。
  5. 將來某個時刻,系統檢測到這個socket上的非同步讀完成,調用ObjectWatcher::DoneWaiting。運行DoneWaiting是系統內部線程(ntdll.dll),為便於後續處理,立即向task_runner_投遞任務callback_,即ObjectWatcher::Signal。
  6. 上面已說過,task_runner_是socket線程的TaskRunner,於是轉為在socket線程執行ObjectWatcher::Signal,它基本等同調用ReadDelegate::OnObjectSignaled。
  7. OnObjectSignaled調用TCPSocketWin::DidSignalRead,後者調用read_if_ready_callback_,即RetryRead。RetryRead執行兩個操作,1)調用系統函數recv讀出數據,2)調用app在Read時傳入的回調read_callback_。——至此操作轉入app在Read時傳入的回調,運行環境是socket線程。

2.2 讓代碼同時支持同步和非同步

不論是Connect,還是Read、Write,它們都有可能非同步(ERR_IO_PENDING)或同步返回(非ERR_IO_PENDING),代碼要同時支持這兩種情況。

<chromium>/net/socket/transport_client_socket_pool.cc
int TransportConnectJob::DoLoop(int result) {
DCHECK_NE(next_state_, STATE_NONE);

int rv = result;
do {
State state = next_state_;
next_state_ = STATE_NONE;
switch (state) {
case STATE_RESOLVE_HOST:
DCHECK_EQ(OK, rv);
rv = DoResolveHost();
break;
case STATE_RESOLVE_HOST_COMPLETE:
rv = DoResolveHostComplete(rv);
break;
case STATE_TRANSPORT_CONNECT:
DCHECK_EQ(OK, rv);
rv = DoTransportConnect();
break;
case STATE_TRANSPORT_CONNECT_COMPLETE:
rv = DoTransportConnectComplete(rv);
break;
default:
NOTREACHED();
rv = ERR_FAILED;
break;
}
} while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);

return rv;
}

TransportConnectJob用一個狀態機描述連接過程,DoTransportConnect負責連接向一個ip地址,以它為例看代碼是如何同時支持同步和非同步。

void TransportConnectJob::OnIOComplete(int result) {
result = DoLoop(result);
if (result != ERR_IO_PENDING) {
...
}
}

int TransportConnectJob::DoTransportConnect() {
next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
...
transport_socket_ = client_socket_factory_->CreateTransportClientSocket(addresses_, ...);
...
int rv = transport_socket_->Connect(base::Bind(&TransportConnectJob::OnIOComplete, base::Unretained(this)));
return rv;
}

DoTransportConnect調用Connect,當Connect是同步返回時,即0。回到上層TransportConnectJob::DoLoop,滿足while繼續循環條件「rv != ERR_IO_PENDING && next_state_ != STATE_NONE」,於是進入下一狀態STATE_TRANSPORT_CONNECT_COMPLETE。

當Connect是非同步返回時,即ERR_IO_PENDING。回到上層TransportConnectJob::DoLoop,不再滿足while繼續循環條件,DoLoop以ERR_IO_PENDING退出。將來某個時刻,系統檢測到這個socket上的非同步連接完成,Connect時設的回調TransportConnectJob::OnIOComplete被調用,後者調用DoLoop,於是也和同步時一樣,進入下一狀態STATE_TRANSPORT_CONNECT_COMPLETE。

代碼要同時支持同步和非同步,使用下面建議。

  • 用一個狀態機,DoLoop處理這個狀態機。
  • DoLoop中的while循環使用條件:rv != ERR_IO_PENDING && next_state_ != STATE_NONE。
  • 設置的非同步回調函數OnIOComplete,直接調用DoLoop。
  • 非OnIOComplete調用DoLoop時,要注意返回值。是ERR_IO_PENDING時,表示後續任務得觸發非同步完成才能處理了,當前極可能得結束任務,讓非同步完成觸發的函數(OnIOComplete)處理後續工作。是非ERR_IO_PENDING時,則不要結束當前任務,繼續處理後續工作。

推薦閱讀:

相關文章