热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词可针对指定设计模式,详细解析其结构、关键角色、使用场景和优缺点,并结合具体编程语言提供清晰代码示例,帮助开发者快速理解并在组件设计中应用。
以下内容围绕策略模式在 Python 的作用、结构与实践展开,并包含一个面向结算域(Checkout)的进阶 FastAPI 服务端示例,通过策略模式将促销与税费规则解耦,支持渠道与会员等级组合、运行时策略链选择、A/B 测试与灰度发布。
一、策略模式在 Python 的作用
二、结构(关键组件与角色)
三、常见使用场景
四、优缺点
五、Python FastAPI 服务端示例(结算域:促销与税费解耦、策略链组合) 说明:
代码示例(可直接运行的结构化示例,省略持久化与真实配置中心接入,以内存模拟配置下发):
from __future__ import annotations
from typing import List, Dict, Any, Optional, Callable
from decimal import Decimal, ROUND_HALF_UP, getcontext
from dataclasses import dataclass, replace
from enum import Enum
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field, condecimal, conint, validator
# -----------------------------
# Money & enums
# -----------------------------
getcontext().prec = 28 # sufficient precision
def money(v: Decimal) -> Decimal:
return v.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
class Channel(str, Enum):
web = "web"
mobile = "mobile"
partner = "partner"
class MemberLevel(str, Enum):
standard = "standard"
silver = "silver"
gold = "gold"
# -----------------------------
# Pydantic models (request/response)
# -----------------------------
class LineItem(BaseModel):
sku: str = Field(..., min_length=1)
qty: conint(gt=0) = 1
unit_price: condecimal(gt=0, max_digits=12, decimal_places=2)
class CheckoutRequest(BaseModel):
site: str = Field(..., min_length=1)
channel: Channel
member_level: MemberLevel
user_tags: List[str] = []
currency: str = "CNY"
items: List[LineItem]
base_shipping_fee: condecimal(ge=0, max_digits=12, decimal_places=2) = Decimal("0.00")
ab_bucket: Optional[str] = None
@validator("items")
def non_empty_items(cls, v):
if not v:
raise ValueError("items cannot be empty")
return v
class StrategyApplication(BaseModel):
key: str
description: str
delta: condecimal(max_digits=12, decimal_places=2)
meta: Dict[str, Any] = {}
class CheckoutResponse(BaseModel):
currency: str
subtotal: condecimal(max_digits=12, decimal_places=2)
discount_total: condecimal(max_digits=12, decimal_places=2)
shipping_fee: condecimal(max_digits=12, decimal_places=2)
tax_total: condecimal(max_digits=12, decimal_places=2)
final_total: condecimal(max_digits=12, decimal_places=2)
applied: List[StrategyApplication]
# -----------------------------
# Runtime PricingContext
# -----------------------------
@dataclass(frozen=True)
class PricingContext:
site: str
channel: Channel
member_level: MemberLevel
user_tags: List[str]
currency: str
subtotal: Decimal
total_qty: int
discount_total: Decimal
shipping_fee: Decimal
tax_total: Decimal
final_total: Decimal
applied: List[StrategyApplication]
def with_updates(
self,
*,
discount_delta: Decimal = Decimal("0.00"),
shipping_delta: Decimal = Decimal("0.00"),
tax_delta: Decimal = Decimal("0.00"),
application: Optional[StrategyApplication] = None,
) -> "PricingContext":
new_discount = money(self.discount_total + discount_delta)
new_shipping = money(self.shipping_fee + shipping_delta)
new_tax = money(self.tax_total + tax_delta)
new_final = money(self.subtotal - new_discount + new_shipping + new_tax)
new_applied = list(self.applied)
if application:
new_applied.append(application)
return replace(
self,
discount_total=new_discount,
shipping_fee=new_shipping,
tax_total=new_tax,
final_total=new_final,
applied=new_applied,
)
# -----------------------------
# Strategy interface & registry
# -----------------------------
class PriceStrategy:
key: str
description: str
order: int # lower first; promotions < shipping < tax
def is_applicable(self, ctx: PricingContext) -> bool:
return True
def apply(self, ctx: PricingContext) -> PricingContext:
raise NotImplementedError
# Strategy spec from ConfigService
class StrategySpec(BaseModel):
type: str # e.g., "percentage_discount"
key: str
params: Dict[str, Any] = {}
order: int = 100 # default
StrategyFactory = Callable[[StrategySpec], PriceStrategy]
class StrategyRegistry:
def __init__(self):
self._builders: Dict[str, StrategyFactory] = {}
def register(self, type_name: str, builder: StrategyFactory):
self._builders[type_name] = builder
def build(self, spec: StrategySpec) -> PriceStrategy:
if spec.type not in self._builders:
raise KeyError(f"Unknown strategy type: {spec.type}")
strategy = self._builders[spec.type](spec)
# allow spec to override default order
strategy.order = spec.order
return strategy
# -----------------------------
# Concrete strategies
# -----------------------------
class PercentageDiscountStrategy(PriceStrategy):
def __init__(self, key: str, percent: Decimal, allowed_channels: List[Channel], allowed_levels: List[MemberLevel]):
self.key = key
self.description = f"{percent}% off"
self.percent = Decimal(percent)
self.allowed_channels = set(allowed_channels)
self.allowed_levels = set(allowed_levels)
self.order = 10
def is_applicable(self, ctx: PricingContext) -> bool:
return ctx.channel in self.allowed_channels and ctx.member_level in self.allowed_levels
def apply(self, ctx: PricingContext) -> PricingContext:
discount = money(ctx.subtotal * self.percent / Decimal("100"))
if discount <= Decimal("0.00"):
return ctx
return ctx.with_updates(
discount_delta=discount,
application=StrategyApplication(
key=self.key, description=self.description, delta=-discount, meta={"percent": str(self.percent)}
),
)
class FullReductionStrategy(PriceStrategy):
def __init__(self, key: str, rules: List[Dict[str, Any]], allowed_channels: List[Channel]):
"""
rules: [{threshold: 300, reduce: 40}, {threshold: 600, reduce: 90}]
pick the best rule for subtotal
"""
self.key = key
self.description = "Full reduction"
self.rules = sorted(rules, key=lambda r: Decimal(str(r["threshold"])))
self.allowed_channels = set(allowed_channels)
self.order = 20
def is_applicable(self, ctx: PricingContext) -> bool:
return ctx.channel in self.allowed_channels
def apply(self, ctx: PricingContext) -> PricingContext:
applicable = [r for r in self.rules if ctx.subtotal >= Decimal(str(r["threshold"]))]
if not applicable:
return ctx
best = max(applicable, key=lambda r: Decimal(str(r["reduce"])))
reduce_amt = money(Decimal(str(best["reduce"])))
return ctx.with_updates(
discount_delta=reduce_amt,
application=StrategyApplication(
key=self.key, description=f"满减: 满{best['threshold']}减{best['reduce']}", delta=-reduce_amt, meta=best
),
)
class TieredPricingStrategy(PriceStrategy):
def __init__(self, key: str, tiers: List[Dict[str, Any]], allowed_levels: List[MemberLevel]):
"""
tiers: [{min_qty: 3, percent: 5}, {min_qty: 5, percent: 10}]
applies percent discount based on total_qty
"""
self.key = key
self.description = "Tiered pricing by qty"
self.tiers = sorted(tiers, key=lambda t: int(t["min_qty"]))
self.allowed_levels = set(allowed_levels)
self.order = 15
def is_applicable(self, ctx: PricingContext) -> bool:
return ctx.member_level in self.allowed_levels
def apply(self, ctx: PricingContext) -> PricingContext:
applicable = [t for t in self.tiers if ctx.total_qty >= int(t["min_qty"])]
if not applicable:
return ctx
best = max(applicable, key=lambda t: Decimal(str(t["percent"])))
percent = Decimal(str(best["percent"]))
discount = money(ctx.subtotal * percent / Decimal("100"))
if discount <= Decimal("0.00"):
return ctx
return ctx.with_updates(
discount_delta=discount,
application=StrategyApplication(
key=self.key, description=f"阶梯价: 满{best['min_qty']}件打{percent}%", delta=-discount, meta=best
),
)
class FreeShippingStrategy(PriceStrategy):
def __init__(self, key: str, threshold: Optional[Decimal], allowed_channels: List[Channel], allowed_levels: List[MemberLevel]):
self.key = key
self.description = "Free shipping"
self.threshold = Decimal(str(threshold)) if threshold is not None else None
self.allowed_channels = set(allowed_channels)
self.allowed_levels = set(allowed_levels)
self.order = 30 # after promotions but before tax
def is_applicable(self, ctx: PricingContext) -> bool:
if ctx.channel not in self.allowed_channels or ctx.member_level not in self.allowed_levels:
return False
if self.threshold is None:
return True
# Apply free shipping if subtotal after discounts >= threshold
subtotal_after_discount = money(ctx.subtotal - ctx.discount_total)
return subtotal_after_discount >= self.threshold
def apply(self, ctx: PricingContext) -> PricingContext:
if ctx.shipping_fee <= Decimal("0.00"):
return ctx
waive = ctx.shipping_fee
return ctx.with_updates(
shipping_delta=-waive,
application=StrategyApplication(
key=self.key, description="包邮", delta=-waive, meta={"threshold": str(self.threshold) if self.threshold else None}
),
)
class VATTaxStrategy(PriceStrategy):
def __init__(self, key: str, vat_percent: Decimal):
self.key = key
self.description = "VAT tax"
self.vat_percent = Decimal(vat_percent)
self.order = 40 # taxes last
def is_applicable(self, ctx: PricingContext) -> bool:
return True
def apply(self, ctx: PricingContext) -> PricingContext:
taxable_base = money(ctx.subtotal - ctx.discount_total + ctx.shipping_fee)
tax = money(taxable_base * self.vat_percent / Decimal("100"))
if tax <= Decimal("0.00"):
return ctx
return ctx.with_updates(
tax_delta=tax,
application=StrategyApplication(
key=self.key, description=f"VAT {self.vat_percent}%", delta=tax, meta={"base": str(taxable_base)}
),
)
# -----------------------------
# Config service (simulated)
# -----------------------------
class ConfigService:
"""
Simulates strategy specs from config center.
In real system, this could read from a KV store / feature flag service.
"""
def get_strategy_specs(self, site: str, channel: Channel, member_level: MemberLevel, user_tags: List[str], ab_bucket: Optional[str]) -> List[StrategySpec]:
# Simplified rules for demo; would be data-driven in production
specs: List[StrategySpec] = []
# Example A/B buckets
bucket = ab_bucket or "A"
# Percentage discount: mobile + gold users get 10% in bucket A, 12% in bucket B
percent = "12" if bucket == "B" else "10"
specs.append(StrategySpec(
type="percentage_discount",
key=f"percent-{percent}-mobile-gold",
params={"percent": percent, "allowed_channels": ["mobile"], "allowed_levels": ["gold"]},
order=10,
))
# Tiered pricing for silver/gold
specs.append(StrategySpec(
type="tiered_pricing",
key="tier-qty",
params={"tiers": [{"min_qty": 3, "percent": 5}, {"min_qty": 5, "percent": 10}], "allowed_levels": ["silver", "gold"]},
order=15,
))
# Full reduction on web + mobile
specs.append(StrategySpec(
type="full_reduction",
key="full-300-40",
params={"rules": [{"threshold": 300, "reduce": 40}, {"threshold": 600, "reduce": 90}], "allowed_channels": ["web", "mobile"]},
order=20,
))
# Free shipping over 200 for web/mobile, gold only
specs.append(StrategySpec(
type="free_shipping",
key="free-ship-200",
params={"threshold": "200", "allowed_channels": ["web", "mobile"], "allowed_levels": ["gold"]},
order=30,
))
# VAT tax 7%
specs.append(StrategySpec(
type="vat_tax",
key="vat-7",
params={"vat_percent": "7"},
order=40,
))
return specs
# -----------------------------
# Registry DI setup
# -----------------------------
def create_registry() -> StrategyRegistry:
reg = StrategyRegistry()
reg.register("percentage_discount", lambda spec: PercentageDiscountStrategy(
key=spec.key,
percent=Decimal(str(spec.params["percent"])),
allowed_channels=[Channel(c) for c in spec.params.get("allowed_channels", [])],
allowed_levels=[MemberLevel(l) for l in spec.params.get("allowed_levels", [])],
))
reg.register("full_reduction", lambda spec: FullReductionStrategy(
key=spec.key,
rules=spec.params["rules"],
allowed_channels=[Channel(c) for c in spec.params.get("allowed_channels", [])],
))
reg.register("tiered_pricing", lambda spec: TieredPricingStrategy(
key=spec.key,
tiers=spec.params["tiers"],
allowed_levels=[MemberLevel(l) for l in spec.params.get("allowed_levels", [])],
))
reg.register("free_shipping", lambda spec: FreeShippingStrategy(
key=spec.key,
threshold=Decimal(str(spec.params.get("threshold"))) if "threshold" in spec.params else None,
allowed_channels=[Channel(c) for c in spec.params.get("allowed_channels", [])],
allowed_levels=[MemberLevel(l) for l in spec.params.get("allowed_levels", [])],
))
reg.register("vat_tax", lambda spec: VATTaxStrategy(
key=spec.key,
vat_percent=Decimal(str(spec.params["vat_percent"])),
))
return reg
# DI providers for FastAPI
def get_config_service() -> ConfigService:
return ConfigService()
def get_strategy_registry() -> StrategyRegistry:
return create_registry()
# -----------------------------
# Pipeline executor (test-friendly)
# -----------------------------
def execute_pricing_pipeline(
ctx: PricingContext,
specs: List[StrategySpec],
registry: StrategyRegistry,
) -> PricingContext:
strategies: List[PriceStrategy] = []
for spec in specs:
st = registry.build(spec)
strategies.append(st)
# sort by order; if same order, keep config order
strategies.sort(key=lambda s: s.order)
current = ctx
for s in strategies:
if s.is_applicable(current):
current = s.apply(current)
return current
# -----------------------------
# FastAPI app & handler
# -----------------------------
app = FastAPI(title="Checkout Pricing Service")
@app.post("/checkout/price", response_model=CheckoutResponse)
def checkout_price(
req: CheckoutRequest,
cfg: ConfigService = Depends(get_config_service),
reg: StrategyRegistry = Depends(get_strategy_registry),
):
subtotal = sum((Decimal(item.unit_price) * item.qty for item in req.items), start=Decimal("0.00"))
subtotal = money(subtotal)
total_qty = sum(item.qty for item in req.items)
initial = PricingContext(
site=req.site,
channel=req.channel,
member_level=req.member_level,
user_tags=req.user_tags,
currency=req.currency,
subtotal=subtotal,
total_qty=total_qty,
discount_total=Decimal("0.00"),
shipping_fee=money(Decimal(req.base_shipping_fee)),
tax_total=Decimal("0.00"),
final_total=money(subtotal + Decimal(req.base_shipping_fee)),
applied=[],
)
try:
specs = cfg.get_strategy_specs(req.site, req.channel, req.member_level, req.user_tags, req.ab_bucket)
final_ctx = execute_pricing_pipeline(initial, specs, reg)
except KeyError as e:
raise HTTPException(status_code=400, detail=str(e))
return CheckoutResponse(
currency=final_ctx.currency,
subtotal=money(final_ctx.subtotal),
discount_total=money(final_ctx.discount_total),
shipping_fee=money(final_ctx.shipping_fee),
tax_total=money(final_ctx.tax_total),
final_total=money(final_ctx.final_total),
applied=final_ctx.applied,
)
# -----------------------------
# Unit test hints (not executed here)
# -----------------------------
"""
Example test for strategies:
def test_full_reduction():
ctx = PricingContext(
site="cn", channel=Channel.web, member_level=MemberLevel.standard, user_tags=[],
currency="CNY", subtotal=Decimal("650.00"), total_qty=2,
discount_total=Decimal("0.00"), shipping_fee=Decimal("20.00"),
tax_total=Decimal("0.00"), final_total=Decimal("670.00"), applied=[]
)
spec = StrategySpec(
type="full_reduction", key="full-300-40",
params={"rules":[{"threshold":300,"reduce":40},{"threshold":600,"reduce":90}], "allowed_channels":["web"]},
order=20
)
reg = create_registry()
st = reg.build(spec)
assert st.is_applicable(ctx)
new_ctx = st.apply(ctx)
assert new_ctx.discount_total == Decimal("90.00")
assert any(a.key=="full-300-40" for a in new_ctx.applied)
# Integration test:
def test_pipeline_order():
reg = create_registry()
cfg = ConfigService()
# Build context and pipeline as in endpoint, assert final_total and applied order.
"""
# -----------------------------
# Strategy extension template
# -----------------------------
class NewPriceStrategyTemplate(PriceStrategy):
"""
Template for introducing a new strategy.
1) Define key, description, order.
2) Implement is_applicable(ctx) with channel/level/tags/geo rules if needed.
3) Implement apply(ctx) returning a NEW context via ctx.with_updates(...)
4) Register builder in StrategyRegistry.
"""
def __init__(self, key: str, params: Dict[str, Any]):
self.key = key
self.description = "New Strategy"
self.order = params.get("order", 25)
self.params = params
def is_applicable(self, ctx: PricingContext) -> bool:
# Example: only on partner channel and silver+
allowed_channels = set([Channel.partner])
return ctx.channel in allowed_channels and ctx.member_level in {MemberLevel.silver, MemberLevel.gold}
def apply(self, ctx: PricingContext) -> PricingContext:
# Example: fixed discount of 15
discount = money(Decimal("15.00"))
return ctx.with_updates(
discount_delta=discount,
application=StrategyApplication(key=self.key, description=self.description, delta=-discount, meta=self.params),
)
# To enable the template:
# reg.register("new_strategy_type", lambda spec: NewPriceStrategyTemplate(key=spec.key, params=spec.params))
设计要点与落地建议:
下面从作用、结构、常见场景、优缺点四个方面讲解观察者模式,并给出一个原生 JavaScript 的前端组件示例,展示轻量状态中心的 subscribe/notify 机制,含取消订阅、防抖更新、一次性订阅与内存泄漏防护,以及简单的对接和测试用例。
一、作用
二、结构(关键组件与角色)
三、常见使用场景
四、优缺点
五、代码示例(原生 JavaScript,Web Components,含取消订阅、防抖、一次性订阅、内存泄漏防护) 功能目标:在前端仪表盘中,多个小部件订阅主题(theme)、汇率(fxRate)、告警等级(alertLevel)、语言(locale)变化,通过观察者模式的轻量状态中心实现解耦更新。
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>Observer Pattern Demo</title>
<style>
body { font-family: system-ui, Arial, sans-serif; }
.panel { border: 1px solid #ccc; padding: 8px; margin: 8px; border-radius: 6px; }
.dark { background: #222; color: #eee; }
.light { background: #fff; color: #222; }
</style>
</head>
<body>
<theme-widget class="panel"></theme-widget>
<fx-widget class="panel"></fx-widget>
<alert-widget class="panel"></alert-widget>
<script>
// ---------- 工具:防抖 ----------
function debounce(fn, wait) {
let t = null, lastArgs = null;
const wrapped = function(...args) {
lastArgs = args;
if (t) clearTimeout(t);
t = setTimeout(() => {
t = null;
try { fn(...lastArgs); } catch (e) { console.error('[debounced handler error]', e); }
}, wait);
};
wrapped.cancel = () => { if (t) { clearTimeout(t); t = null; } };
return wrapped;
}
// ---------- 观察者核心:发布/订阅中心 ----------
class ObserverCenter {
constructor() {
this._subs = new Map(); // topic => Set<entry>
}
subscribe(topic, handler, options = {}) {
const { once = false, debounceMs = 0, signal } = options;
let fn = handler;
if (debounceMs > 0) fn = debounce(handler, debounceMs);
const entry = { handler, fn, once, signal };
let set = this._subs.get(topic);
if (!set) { set = new Set(); this._subs.set(topic, set); }
set.add(entry);
const unsubscribe = () => {
const s = this._subs.get(topic);
if (!s || !s.has(entry)) return;
s.delete(entry);
if (entry.fn && entry.fn.cancel) entry.fn.cancel(); // 清理定时器,避免内存泄漏
};
// 生命周期绑定:AbortSignal 触发自动取消订阅(内存泄漏防护)
if (signal) {
if (signal.aborted) unsubscribe();
else signal.addEventListener('abort', unsubscribe, { once: true });
}
return unsubscribe;
}
notify(topic, payload) {
const set = this._subs.get(topic);
if (!set || set.size === 0) return;
// 复制集合避免回调内修改集合带来的遍历问题
for (const entry of Array.from(set)) {
try { entry.fn(payload); } catch (e) { console.error('[notify handler error]', e); }
if (entry.once) {
set.delete(entry);
if (entry.fn && entry.fn.cancel) entry.fn.cancel();
}
}
}
subscriberCount(topic) {
const set = this._subs.get(topic);
return set ? set.size : 0;
}
}
// ---------- 业务状态中心 ----------
const Topics = {
theme: 'theme',
fxRate: 'fxRate',
alertLevel: 'alertLevel',
locale: 'locale',
};
class DashboardStore extends ObserverCenter {
constructor() {
super();
this.state = {
theme: 'light',
fxRate: 7.20,
alertLevel: 'info',
locale: 'en-US',
};
}
setTheme(theme) { this.state.theme = theme; this.notify(Topics.theme, theme); }
pushFxRate(rate) { this.state.fxRate = rate; this.notify(Topics.fxRate, rate); }
setAlertLevel(level) { this.state.alertLevel = level; this.notify(Topics.alertLevel, level); }
setLocale(locale) { this.state.locale = locale; this.notify(Topics.locale, locale); }
getState() { return { ...this.state }; }
}
const store = new DashboardStore();
// ---------- Web Components ----------
class ThemeWidget extends HTMLElement {
constructor() {
super();
this.attachShadow({ mode: 'open' });
this._abort = new AbortController(); // 生命周期绑定
}
connectedCallback() {
const s = store.getState();
this.render(s.theme, s.locale);
// 订阅主题变化
store.subscribe(Topics.theme, (theme) => {
this.applyTheme(theme);
this.shadowRoot.getElementById('theme-text').textContent = `Theme: ${theme}`;
}, { signal: this._abort.signal });
// 一次性订阅:仅首次语言变化时,更新欢迎文案
store.subscribe(Topics.locale, (locale) => {
this.shadowRoot.getElementById('welcome').textContent = this.getWelcome(locale);
}, { once: true, signal: this._abort.signal });
// 手动取消订阅示例(非必须):在2秒后不再关注主题(演示取消机制)
this._manualUnsub = store.subscribe(Topics.theme, (theme) => {
console.log('[ThemeWidget second handler]', theme);
}, { signal: this._abort.signal });
setTimeout(() => {
this._manualUnsub(); // 取消订阅
console.log('[ThemeWidget] manually unsubscribed second handler');
}, 2000);
}
disconnectedCallback() {
// 自动清理所有绑定了 this._abort.signal 的订阅
this._abort.abort();
}
applyTheme(theme) {
const host = this.shadowRoot.host;
host.classList.remove('light', 'dark');
host.classList.add(theme);
}
getWelcome(locale) {
return locale === 'zh-CN' ? '欢迎使用仪表盘' : 'Welcome to the dashboard';
}
render(theme = 'light', locale = 'en-US') {
this.shadowRoot.innerHTML = `
<div>
<div id="welcome">${this.getWelcome(locale)}</div>
<div id="theme-text">Theme: ${theme}</div>
<button id="toggle">Toggle Theme</button>
</div>
`;
this.applyTheme(theme);
this.shadowRoot.getElementById('toggle').onclick = () => {
const next = store.getState().theme === 'light' ? 'dark' : 'light';
store.setTheme(next);
};
}
}
customElements.define('theme-widget', ThemeWidget);
class FxWidget extends HTMLElement {
constructor() {
super();
this.attachShadow({ mode: 'open' });
this._abort = new AbortController();
}
connectedCallback() {
const s = store.getState();
this.shadowRoot.innerHTML = `
<div>
<div>FX Rate: <span id="rate">${s.fxRate.toFixed(2)}</span></div>
<button id="burst">Burst Updates</button>
</div>
`;
// 高频汇率更新:使用防抖减少 DOM 更新频率
store.subscribe(Topics.fxRate, (rate) => {
this.shadowRoot.getElementById('rate').textContent = rate.toFixed(2);
}, { debounceMs: 150, signal: this._abort.signal });
// 一次性订阅:仅首次汇率推送时校准一次
store.subscribe(Topics.fxRate, (rate) => {
console.log('[FxWidget] first tick calibrated:', rate);
}, { once: true, signal: this._abort.signal });
this.shadowRoot.getElementById('burst').onclick = () => {
// 模拟快速推送:仅触发少量防抖后的 DOM 更新
const base = store.getState().fxRate;
for (let i = 1; i <= 10; i++) {
setTimeout(() => store.pushFxRate(base + i * 0.01), i * 20);
}
};
}
disconnectedCallback() { this._abort.abort(); }
}
customElements.define('fx-widget', FxWidget);
class AlertWidget extends HTMLElement {
constructor() {
super();
this.attachShadow({ mode: 'open' });
this._abort = new AbortController();
}
connectedCallback() {
const s = store.getState();
this.shadowRoot.innerHTML = `
<div>
<div>Alert: <span id="level">${s.alertLevel}</span></div>
<button id="warn">Warn</button>
<button id="crit">Critical</button>
</div>
`;
store.subscribe(Topics.alertLevel, (level) => {
const el = this.shadowRoot.getElementById('level');
el.textContent = level;
el.style.color = level === 'critical' ? 'red' : (level === 'warning' ? 'orange' : '');
}, { signal: this._abort.signal });
// 一次性订阅:首次出现 critical 级别时提示一次
store.subscribe(Topics.alertLevel, (level) => {
console.log('[AlertWidget] first alert event:', level);
}, { once: true, signal: this._abort.signal });
this.shadowRoot.getElementById('warn').onclick = () => store.setAlertLevel('warning');
this.shadowRoot.getElementById('crit').onclick = () => store.setAlertLevel('critical');
}
disconnectedCallback() { this._abort.abort(); }
}
customElements.define('alert-widget', AlertWidget);
// ---------- 演示:主题/语言初始推送 ----------
setTimeout(() => store.setLocale('zh-CN'), 500); // 仅 ThemeWidget 第一次语言订阅会生效
setTimeout(() => store.setTheme('dark'), 800);
setTimeout(() => store.setAlertLevel('info'), 1000);
// ---------- 简单测试用例(控制台断言/检查) ----------
(function tests() {
console.log('--- Tests start ---');
// 1) 订阅数量检查
console.log('theme subscribers:', store.subscriberCount(Topics.theme)); // >= 1
// 2) 一次性订阅在首次触发后应被移除
const onceCheck = [];
const unsub = store.subscribe(Topics.theme, (t) => onceCheck.push(t), { once: true });
store.setTheme('light'); // 触发一次
store.setTheme('dark'); // 不应再记录
console.log('onceCheck:', onceCheck); // 期望 ['light']
// 3) 防抖:多次快速推送仅触发少量更新
let count = 0;
const unsubFx = store.subscribe(Topics.fxRate, () => count++, { debounceMs: 100 });
for (let i = 0; i < 10; i++) store.pushFxRate(7.5 + i * 0.01);
setTimeout(() => {
unsubFx();
console.log('debounced updates count ~', count, '(should be << 10)');
}, 500);
// 4) 内存泄漏防护:绑定 AbortSignal 的订阅在组件卸载后自动清理
const temp = new AbortController();
store.subscribe(Topics.theme, () => {}, { signal: temp.signal });
console.log('temp theme subscribers before abort:', store.subscriberCount(Topics.theme));
temp.abort();
console.log('temp theme subscribers after abort:', store.subscriberCount(Topics.theme));
console.log('--- Tests end ---');
})();
</script>
</body>
</html>
说明与要点
扩展建议
下面以资深架构师视角系统讲解 C# 中的装饰器模式,并给出一个面向服务端的完整示例(订单查询微服务),演示如何用装饰器为仓储接口分层增加 Logging、Caching、Retry 三个能力,避免污染核心查询逻辑,同时展示 IoC 注册、装饰器链顺序控制、缓存键规范与滑动过期策略,以及在 ASP.NET Minimal API 中的接入与单元测试验证。
一、概念与作用
二、结构(关键组件与角色)
三、常见使用场景
四、优缺点
五、示例:订单查询微服务(ASP.NET Minimal API) 目标:
NuGet 依赖(关键包)
Program.cs(一个文件即可运行;为简洁演示,核心仓储用内存数据并模拟瞬时故障与延迟)
using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Scrutor;
// ---------- Domain ----------
public record Order(string Id, string CustomerId, decimal Amount);
// ---------- Abstraction ----------
public interface IOrderRepository
{
Task<Order?> GetByIdAsync(string id, CancellationToken ct = default);
}
// ---------- Core Implementation (ConcreteComponent) ----------
public class OrderRepository : IOrderRepository
{
private readonly ConcurrentDictionary<string, Order> _store = new();
private readonly ConcurrentDictionary<string, int> _attemptsPerId = new();
private readonly ILogger<OrderRepository> _logger;
public int InvocationCount { get; private set; }
public OrderRepository(ILogger<OrderRepository> logger)
{
_logger = logger;
// seed data
_store.TryAdd("o-001", new Order("o-001", "c-100", 199.99m));
_store.TryAdd("o-002", new Order("o-002", "c-101", 99.5m));
}
public async Task<Order?> GetByIdAsync(string id, CancellationToken ct = default)
{
InvocationCount++;
// 模拟下游延迟
await Task.Delay(TimeSpan.FromMilliseconds(80), ct);
// 为演示 Retry,一次性制造“首次查询该 id 必失败”的瞬时错误
int attempt = _attemptsPerId.AddOrUpdate(id, 1, (_, current) => current + 1);
if (attempt == 1)
{
_logger.LogWarning("Simulated transient failure for id={Id} on first attempt.", id);
throw new TimeoutException("Transient timeout");
}
_store.TryGetValue(id, out var order);
return order;
}
}
// ---------- Options ----------
public class OrderCacheOptions
{
public TimeSpan SlidingExpiration { get; set; } = TimeSpan.FromMinutes(5);
// 可扩展:Size、NegativeCaching 等
}
public class RetryOptions
{
public int MaxAttempts { get; set; } = 3;
public TimeSpan BaseDelay { get; set; } = TimeSpan.FromMilliseconds(100);
public double BackoffMultiplier { get; set; } = 2.0; // 指数退避系数
public int JitterMs { get; set; } = 50; // 抖动范围(毫秒)
}
// ---------- Cache Key Spec ----------
public static class CacheKeys
{
// 接口名:方法:参数 进行规范化,避免冲突
public static string OrderById(string id) => $"IOrderRepository:GetById:{id}";
}
// ---------- Decorators ----------
// 1) Retry Decorator(最内层,确保只对真正下游调用进行重试)
public class RetryOrderRepository : IOrderRepository
{
private readonly IOrderRepository _inner;
private readonly ILogger<RetryOrderRepository> _logger;
private readonly RetryOptions _options;
public RetryOrderRepository(IOrderRepository inner,
ILogger<RetryOrderRepository> logger,
IOptions<RetryOptions> options)
{
_inner = inner;
_logger = logger;
_options = options.Value;
}
public async Task<Order?> GetByIdAsync(string id, CancellationToken ct = default)
{
int attempt = 0;
Exception? last = null;
TimeSpan delay = _options.BaseDelay;
while (attempt < _options.MaxAttempts)
{
attempt++;
try
{
_logger.LogDebug("Retry decorator attempt {Attempt} for id={Id}", attempt, id);
return await _inner.GetByIdAsync(id, ct);
}
catch (Exception ex) when (IsTransient(ex))
{
last = ex;
if (attempt >= _options.MaxAttempts) break;
var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(0, _options.JitterMs));
var wait = delay + jitter;
_logger.LogWarning(ex, "Transient error on attempt {Attempt} for id={Id}. Backing off {Delay} ms...", attempt, id, wait.TotalMilliseconds);
await Task.Delay(wait, ct);
delay = TimeSpan.FromMilliseconds(delay.TotalMilliseconds * _options.BackoffMultiplier);
}
}
_logger.LogError(last, "GetById({Id}) failed after {MaxAttempts} attempts.", id, _options.MaxAttempts);
throw new Exception($"GetById({id}) failed after {_options.MaxAttempts} attempts.", last);
}
private static bool IsTransient(Exception ex) =>
ex is TimeoutException || ex is HttpRequestException;
}
// 2) Caching Decorator(中层,命中缓存则不触发重试与内层调用)
public class CachingOrderRepository : IOrderRepository
{
private readonly IOrderRepository _inner;
private readonly IMemoryCache _cache;
private readonly OrderCacheOptions _options;
private readonly ILogger<CachingOrderRepository> _logger;
public CachingOrderRepository(IOrderRepository inner,
IMemoryCache cache,
IOptions<OrderCacheOptions> options,
ILogger<CachingOrderRepository> logger)
{
_inner = inner;
_cache = cache;
_options = options.Value;
_logger = logger;
}
public async Task<Order?> GetByIdAsync(string id, CancellationToken ct = default)
{
var key = CacheKeys.OrderById(id);
if (_cache.TryGetValue<Order>(key, out var cached))
{
_logger.LogInformation("Cache hit: {Key}", key);
return cached;
}
var result = await _inner.GetByIdAsync(id, ct);
if (result != null)
{
// 滑动过期:每次访问会刷新过期时间
_cache.Set(key, result, new MemoryCacheEntryOptions
{
SlidingExpiration = _options.SlidingExpiration
});
_logger.LogInformation("Cache miss: {Key}. Added with sliding expiration={ExpirationSeconds}s",
key, _options.SlidingExpiration.TotalSeconds);
}
return result;
}
}
// 3) Logging Decorator(最外层,统一记录调用前后与耗时)
public class LoggingOrderRepository : IOrderRepository
{
private readonly IOrderRepository _inner;
private readonly ILogger<LoggingOrderRepository> _logger;
public LoggingOrderRepository(IOrderRepository inner, ILogger<LoggingOrderRepository> logger)
{
_inner = inner;
_logger = logger;
}
public async Task<Order?> GetByIdAsync(string id, CancellationToken ct = default)
{
var sw = Stopwatch.StartNew();
_logger.LogInformation("-> GetById({Id})", id);
try
{
var result = await _inner.GetByIdAsync(id, ct);
_logger.LogInformation("<- GetById({Id}) took {Elapsed} ms, found={Found}",
id, sw.ElapsedMilliseconds, result != null);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "<x GetById({Id}) failed after {Elapsed} ms", id, sw.ElapsedMilliseconds);
throw;
}
}
}
// ---------- ASP.NET Minimal API ----------
var builder = WebApplication.CreateBuilder(args);
// Logging
builder.Logging.ClearProviders();
builder.Logging.AddConsole();
// Memory cache
builder.Services.AddMemoryCache();
// Options
builder.Services.Configure<OrderCacheOptions>(o => o.SlidingExpiration = TimeSpan.FromSeconds(30)); // demo用短些
builder.Services.Configure<RetryOptions>(o =>
{
o.MaxAttempts = 3;
o.BaseDelay = TimeSpan.FromMilliseconds(100);
o.BackoffMultiplier = 2.0;
o.JitterMs = 50;
});
// 注册核心仓储为可供测试访问的单例,同时将 IOrderRepository 初始绑定到它
builder.Services.AddSingleton<OrderRepository>();
builder.Services.AddTransient<IOrderRepository>(sp => sp.GetRequiredService<OrderRepository>());
// 使用 Scrutor 控制装饰器链顺序:Core -> Retry -> Caching -> Logging(outermost)
builder.Services.Decorate<IOrderRepository, RetryOrderRepository>();
builder.Services.Decorate<IOrderRepository, CachingOrderRepository>();
builder.Services.Decorate<IOrderRepository, LoggingOrderRepository>();
var app = builder.Build();
// Minimal API
app.MapGet("/orders/{id}", async (string id, IOrderRepository repo, CancellationToken ct) =>
{
var order = await repo.GetByIdAsync(id, ct);
return order is null ? Results.NotFound() : Results.Ok(order);
});
app.MapGet("/debug/invocations", (OrderRepository core) => new { core.InvocationCount });
app.Run();
装饰器链顺序说明
六、调用与日志(示例)
七、单元测试(xUnit 示例) 下面的测试构建同样的 DI 容器,验证调用次序(通过核心仓储的 InvocationCount)与性能收益(第二次调用更快)。实际项目中也可以用 InMemoryLoggerProvider 捕获日志顺序关键字进行断言。
using System.Diagnostics;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Scrutor;
using Xunit;
public class OrderRepositoryDecoratorTests
{
private ServiceProvider BuildProvider()
{
var services = new ServiceCollection();
services.AddLogging(b => b.AddDebug().AddConsole());
services.AddMemoryCache();
services.Configure<OrderCacheOptions>(o => o.SlidingExpiration = TimeSpan.FromSeconds(30));
services.Configure<RetryOptions>(o =>
{
o.MaxAttempts = 3;
o.BaseDelay = TimeSpan.FromMilliseconds(20); // 测试更快
o.BackoffMultiplier = 2.0;
o.JitterMs = 10;
});
services.AddSingleton<OrderRepository>();
services.AddTransient<IOrderRepository>(sp => sp.GetRequiredService<OrderRepository>());
services.Decorate<IOrderRepository, RetryOrderRepository>();
services.Decorate<IOrderRepository, CachingOrderRepository>();
services.Decorate<IOrderRepository, LoggingOrderRepository>();
return services.BuildServiceProvider();
}
[Fact]
public async Task Second_call_should_hit_cache_and_reduce_latency()
{
var sp = BuildProvider();
var repo = sp.GetRequiredService<IOrderRepository>();
var core = sp.GetRequiredService<OrderRepository>();
var sw1 = Stopwatch.StartNew();
var o1 = await repo.GetByIdAsync("o-001");
sw1.Stop();
var sw2 = Stopwatch.StartNew();
var o2 = await repo.GetByIdAsync("o-001");
sw2.Stop();
Assert.NotNull(o1);
Assert.NotNull(o2);
Assert.Equal(o1, o2);
// 首次会重试(Core 至少调用一次),第二次命中缓存不再调用 Core
Assert.Equal(1, core.InvocationCount);
// 第二次调用应显著更快(缓存返回)
Assert.True(sw2.ElapsedMilliseconds < sw1.ElapsedMilliseconds);
}
}
八、设计要点与实践建议
通过以上装饰器架构,你可以在不污染核心仓储代码的前提下,灵活地为查询能力叠加日志、缓存与重试,并在生产系统中以配置驱动各策略参数,保证可维护性与可观测性。
帮助用户深入理解并掌握指定设计模式的用途、结构及实现方法,通过清晰的讲解和实例代码提升技术技能,从而解决实际开发中的架构问题,并在项目实施中充分运用到设计模式的核心思想。
从事系统开发的程序员,使用提示词快速掌握设计模式理论,学习代码实现,并将其用于项目开发。
想提升团队架构能力的技术领袖,利用本提示词快速生成教学材料并推广设计模式知识,使团队开发更高效。
从事技术教程、博客、书籍撰写的内容创作者,通过提示词快速建立高质量架构文章模板,提高创作效率。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期