自学内容网 自学内容网

2024-简单点-观察者模式

先看代码:

# 导入未来模块以支持类型注解
from __future__ import annotations

# 导入抽象基类模块和随机数生成器
from abc import ABC, abstractmethod
from random import randrange

# 导入列表类型注解
from typing import List


# 定义观察者模式中的主体接口(Subject)
class Subject(ABC):

    """
    主体接口声明一组用于管理订阅者的方法。
    """

    @abstractmethod
    def attach(self, observer: Observer) -> None:
        """
        将观察者附加到主体。
        """
        pass

    @abstractmethod
    def detach(self, observer: Observer) -> None:
        """
        从主体中移除观察者。
        """
        pass

    @abstractmethod
    def notify(self) -> None:
        """
        通知所有观察者有关事件的信息。
        """
        pass


# 定义具体主体类(ConcreteSubject)
class ConcreteSubject(Subject):

    """
    具体主体拥有对所有订阅者至关重要的状态,并在状态变化时通知观察者。
    """

    _state: int = None

    """
    为了简化起见,主体的状态(对所有订阅者至关重要)存储在此变量中。
    """

    _observers: List[Observer] = []

    """
    订阅者列表。在实际应用中,订阅者列表可以更全面地存储(按事件类型分类等)。
    """

    def attach(self, observer: Observer) -> None:
        print("主体:已附加一个观察者。")
        self._observers.append(observer)

    def detach(self, observer: Observer) -> None:
        self._observers.remove(observer)

    """
    订阅管理方法。
    """

    def notify(self) -> None:
        """
        触发每个订阅者的更新。
        """
        print("主体:正在通知观察者...")
        for observer in self._observers:
            observer.update(self)

    def some_business_logic(self) -> None:
        """
        通常,订阅逻辑只是主体所能做的工作的一部分。主体通常包含一些重要的业务逻辑,
        当即将发生(或已经发生)重要事情时触发通知方法。
        """
        print("\n主体:我正在做一些重要的事情。")
        self._state = randrange(0, 10)
        print(f"主体:我的状态刚刚变更为:{self._state}")
        self.notify()


# 定义观察者接口(Observer)
class Observer(ABC):

    """
    观察者接口声明由主体使用的update方法。
    """

    @abstractmethod
    def update(self, subject: Subject) -> None:
        """
        接收主体的更新。
        """
        pass


# 定义具体观察者类(ConcreteObserverA 和 ConcreteObserverB)
"""
具体观察者对它们所附属的主体发出的更新做出反应。
"""

class ConcreteObserverA(Observer):
    def update(self, subject: Subject) -> None:
        if subject._state < 3:
            print("具体观察者A:对事件作出反应")

class ConcreteObserverB(Observer):
    def update(self, subject: Subject) -> None:
        if subject._state == 0 or subject._state >= 2:
            print("具体观察者B:对事件作出反应")


# 客户端代码
if __name__ == "__main__":
    subject = ConcreteSubject()

    observer_a = ConcreteObserverA()
    subject.attach(observer_a)

    observer_b = ConcreteObserverB()
    subject.attach(observer_b)

    subject.some_business_logic()
    subject.some_business_logic()

    subject.detach(observer_a)

    subject.some_business_logic()

输出

Subject: Attached an observer.
Subject: Attached an observer.

Subject: I'm doing something important.
Subject: My state has just changed to: 0
Subject: Notifying observers...
ConcreteObserverA: Reacted to the event
ConcreteObserverB: Reacted to the event

Subject: I'm doing something important.
Subject: My state has just changed to: 5
Subject: Notifying observers...
ConcreteObserverB: Reacted to the event

Subject: I'm doing something important.
Subject: My state has just changed to: 0
Subject: Notifying observers...
ConcreteObserverB: Reacted to the event

在这里插入图片描述
参考:
参考


原文地址:https://blog.csdn.net/weixin_43702920/article/details/137678100

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!