關於並行計算的工具和軟體非常多,甚至自身都成了一個系統。而且本人也經常會用到那些工具如spark,mesos之類的來處理數據分析等任務。但是最近覺得很多時候處理此類任何對新手來說實在太麻煩了,也就是說動靜太大了。絕大多數時候並不是要寫一個數據分析的或是都運行在集群上的產品,也不是要搞一個需要長期穩定運行的工具。這類東西實在太重量級了折騰起來反而浪費時間,殺雞用了牛刀了。

大多數時候要用python做數據處理的時候僅僅是因為單機資源不夠了,想把其它機器連接起來一起使用處理或是計算,或者說是要經常在不同機器上運行計算程序處理一部分需求之後手動合併。基本上這類任務對系統也沒啥穩定性的需求甚至也可以忍受很高的因為組建集群造成的額外資源浪費,用一次基本也就拆除掉了,因此這裡需要一個開箱即可使用的工具並且最好設置簡單高效,可以靈活適配簡單的需求。比如我已經有一個比較老的遺留程序App1,它可能是一個其他語言寫的軟體或是根本沒有任何的並行處理能力我也不想修改它,我只是想通過腳本控制的方式啟動這個程序讓其在多個機器上運行多個進程,每個進程處理一部分計算任務最終把結果以某種方式合併,此類需求在實際情況下需求很大,ipyparallel就是可以適應這類需求的開箱簡單設置就可以使用的工具,每一次用就被這個小巧的工具吸引了,果斷加入自己的常用工具箱。

ipyparallel是一個python包,直接安裝 pip install ipyparallel 就可以了,當然要在你需要連接成集群的機器上都安裝上這個工具。一般最好python的版本要保持一致,不然可能存在莫名其妙的問題。同時我測試由兩台Linux和一台Mac pro三台機器組成的集群運行狀態良好,但是加上一個Windows機器後出錯,是Windows的字元編碼問題,不想去搞清細節了,反正平時基本用不上Windows了。

ipyparallel由控制節點controller和引擎節點engines組成,所有運行engines的節點要在網路上能聯通controller節點(並不需要controller節點去連接engines節點,只要engines節點能連接到controller節點就可以)由controller節點負責協調發送計算指令給engines節點,邏輯上這個工具就是這麼簡單。

說一個我的機器的網路結構一台Linux機器是24線程ip是192.168.2.40 叫node1,一台Mac pro是8線程在192.168.2.20叫node2,另一台24線程的Linux機器在192.168.0.20叫master, 其中node1和node2在一個內網中,master是不能直接訪問到的,但是node1和node2能訪問到master。在master上啟動controller

ipcontroller --ip="*"

幾秒鐘之後ipcontroller啟動成功,如果是遠程登陸Linux伺服器,在shell下運行命令如果網路中斷了shell,程序會退出。一般推薦使用screen或是tmux之類的工具運行程序,關於這個可以自行網路搜索,這裡不是重點。

在master上啟動了控制器之後會生成一個json格式的配置文件,一般在用戶目錄如下

