自学内容网 自学内容网

python基础——并发编程

前言

一、并发编程介绍

1.1 串行、并行与并发的区别

1.2 进程、线程、协程的区别

1.2.1 进程是什么? 

1.2.2线程是什么?

1.3 同步和异步介绍

二、线程

2.1 什么是线程?

2.2 线程的特点

2.3 线程的创建方式

2.3.1方法包装的线程创建

2.3.2 类方法线程包装的实现 

2.4 join()方法的使用

 2.5 守护线程

 2.6 全局锁GIL问题

2.7 线程同步和互斥锁

2.7.1 线程同步的概念 

2.7.2锁机制实现银行账户的访问(互斥锁)

2.8 死锁

2.9 信号量(Semaphore)

2.10 事件(EVent)

2.11 生产者和消费者模式

三、进程process

3.1 什么是进程

3.2 进程的优缺点

3.3 进程的创建方式

3.3.1 方法模式

3.3.2继承process类

3.4 Queue实现进程间通信

3.5 Pipe实现进程间通信

3.6 Manager管理器

3.7 进程池(pool)

四、协程

4.1 协程是什么

4.2 协程的核心

4.3asyncio实现协程(重点)

参考文献

前言

本部分开始对并发编程的学习

一、并发编程介绍

1.1 串行、并行与并发的区别

1.串行(serial):一个CPU上,按顺序完成多个任务
2 并行(parallelism):指的是任务数小于等于cpu核数,即任务真的是一起执行的
3.并发(concurrency):一个CPU采用时间片管理方式,交替的处理多个任务。一般是是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)

1.2 进程、线程、协程的区别

一个故事说明进程、线程、协程的关系

1线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
2 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;

3进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
4 调度和切换:线程上下文切换比进程上下文切换要快得多

进程(Process):拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;进程切换需要的资源很最大,效率低
线程(Thread):拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
协程(coroutine):拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度;协程切换任务资源很小,效率高

1.2.1 进程是什么?
 

进程(Process)是一个具有一定独立功能的程序关于某个数据集合的一次运行活动

        现代操作系统比如Mac OS X,Linux,Windows等,都是支持“多任务”的操作系统,叫“多任务”呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用逛淘宝,一边在听音乐,一边在用微信聊天,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

1.2.2线程是什么?

线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

        有些进程还不止同时干一件事,比如微信,它可以同时进行打字聊天,视频聊天,朋友圈等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)

并发编程解决方案

1.3 同步和异步介绍

同步和异步强调的是消息通信机制 (synchronous communication/asynchronous communication)。
同步(synchronous):A调用B,等待B返回结果后,A继续执行
异步(asynchronous ):A调用B,A继续执行,不等待B返回结果;B
有结果了,通知A,A再做处理。

二、线程

2.1 什么是线程?

线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

        有些进程还不止同时干一件事,比如微信,它可以同时进行打字聊天,视频聊天,朋友圈等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)

2.2 线程的特点


1线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位
2 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
3 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
4 拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;
5 调度和切换:线程上下文切换比进程上下文切换要快得多

2.3 线程的创建方式

        Python的标准库提供了两个模块: _thread 和 threading , _thread 是低级模块, threading 是高级模块,对 _thread 进行了封装。绝大多数情况
下,我们只需要使用 threading 这个高级模块。线程的创建可以通过分为两种方式:
1. 方法包装
2. 类包装
线程的执行统一通过 start() 方法

2.3.1方法包装的线程创建

#coding=utf-8
from threading import Thread
from time import sleep

def func1(name):
    print(f"线程{name},start")  #format
    for i in range(3):
        print(f"线程{name},{i}")
        sleep(3)
    print(f"线程{name},end")

if __name__ == '__main__':
    print("主线程,start")
    #创建线程
    t1 = Thread(target=func1,args=("t1",))
    t2 = Thread(target=func1,args=("t2",))
    #启动线程
    t1.start()
    t2.start()
    print("主线程,end")

运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的IO流。比如,如下的输出换行就没有按照预想的显示: 

2.3.2 类方法线程包装的实现 

#coding=utf-8
from threading import Thread
from time import sleep
class MyThread(Thread):
    def __init__(self,name):
        Thread.__init__(self)
        self.name = name
    def run(self):
        print(f"线程{self.name},start")  # format
        for i in range(3):
            print(f"线程{self.name},{i}")
            sleep(3)
        print(f"线程{self.name},end")
if __name__ == '__main__':
    print("主线程,start")
    #创建线程
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    #启动线程
    t1.start()
    t2.start()
    print("主线程,end")

 结果如下所示:

