Herb Sutter在DDJ pillars of concurrency一文中拋出並行編程的三個簡單論點,一是分離任務,使用更細粒度的鎖或者無鎖編程;二是盡量通過並行任務使用CPU資源,以提高系統吞吐量及擴展性;三是保證對共享資源訪問的一致性。第三點已經被atomicmutexlockcondition_variable解決了,第一點和第二點則可以歸結為如何對任務進行粒度劃分並投遞到任務的執行單元中去調度執行。任務劃分依賴於各種不同業務的理解,例如網路和渲染,很難抽取出其共性。而任務的調度執行則是一種通用的結構,可以分為四個部分:

  1. 任務的封裝 在c++11里提供了三種最基本的任務封裝形式future, promise,packaged_task
  2. 任務的結構 在c++17里補全了任務結構控制,主要是提供了then, when_all, when_any這三個用來關聯多個future的函數
  3. 任務的執行 任務執行者基本都是使用線程池,每個線程不斷的嘗試獲取一個任務並執行,類似於一個while循環
  4. 任務的調度 這部分負責了任務的投遞和分發,他在多線程之間維持了一個任務容器集合,提供的介面主要包括接受新任務、取出一個任務和判斷容器是否為空

在整個並發任務系統中,在任務容器集合之上的任務調度結構是核心。現在使用的最廣泛的任務容器是concurrent queue,下面我們來對concurrent queue的多種實現來做一下分析。

naive concurrent queue

queue是一個維持先進先出(FIFO)隊列的結構,在很多STL的實現之中採取的是多塊連續內存的雙向鏈表來維持其先進先出結構。為了在多線程中使用std::queue,最簡單的方法就是使用鎖來解決data race,同時修改原始提供的介面,使得這個數據結構不會被用錯。

#include <queue>
#include <atomic>
#include <mutex>
template<typename T>
class concurrent_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
concurrent_queue()
{
// pass
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{
return !data_queue.empty();
})
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{
return !data_queue.empty();
})
auto res = std::make_shared<T>(std::move(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return false;
}
value = std::move(data_queue.front());
data_queue.pop()
return True;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return std::shared_ptr<T>();
}
auto res = std::make_shared<T>(std::move(data_queue.front()));
data_queue.pop()
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
}

上述代碼的主要考量如下:

  1. 由於多線程的干擾,常規的查詢empty之後再pop的處理流程是錯誤的,這兩個操作必須封裝在一起,所以這裡提供了try_popwait_and_pop這兩個介面來獲取數據。
  2. 為了避免在數據拷貝的時候出現異常導致的數據不一致,返回數據的時候採取兩套方案,一個是調用者提供引用,一個是返回一個shared_ptr。這樣就保證了如果在拷貝構造front的時候出了trace也能維持整個queue的結構完整。

這個concurrent_queue並不是很高效,主要的drawback包括如下三個方面:

  1. 每次訪問介面的時候都需要調用鎖,而且是同一個鎖
  2. 在嘗試獲得數據的時候失敗會觸發yield,從而導致線程切換
  3. 維持了一個全局的先進先出序列,在多消費者的情況下這個強制唯一序是沒有意義的,在單消費者的情況下也很少會有這種要求。

對應的常見解決方案:

  1. 使用無鎖的方式去代替mutex,同時由於無鎖最大的問題是內存分配,有些並發隊列通過預先設置最大大小的方式來預分配內存,從而繞過了著名的ABA問題
  2. 使用雙鏈表結構去維持隊列,而不是使用queue,這樣我們就可以分離頭節點和尾節點的訪問;如果是固定大小的隊列則可以採取ring buffer的形式來維持隊列結構。
  3. 當嘗試獲得數據失敗的時候,先輪詢一段時間,如果這段時間內還是沒有數據,則調用yield,也就是對condition_variable封裝了一層。
  4. 每個生產者維護其投遞隊列,每個消費者根據對各個生產者任務隊列的優先順序去遍歷獲取任務。

事實上,在這是一個並發queue的時候,首先要明確如下幾個問題:

  1. 這個queue的生產者和消費者各有多少個,常見的有單生產者單消費者(SPSC)、單生產者多消費者(SPMC)、多生產者單消費者(MPSC)和多生產者多消費者(MPMC)
  2. 這個queue的最大元素大小是否確定,如果可以確定最大大小,則動態內存分配就可以避免,直接採取環形隊列當作容器即可;如果無法確定最大大小,則只能通過動態內存分配的形式去處理,這裡的難度加大了很多,因為要處理多線程的內存分配。

