enhanced future task system
future介紹
在多線程環境下,我們經常需要同時啟動多個任務。有些任務是比較耗時,而且我們並不急於獲得其結果。對於這些任務,我們可以使用std::future
和std::async
來封裝其非同步執行流程。通過std::async
來註冊非同步任務,然後返回一個該非同步結果的句柄std::future
。
std::future < double > result_future = std::async(std::power,2, 1000);
當我們需要得到這個非同步過程的結果時,我們可以顯示的請求。
double result=result_future.get();
如果該非同步過程已經執行完,則可以直接獲得結果;如果還未執行,則當前線程被阻塞,直到執行完成。
在後文中,我們將分為兩個部分:
- 非同步過程的執行,即處理
std::async
封裝後的任務調度; - 非同步過程的生成,即處理
future=async()
中非同步過程的註冊和結果句柄的生成; - 非同步過程的串聯,即處理
future=future.then(async())
中非同步過程的序列化。
非同步過程的執行
基本執行結構
首先我們考慮無返回值、無參數的非同步過程的執行。在這種情況下,最直接的處理方案就是利用一個隊列來存儲提交的非同步任務,同時建立一個線程池來消費這個任務隊列。為此,我們需要實現兩個部分:多線程的任務隊列,以及任務的提交和請求。
對於多線程的任務隊列,可以參考下面代碼
class notification_queue
{
queue<function<void()>> _q;
bool _done{ false };
mutex _mutex;
condition_variable _ready;
public:
void done()
{
{
lock_guard<mutex> lock{ _mutex };
_done = true;
}
_ready.notify_all();
}
bool try_pop(function<void()>& x)
{
unique_lock<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return !_q.empty()||_done;
});
if (_q.empty()||done)
{
return false;
}
x = move(_q.front());
_q.pop();
return true;
}
template<typename F>
bool try_push(F&& f)
{
{
lock_guard<mutex> lock{ _mutex,try_to_lock };
if (!lock||done)
{
return false;
}
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
return true;
}
};
上述代碼與一般的多線程隊列的不同之處在於增加了done
這個bool
變數,用來停止任務的提交和請求。其實更好的多線程任務隊列實現應該以std::shared_ptr
作為返回值,以防止內存分配時異常所導致的數據不一致。當前文檔只是為了做概念性說明作用,因此以最簡實現作為展示。
在此多線程隊列的支持下,初步的線程池系統可以有如下實現:
class task_system
{
const unsigned _count{ thread::hardware_concurrency() };
vector<thread> _threads;
notification_queue _q;
bool _done{false};
void run()
{
while (!done)
{
function<void()> f;
if (!_q.try_pop(f))
{
this_thread.yield();
}
else
{
f();
}
}
}
public:
task_system()
{
for (unsigned n = 0; n != _count; ++n)
{
_threads.emplace_back([&] { run(); });
}
}
~task_system()
{
_done=true;
_q.done();
for (auto& e : _threads)
{
e.join();
}
}
template <typename F>
bool async_(F&& f)
{
return _q.try_push(forward<F>(f));
}
void done()
{
_done = true;
}
};
上述代碼雖然實現了一個簡單的線程池,但是在效率上是有問題的。主要的原因就是所有的工作線程都在爭奪任務隊列的控制權,產生了contention
。為了緩解contention
現象,可以從以下兩個方面來入手:
- 顯式的以待頭節點的鏈表來實現隊列,從而使得任務的提交和請求所需要的鎖分開;
- 為每一個線程分配一個專有的任務隊列,同時允許線程向其他任務隊列請求任務,即
work_steal
。
良構鎖隊列
所謂良構鎖隊列,就是儘可能的減少不必要的鎖佔用時間。因此,我們採取鏈表來作為良構鎖隊列的底層數據結構,其數據結構如下:
template<typename T>
class notification_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
}
這裡我們存儲了鏈表的頭節點和尾節點,同時為頭節點和尾節點添加相應的鎖。同時我們將鏈表初始化為有一個頭節點的鏈表:
notification_queue() :head(new node),tail(head.get())
{
}
此外,在此鏈表中加入一個頭節點,以減少pop
操作對於尾節點鎖的爭用。對於push
操作來說,實現是很直白的,因為這個過程只涉及到了尾節點的鎖。唯一需要注意的一點是在獲得鎖之前就通過make_shared
分配好內存空間,以防止在鎖的作用域內拋出異常,從而導致數據不一致。
void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail = p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
此時的pop
操作則需要小心:
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
};
bool try_pop(T& result)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return false;
}
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
result = *(old_head->data);
return true;
}
在try_pop
的過程中,我們首先判斷當前鏈表是否為空,這裡的get_tail
必須放在head_lock
之後。否則的話:我們獲得head
時,可能其他線程已經多次進行了try_pop
和push
。從而導致head
獲得時,隊列可能已經為空,而之前獲得的尾節點所指向的並非當前尾節點。在這種情況下head
與 tail
的比較失敗,返回head
,從而導致錯誤。
work_steal線程池
work_steal
的實現比較直白。首先我們修改其數據結構為:
class task_system
{
const unsigned _count{ thread::hardware_concurrency() };
vector<thread> _threads;
vector<notification_queue> _q{ _count };
atomic<unsigned> _index{ 0 };
bool _done{ false };
}
對於每一個邏輯執行單元,我們都分配一個專有任務隊列。提交新任務時,我們直接採取輪轉法來平衡負載,用index
自增來實現:
template <typename F>
void async_(F&& f)
{
auto i = _index++;
for (unsigned n = 0; n != _count; ++n)
{
if (_q[(i + n) % _count].try_push(forward<F>(f)))
{
return;
}
}
_q[i % _count].push(forward<F>(f));
}
而對於run
函數,我們添加一個參數來指明該線程所對應的專有任務隊列:
void run(unsigned i)
{
while (true)
{
function<void()> f;
for (unsigned n = 0; n != _count; ++n)
{
if (_q[(i + n) % _count].try_pop(f))
{
break;
}
}
if (!f && !_q[i].try_pop(f))
{
break;
}
f();
}
}
同時,線程池的構造函數和析構函數也要做相應的修改,以實現RAII:
task_system()
{
for (unsigned n = 0; n != _count; ++n)
{
_threads.emplace_back([&, n] { run(n); });
}
}
~task_system()
{
for (auto& e : _q)
{
e.done();
}
for (auto& e : _threads)
{
e.join();
}
}
至此,一個帶work_steal
的線程池就完成了
非同步過程的生成
返回值保存
現有的代碼處理的只是function<void()>
類型的任務。對於普通函數來說,函數簽名則是function<R(Args...)>
的。但是非同步提交任務時其參數列表就已經確定,需要處理的只是返回值,即function<R()>
。為此,我們可以定義一個shared_base
,來存儲返回值相關信息。
template <typename R>
struct shared_base
{
optional<R> _r;
mutex _mutex;
condition_variable _ready;
};
這裡的optional<R> _r
用來存儲非同步過程的執行結果。
下面就是這個類所附帶的幾個函數:虛析構函數、set
、get
。
virtual ~shared_base() {}
void set(R&& r)
{
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
}
_ready.notify_all();
}
const R& get()
{
lock_guard<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return static_cast<bool>(_r);
});
return _r.value();
}
這樣,我們就可以在非同步調用中調用set
函數來填充返回值,同時在future
中調用get
來等待返回值。為了在等待線程和工作線程中同時操作這個shared_base
對象,我們需要以 shared_ptr
的形式來保存這個對象的指針。自此,future
所需要的元素都已具備,定義future
來保存非同步過程的結果句柄,其簡單實現如下所示:
template <typename R>
class future
{
shared_ptr<R> _p;
explicit future(shared_ptr<R> p) : _p(move(p))
{
}
future() = default;
template <typename F>
const R& get() const
{
return _p->get();
}
};
這裡的R
就是shared_base<T>
類型。
非同步過程封裝
由於線程池中只支持function<void()>
類型的調用,為此我們需要在shared_base
的基礎上進一步做封裝。
首先,我們將返回值封裝起來:
template <typename R, typename... Args>
struct shared<R(Args...)> : shared_base<R>
{
function<R(Args...)> _f;
template<typename F>
shared(F&& f) : _f(forward<F>(f)) {}
template <typename... A>
void operator()(A&&... args)
{
this->set(_f(forward<A>(args)...));
_f = nullptr;
}
};
當前的shared
結構既保存了返回值相關信息,還保留了函數的相關信息,即當前類型是一個可調用對象。在這個對象之上,我們再定義一個packaged_task
。
template<typename R, typename ...Args >
class packaged_task<R(Args...)>
{
weak_ptr<shared<R(Args...)>> _p;
explicit packaged_task(weak_ptr<shared<R(Args...)>> p) : _p(move(p)) {}
public:
packaged_task() = default;
template <typename... A>
void operator()(A&&... args) const
{
auto p = _p.lock();
if (p)
{
(*p)(forward<A>(args)...);
}
}
};
這個packaged_task
中存儲了一個weak_ptr<shared<R(Args...)>>
,用來保留所有的調用信息和返回值信息。至於這裡為什麼用weak_ptr
,可能是為了處理某些特殊情況:例如非同步結果直接被拋棄,此時則沒有必要去執行該非同步過程。即非同步過程的所有者是其返回值的所有者,而不是過程本身。
至此,我們把非同步結果的存儲和非同步過程的存儲都解決了,剩下的問題就是:我們如何根據一個可調用對象生成future
和packaged_task
。為此,我們定義一個新的中間函數package
來執行次任務:
template <typename S, typename F>
auto package(F&& f) -> pair<packaged_task<S>, future<result_of_t_<S>>>
{
auto p = make_shared<shared<S>>(forward<F>(f));
return make_pair(packaged_task<S>(p), future<result_of_t_<S>>(p));
}
template <typename F, typename ...Args>
auto async(F&& f, Args&&... args)
{
using result_type = result_of_t<F(Args...)>;
using packaged_type = packaged_task<result_type()>;
auto pack = package<result_type()>(bind(forward<F>(f), forward<Args>(args)...));
_system.async_(move(get<0>(pack)));
return get<1>(pack);
}
這個package
函數做了兩件事:
- 根據所提供的函數參數
f
來生成一個保存了返回值和參數f
的shared<S>
,用shared_ptr p
來控制其所有權。這裡利用了result_of_t
這個type_traits
,其作用是根據函數簽名返回該函數的返回值類型。 - 根據上一步生成的
shared_ptr p
構造packaged_task
和future
對象。這裡利用了兩個自動類型轉換:一個是由shared_ptr<shared<result_type()>>
構造出一個weak_ptr <shared<result_type()>>
,另外一個就是根據shared_ptr<shared<result_type()>>
構造出一個shared_ptr<shared_base<result_type>>
。在自動類型轉換之後,再調用這兩個對象相應的構造函數。
同時,在async
中,主要包括四個過程。
- 通過
bind
將函數f
與其相應的參數args
綁定起來,生成了一個function<result_type()
類型的可調用對象。 - 將上一步構造出的臨時可調用對象作為
package
的參數,生成了包含package_task<result_type()>
和future<result_type>
的pair pack
。 - 這個
pair
中所包含的package_task
在當作一個可調用對象時,其函數簽名為void()
,因此可以直接向_system
提交非同步任務,這裡的_system
的類型就是前一節中所提到的work_steal
線程池任務系統。 - 最後,將
pair
中的future<result_type>
返回,作為結果句柄。
自此,從可調用對象構造出了對應的非同步執行對象packaged_task
和future
。這個packaged_task
被提交到了系統的任務處理系統_system
中,而future
則返回到用戶線程。
非同步過程的串聯
單後繼串聯
在處理多個非同步任務的系統設計中,非同步任務的串聯(即任務之間的依賴)是很重要的一個特性。設想一下場景中,我們需要依次調用兩個函數:
std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();
在上述實例中,result_2
依賴於result_1
。因此執行時我們必須顯式的調用future.get()
來獲得result_1
,然後才能提交第二個非同步任務。這種顯示的同步在依賴路徑變長時就會顯得非常繁雜,而且引入了多次同步的需求。理想情況下,我們想要的只是最後的執行結果,只需要調用一次future.get()
就可以。簡化的代碼可以變成這樣:
std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(std::bind(std::power,_1, 10));
double result_2=result_future_2.get();
同時也可以變成這樣
std::future < double > result_future_1 = std::async(std::power,2, 100).then(std::bind(std::power,_1, 10));
double result_1=result_future_1.get();
但是C++11
中的std::future
並不支持串聯,所以我們就不得不去造輪子。現在我們就來實現future.then()
。
為了支持then
操作,我們必須在future
中保存後續的執行路徑。我們來研究future
中對then
的處理:
template <typename F>
auto then(F&& f)
{
auto pack = package<result_of_t<F(R)>()>([p = _p, f = forward<F>(f)]()
{
return f(p->_r.back());
});
_p->then(move(pack.first));
return pack.second;
}
這裡的then
操作每次生成一個新的pair<packaged_task,future>
。其中的packaged_task
部分捕獲了調用者future
的shared_ptr<shared_base>
成員_p
,同時這個packaged_task
註冊到了_p._then
隊列中。pair
中的future
則保留了新的返回值信息。現在剩下的任務就是實現shared_base.then
了。
為此我們首先修改share_base
,增加一個保存後續任務的optional
,同樣以vector
的形式來實現。此時shared_base<R>
的數據成員如下:
optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
optional<function<void()>> _then;
這個_then
成員就是用來保存後續的執行路徑的。其實也不算路徑,因為只保留路徑中的直接子節點,可以當作路徑中的next
指針。
而then
操作就比較直接了:
template <typename F>
void then(F&& f)
{
bool resolved{ false };
{
lock_guard<mutex> lock{ _mutex };
if (!_r)
{
_then=forward<F>(f);
}
else
{
resolved = true;
}
}
if (resolved)
{
_system.async_(forward<F>(f));
}
}
在執行then
操作時,首先判斷是否之前的非同步任務已經執行完全。這個判斷是通過_r
來執行的,因為optional
有一個到bool
的自動類型轉換。
如果之前的非同步任務已執行,則提交當前任務到任務調度系統;否則將當前任務掛載在next
下,即_then
。
同時,set
函數也要做一些修改:
void set(R&& r)
{
optional<function<void()>> then;
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
swap(_then, then);
}
_ready.notify_all();
if(then)
{
_system.async_(move(then.value()));
}
}
這段代碼也是比較直接:在當前shared_base
獲得了初始值之後,再去執行下一個非同步過程。至於這裡為什麼用swap
操作,目前還不是很清楚,需要測試一下才能知道這裡的swap
到底發生了什麼。
多後繼串聯
之前的代碼處理的只是包含了一個後續任務依賴的情況,但是當我們談到依賴的時候,我們所提到的都是依賴樹,而不是依賴序列。這種依賴樹的情況,一個packaged_task
可能有多個任務都在等待其完成。
std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
std::future < double > result_future_3 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();
double result_3=result_future_3.get();
為了支持這種多後繼的操作,我們需要對shared_base
做修改。
首先,_then
成員不再是一個optional<function<void()>>
,而是一個vector<function<void>>
,因為我們所存儲的後續任務不再僅限於一個。
optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
vector<function<void()>> _then;
同時,我們也要修改對應的then
和set
的代碼實現,從optional
轉換到vector
上來。
template <typename F>
void then(F&& f)
{
bool resolved{ false };
{
lock_guard<mutex> lock{ _mutex };
if (!_r)
{
_then.push_back(forward<F>(f));
}
else
{
resolved = true;
}
}
if (resolved)
{
_system.async_(forward<F>(f));
}
}
:::c++
void set(R&& r)
{
vector<function<void()>> then;
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
swap(_then, then);
}
_ready.notify_all();
for(const auto& f:then)
{
_system.async_(move(f));
}
}
在經過這種修改之後,之前的示例代碼可以這樣寫:
:::c++
std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(bind(std::power,_1, 10));
std::future < double > result_future_3 = result_future_1.then(bind(std::power,_1, 10));
double result_2=result_future_2.get();
double result_3=result_future_3.get();
多前驅串聯
wait_all 串聯
對於wait_for_all
形式,一個任務的啟動需要多個任務都已完成。我們需要等待所有過程的執行完全,所以需要顯示的等待。為了支持這種顯示的同步操作,我們繼續修改之前的shared_base
和future
,都加入顯示的等待wait
。
template <typename R>
void shared_base<R>::wait()
{
lock_guard<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return !_r.empty();
});
}
template <typename R>
void future<R>::wait() const
{
_p->wait();
}
這樣我們可以以一下形式來實現wait_for_all
:
template<typename F1, typename... Fs>
void wait_for_all(F1& f1, Fs&... fs)
{
bool dummy[] = { (f1.wait(), true), (fs.wait(), true)... };
// prevent unused parameter warning
(void) dummy;
}
雖然上述代碼實現了wait_for_all
這個介面,但是這種實現並不利於future
的進一步組合。更好的實現應該是綜合所依賴的future
生成一個新的future
,類似於如下形式:
template < typename R,typename... Arg>
auto all_package(function<R(Arg...)>&& _f, future<Arg>&& all_futures...)
->pair<packaged_task<R()>, future<R>>
{
auto lambda_task = [all=move(all_futures)...,f=move(f)]()
{
return f(all.get()...);
}
auto p = make_shared<shared<R()>>(move(lambda_task));
return make_pair(packaged_task<R()>(p), future<R>(p));
}
同時添加async_all
template <typename R, typename ...Args>
auto async_all(function<R(Args...)>&& f, future<Args>&&... args)
{
auto pack = all_package<R,Args...>(forward<function<R(Args...)>>(f), forward<future<Args>>(args)...);
_system.async_(move(get<0>(pack)));
return get<1>(pack);
}
wait_any 串聯
wait_any
相對來說就比較麻煩了,需要實現一個通知機制。只要所等待的future
其中一個有了信號,其他的future
就不需要再執行了。我們可以採取atomic_bool
的形式來實現這個once_flag
,剩下的任務就是如何將這個atomic_bool
掛載到各個future
之上,這個掛載操作我們可以利用then
。但是用then
的話就無法控制其他future
的執行,因為當我們執行then
操作的時候之前的結果肯定已經計算完成。所以,我只能說毫無辦法。
如果實在想這麼做的話,需要在這些future
生成之前就定義好這個atomic_bool
。同時修改這些任務的執行邏輯,執行前首先判斷atomic_bool
是否已經set
了,沒有的話才執行後續任務。
後記
這篇文章基本翻譯自Sean Parent的三年前的一個ppt
,現在c++17
已經出來了,文中提到的future
基礎設施基本已經添加進去了,可以直接用stl
去調用了。
目前倒是有個想法,使用這個非同步任務系統實現以下行為樹的任務調度,看起來應該能行,核心還是在action
的自動註冊身上。
推薦閱讀: