老齐教室

用 Python 实现并行计算

:不少学过点编程语言的人,都会抱怨 Python 语言的程序执行速度慢,因此对学习和使用此语言嗤之以鼻。暂且不论程序的执行速度是否是开发者追求的唯一目标(有意对此进行争论的,请参阅人民邮电出版社出版的《编程的原则》一书),单就提升 Python 计算速度而言,并行计算是一个重要的选项。本文即为这方面的入门资料。


Python 非常适合训练机器学习模型、进行数值计算、以及快速开发验证性的模型等。使用 Python ,所需要的辅助工具和也依赖项都很少。在执行这些任务时,你还希望尽可能多地使用底层硬件,以便获得更高的速度。Python 代码的并行化可以实现这一目标。但是,使用标准的 CPython 则无法充分使用底层硬件的计算能力,因为全局解释器锁(GIL)会阻止多个线程同时运行字节码。

本文汇总了一些用 Python 代码实现并行计算的常见方法,包括:

  • 基于进程的并行计算

  • 使用专用库实现并行计算

  • IPython 中的并行计算

  • 用第三方库 Ray 实现并行计算

对于每种实现并行计算的技术,本文都列出了一些优点和缺点,并展示了代码示例,以帮助你了解其使用情况。

并行化 Python 代码

有几种常见的方法可以让 Python 代码实现并行运行——可以说成“并行化”。 例如启动多个应用程序实例或启动某个脚本来并行执行程序。 若不需要在并行的进程之间交换数据时,这种方法非常有用。 否则,在进程之间共享数据会在聚合数据时显著降低运算性能。

在同一个进程中启动多个线程可以更有效地在作业之间共享数据。在这种情况下,基于线程的并行化可以将一些工作转移到后台。 然而,CPython 实现的全局解释器锁(GIL)阻止了字节码在多个线程中同时运行。

下面示例中的函数模拟了复杂计算(旨在模拟激活函数)。

1
2
3
4
5
6
7
8
9
import math
import numpy as np
from timebudget import timebudget

iterations_count = round(1e7)

def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

为了更直观地计算时间,将函数 complex_operation() 执行多次。将输入的数据划分为几个子集,然后对这些子集并行计算。

下面调用函数 complex_operation() 的代码中,将其多次执行( input 的区间是 0~10 ),并使用 timebudget 包来度量执行时间( pip install timebudget

1
2
3
4
5
6
7
@timebudget
def run_complex_operations(operation, input):
for i in input:
operation(i)

input = range(10)
run_complex_operations(complex_operation, input)

执行上述程序,输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
Complex operation. Input index:  0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 7
Complex operation. Input index: 8
Complex operation. Input index: 9
run_complex_operations took 34.495sec

如你所见,在本文中使用的笔记本电脑上执行这段代码大约花了 34.5 秒。这是没有采用任何并行化技术的执行结果,下面就让我们看看如何用并行化方式优化。

基于进程的并行计算

第一种方法是基于进程的并行。 使用这种方法,可以同时(即“并发”)启动多个进程,这样,它们就可以并发地执行计算。

从 Python 3开始,标准库中已经有了实现多进程的模块 multiprocessing ,用它可以非常便捷地实现多进程进程并发。multiprocessing 模块中的 Pool 类,能自动将输入划分为若干个子集,并将这些子集分配给多个进程。

在前述代码中,使用 Pool 启动 10 个进程,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool

iterations_count = round(1e7)

def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@timebudget
def run_complex_operations(operation, input, pool):
pool.map(operation, input)
# for i in input:
# operation(i)

# input = range(10)
# run_complex_operations(complex_operation, input)

processes_count = 10

if __name__ == '__main__':
processes_pool = Pool(processes_count)
run_complex_operations(complex_operation, range(10), processes_pool)

每个进程同时执行 complex_operations() 函数,因此,从理论上讲,这些代码可以将总的执行时间减少 10 倍。 然而,试试并非如此。以下是译者的执行结果(在翻译本文的时候,译者将所有代码重新执行,在文中显示的是译者的执行结果):

1
2
3
4
5
6
7
8
9
10
11
Complex operation. Input index:  0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 7
Complex operation. Input index: 8
Complex operation. Input index: 9
run_complex_operations took 10.645sec

与之前的运行结果比较,并没有将执行时间缩短 10 倍,其原因有多方面,首先要考察的是本地计算机中 CPU 的数量,它决定了最大进程数。

1
2
3
>>> import os
>>> print('Number of CPUs in the system: {}'.format(os.cpu_count()))
Number of CPUs in the system: 8

os 模块中的 os.cpu_count() 函数能得到本地计算机中 CPU 的数量。

另外一个导致上述程序没有如预想那样大幅度降低运算时间的原因,跟程序汇总的计算量较小也有关系。这是因为进程之间必须通过进程间通信机制实现通信,这些计算开销,对于比较小的计算任务而言,并行计算通常比 Python 编写的普通程序所执行的串行计算更慢。

总结基于进程的并行计算的优劣:

优点 劣势
应用简单 性能不如 Ray (关于 Ray ,见后续)
摆脱了 GIL 限制 因共享数据而降低性能
对结果的聚合需要手动实现

利用专用库

NumPy 等专用于计算的库可以在许多计算上不受 GIL 的限制,于是就能用进程和其他技术实现并行计算。下面就介绍将 NumPy 用于并行计算的方式。

为了比较使用 Numpy 与否在计算中的差异,需要编写如下函数。

1
2
3
4
5
def complex_operation_numpy(input_index):
print(f"Complex operation (numpy). Input index: {input_index:2d}")

data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)

