自学内容网 自学内容网

python并行计算---concurrent.futures模块的使用方法


concurrent.futures提供了两种Executor的子类分别为: ThreadPoolExecutor和ProcessPoolExecutor。前者创建一个可以提交作业的线程池,后者创建一个进程池。

1. 选择多线程ThreadPoolExecutor和多进程ProcessPoolExecutor的经验法则

  • 执行重 I/O 操作的任务 (IO 密集型) 选择 ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络、磁盘 I/O 相关的内容。
    需要注意的是,线程池会受到python的全局解释器(GIL)的影响,如果使用多线程处理重CPU的任务,耗时很大可能性还不如顺序执行。但在 I/O 密集型任务中,线程大部分时间都在等待 I/O 操作完成(如网络下载、磁盘读写),而不是执行 CPU 计算。Python 的 GIL 在线程等待 I/O 时会释放锁,使得其他线程可以继续运行。这意味着在 I/O 密集型任务中,GIL 的影响并不明显,多线程仍然可以显著提高性能
  • 执行重 CPU 的任务 (CPU 密集型) 选择 ProcessPoolExecutor,例如大量消耗 CPU 的数学与逻辑运算、视频编解码等内容。
    ProcessPoolExecutor可以避开 GIL 的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回(具体解释见小标3)

2.一个重CPU计算的例子,对比顺序执行、多线程、多进程的耗时情况

import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):
    result_item = count(x)
    # return result_item

def count(number) :
    for i in range(0, 10000):
        i=i+1
    # return i * number

if __name__ == "__main__":
    # 顺序执行
    startTime = time.time()
    for item in number_list:
            # print(evaluate_item(item))
            evaluate_item(item)
    print("Sequential execution use {} s".format(time.time() - startTime))

    # 线程池执行
    startTime1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(evaluate_item, item) for item in number_list]

            for future in concurrent.futures.as_completed(futures):
                    # print(future.result())
                    future.result()

    print ("Thread pool execution use {} s".format(time.time() - startTime1))

    # 进程池
    startTime2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(evaluate_item, item) for item in number_list]

            for future in concurrent.futures.as_completed(futures):
                    # print(future.result())
                    future.result()

    print ("Process pool execution {} s".format(time.time() - startTime2))
Sequential execution use 0.004363298416137695 s
Thread pool execution use 0.01605534553527832 s
Process pool execution 0.4858131408691406 s

从计算结果来看,对于这种重cpu的计算,线程池方式不如进程池,甚至于不如进程池

3. 为什么进程池ProcessPoolExecutor只有可序列化的对象可以执行并返回

3.1 什么是可序列化的对象?

在 Python 中,可序列化的对象意味着该对象可以被“序列化”(即转换成字节流),然后在不同进程之间传递。Python 使用 pickle 模块来实现这种序列化,因此所有传递给 ProcessPoolExecutor 的参数和返回值都必须是 pickle 支持的类型(例如基本数据类型、列表、字典、元组等)。

3.2 为什么这会带来限制?

Python 的大多数基础数据类型都是可序列化的,但某些复杂类型(例如打开的文件句柄、网络连接、锁对象等)是不可序列化的。如果你尝试将这些不可序列化的对象传递给 ProcessPoolExecutor 的子进程,会导致 pickle 抛出错误。

  • 示例1
    以下是一个使用 ProcessPoolExecutor 的示例:
    • 正常情况:传递可序列化对象
      假设你有一个简单的 CPU 密集型任务(如计算平方),可以将整数列表作为输入传递给子进程,因为整数是可序列化的。
from concurrent.futures import ProcessPoolExecutor

def square(x):
    return x * x

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        numbers = [1, 2, 3, 4, 5]
        results = list(executor.map(square, numbers))
    print(results)  # 输出: [1, 4, 9, 16, 25]

在这个示例中:
square 函数被传递给子进程执行,numbers 列表的每个元素 x 被传递给 square。
由于整数是可序列化的对象,ProcessPoolExecutor 能够正常处理并返回结果。

  • 示例2
    • 不可序列化对象的情况
      如果尝试在多进程中传递一个不可序列化的对象(例如打开的文件句柄),则会导致错误:
