热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据需求生成高质量示例代码或程序模板
以下是一个使用 Python 实现的观察者模式的代码片段,展示一种事件驱动架构。此方案包括一个发布者 (Publisher) 和多个订阅者 (Subscribers)。代码包含注释,以便于理解和维护,同时设计为可扩展。
from typing import List, Callable
class Publisher:
"""
Publisher负责维护订阅者列表并向其广播事件。
"""
def __init__(self):
# 保存订阅者的列表,每个订阅者是一个实现特定接口的对象或回调函数
self._subscribers: List[Callable] = []
def subscribe(self, subscriber: Callable):
"""
添加订阅者到通知列表。
:param subscriber: 订阅者,必须是可调用对象
"""
if subscriber not in self._subscribers:
self._subscribers.append(subscriber)
else:
print("Subscriber already subscribed.")
def unsubscribe(self, subscriber: Callable):
"""
从订阅列表中移除订阅者。
:param subscriber: 要移除的订阅者
"""
if subscriber in self._subscribers:
self._subscribers.remove(subscriber)
else:
print("Subscriber not found in the list.")
def notify(self, *args, **kwargs):
"""
通知所有订阅者,广播事件。
:param args: 传递给订阅者的参数
:param kwargs: 传递给订阅者的关键字参数
"""
print("Notifying subscribers...")
for subscriber in self._subscribers:
subscriber(*args, **kwargs)
class Subscriber:
"""
示例订阅者类,可以实现特定的事件处理逻辑。
"""
def __init__(self, name: str):
self.name = name
def update(self, message: str):
"""
处理来自发布者的通知。
:param message: 从发布者接收到的消息
"""
print(f"Subscriber {self.name} received message: {message}")
# 示例:利用观察者模式实现事件驱动架构
if __name__ == "__main__":
# 创建发布者
event_publisher = Publisher()
# 创建订阅者
subscriber1 = Subscriber("Subscriber1")
subscriber2 = Subscriber("Subscriber2")
# 订阅事件
event_publisher.subscribe(subscriber1.update) # 通过方法引用传递 update 函数
event_publisher.subscribe(subscriber2.update)
# 发布事件,订阅者将被通知
event_publisher.notify(message="Event 1 triggered")
# 取消订阅
event_publisher.unsubscribe(subscriber1.update)
# 再次发布事件,只有剩余订阅者会被通知
event_publisher.notify(message="Event 2 triggered")
# 添加额外订阅者
event_publisher.subscribe(lambda message: print(f"Anonymous subscriber received message: {message}"))
# 发布另外的事件
event_publisher.notify(message="Event 3 triggered")
Publisher 类:
subscribe() 方法用于添加订阅者,unsubscribe() 方法移除订阅者,notify() 方法广播事件。Subscriber 类:
update)。通知机制:
notify() 方法时,会依次调用所有订阅者的回调方法(update() 或匿名函数)。事件参数:
notify() 支持灵活的参数传递(*args 和 **kwargs),这样发布者可以动态发送不同的事件数据。扩展功能:
与其他系统集成:
asyncio 或消息队列)结合,为事件处理提供异步能力。下面是一个实现发布-订阅(Publisher-Subscriber)系统的 Python 模板代码,它采用模块化设计和功能扩展性,支持动态注册、取消订阅以及快速集成新功能:
from typing import Callable, Dict, List, Any
class Event:
"""
Represents an event type in the Pub-Sub system.
"""
def __init__(self, name: str):
self.name = name
def __repr__(self):
return f"Event(name={self.name})"
class Subscriber:
"""
Base class for subscribers. Can be extended for specific behaviors.
"""
def __init__(self, name: str):
self.name = name
def notify(self, event: Event, data: Any):
"""
Action performed when the subscriber receives an event notification.
Override this method to define custom behavior.
"""
print(f"{self.name} received event '{event.name}' with data: {data}")
class Publisher:
"""
Publisher class to manage events and subscribers.
"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
def register(self, event_name: str, callback: Callable):
"""
Registers a subscriber (callback function) to an event.
"""
if event_name not in self._subscribers:
self._subscribers[event_name] = []
self._subscribers[event_name].append(callback)
print(f"Registered callback for event '{event_name}'.")
def unregister(self, event_name: str, callback: Callable):
"""
Unregisters a subscriber (callback function) from an event.
"""
if event_name in self._subscribers:
try:
self._subscribers[event_name].remove(callback)
print(f"Unregistered callback for event '{event_name}'.")
if not self._subscribers[event_name]:
del self._subscribers[event_name] # Remove event if no subscribers
except ValueError:
print(f"Callback not found for event '{event_name}'.")
else:
print(f"No such event '{event_name}' to unregister from.")
def notify(self, event_name: str, data: Any):
"""
Notifies all subscribers of a particular event with provided data.
"""
if event_name in self._subscribers:
print(f"Notifying subscribers of event '{event_name}'...")
for callback in self._subscribers[event_name]:
callback(data)
else:
print(f"No subscribers to notify for event '{event_name}'.")
# Example: Testing the Pub-Sub System
if __name__ == "__main__":
# Create a Publisher
pub = Publisher()
# Create Subscribers
alice = Subscriber("Alice")
bob = Subscriber("Bob")
# Define Event Handlers (callbacks)
def alice_handler(data):
alice.notify(Event("TestEvent"), data)
def bob_handler(data):
bob.notify(Event("TestEvent"), data)
# Register Handlers
pub.register("TestEvent", alice_handler)
pub.register("TestEvent", bob_handler)
# Trigger Events
pub.notify("TestEvent", {"message": "Hello, World!"})
# Unregister one subscriber
pub.unregister("TestEvent", alice_handler)
# Trigger Events again
pub.notify("TestEvent", {"message": "Goodbye, World!"})
模块化设计:
Event 类表示事件对象(可以扩展为更复杂的事件数据结构)。Subscriber 类是一个基础类,支持自定义行为(通过重写 notify 方法)。Publisher 类是核心,负责维护事件与回调的关系。动态注册与取消订阅:
register 方法动态添加回调函数。unregister 方法在运行时移除回调。扩展性与适配器模式:
Subscriber 可以被子类化,添加特定的通知行为。性能优化与可维护性:
asyncio 或线程池实现异步通知机制。Publisher 类,将事件通知历史记录存储到文件或数据库中。notify 方法中添加数据过滤规则,让特定数据只发送到符合条件的订阅者。这是一个简单但功能强大的发布订阅系统模板,可以根据业务需求进一步定制和扩展。
好的!以下是一个简单明了的 Python 版观察者模式(Observer Pattern)示例。通过这个例子,初学者可以快速理解这一设计模式的基本概念和实现方式。
观察者模式用于一个对象(被观察者)状态发生变化时,通知所有依赖它的其他对象(观察者)。本例展示了如何实现这一模式。
# 观察者模式示例
# 功能: 当"被观察者"的状态发生变化时, 通知所有"观察者"
class Subject:
"""
被观察者(Subject):负责维护观察者列表,并在自身状态更改时通知观察者。
"""
def __init__(self):
self._observers = [] # 存储观察者的列表
def add_observer(self, observer):
"""
添加观察者到列表中
"""
if observer not in self._observers:
self._observers.append(observer)
def remove_observer(self, observer):
"""
从列表中移除观察者
"""
if observer in self._observers:
self._observers.remove(observer)
def notify_observers(self):
"""
通知所有的观察者,调用它们的更新方法
"""
for observer in self._observers:
observer.update(self)
class Observer:
"""
观察者(Observer):定义一个接口,要求所有观察者实现 update 方法,用以接收通知。
"""
def update(self, subject):
raise NotImplementedError("Subclass must implement abstract method")
# 实现具体的被观察者
class ConcreteSubject(Subject):
"""
具体的被观察者:维护一个特定的数据状态,状态修改时通知观察者。
"""
def __init__(self):
super().__init__()
self._state = None # 被观察的数据状态
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
print(f"Subject state changed to: {self._state}")
self.notify_observers()
# 实现具体的观察者
class ConcreteObserver(Observer):
"""
具体的观察者:在接收到被观察者的通知时,实现自己特定的处理逻辑。
"""
def __init__(self, name):
self._name = name # 观察者的名字
def update(self, subject):
"""
在被观察者状态变化时,被调用。
"""
print(f"Observer {self._name} notified. New state: {subject.state}")
# 示例:如何使用观察者模式
if __name__ == "__main__":
# 创建一个具体的被观察者
subject = ConcreteSubject()
# 创建多个具体的观察者并添加到被观察者中
observer1 = ConcreteObserver("Observer1")
observer2 = ConcreteObserver("Observer2")
subject.add_observer(observer1)
subject.add_observer(observer2)
# 修改被观察者的状态
subject.state = "State1"
subject.state = "State2"
# 移除一个观察者,测试后续通知
subject.remove_observer(observer1)
subject.state = "State3"
Subject类(被观察者基类):
add_observer 和 remove_observer 方法,用于管理观察者列表。notify_observers 方法,负责通知所有观察者。Observer类(观察者基类):
update 方法,强制要求所有具体观察者实现该方法,用以接收状态更新。ConcreteSubject类(具体的被观察者):
notify_observers 通知所有观察者。ConcreteObserver类(具体观察者):
update方法来响应被观察者的状态变化。示例使用:
运行代码后,输出如下:
Subject state changed to: State1
Observer Observer1 notified. New state: State1
Observer Observer2 notified. New state: State1
Subject state changed to: State2
Observer Observer1 notified. New state: State2
Observer Observer2 notified. New state: State2
Subject state changed to: State3
Observer Observer2 notified. New state: State3
支持多种通知的机制:
add、delete等)触发不同的通知。与其他系统的集成:
weakref.WeakSet 来改进观察者集合存储,避免内存泄漏。通过这段代码,初学者不仅能够理解观察者模式,还能学习如何在 Python 中实现这种模式的基本方法。如果有更多问题或扩展需求,欢迎提出!
通过快速生成高质量代码片段与模板,减少重复劳动,加速任务执行,专注于构建核心功能。
根据业务逻辑设计精准代码结构与设计模式,确保代码的可扩展性与维护性更高,助力整体技术架构优化。
自定义生成学习用例,轻松理解不同编程语言的实现方法与最佳实践,提高学习效率。
帮助开发者快速生成高质量示例代码或程序模板,以满足特定需求,同时提供相关技术指导,从而提高开发效率,降低学习成本并加速项目推进。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期