Pthreads是Linux下常用的多線程庫。多線程程序很常見,就算沒有多核CPU即便如嵌入式系統單核也可以跑多線程,比如我需要以100Hz的頻率對電機調速,同時又需要1秒種改變一次LED的狀態,這種任務當然可以在一個定時周期內設置flag來進行人為調度,但更方便、又能擴展的方式還是使用多線程。多線程具有獨立的調用棧,也就是說同一個函數內局部變數在線程間是不可見的,另一個特點則是具有shared memory, 像全局變數對線程是可見的。

Shared memory model

  為什麼是採用shared memory 而不是設計某種通信機制來保護數據呢? 這個問題我覺得主要是shared memory相比通信延遲(latency)小,帶寬(bandwidth)高。下圖是pthreads 和MPI的數據傳輸帶寬對比

系統memory帶寬:

memory bandwidth

集群間採用MPI通信,帶寬受限於網路,如千兆交換機、萬兆交換機。千兆交換機1000Mb/s=125MB/s,速度在CPU memory面前簡直不可同日而語。Pthreads 使用shared memory帶了一個重要的問題就是數據的保護,這項工作是交給開發者來完成的。比如shared memory里變數a, 同一時刻兩個線程進行write:線程1賦值a=3, 線程2賦值a=5, 那麼a到底該是多少? 因此相比進程間通信的MPI,Pthreads 需要用戶顯式地對數據進行保護。多線程程序有一個名詞叫線程安全(thread safety),所謂線程安全,指的是能夠多線程並發執行而不造成shared memory數據被破壞或者出現線程競爭,比如STL vector不是線程安全的,因為vector的內存管理並沒有進行保護,每個線程都可以進行銷毀、擴容等操作。

Pthreads 還具有輕量級、實時調度等特點。基於Pthreads擴展為多線程程序也是較為easy 的事,Pthreads的API不多,主要有四種類型:

  • 管理線程的, 如pthread_create用於創建線程, pthread_exit 退出線程,pthread_join 用於線程同步;
  • 互斥鎖(Mutex lock),用於保護shared memory data;
  • 條件變數(Condition variables),  線程間通信, 此外除了flag,還有semaphore這種也不錯;
  • 線程同步(Synchronization), 線程同步。

Pthreads 在linux下直接包含頭文件pthread.h即可,gcc編譯時加入鏈接選項-lpthread或-pthread, 如果使用CMake 編譯,在CMakeLists.txt加入一句

set_target_properties( obj PROPERTIES COMPILE_FLAGS -pthread LINK_FLAGS -pthread)

一個簡單的例子:

#include <pthread.h>
#include <stdio.h>
#define NUM_THREADS 5

void *PrintHello(void *threadid)
{
long tid;
tid = (long)threadid;
printf("Hello World! Its me, thread #%ld!
", tid);
pthread_exit(NULL);
}

int main (int argc, char *argv[])
{
pthread_t threads[NUM_THREADS];
int rc;
long t;
for(t=0; t<NUM_THREADS; t++){
printf("In main: creating thread %ld
", t);
rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t);
if (rc){
printf("ERROR; return code from pthread_create() is %d
", rc);
exit(-1);
}
}

/* Last thing that main() should do */
pthread_exit(NULL);
}

開啟5個線程,每個線程只做了一件事print hello world。除了主線程,子線程也可以開啟子線程。

#include <pthread.h>

int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);

pthread_create第一個參數是子線程的標識符,第二個參數設置線程屬性,默認為NULL, 第三個參數傳入函數指針,第四個參數是傳給函數的參數。這裡有個小trick, pthread_create第四個參數類型是 void *, 那麼怎麼將任意類型數據傳給函數呢?對於int, float這種內置類型,直接進行指針類型轉換就好了,就像上面的例子, 對於複雜的數據,可以傳struct指針。例如

struct Args {
long tid;
int width;
int height;
/// other data
};

void *write_image(void * arg) {
long tid = ((Args *)arg)->tid;
int width = ((Args *)arg)->width;
///...
}

至此,你可以寫出一個多線程的hello world,一次調用多次列印,童叟無欺~~

為了享受多線程共享數據的好處, 你還需要會用mutex 。mutex 使用套路是定義、初始化, 使用(lock, trylock, unlock),銷毀。舉一個栗子:

/// shared data
static float money = 0;
/// mutex for money
pthread_mutex_t mutex_money;

void *task1(void * arg) {
pthread_mutex_lock(&mutex_money);
money += 200;
pthread_mutex_unlock(&mutex_money);

pthread_exit(NULL);
}
void *task2(void * arg) {
pthread_mutex_lock(&mutex_money);
money += 300;
pthread_mutex_unlock(&mutex_money);

pthread_exit(NULL);
}

int main(int argc, char * argv[]) {
pthread_mutex_init(&mutex_money, NULL);
/// do jobs ...

pthread_mutex_destroy(&mutex_money);
return 0;
}

現在你有一個變數money,兩個線程可能同時對其寫操作, 如果不加mutex鎖,那麼結果可能是200也可能是300,而不是我們想要的200+300。mutex變數初始化時默認unlocked, 當調用pthreadlock時,如果mutex變數處於unlocked狀態,則該線程lock成功,否則線程阻塞直到mutex變數狀態變為unlocked; 如果不想阻塞,可以調用pthread_mutex_trylock,如果mutex變數處於locked狀態,該函數直接返回"busy"。 mutex的lock和unlock是成對使用的,不然可能會發生死鎖(dead lock)。

Pthreads 線程間通信可以用條件變數, 信號量(semaphore)(信號量並不是Pthreads獨有的)。

/// shared data
static float money = 0;
pthread_cond_t con_val; //!condition variable
pthread_mutex_t mutex_money; //!coupled with con_val
int count = 0;
#define COUNT_LIMIT 5
#define TCOUNT 8

void *task1(void * thread_id) {
long tid;
tid = (long)thread_id;
for(int i = 0; i < TCOUNT; ++i) {
pthread_mutex_lock(&mutex_money);
count ++;
money += 200;
cout << "count:" << count << " money:" << money << endl;
if(count == COUNT_LIMIT) {
pthread_cond_signal(&con_val);
cout<<"task1 of thread"<<tid<<" send a signal."<<endl;
}
pthread_mutex_unlock(&mutex_money);
sleep(1);
}

cout<<"thread "<<tid <<" exit now!"<<endl;
pthread_exit(NULL);
}

void *task2(void * thread_id) {
long tid = (long)thread_id;

pthread_mutex_lock(&mutex_money);
while(count < COUNT_LIMIT) {
pthread_cond_wait(&con_val, &mutex_money); //! wait for the signal
cout<<"task2 received the signal."<<endl;
}
money *= 2;
pthread_mutex_unlock(&mutex_money);

cout<<"thread "<<tid <<" exit now!"<<endl;
pthread_exit(NULL);
}

這個例子里開啟兩個線程分別跑task1和task2, 最後money會是多少呢?答案是2600。task2一直處於等待條件變數con_val的狀態(阻塞狀態), task1 將money從0增加到1000時發出signal,解鎖mutex_money, task2得到signal,然後將money翻倍,變為2000,再unlock 退出線程, task1繼續增加,從2000增加到2600後退出線程。

  以上就是基於shared memory model的Pthreads的使用,shared memory model 另一個例子就是GPU里的shared memory使用,此為後話。

參考資料:

1. CME 213 Introduction to parallel computing using MPI, openMP, and CUDA

2. POSIX Threads Programming


推薦閱讀:
相关文章