future介紹

在多線程環境下,我們經常需要同時啟動多個任務。有些任務是比較耗時,而且我們並不急於獲得其結果。對於這些任務,我們可以使用std::futurestd::async來封裝其非同步執行流程。通過std::async來註冊非同步任務,然後返回一個該非同步結果的句柄std::future

std::future < double > result_future = std::async(std::power,2, 1000);

當我們需要得到這個非同步過程的結果時,我們可以顯示的請求。

double result=result_future.get();

如果該非同步過程已經執行完,則可以直接獲得結果;如果還未執行,則當前線程被阻塞,直到執行完成。

在後文中,我們將分為兩個部分:

  • 非同步過程的執行,即處理std::async封裝後的任務調度;

  • 非同步過程的生成,即處理future=async()中非同步過程的註冊和結果句柄的生成;
  • 非同步過程的串聯,即處理future=future.then(async())中非同步過程的序列化。

非同步過程的執行

基本執行結構

首先我們考慮無返回值、無參數的非同步過程的執行。在這種情況下,最直接的處理方案就是利用一個隊列來存儲提交的非同步任務,同時建立一個線程池來消費這個任務隊列。為此,我們需要實現兩個部分:多線程的任務隊列,以及任務的提交和請求。

對於多線程的任務隊列,可以參考下面代碼

class notification_queue
{
queue<function<void()>> _q;
bool _done{ false };
mutex _mutex;
condition_variable _ready;
public:
void done()
{
{
lock_guard<mutex> lock{ _mutex };
_done = true;
}
_ready.notify_all();
}
bool try_pop(function<void()>& x)
{
unique_lock<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return !_q.empty()||_done;
});
if (_q.empty()||done)
{
return false;
}
x = move(_q.front());
_q.pop();
return true;
}

template<typename F>
bool try_push(F&& f)
{
{
lock_guard<mutex> lock{ _mutex,try_to_lock };
if (!lock||done)
{
return false;
}
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
return true;
}

};

上述代碼與一般的多線程隊列的不同之處在於增加了done這個bool變數,用來停止任務的提交和請求。其實更好的多線程任務隊列實現應該以std::shared_ptr作為返回值,以防止內存分配時異常所導致的數據不一致。當前文檔只是為了做概念性說明作用,因此以最簡實現作為展示。

在此多線程隊列的支持下,初步的線程池系統可以有如下實現:

class task_system
{
const unsigned _count{ thread::hardware_concurrency() };
vector<thread> _threads;
notification_queue _q;
bool _done{false};
void run()
{
while (!done)
{
function<void()> f;
if (!_q.try_pop(f))
{
this_thread.yield();
}
else
{
f();
}
}
}
public:
task_system()
{
for (unsigned n = 0; n != _count; ++n)
{
_threads.emplace_back([&] { run(); });
}
}
~task_system()
{
_done=true;
_q.done();
for (auto& e : _threads)
{
e.join();
}
}
template <typename F>
bool async_(F&& f)
{
return _q.try_push(forward<F>(f));
}
void done()
{
_done = true;
}
};

上述代碼雖然實現了一個簡單的線程池,但是在效率上是有問題的。主要的原因就是所有的工作線程都在爭奪任務隊列的控制權,產生了contention。為了緩解contention現象,可以從以下兩個方面來入手:

  • 顯式的以待頭節點的鏈表來實現隊列,從而使得任務的提交和請求所需要的鎖分開;
  • 為每一個線程分配一個專有的任務隊列,同時允許線程向其他任務隊列請求任務,即 work_steal

良構鎖隊列

所謂良構鎖隊列,就是儘可能的減少不必要的鎖佔用時間。因此,我們採取鏈表來作為良構鎖隊列的底層數據結構,其數據結構如下:

template<typename T>
class notification_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
}

這裡我們存儲了鏈表的頭節點和尾節點,同時為頭節點和尾節點添加相應的鎖。同時我們將鏈表初始化為有一個頭節點的鏈表:

