编程设计模式讲解

251 浏览
24 试用
6 购买
Oct 23, 2025更新

为指定的设计模式提供详细解析,覆盖结构、关键角色、使用场景与优缺点,并结合具体编程语言附上清晰代码示例,帮助开发者快速理解并应用设计模式。

以下从软件设计角度和 Go 语言的实现习惯,系统讲解策略模式(Strategy Pattern),并给出一个中等复杂度的示例。

一、作用

  • 将可替换的算法或行为(策略)从使用它的对象中解耦,使得行为可以在运行时自由切换、扩展,而不需要修改使用方的代码。
  • 在 Go 中,策略模式常用小接口(或函数类型)来表达算法,配合组合而非继承,实现更易测试和更可维护的代码结构。
  • 典型价值:消除庞大的 if/else 或 switch 分支;遵循开闭原则;提高可插拔性与可测试性。

二、结构(关键组件与角色)

  • Strategy(策略接口):定义可替换的算法约定。在 Go 中通常是一个小接口或函数类型。
  • ConcreteStrategy(具体策略):实现不同的算法/行为。
  • Context(上下文):持有某个 Strategy,在其业务流程中调用策略方法。可以支持在运行时更换策略。
  • Client(客户端):选择和配置具体策略,注入给 Context 使用。

三、常见使用场景

  • 重试与回退(Backoff)策略:指数回退、固定间隔、带抖动等。
  • 排序/比较器策略:不同排序方式或自定义比较策略。
  • 路由/负载均衡策略:轮询、最少连接、权重优先等。
  • 缓存淘汰策略:LRU、LFU、FIFO 等。
  • 支付或计费策略:不同国家税率、折扣、费用计算方式。
  • 编码/压缩/加密策略:根据配置或数据类型选择算法。
  • 日志格式化/序列化策略:不同输出格式或字段映射。

四、优缺点

  • 优点
    • 解耦算法与使用方,实现开闭原则(新增策略不改动使用方)。
    • 便于单元测试:可注入“假策略”或“快策略”。
    • 消除重复的条件分支,使代码更清晰、更易维护。
    • 支持运行时切换策略,应对配置或环境变化。
  • 缺点
    • 类型数量增多,增加抽象与间接层,过度使用会导致过度设计。
    • 策略选择逻辑仍需管理(例如通过配置、注册表或工厂)。
    • 共享状态或并发安全需要额外注意(策略最好是不可变或无副作用)。

五、Go 语言实现示例(重试与回退策略) 说明:

  • 定义 BackoffStrategy 接口作为策略。
  • 提供 ConstantBackoff 与 ExponentialBackoff(带最大值与抖动)。
  • Retrier 作为上下文,接受 BackoffStrategy 和可自定义的重试谓词(ShouldRetry)。
  • 演示在运行时更换策略。

代码:

package main

import (
	"context"
	"fmt"
	"math"
	"math/rand"
	"time"
)

// Strategy:回退策略接口
type BackoffStrategy interface {
	NextDelay(attempt int, err error) time.Duration
}

// ConcreteStrategy:固定间隔回退
type ConstantBackoff struct {
	Interval time.Duration
}

func (c ConstantBackoff) NextDelay(attempt int, err error) time.Duration {
	return c.Interval
}

// ConcreteStrategy:指数回退(带最大值与抖动)
type ExponentialBackoff struct {
	Base   time.Duration // 初始基准延迟
	Factor float64       // 每次递增的倍率
	Max    time.Duration // 最大延迟
	Jitter float64       // 抖动比例,范围 [0,1] 较为合理,表示±百分比
}

func (e ExponentialBackoff) NextDelay(attempt int, err error) time.Duration {
	if attempt < 1 {
		attempt = 1
	}
	delay := float64(e.Base) * math.Pow(e.Factor, float64(attempt-1))
	if e.Max > 0 && time.Duration(delay) > e.Max {
		delay = float64(e.Max)
	}
	// 应用抖动:在 [1-Jitter, 1+Jitter] 区间随机缩放
	if e.Jitter > 0 {
		min := 1 - e.Jitter
		max := 1 + e.Jitter
		noise := min + rand.Float64()*(max-min)
		delay *= noise
	}
	if delay < 0 {
		delay = 0
	}
	return time.Duration(delay)
}

// 可选:用于判断是否应该重试的谓词策略
type RetryPredicate func(error) bool

// 一个用于示例的临时错误接口
type TemporaryError interface {
	error
	Temporary() bool
}

type transientError struct{ msg string }

func (e transientError) Error() string   { return e.msg }
func (e transientError) Temporary() bool { return true }

// Context:重试器,封装重试流程并使用 BackoffStrategy
type Retrier struct {
	MaxAttempts int
	Backoff     BackoffStrategy
	ShouldRetry RetryPredicate // 可选,不设置则全部重试
}

func NewRetrier(max int, backoff BackoffStrategy, pred RetryPredicate) *Retrier {
	return &Retrier{
		MaxAttempts: max,
		Backoff:     backoff,
		ShouldRetry: pred,
	}
}

func (r *Retrier) Do(ctx context.Context, op func(context.Context) error) error {
	if r.MaxAttempts < 1 {
		return fmt.Errorf("MaxAttempts must be >= 1")
	}
	var lastErr error
	for attempt := 1; attempt <= r.MaxAttempts; attempt++ {
		err := op(ctx)
		if err == nil {
			return nil
		}
		lastErr = err
		if r.ShouldRetry != nil && !r.ShouldRetry(err) {
			return err
		}
		if attempt == r.MaxAttempts {
			break
		}
		delay := r.Backoff.NextDelay(attempt, err)
		fmt.Printf("attempt=%d failed: %v; backing off for %v\n", attempt, err, delay)

		// 尊重上下文取消或超时
		select {
		case <-time.After(delay):
		case <-ctx.Done():
			return ctx.Err()
		}
	}
	return lastErr
}

// 一个不稳定的服务,用于演示
type flakyService struct {
	failUntil int // 前 failUntil 次调用都会失败
	calls     int
}

func (s *flakyService) call(ctx context.Context) error {
	s.calls++
	fmt.Printf("calling service (attempt %d)\n", s.calls)
	if s.calls <= s.failUntil {
		return transientError{msg: "network glitch"}
	}
	// 模拟成功
	fmt.Println("service succeeded")
	return nil
}

func main() {
	rand.Seed(time.Now().UnixNano())

	// 上下文设置 5 秒超时
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	svc := &flakyService{failUntil: 3}

	// 使用指数回退策略(带抖动)
	exp := ExponentialBackoff{
		Base:   100 * time.Millisecond,
		Factor: 2.0,
		Max:    1 * time.Second,
		Jitter: 0.3, // ±30% 抖动
	}
	shouldRetry := func(err error) bool {
		// 只重试临时错误
		if te, ok := err.(TemporaryError); ok && te.Temporary() {
			return true
		}
		return false
	}
	retrier := NewRetrier(5, exp, shouldRetry)

	fmt.Println("== Run with ExponentialBackoff ==")
	if err := retrier.Do(ctx, svc.call); err != nil {
		fmt.Printf("final error: %v\n", err)
	}

	// 运行时切换策略:改为固定间隔策略
	fmt.Println("\n== Switch to ConstantBackoff ==")
	retrier.Backoff = ConstantBackoff{Interval: 200 * time.Millisecond}
	svc.failUntil = 5
	svc.calls = 0
	if err := retrier.Do(ctx, svc.call); err != nil {
		fmt.Printf("final error: %v\n", err)
	}
}

要点解析(结合示例)

  • BackoffStrategy 是策略接口;ConstantBackoff、ExponentialBackoff 是具体策略。
  • Retrier 是上下文,承载通用的重试流程,与具体回退策略解耦。
  • ShouldRetry 是另一类“策略”,用函数类型表达(Go 中常见替代方案),演示策略的可组合性。
  • 在 main 中展示了运行时切换策略,使行为在不改动重试流程的情况下变化。
  • 策略尽量设计为不可变或无副作用,便于并发安全和复用。

实践建议(Go 习惯)

  • 小接口优先:接口只声明必要方法,降低耦合。
  • 使用函数类型表达简单策略:如 RetryPredicate、过滤器等,减少样板代码。
  • 在构造函数注入策略,少用 setter;需要动态切换时(如示例)确保线程安全。
  • 将策略与 I/O 分离:策略只计算延迟、选择行为,不直接做 Sleep 或网络调用;上下文负责具体的流程与控制。
  • 测试中注入“快策略”(例如固定 0 延迟)与伪造错误,快速覆盖逻辑。