下面我們來看一下現在主流的幾種concurrent_queue的實現,來分析一下他們對concurrent_queue的實現優化。

intel spsc concurrent queue

intel官方網站上提供了一個SPSC queue,但是這個queue沒有限制最大元素大小,如果臨時內存不夠的話會調用new,可能會觸發鎖。

// load with consume (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*>(addr);
__memory_barrier(); // compiler fence
return v;
}

// store with release memory ordering
template<typename T>
void store_release(T* addr, T v)
{
// hardware fence is implicit on x86
__memory_barrier(); // compiler fence
*const_cast<T volatile*>(addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;

// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_= tail_copy_ = n;
}

~spsc_queue()
{
node* n = first_;
do
{
node* next = n->next_;
delete n;
n = next;
}
while (n);
}

void enqueue(T v)
{
node* n = alloc_node();
n->next_ = 0;
n->value_ = v;
store_release(&head_->next_, n);
head_ = n;
}

// returns false if queue is empty
bool dequeue(T& v)
{
if (load_consume(&tail_->next_))
{
v = tail_->next_->value_;
store_release(&tail_, tail_->next_);
return true;
}
else
{
return false;
}
}

private:
// internal node structure
struct node
{
node* next_;
T value_;
};

// consumer part
// accessed mainly by consumer, infrequently be producer
node* tail_; // tail of the queue

// delimiter between consumer part and producer part,
// so that they situated on different cache lines
char cache_line_pad_ [cache_line_size];

// producer part
// accessed only by producer
node* head_; // head of the queue
node* first_; // last unused node (tail of node cache)
node* tail_copy_; // helper (points somewhere between first_ and tail_)

node* alloc_node()
{
// first tries to allocate node from internal node cache,
// if attempt fails, allocates node via ::operator new()

if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
tail_copy_ = load_consume(&tail_);
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
node* n = new node;
return n;
}

spsc_queue(spsc_queue const&);
spsc_queue& operator = (spsc_queue const&);
};

// usage example
int main()
{
spsc_queue<int> q;
q.enqueue(1);
q.enqueue(2);
int v;
bool b = q.dequeue(v);
b = q.dequeue(v);
q.enqueue(3);
q.enqueue(4);
b = q.dequeue(v);
b = q.dequeue(v);
b = q.dequeue(v);
}

這個代碼的實現很簡單粗暴,核心是一個單鏈表,對於單鏈表的任何操作都是wait_free的,這個鏈表有四個指針:

  1. tail指針,指向下一個應該dequeue的位置
  2. head指針,指向最新的一個enqueue的位置
  3. first_指針,指向第一個可以回收node的位置
  4. tail_copy指針,指向一個安全的可以回收的nodenext位置,他不一定指向tail

在這個鏈表裡,指針之間有如下關係:$first le tail_copy le tail le head$ 。這裡做的核心優化就是按需去更新tail_copy,沒必要每次更新tail的時候都把tail_copy更新一遍,只有發現first == tail_copy的時候才去更新一下。每個操作都沒有使用到CAS,因此都是wait_free的,當然那一行調用了new的除外。

這裡為了避免False Sharing使用了padding。由於讀線程只需要更改tail,所以只需要在tail之後加個padding即可。

facebook spsc concurrent queue

facebook提供了固定大小的SPSC queue,代碼在follyProducerConsumerQueue里。

/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// @author Bo Hu ([email protected])
// @author Jordan DeLong ([email protected])

#pragma once

#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>

#include <folly/concurrency/CacheLocality.h>