notification_queue() :head(new node),tail(head.get())
{

}

此外,在此鏈表中加入一個頭節點,以減少pop操作對於尾節點鎖的爭用。對於push操作來說,實現是很直白的,因為這個過程只涉及到了尾節點的鎖。唯一需要注意的一點是在獲得鎖之前就通過make_shared分配好內存空間,以防止在鎖的作用域內拋出異常,從而導致數據不一致。

void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail = p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}

此時的pop操作則需要小心:

node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
};
bool try_pop(T& result)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return false;
}
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
result = *(old_head->data);
return true;
}

try_pop的過程中,我們首先判斷當前鏈表是否為空,這裡的get_tail必須放在head_lock之後。否則的話:我們獲得head時,可能其他線程已經多次進行了try_poppush。從而導致head獲得時,隊列可能已經為空,而之前獲得的尾節點所指向的並非當前尾節點。在這種情況下headtail的比較失敗,返回head,從而導致錯誤。

work_steal線程池

work_steal的實現比較直白。首先我們修改其數據結構為:

class task_system
{
const unsigned _count{ thread::hardware_concurrency() };
vector<thread> _threads;
vector<notification_queue> _q{ _count };
atomic<unsigned> _index{ 0 };
bool _done{ false };
}

對於每一個邏輯執行單元,我們都分配一個專有任務隊列。提交新任務時,我們直接採取輪轉法來平衡負載,用index自增來實現:

template <typename F>
void async_(F&& f)
{
auto i = _index++;
for (unsigned n = 0; n != _count; ++n)
{
if (_q[(i + n) % _count].try_push(forward<F>(f)))
{
return;
}
}
_q[i % _count].push(forward<F>(f));
}

而對於run函數,我們添加一個參數來指明該線程所對應的專有任務隊列:

void run(unsigned i)
{
while (true)
{
function<void()> f;
for (unsigned n = 0; n != _count; ++n)
{
if (_q[(i + n) % _count].try_pop(f))
{
break;
}
}
if (!f && !_q[i].try_pop(f))
{
break;
}
f();
}
}

同時,線程池的構造函數和析構函數也要做相應的修改,以實現RAII:

task_system()
{
for (unsigned n = 0; n != _count; ++n)
{
_threads.emplace_back([&, n] { run(n); });
}
}
~task_system()
{
for (auto& e : _q)
{
e.done();
}
for (auto& e : _threads)
{
e.join();
}
}

至此,一個帶work_steal的線程池就完成了

非同步過程的生成

返回值保存

現有的代碼處理的只是function<void()>類型的任務。對於普通函數來說,函數簽名則是function<R(Args...)>的。但是非同步提交任務時其參數列表就已經確定,需要處理的只是返回值,即function<R()>。為此,我們可以定義一個shared_base,來存儲返回值相關信息。

template <typename R>
struct shared_base
{
optional<R> _r;
mutex _mutex;
condition_variable _ready;
};

這裡的optional<R> _r用來存儲非同步過程的執行結果。

下面就是這個類所附帶的幾個函數:虛析構函數、setget

virtual ~shared_base() {}
void set(R&& r)
{
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
}
_ready.notify_all();
}
const R& get()
{
lock_guard<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return static_cast<bool>(_r);
});
return _r.value();
}

這樣,我們就可以在非同步調用中調用set函數來填充返回值,同時在future中調用get來等待返回值。為了在等待線程和工作線程中同時操作這個shared_base對象,我們需要以 shared_ptr的形式來保存這個對象的指針。自此,future所需要的元素都已具備,定義future來保存非同步過程的結果句柄,其簡單實現如下所示:

template <typename R>
class future
{
shared_ptr<R> _p;
explicit future(shared_ptr<R> p) : _p(move(p))
{

}
future() = default;
template <typename F>
const R& get() const
{
return _p->get();
}
};