from concurrent.futures import ProcessPoolExecutor

def read_file(file):
    return file.read()

if __name__ == "__main__":
    with open("example.txt", "r") as file:
        with ProcessPoolExecutor() as executor:
            # 尝试传递文件句柄
            results = list(executor.map(read_file, [file]))  # 将会抛出序列化错误

在这个示例中:文件句柄 file 被传递给 read_file 函数,但由于文件句柄不可序列化,ProcessPoolExecutor 会抛出错误,无法传递该对象给子进程。

3.3 怎么解决解决不可序列化对象的限制

如果遇到不可序列化对象,通常可以通过以下方式解决:

  1. 在子进程中重新创建资源:在子进程中打开文件或建立数据库连接,而不是在主进程中传递这些资源。
def read_file(filename):
    with open(filename, "r") as file:
        return file.read()

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(read_file, ["example.txt"]))
  1. 只传递路径或标识:传递文件路径等可序列化的标识,而不是直接传递不可序列化的对象(如文件句柄)

4. submit与map方法

ProcessPoolExecutor和ThreadPoolExecutor类中最重要的 2 个方法如下:

  1. submit提交任务,并返回 Future 对象代表可调用对象的执行。
  2. map和 Python 自带的 map 函数功能类似,只不过是以异步的方式把函数依次作用在列表的每个元素上。
    如果一次性提交一批任务可以使用map,如果单个任务提交用submit

可以简单参考如下脚本:
其中process_single_line是个单参数函数,另外注意这种多线程对进程的方式,并不仅仅面对单参函数,多参数函数也一样使用,就在submit或者map后的参数新增就行。见小标5

        # 多线程ThreadPoolExecutor模式
        with ThreadPoolExecutor(max_workers = args.num_worker) as executor:

            # # submit提交
            # results = [executor.submit(self.process_single_line, line) for line in lines]
            # for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
            #     relative, absoluted = future.result()
            #     self.outputJsonListRelative.append(relative)
            #     self.outputJsonListAbsoluted.append(absoluted)   

            # map提交
            results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
            for relative, absoluted in results:
                self.outputJsonListRelative.append(relative)
                self.outputJsonListAbsoluted.append(absoluted)

        # # 多线程ThreadPoolExecutor模式
        # with ProcessPoolExecutor(max_workers = args.num_worker) as executor:

        #     # submit提交
        #     results = [executor.submit(self.process_single_line, line) for line in lines]
        #     for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
        #         relative, absoluted = future.result()
        #         self.outputJsonListRelative.append(relative)
        #         self.outputJsonListAbsoluted.append(absoluted)   

        #     # # map提交
        #     # results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
        #     # for relative, absoluted in results:
        #         # self.outputJsonListRelative.append(relative)
        #         # self.outputJsonListAbsoluted.append(absoluted)

map和submit处理多参函数

  • submit
from concurrent.futures import ThreadPoolExecutor

# 定义 process 函数
def process(a, b):
    return a + b

A = [1, 2, 3, 4, 5]
B = [11, 12, 13, 14, 15]

# 使用 submit 提交任务
with ThreadPoolExecutor() as executor:
    futures = [executor.submit(process, a, b) for a, b in zip(A, B)]
    
    # 获取结果
    results = [future.result() for future in futures]

print(results)  # 输出: [12, 14, 16, 18, 20]

  • map
from concurrent.futures import ThreadPoolExecutor

# 定义 process 函数
def process(a, b):
    return a + b

A = [1, 2, 3, 4, 5]
B = [11, 12, 13, 14, 15]

# 使用 map 提交任务
with ThreadPoolExecutor() as executor:
    results = list(executor.map(process, A, B))

print(results)  # 输出: [12, 14, 16, 18, 20]


原文地址:https://blog.csdn.net/qhu1600417010/article/details/143732454

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!