通过上述结构与示例,你可以把任何可替换的算法封装为策略接口或函数类型,做到在 Go 项目中既优雅又高效地管理行为变体。

以下内容围绕观察者模式在 TypeScript 中的作用、结构、常见使用场景、优缺点,并附上一个简单清晰的代码示例。

作用

  • 解决一对多的依赖:当一个对象(主题)状态变化时,自动通知所有依赖它的对象(观察者),实现事件驱动更新。
  • 解耦:主题不知道观察者的具体实现,只依赖统一的接口,使扩展与维护更容易。
  • 在 TypeScript 中,借助泛型与接口,可以为通知的数据类型提供强类型约束,减少运行时错误。

结构(关键组件与角色)

  • Subject(主题/被观察者)
    • 职责:维护观察者列表;提供订阅/取消订阅方法;在状态变化时通知观察者。
    • 关键方法:attach/subscribe、detach/unsubscribe、notify
  • Observer(观察者)
    • 职责:定义当主题发生变化时的响应行为。
    • 关键方法:update(data) 或 update(subject)(Push vs Pull)
  • ConcreteSubject(具体主题)
    • 存储实际状态;在状态变更时调用 notify
  • ConcreteObserver(具体观察者)
    • 实现 update,以响应主题的通知

两种通知模式(重要变体)

  • Push 模式:主题在通知时将变化数据直接推送给观察者(update(data))
  • Pull 模式:主题只告诉观察者“有变化”,观察者再从主题拉取所需数据(update(subject))
  • TypeScript 中常用 Push,因为可以强类型限定数据

常见使用场景

  • UI/MV* 架构:视图观察模型数据变化(如表单状态、列表数据)
  • 事件系统与消息通知:简化组件之间事件传播(Node.js 的 EventEmitter、浏览器的 DOM 事件本质上都是观察者模式)
  • 配置或特征开关变更:变更后通知依赖模块刷新行为
  • 缓存失效:数据源更新时通知缓存层刷新或失效
  • 数据流与实时更新:WebSocket/股票行情/传感器数据
  • 与 RxJS 集成:Observable/Observer 是其核心抽象(增强了组合、操作符、错误处理)

优点

  • 解耦与扩展性好:新增观察者无需修改主题
  • 支持动态订阅/取消订阅
  • 易于广播:一次状态变更可以通知多个观察者
  • 在 TypeScript 中有良好的类型约束与可读性

缺点

  • 隐式依赖:谁订阅了谁不直观,可能增加理解与调试成本
  • 性能风险:通知链可能触发“更新风暴”,需要节流/去抖/批处理
  • 资源泄漏:忘记取消订阅会导致内存泄漏或重复通知
  • 通知顺序与错误处理:默认顺序不保证、单个观察者抛错可能影响其他通知(需隔离错误)
  • 与发布-订阅(Pub/Sub)的区别:观察者直连主题;Pub/Sub 有中介/消息总线。选择错误会导致架构不清晰

TypeScript 简单示例(Push 模式,强类型)

// 定义观察者接口(泛型,强类型)
interface Observer<T> {
  update(data: T): void;
}

// 定义主题接口
interface Subject<T> {
  attach(observer: Observer<T>): () => void; // 返回取消订阅函数,便于使用
  detach(observer: Observer<T>): void;
  notify(data: T): void;
}

// 一个具体主题:温度传感器(推送最新温度)
class TemperatureSensor implements Subject<number> {
  private observers = new Set<Observer<number>>();
  private temperature = 20;

  attach(observer: Observer<number>): () => void {
    this.observers.add(observer);
    // 返回取消订阅函数,避免遗忘 detach
    return () => this.detach(observer);
  }

  detach(observer: Observer<number>): void {
    this.observers.delete(observer);
  }

  setTemperature(value: number): void {
    this.temperature = value;
    this.notify(this.temperature);
  }

  notify(data: number): void {
    // 复制迭代,避免通知过程中集合被修改影响遍历
    for (const obs of [...this.observers]) {
      try {
        obs.update(data);
      } catch (err) {
        console.error('Observer failed:', err);
      }
    }
  }
}

// 具体观察者:仪表盘显示
class DashboardDisplay implements Observer<number> {
  update(temp: number): void {
    console.log(`Dashboard: current temperature is ${temp}°C`);
  }
}