namespace folly {

/*
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
template<class T>
struct ProducerConsumerQueue {
typedef T value_type;

ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;

// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
// given time is actually (size-1), so if you start with an empty queue,
// isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size)
, records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
, readIndex_(0)
, writeIndex_(0)
{
assert(size >= 2);
if (!records_) {
throw std::bad_alloc();
}
}

~ProducerConsumerQueue() {
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
if (!std::is_trivially_destructible<T>::value) {
size_t readIndex = readIndex_;
size_t endIndex = writeIndex_;
while (readIndex != endIndex) {
records_[readIndex].~T();
if (++readIndex == size_) {
readIndex = 0;
}
}
}

std::free(records_);
}

template<class ...Args>
bool write(Args&&... recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
writeIndex_.store(nextRecord, std::memory_order_release);
return true;
}

// queue is full
return false;
}

// move (or copy) the value at the front of the queue to given variable
bool read(T& record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return false;
}

auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
record = std::move(records_[currentRead]);
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
return true;
}

// pointer to the value at the front of the queue (for use in-place) or
// nullptr if empty.
T* frontPtr() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return nullptr;
}
return &records_[currentRead];
}

// queue must not be empty
void popFront() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
assert(currentRead != writeIndex_.load(std::memory_order_acquire));

auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
}

bool isEmpty() const {
return readIndex_.load(std::memory_order_acquire) ==
writeIndex_.load(std::memory_order_acquire);
}

bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}

// * If called by consumer, then true size may be more (because producer may
// be adding items concurrently).
// * If called by producer, then true size may be less (because consumer may
// be removing items concurrently).
// * It is undefined to call this from any other thread.
size_t sizeGuess() const {
int ret = writeIndex_.load(std::memory_order_acquire) -
readIndex_.load(std::memory_order_acquire);
if (ret < 0) {
ret += size_;
}
return ret;
}

private:
char pad0_[CacheLocality::kFalseSharingRange];
const uint32_t size_;
T* const records_;

FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;

char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
};

}

這裡就是使用環形隊列來作為容器,雙指針來作為頭尾,讀線程讀取readIndex直接採用relaxed,寫線程讀取writeIndex的時候也是採取relaxed,因為這兩個變數只會在對應的線程內修改,可以認為是對應線程的私有變數,如果要讀取另外一個線程的變數則需要採取acquire,當然前提是修改的時候使用了release。為了避免False Sharing這裡也使用了padding,只不過是用宏做的。

其實這裡也可以做一點優化,就像前面intel的延遲處理tail_copy一樣,首次讀取另外一個線程變數的時候先用relaxed,如果發現不能操作了,則再使用acquire

總的來說,這個無鎖spsc queue也是wait_free的。

moodycamel spsc concurrent queue

moody camel 在上面的基礎上做了一些改進:在支持無大小限制的情況下,將動態內存分配的需求降得很低,同時支持了容量的動態增長。其容器結構是兩層的queue of queue,第一層是循環鏈表,第二層是循環隊列。第一層循環鏈表的控制基本等價於intelspsc里的代碼,而第二層的循環隊列的控制基本等價於folly的代碼。當enqueue的時候,發現沒有空閑內存的時候會調用malloc,不過這種動態內存分配比起intel的每個新node都分配來說簡單多了,總的來說還是比較符合wait_free的。這個的代碼我就不分析了,直接貼作者的解釋吧。

# Enqueue
If room in tail block, add to tail
Else check next block
If next block is not the head block, enqueue on next block
Else create a new block and enqueue there
Advance tail to the block we just enqueued to

# Dequeue
Remember where the tail block is
If the front block has an element in it, dequeue it
Else
If front block was the tail block when we entered the function, return false
Else advance to next block and dequeue the item there

naive spmc concurrent queue

在這前面介紹的spsc並發隊列的基礎上,我們可以比較容易的構建出一個spmc的並發隊列,而構造一個mpsc的並發隊列則難很多。其原因主要是在enqueue的時候,可能會涉及到動態內存分配,如果有好幾個線程都搶著進行動態內存分配的話,就會出現malloc的鎖徵用。而多個線程搶佔dequeue的時候,只需要採取CAS來保持tail的更新即可,雖說這個不是waitfree的,但是lockfree還是可以基本保證的。

boost mpmc concurrent queue

boost concurrent queue通過模板參數的方式來支持固定大小的隊列和不定大小的隊列。

如果是固定大小隊列,則會使用一個帶dummy headring buffer來存儲內容,同時使用一個頭節點索引和一個尾節點索引來標記隊列的頭尾位置。為了一次性的修改頭尾節點索引,這裡將隊列大小的上限設置為了$2^{16} - 2$ ,這樣兩個索引就可以合併為一個int32 來處理,修改的時候可以使用compare_exchange_來同時修改。如果在支持int64類型的compare_exchange_操作的平台,隊列大小的上限可以放到$2^{32} -2$ ,同時兩個索引會被壓縮為一個int64來做同時修改。

如果是不定大小的隊列,則會使用鏈表的形式來維持隊列結構, 代碼見下。

struct BOOST_LOCKFREE_CACHELINE_ALIGNMENT node
{
typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;

node(T const & v, handle_type null_handle):
data(v)//, next(tagged_node_handle(0, 0))
{
/* increment tag to avoid ABA problem */
tagged_node_handle old_next = next.load(memory_order_relaxed);
tagged_node_handle new_next (null_handle, old_next.get_next_tag());
next.store(new_next, memory_order_release);
}

