▎作者:Manu NALEPA
▎編譯:公眾號翻譯部
本文中介紹的庫只支持Linux和MacOS。
安裝文件文末下載
對於Pandas,當你運行以下代碼行時:
df.apply(func) 只使用一個CPU
df.apply(func)
即使你的計算機有多個CPU,也只有一個CPU是完全用於計算的。
我們希望用一種簡單的方法來代替CPU的使用:
Pandaral·lel的理念是將Pandas計算分配到計算機上所有可用的CPU上,以獲得顯著的速度提升。
▍安裝
$ pip install pandarallel [--user]
▍初始化
# Import from pandarallel import pandarallel
# Initialization pandarallel.initialize()
▍用法
對於一個帶有Pandas DataFrame df的簡單用例和一個應用func的函數,只需用parallel_apply替換經典的apply。
# Standard pandas apply df.apply(func)
# Parallel apply df.parallel_apply(func)
注意,如果不想並行化計算,仍然可以使用經典的apply方法。
你還可以通過在initialize函數中傳遞progress_bar=True來顯示每個工作CPU的一個進度條。
在一個更加複雜的Pandas DataFrame df用例中,DataFrame column1和column2的兩列,以及一個函數用於func:
# Standard pandas apply df.groupby(column1).column2.rolling(4).apply(func)
# Parallel apply df.groupby(column1).column2.rolling(4).parallel_apply(func)
以下是其中四個例子的配置:
%load_ext autoreload %autoreload 2 import pandas as pd import time from pandarallel import pandarallel import math import numpy as np
▍DataFrame.apply
f_size = int(5e6) df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),b=np.random.rand(df_size)))
def func(x): return math.sin(x.a**2) + math.sin(x.b**2) %%time res = df.apply(func, axis=1) %%time res_parallel = df.parallel_apply(func, axis=1) res.equals(res_parallel)
▍DataFrame.applymap
df_size = int(1e7) df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),b=np.random.rand(df_size)))
def func(x): return math.sin(x**2) - math.cos(x**2) %%time res = df.applymap(func) %%time res_parallel = df.parallel_applymap(func) res.equals(res_parallel)
▍DataFrame.groupby.apply
df_size = int(3e7) df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size), b=np.random.rand(df_size)))
def func(df): dum = 0 for item in df.b: dum += math.log10(math.sqrt(math.exp(item**2))) return dum / len(df.b) %%time res = df.groupby("a").apply(func) %%time res_parallel = df.groupby("a").parallel_apply(func) res.equals(res_parallel)
▍DataFrame.groupby.rolling.apply
df_size = int(1e6) df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),b=np.random.rand(df_size)))
def func(x): return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4 %%time res = df.groupby(a).b.rolling(4).apply(func, raw=False) %%time res_parallel = df.groupby(a).b.rolling(4).parallel_apply(func, raw=False) res.equals(res_parallel)
除了df.groupby.col_name.rolling。如果應用程序的速度只增加了x3.2倍,那麼平均速度就會增加大約x4倍,即使用的計算機上的內核數量。
df_size = int(5e7) df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1)) def func(x): return math.log10(math.sqrt(math.exp(x**2))) %%time res = df.a.map(func) %%time res_parallel = df.a.parallel_map(func) res.equals(res_parallel)
▍Series.apply
df_size = int(3.5e7) df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1)) def func(x, power, bias=0): return math.log10(math.sqrt(math.exp(x**power))) + bias %%time res = df.a.apply(func, args=(2,), bias=3) %%time res_parallel = df.a.parallel_apply(func, args=(2,), bias=3) res.equals(res_parallel)
df_size = int(1e6) df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size), b=list(range(df_size)))) def func(x): return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4 %%time res = df.b.rolling(4).apply(func, raw=False) %%time res_parallel = df.b.rolling(4).parallel_apply(func, raw=False) res.equals(res_parallel)
警告
1、進度條是一項實驗性功能。這可能導致相當大的性能損失。parallel_apply不能用於DataFrameGroupy.parallel_apply。
2、使用df一個Pandas DataFrame,series 一個 Pandas Series,func一個函數來應用/ map,args1,args2一些參數&col_name一個列名:
調用parallel_apply時,Pandaral·lel:
The Plasma In-Memory Object Store?arrow.apache.org
與其他進程間通信媒介相比,使用共享內存的主要優點是不存在序列化/反序列化,這可能會導致CPU開銷過大。
1、並行化是有條件的(實現新進程,通過共享內存發送數據等等),所以只有當並行化的計算量足夠高時,並行化纔有效。對於很少的數據,使用parallezation並不總是有效的。
2、應用的函數不應該是lambda函數。
from pandarallel import pandarallel from math import sin
pandarallel.initialize()
# FORBIDDEN df.parallel_apply(lambda x: sin(x**2), axis=1)
# ALLOWED def func(x): return sin(x**2)
df.parallel_apply(func, axis=1)
我有8個CPU,但是 parallel_apply 只加快了大約x4的計算速度。為什麼?
實際上,Pandarallel 只能加快計算速度,直到你計算機有大約核心的數量。最近大多數CPU(如Intel core-i7)都使用超線程。例如,一個4核的超線程CPU將向操作系統顯示8個CPU,但實際上只有4個物理計算單元。
在Ubuntu上,你可以用如下獲得內核的數量:
$ grep - m1 cpu core /proc/cpuinfo.
當我運行如下時:
from pandarallel import pandarallel
我得到:
ModuleNotFoundError: No module named pyarrow._plasma。
為什麼?
目前,Pyarrow Plasma只在Linux和macOS上工作(不支持Windows)
乾貨 | 如何用一行代碼在多CPU環境下高效並行Pandas?mp.weixin.qq.com
—End—
量化投資與機器學習微信公眾號,是業內垂直於Quant、MFE、CST等專業的主流自媒體。公眾號擁有來自公募、私募、券商、銀行、海外等眾多圈內10W+關注者。每日發布行業前沿研究成果和最新資訊。