// 具体观察者:风扇控制器
class FanController implements Observer<number> {
  update(temp: number): void {
    const on = temp >= 28;
    console.log(on ? 'FanController: Fan ON' : 'FanController: Fan OFF');
  }
}

// 使用
const sensor = new TemperatureSensor();
const dashboard = new DashboardDisplay();
const fan = new FanController();

const unsubscribeFan = sensor.attach(fan);
sensor.attach(dashboard);

sensor.setTemperature(26); // 通知:风扇关、仪表盘显示
sensor.setTemperature(31); // 通知:风扇开、仪表盘显示

unsubscribeFan();          // 取消订阅风扇
sensor.setTemperature(25); // 仅仪表盘接收通知

实践建议

  • 提供取消订阅机制(如返回函数或统一的 unsubscribe API),避免内存泄漏
  • 对频繁通知的数据做节流/去抖/批处理,或异步队列化通知
  • 在 notify 中隔离单个观察者错误,防止影响其他观察者
  • 对于复杂场景或跨模块通信,评估是否使用事件总线(Pub/Sub/Mediator)或 RxJS
  • 若担心长生命周期引用导致泄漏,可考虑 WeakRef/FinalizationRegistry(需谨慎并了解运行时支持情况)

下面从架构视角系统讲解装饰器模式在 Python 中的作用、结构、使用场景与优缺点,并附上一个较为复杂但清晰的示例代码。

一、作用

  • 在不修改原对象(组件)代码的前提下,动态地为其“按需叠加”功能。通过组合多个装饰器形成调用链,使行为横切关注(如缓存、重试、鉴权、度量、限流、熔断、压缩/加密等)以模块化方式复用。
  • 遵循开放-封闭原则:对扩展开放,对修改封闭。
  • 相比继承,更强调组合:避免“子类爆炸”,并可在运行时灵活重组功能。

二、结构(关键组件与角色)

  • Component(组件接口/抽象类):定义核心功能的统一接口。例如 Fetcher.fetch(url) -> str。
  • ConcreteComponent(具体组件):核心业务实现。例如真实的网络请求或数据加载。
  • Decorator(抽象装饰器):持有一个 Component 引用,并实现同样的接口;默认将调用委托给内部组件。
  • ConcreteDecorator(具体装饰器):在调用前后或异常路径上增加附加行为(缓存、重试、日志、鉴权等),同时保持接口一致。多个装饰器可嵌套形成调用链。

三、常见使用场景

  • I/O 或服务访问增强:重试、超时、熔断、限流、回退、幂等。
  • 性能与稳定性:缓存(LRU/TTL)、批处理、压缩。
  • 可观测性:日志、度量(计数、延迟)、追踪(trace id)。
  • 安全:鉴权、签名、加密/解密。
  • 领域规则:输入校验、幂等键、数据脱敏。

四、优缺点

  • 优点:
    • 高内聚、低耦合,职责分离清晰。
    • 组合灵活,可按需叠加,替代多层继承。
    • 符合开放-封闭原则,扩展容易。
  • 缺点:
    • 对象数量增多,调用栈加深,调试复杂。
    • 叠加顺序敏感(例如重试与熔断的相对位置影响行为)。
    • 可能引入非显式的控制流与状态,学习成本较高。
    • 若需改变接口(方法签名),装饰器难以处理。

五、代码示例(复杂示例:在数据获取组件上叠加缓存、重试、熔断、度量) 说明:

  • 定义 Fetcher 接口与一个不稳定的具体实现(模拟网络/服务偶发失败)。
  • 叠加装饰器:CircuitBreaker(熔断)、Retry(重试)、Cache(TTL+LRU)、Metrics(度量)。
  • 展示顺序对行为的影响:Cache -> Retry -> CircuitBreaker -> ConcreteComponent,最外层再加 Metrics。

代码:

import time
import random
from abc import ABC, abstractmethod
from collections import OrderedDict
from typing import Dict, Optional, Tuple, Any


# =========================
# 领域与错误类型
# =========================
class TransientError(Exception):
    """可重试的瞬时错误(例如网络抖动、服务暂不可用)。"""
    pass

class NotFoundError(Exception):
    """不可重试的错误(例如资源不存在)。"""
    pass

