注 :不少学过点编程语言的人,都会抱怨 Python 语言的程序执行速度慢,因此对学习和使用此语言嗤之以鼻。暂且不论程序的执行速度是否是开发者追求的唯一目标(有意对此进行争论的,请参阅人民邮电出版社出版的《编程的原则》一书),单就提升 Python 计算速度而言,并行计算是一个重要的选项。本文即为这方面的入门资料。
Python 非常适合训练机器学习模型、进行数值计算、以及快速开发验证性的模型等。使用 Python ,所需要的辅助工具和也依赖项都很少。在执行这些任务时,你还希望尽可能多地使用底层硬件,以便获得更高的速度。Python 代码的并行化可以实现这一目标。但是,使用标准的 CPython 则无法充分使用底层硬件的计算能力,因为全局解释器锁(GIL)会阻止多个线程同时运行字节码。
本文汇总了一些用 Python 代码实现并行计算的常见方法,包括:
基于进程的并行计算
使用专用库实现并行计算
IPython 中的并行计算
用第三方库 Ray 实现并行计算
对于每种实现并行计算的技术,本文都列出了一些优点和缺点,并展示了代码示例,以帮助你了解其使用情况。
并行化 Python 代码 有几种常见的方法可以让 Python 代码实现并行运行——可以说成“并行化”。 例如启动多个应用程序实例或启动某个脚本来并行执行程序。 若不需要在并行的进程之间交换数据时,这种方法非常有用。 否则,在进程之间共享数据会在聚合数据时显著降低运算性能。
在同一个进程中启动多个线程可以更有效地在作业之间共享数据。在这种情况下,基于线程的并行化可以将一些工作转移到后台。 然而,CPython 实现的全局解释器锁(GIL)阻止了字节码在多个线程中同时运行。
下面示例中的函数模拟了复杂计算(旨在模拟激活函数)。
1 2 3 4 5 6 7 8 9 import mathimport numpy as npfrom timebudget import timebudgetiterations_count = round(1e7 ) def complex_operation (input_index) : print("Complex operation. Input index: {:2d}" .format(input_index)) [math.exp(i) * math.sinh(i) for i in [1 ] * iterations_count]
为了更直观地计算时间,将函数 complex_operation()
执行多次。将输入的数据划分为几个子集,然后对这些子集并行计算。
下面调用函数 complex_operation()
的代码中,将其多次执行( input
的区间是 0~10
),并使用 timebudget
包来度量执行时间( pip install timebudget
)
1 2 3 4 5 6 7 @timebudget def run_complex_operations(operation, input): for i in input: operation(i) input = range(10) run_complex_operations(complex_operation, input)
执行上述程序,输出结果如下:
1 2 3 4 5 6 7 8 9 10 11 Complex operation. Input index: 0 Complex operation. Input index: 1 Complex operation. Input index: 2 Complex operation. Input index: 3 Complex operation. Input index: 4 Complex operation. Input index: 5 Complex operation. Input index: 6 Complex operation. Input index: 7 Complex operation. Input index: 8 Complex operation. Input index: 9 run_complex_operations took 34.495sec
如你所见,在本文中使用的笔记本电脑上执行这段代码大约花了 34.5 秒。这是没有采用任何并行化技术的执行结果,下面就让我们看看如何用并行化方式优化。
基于进程的并行计算 第一种方法是基于进程的并行。 使用这种方法,可以同时(即“并发”)启动多个进程,这样,它们就可以并发地执行计算。
从 Python 3开始,标准库中已经有了实现多进程的模块 multiprocessing ,用它可以非常便捷地实现多进程进程并发。multiprocessing 模块中的 Pool 类,能自动将输入划分为若干个子集,并将这些子集分配给多个进程。
在前述代码中,使用 Pool 启动 10 个进程,完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import mathimport numpy as npfrom timebudget import timebudgetfrom multiprocessing import Pooliterations_count = round(1e7 ) def complex_operation (input_index) : print("Complex operation. Input index: {:2d}" .format(input_index)) [math.exp(i) * math.sinh(i) for i in [1 ] * iterations_count] @timebudget def run_complex_operations (operation, input, pool) : pool.map(operation, input) processes_count = 10 if __name__ == '__main__' : processes_pool = Pool(processes_count) run_complex_operations(complex_operation, range(10 ), processes_pool)
每个进程同时执行 complex_operations()
函数,因此,从理论上讲,这些代码可以将总的执行时间减少 10 倍。 然而,试试并非如此。以下是译者的执行结果(在翻译本文的时候,译者将所有代码重新执行,在文中显示的是译者的执行结果):
1 2 3 4 5 6 7 8 9 10 11 Complex operation. Input index: 0 Complex operation. Input index: 1 Complex operation. Input index: 2 Complex operation. Input index: 3 Complex operation. Input index: 4 Complex operation. Input index: 5 Complex operation. Input index: 6 Complex operation. Input index: 7 Complex operation. Input index: 8 Complex operation. Input index: 9 run_complex_operations took 10.645sec
与之前的运行结果比较,并没有将执行时间缩短 10 倍,其原因有多方面,首先要考察的是本地计算机中 CPU 的数量,它决定了最大进程数。
1 2 3 >>> import os>>> print('Number of CPUs in the system: {}' .format(os.cpu_count()))Number of CPUs in the system: 8
用 os
模块中的 os.cpu_count()
函数能得到本地计算机中 CPU 的数量。
另外一个导致上述程序没有如预想那样大幅度降低运算时间的原因,跟程序汇总的计算量较小也有关系。这是因为进程之间必须通过进程间通信机制实现通信,这些计算开销,对于比较小的计算任务而言,并行计算通常比 Python 编写的普通程序所执行的串行计算更慢。
总结基于进程的并行计算的优劣:
优点
劣势
应用简单
性能不如 Ray (关于 Ray ,见后续)
摆脱了 GIL 限制
因共享数据而降低性能
对结果的聚合需要手动实现
利用专用库 NumPy 等专用于计算的库可以在许多计算上不受 GIL 的限制,于是就能用进程和其他技术实现并行计算。下面就介绍将 NumPy 用于并行计算的方式。
为了比较使用 Numpy 与否在计算中的差异,需要编写如下函数。
1 2 3 4 5 def complex_operation_numpy (input_index) : print(f"Complex operation (numpy). Input index: {input_index:2 d} " ) data = np.ones(iterations_count) np.exp(data) * np.sinh(data)
函数中使用 NumPy 的 np.exp()
和 np.sinh()
两个函数对输入数据执行计算。 然后,使用进程池执行 complex_operation()
和 complex_operation_numpy()
函数各十次,以比较它们的性能。
1 2 3 4 5 6 7 8 9 processes_count = 10 input = range(10 ) if __name__ == '__main__' : processes_pool = Pool(processes_count) print(‘Without NumPy’) run_complex_operations(complex_operation, input, processes_pool) print(‘NumPy’) run_complex_operations(complex_operation_numpy, input, processes_pool)
以下为执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Without Numpy Complex operation. Input index: 0 Complex operation. Input index: 1 Complex operation. Input index: 2 Complex operation. Input index: 3 Complex operation. Input index: 4 Complex operation. Input index: 5 Complex operation. Input index: 6 Complex operation. Input index: 8 Complex operation. Input index: 7 Complex operation. Input index: 9 run_complex_operations took 11.874sec Numpy Complex operation (numpy). Input index: 1 Complex operation (numpy). Input index: 2 Complex operation (numpy). Input index: 3 Complex operation (numpy). Input index: 4 Complex operation (numpy). Input index: 0 Complex operation (numpy). Input index: 5 Complex operation (numpy). Input index: 6 Complex operation (numpy). Input index: 7 Complex operation (numpy). Input index: 8 Complex operation (numpy). Input index: 9 run_complex_operations took 845.87ms
NumPy 使性能得到了大幅度提升,846ms vs 12s 。 之所 NumPy 能更快,其原因是其中的大多数处理都是向量化的。 向量化实际上使底层代码可以“并行化”,因为该操作可以一次计算多个数组元素,而不是一次遍历一个数组元素。
NumPy 的优点
NumPy 的劣势
简单易用
对结果的聚合需要手动实现
多数 NumPy 计算不受 GIL 限制,但不是全部
有限的数值计算
支持向量化
自定义算法比较麻烦
###使用 IPython 的并行计算包
IPython 是数据科学研究者使用的一个工具,能够实现交互式操作,后来被更名为 Jupyter (参阅《跟老齐学 Python:数据分析》 )。除了这些之外,它还提供了一个用于并行计算的包“IPython Parallel”,安装方法如下:
的官方网站:https://ipyparallel.readthedocs.io/en/latest/ 。
IPython Parallel 有很多优点,其中最令人神往的可能是它允许以交互的方式开发、执行和监视并行应用程序。
一种使用 IPython Parallel 的方式是参考官方文档中的样式,在 Jupyter 中直接调用。
下面演示的是另外一种方式。首先准备好代码,如下所示(文件名称 parallelprocess.py
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import mathimport numpy as npfrom timebudget import timebudgetimport ipyparallel as ippiterations_count = round(1e7 ) def complex_operation (input_index) : print("Complex operation. Input index: {:2d}" .format(input_index)) [math.exp(i) * math.sinh(i) for i in [1 ] * iterations_count] def complex_operation_numpy (input_index) : print("Complex operation (numpy). Input index: {:2d}" .format(input_index)) data = np.ones(iterations_count) np.exp(data) * np.sinh(data) @timebudget def run_complex_operations (operation, input, pool) : pool.map(operation, input) client_ids = ipp.Client() pool = client_ids[:] input = range(10 ) print('Without NumPy' ) run_complex_operations(complex_operation, input, pool) print('NumPy' ) run_complex_operations(complex_operation_numpy, input, pool)
然后打开一个终端,输入如下 ipcluster 命令(是在命令行状态):
1 2 3 4 % ipcluster start -n 10 2021-09-17 13:21:24.805 [IPClusterStart] Starting ipcluster with [daemonize=False] 2021-09-17 13:21:25.898 [IPClusterStart] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'> 2021-09-17 13:21:56.945 [IPClusterStart] Engines appear to have started successfully
出现上述 Engines appear to have started successfully
提示之后,再打开一个终端,执行前述程序文件,如下所示:
1 2 3 4 5 % python parallelprocess.py Without NumPy run_complex_operations took 8.18ms NumPy run_complex_operations took 6.76ms
上述结果显示,对于使用和不使用 NumPy 两种情况下下,均用 IPython Parallel 进行并行处理,运算速度远远快于前述两种条件下的执行结果。
IPython 的有点
IPython 的劣势
支持并行和分布计算
适用于较短的作业内容
能用于 Jupyter notebook
如果要执行过程的输出,需要额外的配置
配置简单
Ray Ray 是一款实现并行和分布计算的第三方库,它具有快速、简单的特点,可以轻松地扩展应用程序,并适用于最先进的机器学习库。 使用 Ray,还是像以往那样运行 Python 代码,只需要做很小的改动。
下面会简要介绍 Ray 是如何轻松地并行化普通的 Python 代码的,但需要注意的是,Ray 及其生态系统也可以轻松地并行化其他库,如 scikit-learn,XGBoost, LightGBM, PyTorch, 等等。
首先要安装 Ray :
然后在前面的 parallelprocess.py
文件基础上进行修改,最后的完整代码如下(并命名为 rayprocess.py
文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import mathimport numpy as npfrom timebudget import timebudgetimport rayiterations_count = round(1e7 ) @ray.remote def complex_operation (input_index) : print(f"Complex operation. Input index: {input_index:2 d} " ) [math.exp(i) * math.sinh(i) for i in [1 ] * iterations_count] @ray.remote def complex_operation_numpy (input_index) : print(f"Complex operation (numpy). Input index: {input_index:2 d} " ) data = np.ones(iterations_count) np.exp(data) * np.sinh(data) @timebudget def run_complex_operations (operation, input) : ray.get([operation.remote(i) for i in input]) ray.init() input = range(10 ) print('Without NumPy' ) run_complex_operations(complex_operation, input) print('NumPy' ) run_complex_operations(complex_operation_numpy, input)
其中 ray.init()
的作用是启动所有相关的 Ray 进程。 默认情况下,Ray 为每个 CPU 核创建一个进程。 如果希望在集群上运行 Ray ,则需要传入一个类似于ray.init(address='insertAddressHere')
的集群地址。
用装饰器 @ray.remote
装饰一个普通的 Python 函数,从而实现创建一个 Ray 任务。这个操作可以在笔记本电脑 CPU 核之间(或 Ray 集群)实现任务调度。
在最后一步中,以 @timebudget
装饰 run_complex_operations()
函数,在 Ray 的调用时间内执行这些函数。
执行此程序后,会得到一个类似于下面的输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 % python rayprocess.py /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/ray/_private/services.py:238: UserWarning: Not all Ray Dashboard dependencies were found. To use the dashboard please install Ray using `pip install ray[default]`. To disable this message, set RAY_DISABLE_IMPORT_WARNING env var to '1'. warnings.warn(warning_message) Without NumPy (pid=9351) Complex operation. Input index: 7 (pid=9352) Complex operation. Input index: 5 (pid=9353) Complex operation. Input index: 6 (pid=9354) Complex operation. Input index: 1 (pid=9356) Complex operation. Input index: 2 (pid=9358) Complex operation. Input index: 0 (pid=9355) Complex operation. Input index: 3 (pid=9357) Complex operation. Input index: 4 (pid=9351) Complex operation. Input index: 8 (pid=9358) Complex operation. Input index: 9 run_complex_operations took 12.731sec NumPy (pid=9351) Complex operation (numpy). Input index: 1 (pid=9352) Complex operation (numpy). Input index: 7 (pid=9353) Complex operation (numpy). Input index: 3 (pid=9354) Complex operation (numpy). Input index: 5 (pid=9356) Complex operation (numpy). Input index: 2 (pid=9358) Complex operation (numpy). Input index: 0 (pid=9355) Complex operation (numpy). Input index: 4 (pid=9357) Complex operation (numpy). Input index: 6 (pid=9351) Complex operation (numpy). Input index: 9 (pid=9354) Complex operation (numpy). Input index: 8 run_complex_operations took 858.52ms
结果中显示了对于当前 Ray 任务而言的、使用和不使用 NumPy 的运行时间。这里似乎没有体现出 Ray 相对于前述其他并行计算方法的优势,这是因为我们在上面演示的属于小量的计算任务,如果遇到更大的业务,Ray 的优势就会非常显著,如下图所示。
在下面的表格中,对 Ray 给予简要总结。
Ray 的优点
Ray 的劣势
支持并行计算和分布计算
针对更大型的业务才会有显著效果
可以在 Jupyter 上使用
能够应用于现有的常见机器学习和神经网络库
整合了多个 Ray 库,如 RLlib(用于强化学习)、Ray Tune(超参数调优)、Ray Serve(可伸缩模式)
结论 有多种方法可以让 Python 程序实现并行化执行,并且本文还介绍了它们的一些优缺点。并行化的代码通常会带来一些开销;并行化的好处在较大的业务中更明显,而不是在本文中的简短计算中。
特别是在处理典型的基于人工智能的任务时,你必须对你的模型进行重复的微调。 在这种情况下,Ray 提供了最好的支持,因为它拥有丰富的生态系统、自动伸缩、容错和远程服务等能力。
参考文献 https://www.anyscale.com/blog/parallelizing-python-code