這裡的R就是shared_base<T>類型。

非同步過程封裝

由於線程池中只支持function<void()>類型的調用,為此我們需要在shared_base的基礎上進一步做封裝。

首先,我們將返回值封裝起來:

template <typename R, typename... Args>
struct shared<R(Args...)> : shared_base<R>
{
function<R(Args...)> _f;
template<typename F>
shared(F&& f) : _f(forward<F>(f)) {}
template <typename... A>
void operator()(A&&... args)
{
this->set(_f(forward<A>(args)...));
_f = nullptr;
}
};

當前的shared結構既保存了返回值相關信息,還保留了函數的相關信息,即當前類型是一個可調用對象。在這個對象之上,我們再定義一個packaged_task

template<typename R, typename ...Args >
class packaged_task<R(Args...)>
{
weak_ptr<shared<R(Args...)>> _p;
explicit packaged_task(weak_ptr<shared<R(Args...)>> p) : _p(move(p)) {}
public:
packaged_task() = default;
template <typename... A>
void operator()(A&&... args) const
{
auto p = _p.lock();
if (p)
{
(*p)(forward<A>(args)...);
}
}
};

這個packaged_task中存儲了一個weak_ptr<shared<R(Args...)>>,用來保留所有的調用信息和返回值信息。至於這裡為什麼用weak_ptr,可能是為了處理某些特殊情況:例如非同步結果直接被拋棄,此時則沒有必要去執行該非同步過程。即非同步過程的所有者是其返回值的所有者,而不是過程本身。

至此,我們把非同步結果的存儲和非同步過程的存儲都解決了,剩下的問題就是:我們如何根據一個可調用對象生成futurepackaged_task。為此,我們定義一個新的中間函數package來執行次任務:

template <typename S, typename F>
auto package(F&& f) -> pair<packaged_task<S>, future<result_of_t_<S>>>
{
auto p = make_shared<shared<S>>(forward<F>(f));
return make_pair(packaged_task<S>(p), future<result_of_t_<S>>(p));
}
template <typename F, typename ...Args>
auto async(F&& f, Args&&... args)
{
using result_type = result_of_t<F(Args...)>;
using packaged_type = packaged_task<result_type()>;
auto pack = package<result_type()>(bind(forward<F>(f), forward<Args>(args)...));
_system.async_(move(get<0>(pack)));
return get<1>(pack);
}

這個package函數做了兩件事:

  • 根據所提供的函數參數f來生成一個保存了返回值和參數fshared<S>,用shared_ptr p來控制其所有權。這裡利用了result_of_t這個type_traits,其作用是根據函數簽名返回該函數的返回值類型。
  • 根據上一步生成的shared_ptr p構造packaged_taskfuture對象。這裡利用了兩個自動類型轉換:一個是由shared_ptr<shared<result_type()>>構造出一個weak_ptr <shared<result_type()>>,另外一個就是根據shared_ptr<shared<result_type()>> 構造出一個shared_ptr<shared_base<result_type>>。在自動類型轉換之後,再調用這兩個對象相應的構造函數。

同時,在async中,主要包括四個過程。

  • 通過bind將函數f與其相應的參數args綁定起來,生成了一個function<result_type()類型的可調用對象。
  • 將上一步構造出的臨時可調用對象作為package的參數,生成了包含package_task<result_type()>future<result_type>pair pack
  • 這個pair中所包含的package_task在當作一個可調用對象時,其函數簽名為void(),因此可以直接向_system提交非同步任務,這裡的_system的類型就是前一節中所提到的work_steal 線程池任務系統。
  • 最後,將pair中的future<result_type>返回,作為結果句柄。

自此,從可調用對象構造出了對應的非同步執行對象packaged_taskfuture。這個packaged_task被提交到了系統的任務處理系統_system中,而future則返回到用戶線程。

非同步過程的串聯

單後繼串聯