class CircuitOpenError(Exception):
    """熔断器打开,拒绝调用。"""
    pass


# =========================
# 组件接口与具体实现
# =========================
class Fetcher(ABC):
    """Component:数据获取接口"""
    @abstractmethod
    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        pass

class UnstableFetcher(Fetcher):
    """
    ConcreteComponent:不稳定的数据源,模拟真实服务的随机失败与延迟。
    - failure_rate: 随机失败概率(TransientError)
    - latency_range: 延迟范围(秒)
    - data: 简单的URL->字符串数据映射
    """
    def __init__(self, data: Dict[str, str], failure_rate: float = 0.3, latency_range: Tuple[float, float] = (0.05, 0.2)):
        self.data = data
        self.failure_rate = failure_rate
        self.latency_range = latency_range

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        # 模拟随机延迟
        time.sleep(random.uniform(*self.latency_range))
        # 模拟不可重试错误:不存在
        if url not in self.data:
            raise NotFoundError(f"URL not found: {url}")
        # 模拟可重试错误:瞬时失败
        if random.random() < self.failure_rate:
            raise TransientError("Random transient failure.")
        return self.data[url]


# =========================
# 抽象装饰器
# =========================
class FetcherDecorator(Fetcher):
    """Decorator:持有被装饰对象,并实现同样的接口"""
    def __init__(self, inner: Fetcher):
        self._inner = inner

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        return self._inner.fetch(url, headers=headers)


# =========================
# Metrics 度量中心(供多个装饰器共享)
# =========================
class MetricsRegistry:
    def __init__(self):
        self.counters: Dict[str, int] = {}
        self.timers: Dict[str, float] = {}

    def inc(self, key: str, n: int = 1):
        self.counters[key] = self.counters.get(key, 0) + n

    def add_time(self, key: str, ms: float):
        self.timers[key] = self.timers.get(key, 0.0) + ms

    def snapshot(self) -> Dict[str, Any]:
        return {
            "counters": dict(self.counters),
            "timers_ms": dict(self.timers),
        }


# =========================
# ConcreteDecorator:熔断器
# =========================
class CircuitBreakerFetcher(FetcherDecorator):
    """
    熔断器状态机:
      - closed: 正常。累积失败达到阈值 -> open
      - open: 拒绝请求,等待恢复时间 -> half_open
      - half_open: 允许一次试探请求;成功 -> closed;失败 -> open
    """
    def __init__(self, inner: Fetcher, failure_threshold: int = 3, recovery_time: float = 3.0, metrics: Optional[MetricsRegistry] = None):
        super().__init__(inner)
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.metrics = metrics

        self._state = "closed"
        self._consecutive_failures = 0
        self._opened_at = 0.0
        self._half_open_trial_in_progress = False

    def _to_open(self):
        self._state = "open"
        self._opened_at = time.time()
        self._half_open_trial_in_progress = False

    def _maybe_to_half_open(self):
        if self._state == "open" and (time.time() - self._opened_at) >= self.recovery_time:
            self._state = "half_open"
            self._half_open_trial_in_progress = False

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        # open -> half_open if冷却结束
        self._maybe_to_half_open()

        if self._state == "open":
            if self.metrics:
                self.metrics.inc("circuit_open_skips")
            raise CircuitOpenError("Circuit breaker is OPEN")

        if self._state == "half_open":
            # 允许一次试探请求
            if self._half_open_trial_in_progress:
                # 有另一个试探中,拒绝(简单处理并发;单线程可忽略)
                raise CircuitOpenError("Half-open trial in progress")
            self._half_open_trial_in_progress = True

        try:
            result = self._inner.fetch(url, headers=headers)
            # 成功:重置并转为closed(若half_open)
            self._consecutive_failures = 0
            if self._state == "half_open":
                self._state = "closed"
                self._half_open_trial_in_progress = False
            return result
        except Exception as e:
            # 失败:计数并根据阈值转open
            self._consecutive_failures += 1
            if self._state == "half_open":
                # half-open失败立即转open
                self._to_open()
            elif self._consecutive_failures >= self.failure_threshold:
                self._to_open()
            raise