主线程,start
线程t1,start
线程t1,0
线程t2,start主线程,end

线程t2,0
线程t1,1线程t2,1

线程t2,2线程t1,2

线程t1,end线程t2,end

2.4 join()方法的使用

        之前的代码,主线程不会等待子线程结束。如果需要等待子线程结束后,再结束主线程,可使用join()方法。简单来说,以往的多线程中的主程序不会等待子程序的结束,使用join()之后可以让主程序等待子程序结束才会继续执行。代码如下所示:

#encoding=utf-8
from threading import Thread
from time import sleep
def func1(name):
    for i in range(3):
        print(f"thread:{name} :{i}")
        sleep(1)

if __name__ == '__main__':
    print("主线程,start")
    #创建线程
    t1 = Thread(target=func1,args=("t1",))
    t2 = Thread(target=func1,args=("t2",))
    #启动线程
    t1.start()
    t2.start()
    #主线程会等待t1,t2结束后,再往下执行
    t1.join()
    t2.join()
    print("主线程,end")

 执行结果如下所示:

主线程,start
thread:t1 :0
thread:t2 :0
thread:t1 :1thread:t2 :1

thread:t1 :2thread:t2 :2

主线程,end

 2.5 守护线程

        在行为上还有一种叫守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在python中,线程通过 setDaemon(True|False)来设置是否为守护线程。

守护线程的作用:守护线程作用是为其他线程提供便利服务,守护线程最典型的应用就是 GC (垃圾收集器)。

#encoding=utf-8
from threading import Thread
from time import sleep
class MyThread(Thread):
    def __init__(self,name):
        Thread.__init__(self)
        self.name =name
    def run(self):
        for i in range(3):
            print(f"thread:{self.name} :{i}")
            sleep(1)
if __name__ == '__main__':
    print("主线程,start")
    #创建线程(类的方式)
    t1 = MyThread('t1')
    #t1设置为守护线程
    t1.daemon = True
    # t1.setDaemon(True)
    #启动线程
    t1.start()
    print("主线程,end")

 2.6 全局锁GIL问题

在python中,无论你有多少核,在Cpython解释器中永远都是假象。无论你是4核,8核,还是16核.......不好意思,同一时间执行的线程只有一个线程,它就是这个样子的。这个是python的一个开发时候,设计的一个缺陷,所以说python中的线程是“含有水分的线程”。

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置 GIL;
  2. 切换到一个线程去运行;
  3. 运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
  4. 把线程设置为睡眠状态;
  5. 解锁 GIL;
  6. 再次重复以上所有步骤。

在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。

2.7 线程同步和互斥锁

同一个资源,多人想用?排队啊!

现实生活中,我们会遇到“同一个资源,多个人都想使用”的问题。 比如:教室里,只有一台电脑,多个人都想使用。天然的解决办法就是,在电脑旁边,大家排队。前一人使用完后,后一人再使用。再比如,上厕所排队。

2.7.1 线程同步的概念 

        处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象。 这时候,我们就需要用到“线程同步”。 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用。

首先我们模拟一下不使用锁的情况下如何会出现什么问题?

#coding=utf-8
#未使用线程同步和互斥锁的情况
from threading import Thread
from time import sleep

class Account:
    def __init__(self,money,name):
        self.money = money
        self.name = name