在處理多個非同步任務的系統設計中,非同步任務的串聯(即任務之間的依賴)是很重要的一個特性。設想一下場景中,我們需要依次調用兩個函數:

std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();

在上述實例中,result_2依賴於result_1。因此執行時我們必須顯式的調用future.get()來獲得result_1,然後才能提交第二個非同步任務。這種顯示的同步在依賴路徑變長時就會顯得非常繁雜,而且引入了多次同步的需求。理想情況下,我們想要的只是最後的執行結果,只需要調用一次future.get()就可以。簡化的代碼可以變成這樣:

std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(std::bind(std::power,_1, 10));
double result_2=result_future_2.get();

同時也可以變成這樣

std::future < double > result_future_1 = std::async(std::power,2, 100).then(std::bind(std::power,_1, 10));
double result_1=result_future_1.get();

但是C++11中的std::future並不支持串聯,所以我們就不得不去造輪子。現在我們就來實現future.then()

為了支持then操作,我們必須在future中保存後續的執行路徑。我們來研究future中對then的處理:

template <typename F>
auto then(F&& f)
{
auto pack = package<result_of_t<F(R)>()>([p = _p, f = forward<F>(f)]()
{
return f(p->_r.back());
});
_p->then(move(pack.first));
return pack.second;
}

這裡的then操作每次生成一個新的pair<packaged_task,future>。其中的packaged_task部分捕獲了調用者futureshared_ptr<shared_base>成員_p,同時這個packaged_task註冊到了_p._then隊列中。pair中的future則保留了新的返回值信息。現在剩下的任務就是實現shared_base.then了。

為此我們首先修改share_base,增加一個保存後續任務的optional,同樣以vector的形式來實現。此時shared_base<R>的數據成員如下:

optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
optional<function<void()>> _then;

這個_then成員就是用來保存後續的執行路徑的。其實也不算路徑,因為只保留路徑中的直接子節點,可以當作路徑中的next指針。

then操作就比較直接了:

template <typename F>
void then(F&& f)
{
bool resolved{ false };
{
lock_guard<mutex> lock{ _mutex };
if (!_r)
{
_then=forward<F>(f);
}
else
{
resolved = true;
}
}
if (resolved)
{
_system.async_(forward<F>(f));
}
}

在執行then操作時,首先判斷是否之前的非同步任務已經執行完全。這個判斷是通過_r來執行的,因為optional有一個到bool的自動類型轉換。

如果之前的非同步任務已執行,則提交當前任務到任務調度系統;否則將當前任務掛載在next下,即_then

同時,set函數也要做一些修改:

void set(R&& r)
{
optional<function<void()>> then;
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
swap(_then, then);
}
_ready.notify_all();
if(then)
{
_system.async_(move(then.value()));
}
}

這段代碼也是比較直接:在當前shared_base獲得了初始值之後,再去執行下一個非同步過程。至於這裡為什麼用swap操作,目前還不是很清楚,需要測試一下才能知道這裡的swap到底發生了什麼。

多後繼串聯

之前的代碼處理的只是包含了一個後續任務依賴的情況,但是當我們談到依賴的時候,我們所提到的都是依賴樹,而不是依賴序列。這種依賴樹的情況,一個packaged_task可能有多個任務都在等待其完成。

std::future < double > result_future_1 = std::async(std::power,2, 100);
double result_1=result_future_1.get();
std::future < double > result_future_2 = std::async(std::power,result_1, 10);
std::future < double > result_future_3 = std::async(std::power,result_1, 10);
double result_2=result_future_2.get();
double result_3=result_future_3.get();

為了支持這種多後繼的操作,我們需要對shared_base做修改。

首先,_then成員不再是一個optional<function<void()>>,而是一個vector<function<void>>,因為我們所存儲的後續任務不再僅限於一個。

optional<R> _r; // optional
mutex _mutex;
condition_variable _ready;
vector<function<void()>> _then;