# =========================
# ConcreteDecorator:重试(指数退避+抖动)
# =========================
class RetryFetcher(FetcherDecorator):
    """
    对指定异常进行重试,支持指数退避与随机抖动。
    - retry_exceptions: 需要重试的异常类型集合
    - giveup_exceptions: 直接放弃的异常,不重试(如 CircuitOpenError、NotFoundError)
    """
    def __init__(self, inner: Fetcher, retries: int = 3, backoff_base: float = 0.1,
                 retry_exceptions: Tuple[type, ...] = (TransientError,),
                 giveup_exceptions: Tuple[type, ...] = (CircuitOpenError, NotFoundError),
                 metrics: Optional[MetricsRegistry] = None):
        super().__init__(inner)
        self.retries = retries
        self.backoff_base = backoff_base
        self.retry_exceptions = retry_exceptions
        self.giveup_exceptions = giveup_exceptions
        self.metrics = metrics

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        attempt = 0
        while True:
            try:
                return self._inner.fetch(url, headers=headers)
            except self.giveup_exceptions:
                # 直接放弃,不做重试
                raise
            except self.retry_exceptions as e:
                attempt += 1
                if self.metrics:
                    self.metrics.inc("retries_attempted")
                if attempt > self.retries:
                    raise
                # 指数退避 + 随机抖动
                delay = self.backoff_base * (2 ** (attempt - 1)) * random.uniform(0.8, 1.2)
                time.sleep(delay)


# =========================
# ConcreteDecorator:TTL + LRU 缓存
# =========================
class TTLCache:
    """简单的TTL+LRU缓存实现,用于示例,不考虑并发。"""
    def __init__(self, capacity: int = 128, ttl_seconds: float = 5.0):
        self.capacity = capacity
        self.ttl = ttl_seconds
        self.store: OrderedDict[Any, Tuple[float, Any]] = OrderedDict()

    def _evict_if_needed(self):
        while len(self.store) > self.capacity:
            self.store.popitem(last=False)

    def _purge_expired(self):
        now = time.time()
        expired_keys = [k for k, (exp, _) in self.store.items() if exp < now]
        for k in expired_keys:
            del self.store[k]

    def get(self, key: Any) -> Optional[Any]:
        self._purge_expired()
        if key in self.store:
            exp, val = self.store.pop(key)  # 触发LRU移动
            self.store[key] = (exp, val)
            return val
        return None

    def put(self, key: Any, value: Any):
        exp = time.time() + self.ttl
        if key in self.store:
            del self.store[key]
        self.store[key] = (exp, value)
        self._evict_if_needed()

class CacheFetcher(FetcherDecorator):
    def __init__(self, inner: Fetcher, cache: TTLCache, metrics: Optional[MetricsRegistry] = None):
        super().__init__(inner)
        self.cache = cache
        self.metrics = metrics

    def _key(self, url: str, headers: Optional[Dict[str, str]]) -> Tuple[str, Tuple[Tuple[str, str], ...]]:
        headers_tuple = tuple(sorted((headers or {}).items()))
        return (url, headers_tuple)

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        key = self._key(url, headers)
        val = self.cache.get(key)
        if val is not None:
            if self.metrics:
                self.metrics.inc("cache_hits")
            return val
        if self.metrics:
            self.metrics.inc("cache_misses")
        val = self._inner.fetch(url, headers=headers)
        self.cache.put(key, val)
        return val


# =========================
# ConcreteDecorator:度量
# =========================
class MetricsFetcher(FetcherDecorator):
    def __init__(self, inner: Fetcher, metrics: MetricsRegistry):
        super().__init__(inner)
        self.metrics = metrics

    def fetch(self, url: str, headers: Optional[Dict[str, str]] = None) -> str:
        self.metrics.inc("fetch_calls")
        start = time.time()
        try:
            result = self._inner.fetch(url, headers=headers)
            self.metrics.inc("fetch_success")
            return result
        except Exception:
            self.metrics.inc("fetch_failure")
            raise
        finally:
            elapsed_ms = (time.time() - start) * 1000.0
            self.metrics.add_time("fetch_latency_ms_total", elapsed_ms)


