python并行计算---concurrent.futures模块的使用方法
python并行计算---concurrent.futures模块的使用方法
concurrent.futures提供了两种Executor的子类分别为: ThreadPoolExecutor和ProcessPoolExecutor。前者创建一个可以提交作业的线程池,后者创建一个进程池。
1. 选择多线程ThreadPoolExecutor和多进程ProcessPoolExecutor的经验法则
- 执行重 I/O 操作的任务 (IO 密集型) 选择 ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络、磁盘 I/O 相关的内容。
需要注意的是,线程池会受到python的全局解释器(GIL)的影响,如果使用多线程处理重CPU的任务,耗时很大可能性还不如顺序执行。但在 I/O 密集型任务中,线程大部分时间都在等待 I/O 操作完成(如网络下载、磁盘读写),而不是执行 CPU 计算。Python 的 GIL 在线程等待 I/O 时会释放锁,使得其他线程可以继续运行。这意味着在 I/O 密集型任务中,GIL 的影响并不明显,多线程仍然可以显著提高性能 - 执行重 CPU 的任务 (CPU 密集型) 选择 ProcessPoolExecutor,例如大量消耗 CPU 的数学与逻辑运算、视频编解码等内容。
ProcessPoolExecutor可以避开 GIL 的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回(具体解释见小标3)
2.一个重CPU计算的例子,对比顺序执行、多线程、多进程的耗时情况
import concurrent.futures
import time
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
result_item = count(x)
# return result_item
def count(number) :
for i in range(0, 10000):
i=i+1
# return i * number
if __name__ == "__main__":
# 顺序执行
startTime = time.time()
for item in number_list:
# print(evaluate_item(item))
evaluate_item(item)
print("Sequential execution use {} s".format(time.time() - startTime))
# 线程池执行
startTime1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
# print(future.result())
future.result()
print ("Thread pool execution use {} s".format(time.time() - startTime1))
# 进程池
startTime2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
# print(future.result())
future.result()
print ("Process pool execution {} s".format(time.time() - startTime2))
Sequential execution use 0.004363298416137695 s
Thread pool execution use 0.01605534553527832 s
Process pool execution 0.4858131408691406 s
从计算结果来看,对于这种重cpu的计算,线程池方式不如进程池,甚至于不如进程池
3. 为什么进程池ProcessPoolExecutor只有可序列化的对象可以执行并返回
3.1 什么是可序列化的对象?
在 Python 中,可序列化的对象意味着该对象可以被“序列化”(即转换成字节流),然后在不同进程之间传递。Python 使用 pickle 模块来实现这种序列化,因此所有传递给 ProcessPoolExecutor 的参数和返回值都必须是 pickle 支持的类型(例如基本数据类型、列表、字典、元组等)。
3.2 为什么这会带来限制?
Python 的大多数基础数据类型都是可序列化的,但某些复杂类型(例如打开的文件句柄、网络连接、锁对象等)是不可序列化的。如果你尝试将这些不可序列化的对象传递给 ProcessPoolExecutor 的子进程,会导致 pickle 抛出错误。
- 示例1
以下是一个使用 ProcessPoolExecutor 的示例:- 正常情况:传递可序列化对象
假设你有一个简单的 CPU 密集型任务(如计算平方),可以将整数列表作为输入传递给子进程,因为整数是可序列化的。
- 正常情况:传递可序列化对象
from concurrent.futures import ProcessPoolExecutor
def square(x):
return x * x
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
numbers = [1, 2, 3, 4, 5]
results = list(executor.map(square, numbers))
print(results) # 输出: [1, 4, 9, 16, 25]
在这个示例中:
square 函数被传递给子进程执行,numbers 列表的每个元素 x 被传递给 square。
由于整数是可序列化的对象,ProcessPoolExecutor 能够正常处理并返回结果。
- 示例2
- 不可序列化对象的情况
如果尝试在多进程中传递一个不可序列化的对象(例如打开的文件句柄),则会导致错误:
- 不可序列化对象的情况
from concurrent.futures import ProcessPoolExecutor
def read_file(file):
return file.read()
if __name__ == "__main__":
with open("example.txt", "r") as file:
with ProcessPoolExecutor() as executor:
# 尝试传递文件句柄
results = list(executor.map(read_file, [file])) # 将会抛出序列化错误
在这个示例中:文件句柄 file 被传递给 read_file 函数,但由于文件句柄不可序列化,ProcessPoolExecutor 会抛出错误,无法传递该对象给子进程。
3.3 怎么解决解决不可序列化对象的限制
如果遇到不可序列化对象,通常可以通过以下方式解决:
- 在子进程中重新创建资源:在子进程中打开文件或建立数据库连接,而不是在主进程中传递这些资源。
def read_file(filename):
with open(filename, "r") as file:
return file.read()
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = list(executor.map(read_file, ["example.txt"]))
- 只传递路径或标识:传递文件路径等可序列化的标识,而不是直接传递不可序列化的对象(如文件句柄)
4. submit与map方法
ProcessPoolExecutor和ThreadPoolExecutor类中最重要的 2 个方法如下:
- submit提交任务,并返回 Future 对象代表可调用对象的执行。
- map和 Python 自带的 map 函数功能类似,只不过是以异步的方式把函数依次作用在列表的每个元素上。
如果一次性提交一批任务可以使用map,如果单个任务提交用submit
可以简单参考如下脚本:
其中process_single_line是个单参数函数,另外注意这种多线程对进程的方式,并不仅仅面对单参函数,多参数函数也一样使用,就在submit或者map后的参数新增就行。见小标5
# 多线程ThreadPoolExecutor模式
with ThreadPoolExecutor(max_workers = args.num_worker) as executor:
# # submit提交
# results = [executor.submit(self.process_single_line, line) for line in lines]
# for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
# relative, absoluted = future.result()
# self.outputJsonListRelative.append(relative)
# self.outputJsonListAbsoluted.append(absoluted)
# map提交
results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
for relative, absoluted in results:
self.outputJsonListRelative.append(relative)
self.outputJsonListAbsoluted.append(absoluted)
# # 多线程ThreadPoolExecutor模式
# with ProcessPoolExecutor(max_workers = args.num_worker) as executor:
# # submit提交
# results = [executor.submit(self.process_single_line, line) for line in lines]
# for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
# relative, absoluted = future.result()
# self.outputJsonListRelative.append(relative)
# self.outputJsonListAbsoluted.append(absoluted)
# # # map提交
# # results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
# # for relative, absoluted in results:
# # self.outputJsonListRelative.append(relative)
# # self.outputJsonListAbsoluted.append(absoluted)
map和submit处理多参函数
- submit
from concurrent.futures import ThreadPoolExecutor
# 定义 process 函数
def process(a, b):
return a + b
A = [1, 2, 3, 4, 5]
B = [11, 12, 13, 14, 15]
# 使用 submit 提交任务
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process, a, b) for a, b in zip(A, B)]
# 获取结果
results = [future.result() for future in futures]
print(results) # 输出: [12, 14, 16, 18, 20]
- map
from concurrent.futures import ThreadPoolExecutor
# 定义 process 函数
def process(a, b):
return a + b
A = [1, 2, 3, 4, 5]
B = [11, 12, 13, 14, 15]
# 使用 map 提交任务
with ThreadPoolExecutor() as executor:
results = list(executor.map(process, A, B))
print(results) # 输出: [12, 14, 16, 18, 20]
原文地址:https://blog.csdn.net/qhu1600417010/article/details/143732454
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!