▎作者:Manu NALEPA

編譯:公眾號翻譯部

本文中介紹的庫只支持LinuxMacOS

安裝文件文末下載

什麼問題困擾著我們?

對於Pandas,當你運行以下代碼行時:

df.apply(func)

只使用一個CPU

即使你的計算機有多個CPU,也只有一個CPU是完全用於計算的。

我們希望用一種簡單的方法來代替CPU的使用:

所有CPU齊上陣

Pandaral·lel如何解決這個問題?

Pandaral·lel的理念是將Pandas計算分配到計算機上所有可用的CPU上,以獲得顯著的速度提升。

▍安裝

$ pip install pandarallel [--user]

▍初始化

# Import
from pandarallel import pandarallel

# Initialization
pandarallel.initialize()

▍用法

對於一個帶有Pandas DataFrame df的簡單用例和一個應用func的函數,只需用parallel_apply替換經典的apply。

# Standard pandas apply
df.apply(func)

# Parallel apply
df.parallel_apply(func)

注意,如果不想並行化計算,仍然可以使用經典的apply方法。

你還可以通過在initialize函數中傳遞progress_bar=True來顯示每個工作CPU的一個進度條。

在一個更加複雜的Pandas DataFrame df用例中,DataFrame column1和column2的兩列,以及一個函數用於func:

# Standard pandas apply
df.groupby(column1).column2.rolling(4).apply(func)

# Parallel apply
df.groupby(column1).column2.rolling(4).parallel_apply(func)

基準

以下是其中四個例子的配置:

  • 操作系統:Linux Ubuntu 16.04
  • 硬體:Intel Core i7 @ 3.40 GHz - 4核

%load_ext autoreload
%autoreload 2
import pandas as pd
import time
from pandarallel import pandarallel
import math
import numpy as np

▍DataFrame.apply

f_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),b=np.random.rand(df_size)))

def func(x):
return math.sin(x.a**2) + math.sin(x.b**2)
%%time
res = df.apply(func, axis=1)
%%time
res_parallel = df.parallel_apply(func, axis=1)
res.equals(res_parallel)

▍DataFrame.applymap

df_size = int(1e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),b=np.random.rand(df_size)))

def func(x):
return math.sin(x**2) - math.cos(x**2)
%%time
res = df.applymap(func)
%%time
res_parallel = df.parallel_applymap(func)
res.equals(res_parallel)

▍DataFrame.groupby.apply

df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
b=np.random.rand(df_size)))

def func(df):
dum = 0
for item in df.b:
dum += math.log10(math.sqrt(math.exp(item**2)))
return dum / len(df.b)
%%time
res = df.groupby("a").apply(func)
%%time
res_parallel = df.groupby("a").parallel_apply(func)
res.equals(res_parallel)

▍DataFrame.groupby.rolling.apply

df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),b=np.random.rand(df_size)))

def func(x):
return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4
%%time
res = df.groupby(a).b.rolling(4).apply(func, raw=False)
%%time
res_parallel = df.groupby(a).b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

標準 vs 並行四核(越低越好)

除了df.groupby.col_name.rolling。如果應用程序的速度只增加了x3.2倍,那麼平均速度就會增加大約x4倍,即使用的計算機上的內核數量。

▍Series.map

df_size = int(5e7)
df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))
def func(x):
return math.log10(math.sqrt(math.exp(x**2)))
%%time
res = df.a.map(func)
%%time
res_parallel = df.a.parallel_map(func)
res.equals(res_parallel)

▍Series.apply

df_size = int(3.5e7)
df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))
def func(x, power, bias=0):
return math.log10(math.sqrt(math.exp(x**power))) + bias
%%time
res = df.a.apply(func, args=(2,), bias=3)
%%time
res_parallel = df.a.parallel_apply(func, args=(2,), bias=3)
res.equals(res_parallel)

▍Series.rolling.apply

df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
b=list(range(df_size))))
def func(x):
return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4
%%time
res = df.b.rolling(4).apply(func, raw=False)
%%time
res_parallel = df.b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

警告

1、進度條是一項實驗性功能。這可能導致相當大的性能損失。parallel_apply不能用於DataFrameGroupy.parallel_apply。

2、使用df一個Pandas DataFrame,series 一個 Pandas Series,func一個函數來應用/ map,args1,args2一些參數&col_name一個列名:

具體如何操作的?

調用parallel_apply時,Pandaral·lel:

  • 實例化Pyarrow Plasma shared memory。

The Plasma In-Memory Object Store?

arrow.apache.org

  • 為每個CPU創建一個子進程,並要求每個CPU處理DataFrame的一個子部分。
  • 在父進程中組合所有結果。

與其他進程間通信媒介相比,使用共享內存的主要優點是不存在序列化/反序列化,這可能會導致CPU開銷過大。

警告

1、並行化是有條件的(實現新進程,通過共享內存發送數據等等),所以只有當並行化的計算量足夠高時,並行化纔有效。對於很少的數據,使用parallezation並不總是有效的。

2、應用的函數不應該是lambda函數。

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
return sin(x**2)

df.parallel_apply(func, axis=1)

問題排除

我有8個CPU,但是 parallel_apply 只加快了大約x4的計算速度。為什麼?

實際上,Pandarallel 只能加快計算速度,直到你計算機有大約核心的數量。最近大多數CPU(如Intel core-i7)都使用超線程。例如,一個4核的超線程CPU將向操作系統顯示8個CPU,但實際上只有4個物理計算單元。

在Ubuntu上,你可以用如下獲得內核的數量:

$ grep - m1 cpu core /proc/cpuinfo.

當我運行如下時:

from pandarallel import pandarallel

我得到:

ModuleNotFoundError: No module named pyarrow._plasma。

為什麼?

目前,Pyarrow Plasma只在Linux和macOS上工作(不支持Windows)

如何獲取代碼

乾貨 | 如何用一行代碼在多CPU環境下高效並行Pandas?

mp.weixin.qq.com圖標

—End—

量化投資與機器學習微信公眾號,是業內垂直於QuantMFECST等專業的主流自媒體。公眾號擁有來自公募、私募、券商、銀行、海外等眾多圈內10W+關注者。每日發布行業前沿研究成果和最新資訊。


推薦閱讀:
相關文章