#模拟提款的操作
class Drawing(Thread):
    def __init__(self,drawingNum,account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0
    def run(self):
        if self.account.money<self.drawingNum:
            return
        sleep(1) #判断完可以取钱,则阻塞。就是为了测试发生冲突问题
        self.account.money -=self.drawingNum
        self.expenseTotal += self.drawingNum
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == '__main__':
    a1 = Account(100,"gaoqi")
    draw1 = Drawing(80,a1)  #定义一个取钱的线程
    draw2 = Drawing(80,a1)  #定义一个取钱的线程
    draw1.start()
    draw2.start()

结果如下所示:

E:\software\miniconda\envs\python382\python.exe D:/workplace20240513/pythonbase/cn/并发编程/多个线程访问一个资源.py
账户:gaoqi,余额是:20账户:gaoqi,余额是:-60
账户:gaoqi,总共取了:80
账户:gaoqi,总共取了:80

 没有线程同步机制,两个线程同时操作同一个账户对象,竟然从只有100元的账户,轻松取出80*2=160元,账户余额竟然成为了-60。这么大的问题,显然银行不会答应的。

2.7.2锁机制实现银行账户的访问(互斥锁)

互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作。
注意: 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。
 threading 模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。

代码如下所示:

#coding=utf-8
#使用互斥锁的案例
from threading import Thread, Lock
from time import sleep

class Account:
    def __init__(self,money,name):
        self.money = money
        self.name = name

#模拟提款的操作
class Drawing(Thread):
    def __init__(self,drawingNum,account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0
    def run(self):
        lock1.acquire()# 首先需要在这个位置拿到锁
        if self.account.money<self.drawingNum:
            print("账户余额不足!")
            return
        sleep(1) #判断完可以取钱,则阻塞。就是为了测试发生冲突问题
        self.account.money -=self.drawingNum
        self.expenseTotal += self.drawingNum
        lock1.release()# 完成之后需要在这个位置释放锁
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == '__main__':
    a1 = Account(100,"gaoqi")
    lock1 = Lock()
    draw1 = Drawing(80,a1)  #定义一个取钱的线程
    draw2 = Drawing(80,a1)  #定义一个取钱的线程
    draw1.start()
    draw2.start()

结果如下所示:

账户:gaoqi,余额是:20账户余额不足!

账户:gaoqi,总共取了:80

 acquire 和 release 方法之间的代码同一时刻只能有一个线程去操作如果在调用 acquire 方法的时候 其他线程已经使用了这个互斥锁,那么此时 acquire 方法会
堵塞,直到这个互斥锁释放后才能再次上锁

2.8 死锁

死锁是指在两个或多个进程或线程之间发生的一种现象,其中每个进程都在等待其他进程释放资源,但这些资源永远不会被释放,导致这些进程无法继续执行。换句话说,死锁是一种相互等待的局面,进程之间形成了一个循环依赖,导致它们都陷入无法继续执行的状态。

死锁的四个必要条件(形成死锁的条件):

  1. 互斥条件:一个资源在同一时刻只能被一个进程占用。

  2. 占有并等待条件:一个进程已经占有了至少一个资源,并且还在等待其他进程占用的资源。

  3. 不可剥夺条件:已经分配给一个进程的资源不能被强制剥夺,只能由该进程主动释放。

  4. 循环等待条件:存在一个进程链,链中的每个进程都在等待下一个进程占用的资源,形成了循环等待的局面。

举例说明:

假设有两个进程 P1P2,以及两个资源 R1R2。下面是导致死锁的一个场景:

  1. 进程 P1 获得了资源 R1,同时进程 P2 获得了资源 R2

  2. 现在 P1 需要 R2 来继续执行,于是它在等待 P2 释放 R2

  3. 但是,P2 也需要 R1 来继续执行,于是它在等待 P1 释放 R1

  4. 结果就是 P1P2 都在等待对方释放资源,而双方都不会释放各自的资源,导致两者都无法继续执行,形成了死锁。

代码如下所示:

#encoding=utf-8
from threading import Thread, Lock
from time import sleep
def fun1():
    lock1.acquire()
    print('fun1拿到菜刀')
    sleep(2)
    lock2.acquire()
    print('fun1拿到锅')

    lock2.release()
    print('fun1释放锅')
    lock1.release()
    print('fun1释放菜刀')

def fun2():
    lock2.acquire()
    print('fun2拿到锅')
    lock1.acquire()
    print('fun2拿到菜刀')
    lock1.release()
    print('fun2释放菜刀')
    lock2.release()
    print('fun2释放锅')


if __name__ == '__main__':
    lock1 = Lock()
    lock2 = Lock()
    t1 = Thread(target=fun1)
    t2 = Thread(target=fun2)
    t1.start()
    t2.start()

  • 代码中有两个锁:lock1lock2,分别可以理解为两个独立的资源(在注释中比喻为“菜刀”和“锅”)。
  • 使用锁是为了确保线程在访问共享资源时,其他线程无法同时使用该资源。

2.9 信号量(Semaphore)

        互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问?这时候,可以使用信号量。信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过。

1.在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
2.在做爬虫抓取数据时。

#coding=utf-8
#一个房子,依次只允许两个人进来
from threading import Semaphore,Thread
from time import sleep
def home(name,se):
    se.acquire()
    print(f"{name}进入房间")
    sleep(3)
    print(f"****{name}走出房间")
    se.release()

if __name__ == '__main__':
    se = Semaphore(5)   #信号量对象
    for i in range(7):
        t = Thread(target=home,args=(f"tom{i}",se))
        t.start()

2.10 事件(EVent)

事件Event主要用于唤醒正在阻塞等待状态的线程;

原理:Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行

Event() 可以创建一个事件管理标志,该标志(event)默认为False,event对象主要有四种方法可以调用:

#coding:utf-8
#小伙伴a,b,c围着吃火锅,当菜上齐了,请客的主人说:开吃!
#于是小伙伴一起动筷子,这种场景如何实现
import threading
import time
def chihuoguo(name):
    #等待事件,进入等待阻塞状态
    print(f'{name}已经启动')
    print(f'小伙伴{name}已经进入就餐状态!')
    time.sleep(1)
    event.wait()
    # 收到事件后进入运行状态
    print(f'{name}收到通知了.' )
    print(f'小伙伴{name}开始吃咯!')
if __name__ == '__main__':
    event = threading.Event()
    # 创建新线程
    thread1 = threading.Thread(target=chihuoguo, args=("tom", ))
    thread2 = threading.Thread(target=chihuoguo, args=("cherry", ))
    # 开启线程
    thread1.start()
    thread2.start()
    time.sleep(10)
    # 发送事件通知
    print('---->>>主线程通知小伙伴开吃咯!')
    event.set()
tom已经启动
小伙伴tom已经进入就餐状态!
cherry已经启动
小伙伴cherry已经进入就餐状态!
---->>>主线程通知小伙伴开吃咯!
cherry收到通知了.
tom收到通知了.
小伙伴cherry开始吃咯!
小伙伴tom开始吃咯!

2.11 生产者和消费者模式

多线程环境下,我们经常需要多个线程的并发和协作。这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式”。 

生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)。 

消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)