node (handle_type null_handle):
next(tagged_node_handle(null_handle, 0))
{}

node(void)
{}

atomic<tagged_node_handle> next;
T data;
};

這裡比較有意思的就是第九行的注釋:對指針的tag位置進行自增來避免ABA問題。這裡的next指針是一個tagged_pointer,其分配位置是內存對齊的,對齊的大小由BOOST_LOCKFREE_CACHELINE_BYTES定義,在WIN平台下,這個宏定義如下:

#define BOOST_LOCKFREE_CACHELINE_BYTES 64
#ifdef _MSC_VER
#define BOOST_LOCKFREE_CACHELINE_ALIGNMENT __declspec(align(BOOST_LOCKFREE_CACHELINE_BYTES))

當這個指針是64位元組對齊時,最底的6位是沒有意義的,所以這6位我們可以用來存儲額外的數據,這種指針就叫做tagged_pointer,在llvm里這個指針結構也很常見。

boost lockfree queue里,數據成員定義如下:

typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
static const bool node_based = !(has_capacity || fixed_sized);
static const bool compile_time_sized = has_capacity;
typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
typedef typename pool_t::tagged_node_handle tagged_node_handle;
typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;

atomic<tagged_node_handle> head_;
static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
char padding1[padding_size];
atomic<tagged_node_handle> tail_;
char padding2[padding_size];
pool_t pool; //代表node的pool 可以當作內存分配器

因為atomic<T*>內部只包含一個T*作為成員變數,所以atomic<T*>T*的內存布局是一樣的,所以這裡的padding_size才會這樣計算出來。這裡的padding的意義在於讓poll的開始地址是BOOST_LOCKFREE_CACHELINE_BYTES對齊的,同時這裡分為了兩個padding而不是一個padding主要是考慮到將tail head分離在兩個cache_line上,避免不同線程之間的緩存競爭。

現在我們來看這個lockfree queue提供的介面。

首先查看empty

/** Check if the queue is empty
*
*
eturn true, if the queue is empty, false otherwise
*
ote The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
* value in program logic.
* */
bool empty(void)
{
return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
}

注釋里寫的很清楚了,這個函數的返回值是不準確的,因為在沒有鎖的情況下無法同時獲得head tail的準確值。

現在來看push,這裡分為了兩個介面push bounded_push,區分在於如果內存池已經用完,第一個push在當前隊列是大小固定的情況下會返回false,不固定的情況下會向操作系統嘗試申請更多的內存並返回;而第二個bounded_push則直接返回false

/** Pushes object t to the queue.
*
* post object will be pushed to the queue, if internal node can be allocated
*
eturns true, if the push operation is successful.
*
*
ote Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
* from the OS. This may not be lock-free.
* */
bool push(T const & t)
{
return do_push<false>(t);
}

/** Pushes object t to the queue.
*
* post object will be pushed to the queue, if internal node can be allocated
*
eturns true, if the push operation is successful.
*
*
ote Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
* hrows if memory allocator throws
* */
bool bounded_push(T const & t)
{
return do_push<true>(t);
}

這兩個函數都調用了do_push,這個函數的定義如下:

template <bool Bounded>
bool do_push(T const & t)
{
using detail::likely;

node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
handle_type node_handle = pool.get_handle(n);

if (n == NULL)
return false;

for (;;) {
tagged_node_handle tail = tail_.load(memory_order_acquire);
node * tail_node = pool.get_pointer(tail);
tagged_node_handle next = tail_node->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

tagged_node_handle tail2 = tail_.load(memory_order_acquire);
if (likely(tail == tail2)) {
if (next_ptr == 0) {
tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
tagged_node_handle new_tail(node_handle, tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
return true;
}
}
else {
tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
}
}
}
}