# =========================
# 演示:构建装饰器调用链并运行
# =========================
if __name__ == "__main__":
    # 准备示例数据与度量中心
    data = {
        "item/42": "The answer is 42.",
        "item/7": "Lucky number is 7.",
    }
    metrics = MetricsRegistry()

    # ConcreteComponent
    base = UnstableFetcher(data=data, failure_rate=0.5, latency_range=(0.05, 0.15))

    # 构建调用链(顺序很重要):
    # Cache -> Retry -> CircuitBreaker -> base,最外层再加 Metrics
    cache = TTLCache(capacity=64, ttl_seconds=2.0)
    chain = MetricsFetcher(
        inner=CacheFetcher(
            inner=RetryFetcher(
                inner=CircuitBreakerFetcher(
                    inner=base,
                    failure_threshold=3,   # 连续3次失败打开熔断
                    recovery_time=3.0,     # 3秒后进入half-open
                    metrics=metrics
                ),
                retries=4,               # 最多4次补偿重试
                backoff_base=0.08,       # 初始退避
                retry_exceptions=(TransientError,),
                giveup_exceptions=(CircuitOpenError, NotFoundError),
                metrics=metrics
            ),
            cache=cache,
            metrics=metrics
        ),
        metrics=metrics
    )

    # 连续请求以观察:缓存、重试、熔断与恢复的行为
    url = "item/42"
    for i in range(1, 12):
        try:
            val = chain.fetch(url)
            print(f"[{i:02d}] OK: {val}")
        except CircuitOpenError as e:
            print(f"[{i:02d}] CIRCUIT OPEN: {e}")
        except NotFoundError as e:
            print(f"[{i:02d}] NOT FOUND: {e}")
        except TransientError as e:
            print(f"[{i:02d}] TRANSIENT FAILURE (no more retries): {e}")
        except Exception as e:
            print(f"[{i:02d}] OTHER ERROR: {e}")
        time.sleep(0.7)  # 控制节奏,观察TTL与熔断恢复

    print("\nMetrics snapshot:")
    print(metrics.snapshot())

要点与实践建议:

  • 装饰器顺序影响语义:
    • 尽量让 Cache 处于外层以减少下游开销。
    • Retry 通常放在 CircuitBreaker 外层,避免熔断后仍盲目重试;并配置 giveup_exceptions 包含 CircuitOpenError。
  • 将横切关注的状态与指标统一收敛到 MetricsRegistry,有利于观测与测试。
  • 在生产环境中:
    • 封装可重试异常清单(网络超时、断开连接等),加入指数退避与最大等待。
    • 熔断器需考虑并发安全、滑动窗口统计、半开并发限制。
    • 缓存策略需结合业务一致性要求(TTL、失效、回源策略)。
  • Python 中的函数/方法装饰器(@decorator)与该 OO 装饰器模式思想一致,都是在保持接口不变的前提下包装功能;本示例展示的是面向对象形式,适于需要多层组合与复杂状态的场景。

示例详情

解决的问题

帮助用户深入理解并掌握指定设计模式的用途、结构及实现方法,通过清晰的讲解和实例代码提升技术技能,从而解决实际开发中的架构问题,并在项目实施中充分运用到设计模式的核心思想。

适用用户

软件开发者

从事系统开发的程序员,使用提示词快速掌握设计模式理论,学习代码实现,并将其用于项目开发。

技术团队负责人

想提升团队架构能力的技术领袖,利用本提示词快速生成教学材料并推广设计模式知识,使团队开发更高效。

技术内容创作者

从事技术教程、博客、书籍撰写的内容创作者,通过提示词快速建立高质量架构文章模板,提高创作效率。

特征总结

系统化解析设计模式,让您轻松理解核心原理和用途,快速掌握复杂的架构设计思路。
支持多种编程语言应用,每次可定制语言环境实例,更贴近开发者实际需求。
一键生成清晰简洁的代码示例,降低学习门槛,轻松将理论转化为实践。
深入分析设计模式的组件与角色结构,帮助明确开发中的职责分工与协作逻辑。
涵盖常见应用场景,结合实际案例展示,帮助用户对号入座,找到最佳实践方式。
全面讲解优缺点,自动生成权衡建议,辅助开发者在不同项目中灵活选择实现。
模板化内容生成,高效进行知识迁移,适用于培训、文档撰写与技术知识传播。
针对用户输入的需求和行业背景,生成定制化的设计模式解读,适配场景多样性。
自动优化内容结构,逻辑清晰流畅,帮助用户快速学习,无需额外整理或加工。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥10.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 95 tokens
- 3 个可调节参数
{ 设计模式名称 } { 编程语言 } { 示例复杂度 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59