消费者不能直接使用生产者的数据,它们之间有个“缓冲区”。生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据

#coding=utf-8
from queue import Queue
from threading import Thread
from time import sleep
def producer():
    num = 1
    while True:
        if queue.qsize()<5:
            print(f"生产{num}号,大馒头")
            queue.put(f"大馒头:{num}号")
            num +=1
        else:
            print("馒头框满了,等待来人消费啊!")
        sleep(1)
def consumer():
    while True:
        print(f"获取馒头:{queue.get()}")
        sleep(1)
if __name__ == '__main__':
    queue = Queue()
    t1 = Thread(target=producer)
    t2 = Thread(target=consumer)
    t1.start()
    t2.start()

三、进程process

3.1 什么是进程

        进程(Process):拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;进程切换需要的资源很最大,效率低。对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

        进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。

广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。

  1. 进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。
  2. 进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。

进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。

3.2 进程的优缺点

优点:

1.可以使用计算机多核,进行任务的并行执行,提高执行效率
2 运行不受其他进程影响,创建方便
3 空间独立,数据安全

缺点:进程的创建和删除消耗的系统资源较多

3.3 进程的创建方式

3.3.1 方法模式

#coding=utf-8
import os
from multiprocessing import Process
from time import sleep
def fun1(name):
    print(f"当前进程ID:{os.getpid()}")
    print(f"父进程ID:{os.getppid()}")
    print(f"Process:{name},start")
    sleep(3)
    print(f"Process:{name},end")

#windows上多进程实现的bug。如果不加main的限制,就会无限制的创建子进程,从而报错。
if __name__ == '__main__':
    print("当前进程ID:",os.getpid())
    #创建进程
    p1 = Process(target=fun1,args=("p1",))
    p2 = Process(target=fun1, args=("p2",))
    #启动进程
    p1.start()
    p2.start()

3.3.2继承process类

#coding=utf-8
from multiprocessing import Process
from time import sleep
class MyProcess(Process):
    def __init__(self,name):
        Process.__init__(self)
        self.name = name
    def run(self):
        print(f"Process:{self.name},start")
        sleep(3)
        print(f"Process:{self.name},end")
if __name__ == '__main__':
    #创建进程
    p1 = MyProcess("p1")
    p2 = MyProcess("p2")
    p1.start()
    p2.start()

3.4 Queue实现进程间通信

#coding=utf-8
from multiprocessing import Process, Queue
from time import sleep
class MyProcess(Process):
    def __init__(self,name,mq):
        Process.__init__(self)
        self.name = name
        self.mq = mq
    def run(self):
        print(f"Process:{self.name},start")
        print(f"get Data:{self.mq.get()}")
        sleep(2)
        self.mq.put(f"new_data:{self.name}")
        print(f"Process:{self.name},end")
if __name__ == '__main__':
    mq = Queue()
    mq.put("1")
    mq.put("2")
    mq.put("3")
    #进程列表
    p_list = []
    for i in range(3):
        p = MyProcess(f"p{i+1}",mq)
        p_list.append(p)
    for p in p_list:
        p.start()
    for p in p_list:
        p.join()
    print(mq.get())
    print(mq.get())
    print(mq.get())
  1. 主程序创建了一个队列 mq,并向其中放入 "1", "2", "3"
  2. 创建并启动了三个进程 p1, p2, p3。每个进程从队列中取出一个元素,然后休眠 2 秒,最后将一个新的元素放入队列中,元素格式为 new_data:{进程名}
  3. 当三个进程都执行完毕后,主程序从队列中获取并打印三个新元素,即 "new_data:p1", "new_data:p2", "new_data:p3"(顺序可能不同,具体取决于进程的执行顺序)。
  4. 进程间通过队列 mq 实现了数据的读取和传递。