這裡比較難理解的一點就是tail tail2,以及最後30行的compare_exchange_strong。這裡在19行使用判斷的意義是避免在內部做無用功,雖然不使用19行的判斷, tail改變之後,20行所在分支的檢測都會fail掉,對正確性沒影響,對性能上來說提升很大。在一個完整的成功push流程中有兩個cas操作,我們需要擔心的是在兩個cas操作之間線程被換出之後會出現何種結果,也就是在24行之前被換出。此時老的tailnext已經被修正為了新數據,而新tail卻沒有更新。在下一個線程進來的時候會發現tail->next != 0, 因此會進28號的分支,在此分支之內會嘗試將tail->next更新為tail,這樣就避免了數據更新到一半的尷尬局面。

對於pop則只有一個函數:

/** Pops object from queue.
*
* pre type U must be constructible by T and copyable, or T must be convertible to U
* post if pop operation is successful, object will be copied to ret.
*
eturns true, if the pop operation is successful, false if queue was empty.

*
*
ote Thread-safe and non-blocking

* */
template <typename U>
bool pop (U & ret)
{
using detail::likely;
for (;;) {
tagged_node_handle head = head_.load(memory_order_acquire);
node * head_ptr = pool.get_pointer(head);

tagged_node_handle tail = tail_.load(memory_order_acquire);
tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);

tagged_node_handle head2 = head_.load(memory_order_acquire);
if (likely(head == head2)) {
if (pool.get_handle(head) == pool.get_handle(tail)) {
if (next_ptr == 0)
return false;

tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);

} else {
if (next_ptr == 0)
/* this check is not part of the original algorithm as published by michael and scott
*
* however we reuse the tagged_ptr part for the freelist and clear the next part during node
* allocation. we can observe a null-pointer here.
* */
continue;
detail::copy_payload(next_ptr->data, ret);

tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
if (head_.compare_exchange_weak(head, new_head)) {
pool.template destruct<true>(head);
return true;
}
}
}
}
}

這裡就簡單多了,成功的pop只需要一個compare_exchange_weak即可,所以就不需要擔心數據更改到一半的問題,這裡的28行處理的還是tail數據更新到一半的問題。

這裡比較有意思的一點就是42行的.template,這個叫做template disambiguator, 其作用就是通知編譯器destruct<true>是一個模板,而不是destruct < true

總的來說, boost lockfree queue的注意點完全在lock free上,並沒有採取每個生產者單獨一個queue的方式來解決爭用,雖然我們可以在lockfree queue的基礎上做一個這樣的東西。

intel tbb concurrent queue

其實這個的總體實現與boost類似。占坑,以後填。粗看起來,這個東西的實現很具有STL的風格。

moodycamel concurrent queue

這個concurrent queue的實現被很多項目使用過, 值得重點分析。這個實現的突出之處在於,每個生產者都維持自己的專屬queue,而不同的消費者會以不同的順序去訪問各個生產者的queue,直到遇到一個不為空的queue。簡而言之,他所實現的MPMC(multiple producer multiple consumer)的隊列建立在了SPMC的多線程隊列的基礎上。這個SPMC的實現是lockfree的,同時還增加了bulk操作。下面來慢慢介紹這個的設計。

首先就是在構建消費者的時候,儘可能的讓消費者與生產者均衡綁定,內部實現是通過使用一個token來維持消費者與生產者之間的親和性。其實最簡單的親和性分配的方法就是每個消費者分配一個生產者的編號,dequeue的時候採取輪詢的方式,每次開始輪詢的時候都以上次dequeue成功的生產者queue開始。

處理完了多生產者多消費者之間的映射,現在剩下的內容就是如何更高效的處理單生產者多消費者。moodycamel這裡的主要改進就是單個queue的存儲結構,這裡採取的是兩層的循環隊列,第一層循環隊列存儲的是第二層循環隊列的指針。一個隊列只需要維護四個索引,考慮到原子性修改可以把消費者的兩個索引合併為一個uint64或者uint32t,因為只有消費者會發生數據競爭,為了方便比較,也順便把生產者的兩個索引合併為一個uint64t or uint32t,這樣就可以直接使用整數比較了。在enqueue的時候,數據複製完成之後,直接對生產者的索引自增即可。而dequeue的時候則沒這麼容易,此時首先自增消費者索引,然後判斷當前消費者索引是否已經越過生產者索引,如果越過了,則對則對另外一個overcommit計數器進行自增,三個計數器合用才能獲得真正的容量。

這裡使用環形緩衝來擴容而不是採取列表來擴容,主要是因為連續的空間操作可以支持批量的enqueuedequeue操作,直接預先佔據一些索引就行了。

未完待續······················

推薦閱讀:

相关文章