~/.ipython/profile_default/security/ipcontroller-engine.json 在想要運行計算引擎的機器上,要把這個配置文件複製到目標機器上去,對應也是這樣的目錄。(注意:每個重啟ipcontroler時,這個配置文件會變動,要重新複製

scp -rp [email protected]:/home/saturnman/.ipython/profile_default/security/ipcontroller-engine.json ~/.ipython/profile_default/security/

實際上master上也有非常多的計算資源的,可以在master節點上啟動計算引擎,命令如下

ipcluster engines --n=24

這樣實際上是啟動了24個計算引擎,因為這個master節點是24線程的,所以啟動24個可以全部跑滿cpu,一般應該預留一部分計算資源的,因為跑滿可能會造成系統反應緩慢。

當配置文件拷貝完成之後現在去node1和node2上運行啟動計算引擎的命令,分別為

ipcluster engines --n=24
ipcluster engines --n=8

記住,啟動計算引擎時的目錄就是python在不同機器上的工作目錄,這一點非常重要。如果工作目錄不正確可以會找不到自己寫的python模塊或是其它可執行程序。當然也可以在python代碼中使用os.chdir來改變當前工作目錄。對於機器比較多的一般建議使用一個nfs文件系統把代碼和工具數據等文件掛在到不同計算節點的相同目錄下,這樣在一處修改了別處也可以同步了,不然造成代碼軟體版本不一樣可以會造成計算錯誤非常難以排除,這一點千萬注意。

現在可以在master節點運行python代碼調用啟動起來的計算引擎了,不過一般使用jupyter notebook比較方便,可以在master節點上啟動筆記本程序

jupyter notebook --ip=192.168.0.20 --port=8888

訪問 192.168.0.20:8888/noteb 新建筆記本試試

from ipyparallel import Client
c = Client()
v = c[:]
print len(v)

64

這裡得到的v列表就可以認為是計算引擎的封裝

下同運行大一點的測試計算,計算mandelbrot集合生成個小視頻,注意master節點要安裝ffmpeg工具和相應的包,這裡不是重點就不介紹了。

%matplotlib inline
import numpy
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from ipyparallel import Client
Writer = animation.writers[ffmpeg]
writer = Writer(fps=15, metadata=dict(artist=saturnman),bitrate=2400)
fig = plt.figure()

DPI = fig.get_dpi()
fig.set_size_inches(2048.0/float(DPI),1280.0/float(DPI))

ITERATIONS = 500
#SIZE = lena.shape[0]
#注意:此參數會極大影響計算過程的內存消耗,對於小內存機器可以會讓計算進程因內存不夠而出錯,請自行調節計算
SIZE = 2048
MAX_COLOR = 255.

x_min, x_max = -2.5, 1
y_min, y_max = -1.2, 1.2

xt_min, xt_max = -0.748746707922161, -0.748746697771757
yt_min, yt_max = 0.123640844894862, 0.123640851045266

xt_mid = (xt_min+xt_max)/2
yt_mid = (yt_min+yt_max)/2

x_mid = (x_min+x_max)/2
y_mid = (y_min+y_max)/2

iter = 1
#x_min, x_max = x-width/2, x+width/2
#y_min, y_max = y-height/2, y+height/2
x,y = numpy.meshgrid(numpy.linspace(x_min,x_max,2*SIZE),numpy.linspace(y_min,y_max,SIZE));
c = x + 1j*y
z = c.copy()
fractal = numpy.zeros(z.shape,dtype=numpy.uint8)+MAX_COLOR
#
#for n in range(ITERATIONS):
# mask = numpy.abs(z) <=10
# z[mask] = z[mask]**2+c[mask]
# fractal[(fractal==MAX_COLOR) & (~mask)] = (MAX_COLOR-1)*n/ITERATIONS
# Display the fractal

def compute_frac(iter):
import numpy
ITERATIONS = 500
SIZE = 512
MAX_COLOR = 255.

x_min, x_max = -2.,2.
y_min, y_max = -1.2, 1.2

#x_min, x_max = 0.5, 2.2
#y_min, y_max = -0.3, 0.3

xt_min, xt_max = -0.748766707922161, -0.748766697771757
yt_min, yt_max = 0.128640844894862, 0.128640851045266

xt_mid = (xt_min+xt_max)/2
yt_mid = (yt_min+yt_max)/2

x_mid = (x_min+x_max)/2
y_mid = (y_min+y_max)/2

scale = 1.0/((1.0+(iter*iter+iter*iter*iter*0.01)*0.0001)/2.0)
width = (x_max-x_min)*scale
height = (y_max-y_min)*scale
x_mid_tmp = xt_mid + (x_mid-xt_mid)*scale
y_mid_tmp = yt_mid + (y_mid-yt_mid)*scale

x_min_tmp = x_mid_tmp-width/2
x_max_tmp = x_mid_tmp+width/2
y_min_tmp = y_mid_tmp-height/2
y_max_tmp = y_mid_tmp+height/2

box = numpy.zeros(6,numpy.float64)
box[0] = x_min
box[1] = x_max
box[2] = y_min
box[3] = y_max
box[4] = -0.67 - (iter)*0.0002
box[5] = 0.11

#print box
block_size = 64

x,y = numpy.meshgrid(numpy.linspace(x_min_tmp,x_max_tmp,2*SIZE),numpy.linspace(y_min_tmp,y_max_tmp,SIZE));
c = x + 1j*y
z = c.copy()
frac = numpy.zeros(z.shape,numpy.uint8)+MAX_COLOR
for n in range(ITERATIONS):
mask = numpy.abs(z) <= 10
z[mask] = z[mask]**2+c[mask]
frac[(frac==MAX_COLOR) & (~mask)] = (MAX_COLOR-1)*n/ITERATIONS
return frac
frac = compute_frac(1.0)
matplotlib.pyplot.axis(off)
im = plt.imshow(frac,cmap=plt.get_cmap(flag))
def updatefig(looper):
global frac_list
im.set_data(frac_list[looper])
return im,
c = Client()
v = c[:]
frac_list = v.map_sync(compute_frac,[d for d in numpy.arange(1,801)])

ani = animation.FuncAnimation(fig,updatefig,range(0,800),interval=100, blit=True)
ani.save(parallel_frac.mp4, writer=writer)

這裡注意關鍵的幾個地方,compute_frac是一個非常耗時的計算函數,它的參數是一個數字來定位計算的位置。關鍵是這一句

v.map_sync(compute_frac,[d for d in numpy.arange(1,801)])

這個v.map_sync是一個同步的並行計算函數,它會把compute_frac發送到不同的計算引擎上去計算,同時負責把結果返回整理出來,它的第二個參數是一個由計算函數所需要的參數組成的列表,如果參數還有,可以繼續在後面增加列表,map_sync會把參數一組一組分別從每個列表相應位置取出來傳遞給計算函數,之後把計算結果收集成一個列表。我們的計算引擎的個數是64個,我們發的計算函數實際上由參數展開是800個,我們不需要去擔心怎麼分配給每個計算引擎多少任務,怎麼收集計算好的結果,直接等最終結果就好了:)。

在計算過程中這個計算函數由主節點發送給每個計算節點,它必須是自包含的。比如要用到某個模塊要在函數內聲明import,如果某台機器上沒有這個模塊,會造成整體任務失敗,這個千萬要注意測試。

一般要發送複雜的參數給計算函數時,我個人一般使用一個類的對象列表,這樣方便處理而且直觀。還在就是ipyparallel在計算後要收集數據,如果最終要收集的數據是比較大的,那麼可能對帶寬要求比較高,比如上面我用的計算結果是一張張點陣圖,上千張圖的同步還是很大的帶寬開銷的。如果看計算引擎幾乎都沒有在工作但是任務就是計算不完,可以這時就在傳輸和同步數據,這裡推薦一個linux小工具nload可以方便用來查看系統不同網路介面的數據傳輸狀態,可用來調試使用。還有我上面舉得計算用例實際上並不太恰當,對於此類計算任務最好是使用顯卡計算,如OpenCL或是CUDA,會比這個效率高上上千倍之多,等有時間再寫關於顯卡計算的主題。

視頻封面

00:05視頻封面

00:59
推薦閱讀:
相关文章