3.5 Pipe实现进程间通信

#coding=utf-8
import multiprocessing
from time import sleep
def func1(conn1):
   sub_info = "Hello!"
   print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
   sleep(1)
   conn1.send(sub_info)
   print(f"来自进程2:{conn1.recv()}")
   sleep(1)
def func2(conn2):
   sub_info = "你好!"
   print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
   sleep(1)
   conn2.send(sub_info)
   print(f"来自进程1:{conn2.recv()}")
   sleep(1)

if __name__ == '__main__':
   #创建管道
   conn1,conn2 = multiprocessing.Pipe()
   # 创建子进程
   process1 = multiprocessing.Process(target=func1,args=(conn1,))
   process2 = multiprocessing.Process(target=func2,args=(conn2,))
   # 启动子进程
   process1.start()
   process2.start()

3.6 Manager管理器

管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享。

#coding=utf-8
from multiprocessing import Process
from multiprocessing import Manager

def func(name,m_list,m_dict):
    m_dict['name'] = '尚学堂'
    m_list.append('你好')

if __name__ == "__main__":
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append('Hello!!')
        m_dict['age']=18
        #两个进程不能直接互相使用对象,需要互相传递
        p1 = Process(target=func,args=('p1',m_list,m_dict))
        p1.start()
        p1.join()   #等p1进程结束,主进程继续执行
        print(f"主进程:{m_list}")
        print(f"主进程:{m_dict}")

 输出结果:

主进程:['Hello!!', '你好']
主进程:{'age': 18, 'name': '尚学堂'}

3.7 进程池(pool)

        Python提供了更好的管理多个进程的方式,就是使用进程池。

        进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。

#coding=utf-8
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
    print(f"当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name
def func2(args):
    print(args)
if __name__ == "__main__":
    pool = Pool(5)
    pool.apply_async(func = func1,args=('sxt1',),callback=func2)
    pool.apply_async(func = func1,args=('sxt2',),callback=func2)
    pool.apply_async(func = func1,args=('sxt3',),callback=func2)
    pool.apply_async(func = func1,args=('sxt4',))
    pool.apply_async(func = func1,args=('sxt5',))
    pool.apply_async(func = func1,args=('sxt6',))
    pool.apply_async(func = func1,args=('sxt7',))
    pool.apply_async(func = func1,args=('sxt8',))
    pool.close()
    pool.join()
#coding=utf-8
from multiprocessing import Pool
import os
from time import sleep

def func1(name):
    print(f"当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name

if __name__ == "__main__":
   with Pool(5) as pool:
        args = pool.map(func1,('sxt1,','sxt2,','sxt3,','sxt4,','sxt5,','sxt6,','sxt7,','sxt8,'))
        for a in args:
            print(a)

四、协程

4.1 协程是什么

协程,Coroutines,也叫作纤程(Fiber)协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。
当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率

协程的核心(控制流的让出和恢复)
1 每个协程有自己的执行栈,可以保存自己的执行现场
2 可以由用户程序按需创建协程(比如:遇到io操作)协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换到其他协程
3协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型 

4.2 协程的核心

1每个协程有自己的执行栈,可以保存自己的执行现场
2 可以由用户程序按需创建协程(比如:遇到io操作)协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换
到其他协程
3协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型

4.3asyncio实现协程(重点)

#coding=utf-8
import asyncio
import time
async def func1():     #async表示方法是异步的
    for i in range(3):
        print(f'北京:第{i}次打印啦')
        await asyncio.sleep(1)
    return "func1执行完毕"
async def func2():
    for k in range(3):
        print(f'上海:第{k}次打印了' )
        await asyncio.sleep(1)
    return "func2执行完毕"
async def main():
   res = await asyncio.gather(func1(), func2())
   #await异步执行func1方法
   #返回值为函数的返回值列表
   print(res)

if __name__ == '__main__':
   start_time = time.time()
   asyncio.run(main())
   end_time = time.time()
   print(f"耗时{end_time-start_time}")   #耗时3秒,效率极大提高

参考文献

操作系统的发展史 - B站-水论文的程序猿 - 博客园

GIL全局解释器锁 - B站-水论文的程序猿 - 博客园


原文地址:https://blog.csdn.net/qq_41221411/article/details/142924919

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!