函数中使用 NumPy 的 np.exp()np.sinh() 两个函数对输入数据执行计算。 然后,使用进程池执行 complex_operation()complex_operation_numpy() 函数各十次,以比较它们的性能。

1
2
3
4
5
6
7
8
9
processes_count = 10
input = range(10)

if __name__ == '__main__':
processes_pool = Pool(processes_count)
print(‘Without NumPy’)
run_complex_operations(complex_operation, input, processes_pool)
print(‘NumPy’)
run_complex_operations(complex_operation_numpy, input, processes_pool)

以下为执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Without Numpy
Complex operation. Input index: 0
Complex operation. Input index: 1
Complex operation. Input index: 2
Complex operation. Input index: 3
Complex operation. Input index: 4
Complex operation. Input index: 5
Complex operation. Input index: 6
Complex operation. Input index: 8
Complex operation. Input index: 7
Complex operation. Input index: 9
run_complex_operations took 11.874sec
Numpy
Complex operation (numpy). Input index: 1
Complex operation (numpy). Input index: 2
Complex operation (numpy). Input index: 3
Complex operation (numpy). Input index: 4
Complex operation (numpy). Input index: 0
Complex operation (numpy). Input index: 5
Complex operation (numpy). Input index: 6
Complex operation (numpy). Input index: 7
Complex operation (numpy). Input index: 8
Complex operation (numpy). Input index: 9
run_complex_operations took 845.87ms

NumPy 使性能得到了大幅度提升,846ms vs 12s 。 之所 NumPy 能更快,其原因是其中的大多数处理都是向量化的。 向量化实际上使底层代码可以“并行化”,因为该操作可以一次计算多个数组元素,而不是一次遍历一个数组元素。

NumPy 的优点 NumPy 的劣势
简单易用 对结果的聚合需要手动实现
多数 NumPy 计算不受 GIL 限制,但不是全部 有限的数值计算
支持向量化 自定义算法比较麻烦

###使用 IPython 的并行计算包

IPython 是数据科学研究者使用的一个工具,能够实现交互式操作,后来被更名为 Jupyter (参阅《跟老齐学 Python:数据分析》)。除了这些之外,它还提供了一个用于并行计算的包“IPython Parallel”,安装方法如下:

1
pip install ipyparallel

的官方网站:https://ipyparallel.readthedocs.io/en/latest/

IPython Parallel 有很多优点,其中最令人神往的可能是它允许以交互的方式开发、执行和监视并行应用程序。

一种使用 IPython Parallel 的方式是参考官方文档中的样式,在 Jupyter 中直接调用。

下面演示的是另外一种方式。首先准备好代码,如下所示(文件名称 parallelprocess.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import math
import numpy as np
from timebudget import timebudget
import ipyparallel as ipp

iterations_count = round(1e7)

def complex_operation(input_index):
print("Complex operation. Input index: {:2d}".format(input_index))

[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

def complex_operation_numpy(input_index):
print("Complex operation (numpy). Input index: {:2d}".format(input_index))

data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input, pool):
pool.map(operation, input)

client_ids = ipp.Client()
pool = client_ids[:]

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input, pool)
print('NumPy')
run_complex_operations(complex_operation_numpy, input, pool)

然后打开一个终端,输入如下 ipcluster 命令(是在命令行状态):

1
2
3
4
% ipcluster start -n 10
2021-09-17 13:21:24.805 [IPClusterStart] Starting ipcluster with [daemonize=False]
2021-09-17 13:21:25.898 [IPClusterStart] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
2021-09-17 13:21:56.945 [IPClusterStart] Engines appear to have started successfully

出现上述 Engines appear to have started successfully 提示之后,再打开一个终端,执行前述程序文件,如下所示:

1
2
3
4
5
% python parallelprocess.py
Without NumPy
run_complex_operations took 8.18ms
NumPy
run_complex_operations took 6.76ms

上述结果显示,对于使用和不使用 NumPy 两种情况下下,均用 IPython Parallel 进行并行处理,运算速度远远快于前述两种条件下的执行结果。