同時,我們也要修改對應的thenset的代碼實現,從optional轉換到vector上來。

template <typename F>
void then(F&& f)
{
bool resolved{ false };
{
lock_guard<mutex> lock{ _mutex };
if (!_r)
{
_then.push_back(forward<F>(f));
}
else
{
resolved = true;
}
}
if (resolved)
{
_system.async_(forward<F>(f));
}
}
:::c++
void set(R&& r)
{
vector<function<void()>> then;
{
lock_guard<mutex> lock{ _mutex };
_r=forward<R>(r);
swap(_then, then);
}
_ready.notify_all();
for(const auto& f:then)
{
_system.async_(move(f));
}
}

在經過這種修改之後,之前的示例代碼可以這樣寫:

:::c++
std::future < double > result_future_1 = std::async(std::power,2, 100);
std::future < double > result_future_2 = result_future_1.then(bind(std::power,_1, 10));
std::future < double > result_future_3 = result_future_1.then(bind(std::power,_1, 10));
double result_2=result_future_2.get();
double result_3=result_future_3.get();

多前驅串聯

wait_all 串聯

對於wait_for_all形式,一個任務的啟動需要多個任務都已完成。我們需要等待所有過程的執行完全,所以需要顯示的等待。為了支持這種顯示的同步操作,我們繼續修改之前的shared_basefuture,都加入顯示的等待wait

template <typename R>
void shared_base<R>::wait()
{
lock_guard<mutex> lock{ _mutex };
_ready.wait(lock, [&]()
{
return !_r.empty();
});
}
template <typename R>
void future<R>::wait() const
{
_p->wait();
}

這樣我們可以以一下形式來實現wait_for_all

template<typename F1, typename... Fs>
void wait_for_all(F1& f1, Fs&... fs)
{
bool dummy[] = { (f1.wait(), true), (fs.wait(), true)... };

// prevent unused parameter warning
(void) dummy;
}

雖然上述代碼實現了wait_for_all這個介面,但是這種實現並不利於future的進一步組合。更好的實現應該是綜合所依賴的future生成一個新的future,類似於如下形式:

template < typename R,typename... Arg>
auto all_package(function<R(Arg...)>&& _f, future<Arg>&& all_futures...)
->pair<packaged_task<R()>, future<R>>
{
auto lambda_task = [all=move(all_futures)...,f=move(f)]()
{
return f(all.get()...);
}
auto p = make_shared<shared<R()>>(move(lambda_task));
return make_pair(packaged_task<R()>(p), future<R>(p));
}

同時添加async_all

template <typename R, typename ...Args>
auto async_all(function<R(Args...)>&& f, future<Args>&&... args)
{
auto pack = all_package<R,Args...>(forward<function<R(Args...)>>(f), forward<future<Args>>(args)...);
_system.async_(move(get<0>(pack)));
return get<1>(pack);
}

wait_any 串聯

wait_any相對來說就比較麻煩了,需要實現一個通知機制。只要所等待的future其中一個有了信號,其他的future就不需要再執行了。我們可以採取atomic_bool的形式來實現這個once_flag,剩下的任務就是如何將這個atomic_bool掛載到各個future之上,這個掛載操作我們可以利用then。但是用then的話就無法控制其他future的執行,因為當我們執行then操作的時候之前的結果肯定已經計算完成。所以,我只能說毫無辦法。

如果實在想這麼做的話,需要在這些future生成之前就定義好這個atomic_bool。同時修改這些任務的執行邏輯,執行前首先判斷atomic_bool是否已經set了,沒有的話才執行後續任務。

後記

這篇文章基本翻譯自Sean Parent的三年前的一個ppt,現在c++17已經出來了,文中提到的future基礎設施基本已經添加進去了,可以直接用stl去調用了。

目前倒是有個想法,使用這個非同步任務系統實現以下行為樹的任務調度,看起來應該能行,核心還是在action的自動註冊身上。

推薦閱讀:

相关文章