最近写 automation tool,需要用 Python 处理百万行的 data frame。本着能不折腾就不折腾,并不打算并行化。起初写的一两个工具将就还能在十分钟内跑完,到后面多个表格交叉查询,即使已经尽量向量化了操作,运行时间还是相当感人。只能从并行计算上下功夫。

Python 的几个向量化函数 mappandas.DataFrame.apply, 从原理上讲,非常适合并行化。它们都是对一列数据应用相同的函数,数据之间互相独立,完全可以把一列数据拆分成子列,再并行地计算这些子列,从而达到加速。

野教程的痛点

关于 Python 的并行化,网络上的野教程到处都是。然而对 map, apply, applymap 这类特殊函数的并行化,野教程们讲的并不好,如同鸡肋。我的需求其实很简单,就两点:

  1. 一个极简的,可运行的,insightful 的代码示例
  2. 代码示例可以充分展现 Python 并行化的好处

几乎没有看到一个满足了以上两个痛点的野教程。于是,又一篇(野)教程出炉了。

Python 并行化

并行化一个代码有两大选择:multithread 和 multiprocess。

Multithread,多线程,同一个进程(process)可以开启多个线程执行计算。每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型。 多线程显然是单机跑并行程序最自然的选择。然而,由于 Python 臭名昭著的 GIL(Global Interpreter Lock 全局解释器锁),一个防止多线程并发执行机器码的的机制,多线程计算几乎没有好处(数据 IO 是反例)。

Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突。这篇文章,主要讲多进程的并行,只举并行 map, apply 等函数为例。

代码示例

先看一个简单的序列执行的示例代码。

import numpy as np
import pandas as pd
import time
 
def f(row):
    return sum(row)+a

a = 2   
np.random.seed(0)    
df = pd.DataFrame(np.random.rand(10**6,10))   

t1= time.time()
result_serial = df.apply(f,axis=1)
t2 = time.time()
print("Serial time =",t2-t1)

以上代码的本质是输入一个表格($10^6 \times 10$),对表格的每一行执行函数 f 操作,将结果输出成另一个表格。最要紧的一步,是 result_serial = df.apply(f,axis=1),对表格 df 的每一行执行函数 f 。执行这句代码,在我的电脑上花了 22.2 秒。

如何使用多进程加速代码的运行呢?以下是多进程的代码。

import numpy as np
import pandas as pd
import time
from multiprocessing import Pool
 
def f(row):
    return sum(row)+a
 
def apply_f(df):
    return df.apply(f,axis=1)
 
def init_process(global_vars):
    global a
    a = global_vars
   
if __name__ == '__main__':
    a = 2
    np.random.seed(0)
    df = pd.DataFrame(np.random.rand(10**6,10))
     
    t1= time.time()
    result_serial = df.apply(f,axis=1)
    t2 = time.time()
    print("Serial time =",t2-t1)
    
    df_parts=np.array_split(df,20) 
    with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:        
        result_parts = pool.map(apply_f,df_parts)

    result_parallel = pd.concat(result_parts)
    t3 = time.time()
    print("Parallel time =",t3-t2)

输出

Serial time = 22.184635877609253
Parallel time = 6.046212196350098

可见,并行化在本例提升了至少 3 倍的运行速度。

代码的变化是引入了多进程包 multiprocessing 的 Pool 类,我所知道最简单方便的并行方法。如果没有全局共享的变量 a, 代码还可以更简单。并行化的基本思路是把 dataframe 用 np.array_split 方法切割成多个子 dataframe。再调用 Pool.map 函数并行地执行。注意到顺序执行的 pandas.DataFrame.apply 是如何转化成 Pool.map 然后并行执行的。

Pool 对象是一组并行的进程,由以上代码中的

Pool(processes=8,initializer=init_process,initargs=(a,))

构建。其中,指定产生 8 个进程,每个进程的初始化需运行 init_process 函数,其参数为一个 singleton tuple a. 利用 init_processinitargs,我们可以方便的设定需要在进程间共享的全局变量(这里是 a)。

with 关键词是 context manager,用 context manager 来处理 Pool 非常方便,只需要一个 with 就行了。否则,就需要写很繁琐的处理开关进程的逻辑。

总结

本文举例用最简单的 multiprocessing.Pool 来并行化 map, pandas.DataFrame.apply 等函数。例子显示,并行化确实加速了执行。示例亦包含了常见的多进程之间共享全局变量的情形。更复杂的并行化说明,应当参考 Python 的多进程标准库 multiprocessing.