IPython 的有点 IPython 的劣势
支持并行和分布计算 适用于较短的作业内容
能用于 Jupyter notebook 如果要执行过程的输出,需要额外的配置
配置简单

Ray

Ray 是一款实现并行和分布计算的第三方库,它具有快速、简单的特点,可以轻松地扩展应用程序,并适用于最先进的机器学习库。 使用 Ray,还是像以往那样运行 Python 代码,只需要做很小的改动。

下面会简要介绍 Ray 是如何轻松地并行化普通的 Python 代码的,但需要注意的是,Ray 及其生态系统也可以轻松地并行化其他库,如 scikit-learn,XGBoost, LightGBM, PyTorch, 等等。

首先要安装 Ray :

1
pip install ray

然后在前面的 parallelprocess.py 文件基础上进行修改,最后的完整代码如下(并命名为 rayprocess.py 文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import math
import numpy as np
from timebudget import timebudget
import ray

iterations_count = round(1e7)

@ray.remote
def complex_operation(input_index):
print(f"Complex operation. Input index: {input_index:2d}")
[math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@ray.remote
def complex_operation_numpy(input_index):
print(f"Complex operation (numpy). Input index: {input_index:2d}")
data = np.ones(iterations_count)
np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input):
ray.get([operation.remote(i) for i in input])

ray.init()

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input)
print('NumPy')
run_complex_operations(complex_operation_numpy, input)

其中 ray.init() 的作用是启动所有相关的 Ray 进程。 默认情况下,Ray 为每个 CPU 核创建一个进程。 如果希望在集群上运行 Ray ,则需要传入一个类似于ray.init(address='insertAddressHere') 的集群地址。

用装饰器 @ray.remote 装饰一个普通的 Python 函数,从而实现创建一个 Ray 任务。这个操作可以在笔记本电脑 CPU 核之间(或 Ray 集群)实现任务调度。

在最后一步中,以 @timebudget 装饰 run_complex_operations() 函数,在 Ray 的调用时间内执行这些函数。

执行此程序后,会得到一个类似于下面的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
% python rayprocess.py 
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/ray/_private/services.py:238: UserWarning: Not all Ray Dashboard dependencies were found. To use the dashboard please install Ray using `pip install ray[default]`. To disable this message, set RAY_DISABLE_IMPORT_WARNING env var to '1'.
warnings.warn(warning_message)
Without NumPy
(pid=9351) Complex operation. Input index: 7
(pid=9352) Complex operation. Input index: 5
(pid=9353) Complex operation. Input index: 6
(pid=9354) Complex operation. Input index: 1
(pid=9356) Complex operation. Input index: 2
(pid=9358) Complex operation. Input index: 0
(pid=9355) Complex operation. Input index: 3
(pid=9357) Complex operation. Input index: 4
(pid=9351) Complex operation. Input index: 8
(pid=9358) Complex operation. Input index: 9
run_complex_operations took 12.731sec
NumPy
(pid=9351) Complex operation (numpy). Input index: 1
(pid=9352) Complex operation (numpy). Input index: 7
(pid=9353) Complex operation (numpy). Input index: 3
(pid=9354) Complex operation (numpy). Input index: 5
(pid=9356) Complex operation (numpy). Input index: 2
(pid=9358) Complex operation (numpy). Input index: 0
(pid=9355) Complex operation (numpy). Input index: 4
(pid=9357) Complex operation (numpy). Input index: 6
(pid=9351) Complex operation (numpy). Input index: 9
(pid=9354) Complex operation (numpy). Input index: 8
run_complex_operations took 858.52ms

结果中显示了对于当前 Ray 任务而言的、使用和不使用 NumPy 的运行时间。这里似乎没有体现出 Ray 相对于前述其他并行计算方法的优势,这是因为我们在上面演示的属于小量的计算任务,如果遇到更大的业务,Ray 的优势就会非常显著,如下图所示。

在下面的表格中,对 Ray 给予简要总结。

Ray 的优点 Ray 的劣势
支持并行计算和分布计算 针对更大型的业务才会有显著效果
可以在 Jupyter 上使用
能够应用于现有的常见机器学习和神经网络库
整合了多个 Ray 库,如 RLlib(用于强化学习)、Ray Tune(超参数调优)、Ray Serve(可伸缩模式)

结论

有多种方法可以让 Python 程序实现并行化执行,并且本文还介绍了它们的一些优缺点。并行化的代码通常会带来一些开销;并行化的好处在较大的业务中更明显,而不是在本文中的简短计算中。

特别是在处理典型的基于人工智能的任务时,你必须对你的模型进行重复的微调。 在这种情况下,Ray 提供了最好的支持,因为它拥有丰富的生态系统、自动伸缩、容错和远程服务等能力。

参考文献

https://www.anyscale.com/blog/parallelizing-python-code

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

关注微信公众号,读文章、听课程,提升技能