热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为Python开发者设计,提供专业的装饰器开发指导。通过明确的功能需求和应用场景分析,生成结构完整、注释清晰的装饰器代码,并附带详细的使用示例和注意事项。能够有效解决函数功能扩展、性能监控、权限验证等常见开发需求,提升代码的可维护性和复用性。适用于Web开发、数据处理、系统工具等多种Python应用场景。
一个可复用的性能监控装饰器,支持同步与异步函数,提供如下能力:
典型场景:
参数说明(装饰器参数):
附加能力:
from __future__ import annotations
import asyncio
import functools
import inspect
import json
import logging
import os
import random
import threading
import time
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, asdict
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Tuple, Union
# -------------------------
# 上下文标签(请求级/任务级)
# -------------------------
_CTX_TAGS: ContextVar[Dict[str, Any]] = ContextVar("_CTX_TAGS", default={})
@contextmanager
def monitor_bind(tags: Mapping[str, Any]):
"""
绑定临时上下文标签,适合在 Web 中间件或任务入口使用。
示例:
with monitor_bind({"trace_id": "abc", "user_id": 42}):
handler()
"""
if not isinstance(tags, Mapping):
raise TypeError("monitor_bind(tags): tags must be a mapping")
current = _CTX_TAGS.get()
merged = {**current, **dict(tags)}
token = _CTX_TAGS.set(merged)
try:
yield
finally:
_CTX_TAGS.reset(token)
def _current_ctx_tags() -> Dict[str, Any]:
# 返回当前上下文标签的浅拷贝,避免外部修改
return dict(_CTX_TAGS.get() or {})
# -------------------------
# 统计器
# -------------------------
@dataclass
class _Stats:
total_calls: int = 0
total_exceptions: int = 0
total_time_ns: int = 0
last_duration_ns: int = 0
_lock: threading.Lock = threading.Lock()
def update(self, duration_ns: int, errored: bool) -> None:
with self._lock:
self.total_calls += 1
if errored:
self.total_exceptions += 1
self.total_time_ns += duration_ns
self.last_duration_ns = duration_ns
def snapshot(self) -> Dict[str, Any]:
with self._lock:
avg_ns = (self.total_time_ns / self.total_calls) if self.total_calls else 0.0
return {
"total_calls": self.total_calls,
"total_exceptions": self.total_exceptions,
"total_time_ns": self.total_time_ns,
"last_duration_ns": self.last_duration_ns,
"avg_ms": avg_ns / 1_000_000.0,
}
def reset(self) -> None:
with self._lock:
self.total_calls = 0
self.total_exceptions = 0
self.total_time_ns = 0
self.last_duration_ns = 0
# -------------------------
# 工具函数
# -------------------------
def _now_iso() -> str:
# 使用 time.time() + UTC ISO 格式,避免引入 datetime 依赖成本
ts = time.time()
# 保留毫秒,统一 ISO-like 字符串
return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(ts)) + f".{int((ts % 1) * 1000):03d}Z"
def _parse_log_level(level_str: str) -> int:
level_str = (level_str or "INFO").upper()
if level_str not in {"INFO", "DEBUG"}:
# 限制在 INFO/DEBUG,其他值回退 INFO
level_str = "INFO"
return logging.DEBUG if level_str == "DEBUG" else logging.INFO
def _default_emit(record: Dict[str, Any], level: int) -> None:
logger = logging.getLogger("perf.monitor")
logger.log(level, json.dumps(record, ensure_ascii=False, separators=(",", ":")))
def _coerce_tags(tags: Optional[Mapping[str, Any]]) -> Dict[str, Any]:
if tags is None:
return {}
if not isinstance(tags, Mapping):
raise TypeError("tags must be a mapping")
# 规范化 key 为 str,避免不可序列化/冲突
return {str(k): v for k, v in dict(tags).items()}
def _should_sample(rate: float) -> bool:
if rate <= 0:
return False
if rate >= 1:
return True
return random.random() < rate
def _get_func_location(fn: Callable[..., Any]) -> Tuple[str, Optional[str], Optional[int]]:
module = getattr(fn, "__module__", "<unknown>")
file = None
line = None
try:
src_file = inspect.getsourcefile(fn) or inspect.getfile(fn)
file = src_file
try:
_, line = inspect.getsourcelines(fn)
except OSError:
line = None
except OSError:
pass
return module, file, line
async def _maybe_call_async(fn: Callable[[Dict[str, Any]], Any], record: Dict[str, Any]) -> None:
"""
在异步上下文中调用 emit/alert:
- 如果是 async 函数,await 它
- 如果是普通函数,直接调用
"""
if fn is None:
return
try:
result = fn(record)
if inspect.isawaitable(result):
await result # type: ignore[func-returns-value]
except Exception: # 不让外部回调影响主逻辑
logging.getLogger("perf.monitor").exception("monitor callback failed")
def _maybe_call_sync(fn: Callable[[Dict[str, Any]], Any], record: Dict[str, Any]) -> None:
"""
在同步上下文中调用 emit/alert:
- 如果是 async 函数,尝试获取运行中的事件循环创建任务,否则使用 asyncio.run
- 如果是普通函数,直接调用
"""
if fn is None:
return
try:
result = fn(record)
if inspect.isawaitable(result):
try:
loop = asyncio.get_running_loop()
# 在已有事件循环中调度(例如在某些框架里)
loop.create_task(result) # fire-and-forget
except RuntimeError:
# 无事件循环,直接运行
asyncio.run(result)
except Exception:
logging.getLogger("perf.monitor").exception("monitor callback failed")
# -------------------------
# 装饰器主体
# -------------------------
def monitor(
*,
threshold_ms: int = 200,
log_level: str = "INFO",
emit: Optional[Callable[[Dict[str, Any]], Union[None, Awaitable[Any]]]] = None,
alert: Optional[Callable[[Dict[str, Any]], Union[None, Awaitable[Any]]]] = None,
sample_rate: float = 0.3,
tags: Optional[Mapping[str, Any]] = None,
env: Optional[str] = None,
):
"""
性能监控装饰器(支持同步/异步函数)。
统计调用次数与异常、记录结构化日志、慢调用触发告警。
注意:
- 日志输出遵循采样率(sample_rate)
- 告警在超阈值时总是触发(不受采样影响)
- 异常会记录为 ERROR 级别日志(不受采样影响)
返回:包装后的函数,其上挂载:
.monitor_stats() -> Dict[str, Any]
.reset_monitor_stats() -> None
"""
# 参数校验
if not isinstance(threshold_ms, int) or threshold_ms < 0:
raise ValueError("threshold_ms must be a non-negative integer")
if not isinstance(sample_rate, (float, int)) or not (0.0 <= float(sample_rate) <= 1.0):
raise ValueError("sample_rate must be a float in [0, 1]")
base_level = _parse_log_level(log_level)
emit = emit or _default_emit
dec_tags = _coerce_tags(tags)
env = env or os.getenv("APP_ENV", "dev")
def _decorate(func: Callable[..., Any]):
is_coro = inspect.iscoroutinefunction(func)
module, file, line = _get_func_location(func)
qualname = getattr(func, "__qualname__", getattr(func, "__name__", "<anonymous>"))
stats = _Stats()
def monitor_stats() -> Dict[str, Any]:
d = stats.snapshot()
d.update(
{
"function": qualname,
"module": module,
"file": file,
"line": line,
}
)
return d
def reset_monitor_stats() -> None:
stats.reset()
def build_record(
duration_ns: int,
status: str,
sampled: bool,
extra_tags: Optional[Mapping[str, Any]],
exc_repr: Optional[str],
) -> Dict[str, Any]:
duration_ms = duration_ns / 1_000_000.0
s = stats.snapshot()
ctx = _current_ctx_tags()
merged_tags = {**dec_tags, **ctx}
if extra_tags:
merged_tags.update(_coerce_tags(extra_tags))
slow = duration_ms >= float(threshold_ms)
# 动态级别提升
level = base_level
if status == "error":
level = logging.ERROR
elif slow:
level = max(base_level, logging.WARNING)
record = {
"ts": _now_iso(),
"env": env,
"function": qualname,
"module": module,
"file": file,
"line": line,
"duration_ms": round(duration_ms, 3),
"threshold_ms": threshold_ms,
"slow": slow,
"status": status, # ok | error
"calls": s["total_calls"],
"exceptions": s["total_exceptions"],
"avg_ms": round(s["avg_ms"], 3),
"sampled": sampled,
"tags": merged_tags or {},
}
if exc_repr:
record["error"] = exc_repr
return record, level
if is_coro:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
# 允许通过特殊关键字 _monitor_tags 传入临时标签(不会透传给原函数)
extra_tags = kwargs.pop("_monitor_tags", None)
start = time.perf_counter_ns()
status = "ok"
exc_repr = None
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "error"
exc_repr = f"{type(e).__name__}: {e}"
raise
finally:
end = time.perf_counter_ns()
duration_ns = end - start
stats.update(duration_ns, errored=(status == "error"))
# 构建记录与输出策略
sampled = _should_sample(float(sample_rate))
record, level = build_record(duration_ns, status, sampled, extra_tags, exc_repr)
# 日志输出:异常或采样命中则输出
if status == "error" or record["sampled"]:
await _maybe_call_async(emit, record)
# 告警:慢调用时总是触发(不受采样影响)
if record["slow"] and alert is not None:
await _maybe_call_async(alert, record)
# 挂载统计接口
async_wrapper.monitor_stats = monitor_stats # type: ignore[attr-defined]
async_wrapper.reset_monitor_stats = reset_monitor_stats # type: ignore[attr-defined]
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
extra_tags = kwargs.pop("_monitor_tags", None)
start = time.perf_counter_ns()
status = "ok"
exc_repr = None
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = "error"
exc_repr = f"{type(e).__name__}: {e}"
raise
finally:
end = time.perf_counter_ns()
duration_ns = end - start
stats.update(duration_ns, errored=(status == "error"))
sampled = _should_sample(float(sample_rate))
record, level = build_record(duration_ns, status, sampled, extra_tags, exc_repr)
if status == "error" or record["sampled"]:
# 默认 emit 会使用 level,但我们已在 record 中不携带 level,仅在内部决定
# 因为 emit 接口仅接受 record,这里通过 logger.level 约定避免泄露实现细节
# 所以传级别在默认 emit 内部处理,这里只需要调用 emit
# 为支持自定义 emit(可能忽略动态级别),我们在 record 中不强制写 level
# 如果有需要,可在自定义 emit 中根据 slow/status 决定输出级别
# 但为兼容默认 emit,我们仍统一调用一次
# Note: 为保证自定义 emit 能获知 level,允许它从记录字段推断
# 我们不额外传 level 参数,保持 emit(record) 签名简单。
# 若需要强传,可在 emit 环境闭包中自取逻辑。
pass
# 为了默认 emit 使用正确级别,这里单独调用默认 emit 时带级别;
# 对于用户自定义 emit(只接 record),我们仍会调用一次,兼容两者。
# 因此这里统一处理两路:如果 emit 是默认 emit,使用带级别的版本;
# 否则,直接调用用户 emit(record)。
if status == "error" or record["sampled"]:
if emit is _default_emit:
_default_emit(record, level)
else:
_maybe_call_sync(emit, record)
if record["slow"] and alert is not None:
_maybe_call_sync(alert, record)
sync_wrapper.monitor_stats = monitor_stats # type: ignore[attr-defined]
sync_wrapper.reset_monitor_stats = reset_monitor_stats # type: ignore[attr-defined]
return sync_wrapper
return _decorate
import logging
logging.basicConfig(level=logging.DEBUG)
@monitor(threshold_ms=200, sample_rate=0.5, tags={"service": "billing", "component": "settlement"})
def calc_heavy(a: int, b: int) -> int:
# 模拟耗时
time.sleep(0.25)
return a + b
with monitor_bind({"request_id": "req-123", "user_id": 42}):
for _ in range(3):
try:
calc_heavy(1, 2)
except Exception:
pass
print("stats:", calc_heavy.monitor_stats())
# 重置统计
calc_heavy.reset_monitor_stats()
import asyncio
async def my_alert(record: dict):
# 慢调用告警(仅示例)
print("[ALERT]", record["function"], "duration:", record["duration_ms"], "ms")
@monitor(threshold_ms=50, log_level="DEBUG", sample_rate=1.0, alert=my_alert, tags={"service": "api"})
async def fetch_user(uid: int):
await asyncio.sleep(0.08) # 80ms,超过阈值 50ms,会触发 alert
return {"id": uid}
async def main():
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(fetch_user(i))
print("async stats:", fetch_user.monitor_stats())
asyncio.run(main())
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def bind_tags(request: Request, call_next):
# 每个请求绑定 trace 信息
with monitor_bind({"trace_id": request.headers.get("x-trace-id", "-"),
"path": request.url.path,
"method": request.method}):
response = await call_next(request)
return response
@monitor(threshold_ms=150, tags={"service": "user-api", "endpoint": "get_user"})
async def get_user_logic(user_id: int):
await asyncio.sleep(0.2) # 模拟慢调用
return {"id": user_id}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return await get_user_logic(user_id)
def my_emit(record: dict):
# 例如写入 stdout 或消息队列(仅示意)
print("[LOG]", json.dumps(record, ensure_ascii=False))
@monitor(emit=my_emit, sample_rate=1.0, tags={"team": "core"})
def work(x):
time.sleep(0.01)
return x * 2
work(21)
@monitor(sample_rate=1.0)
def compute(x):
time.sleep(0.02)
compute(1, _monitor_tags={"job_id": "j-7788", "step": "map"})
如需扩展(例如 Prometheus 指标导出、异常单独告警、参数/返回值采样记录等),可以在现有结构化日志基础上对 emit/alert 进行替换或在装饰器中增加可选参数。
from __future__ import annotations
from contextvars import ContextVar
from functools import wraps
from typing import Any, Callable, Iterable, Mapping, Optional, Set, Union, Dict, Tuple
import inspect
# ========== 上下文与类型 ==========
# 全局上下文变量:框架中可由中间件/依赖注入设置当前用户
CURRENT_USER: ContextVar[Optional[Any]] = ContextVar("CURRENT_USER", default=None)
def set_current_user(user: Any):
"""
将用户写入上下文,返回 token 以便重置。
用法:
token = set_current_user(user)
try:
...
finally:
CURRENT_USER.reset(token)
"""
return CURRENT_USER.set(user)
# ========== 工具函数 ==========
def _to_str_set(values: Optional[Iterable[Union[str, Any]]]) -> Set[str]:
"""将可迭代对象转换为去重的字符串集合;None -> 空集合。"""
if not values:
return set()
result: Set[str] = set()
for v in values:
if v is None:
continue
result.add(str(v))
return result
def _get_from_attr_or_mapping(obj: Any, name: str, default: Any = None) -> Any:
"""
兼容对象属性与字典取值。
- 优先属性;其次 Mapping;否则 default。
"""
if obj is None:
return default
if hasattr(obj, name):
return getattr(obj, name)
if isinstance(obj, Mapping):
return obj.get(name, default)
return default
def _extract_user_id(user: Any) -> Optional[str]:
"""
从用户对象提取 ID。支持常见字段:id / user_id / uid。
"""
for key in ("id", "user_id", "uid"):
val = _get_from_attr_or_mapping(user, key)
if val is not None:
return str(val)
return None
def _extract_roles(user: Any) -> Set[str]:
"""
从用户对象提取角色集合。支持字段:
- roles: Iterable[str]
- role: str 或 Iterable[str]
"""
roles = _get_from_attr_or_mapping(user, "roles")
if roles is None:
roles = _get_from_attr_or_mapping(user, "role")
if isinstance(roles, str):
return {roles}
return _to_str_set(roles)
def _extract_perms(user: Any) -> Set[str]:
"""
从用户对象提取权限点集合。支持字段:
- perms / permissions / scopes: Iterable[str] 或空
"""
for key in ("perms", "permissions", "scopes"):
perms = _get_from_attr_or_mapping(user, key)
if perms is not None:
return _to_str_set(perms)
return set()
def _default_resolver() -> Optional[Any]:
"""默认从 ContextVar 读取当前用户。"""
return CURRENT_USER.get()
def _default_on_deny(
*,
func: Callable[..., Any],
user: Optional[Any],
reason: str,
required: Dict[str, Set[str]],
missing: Dict[str, Set[str]],
context: Dict[str, Any],
) -> Dict[str, Any]:
"""
默认拒绝处理器:返回统一结构字典。
可在实际项目中用自定义 on_deny 替换(如抛 HTTP 异常)。
"""
status = 401 if reason in ("NO_CONTEXT", "NO_USER") else 403
messages = {
"NO_CONTEXT": "未发现请求上下文用户,请先登录",
"NO_USER": "未登录或登录已失效",
"ROLE_MISMATCH": "角色不允许访问",
"MISSING_PERMS": "缺少必要权限点",
}
return {
"ok": False,
"status": status,
"error": {
"code": "UNAUTHORIZED" if status == 401 else "FORBIDDEN",
"message": messages.get(reason, "访问被拒绝"),
"reason": reason,
"required": {k: sorted(v) for k, v in required.items() if v},
"missing": {k: sorted(v) for k, v in missing.items() if v},
},
}
# ========== 核心装饰器 ==========
def access_control(
*,
roles: Optional[Iterable[str]] = None,
perms: Optional[Iterable[str]] = None,
whitelist: Optional[Iterable[str]] = None,
resolver: Optional[Callable[[], Any]] = None,
on_deny: Optional[
Callable[
...,
# 形参采用关键字参数约定,便于扩展
]
] = None,
):
"""
访问控制装饰器(支持同步/异步函数)。
参数:
- roles: 允许访问的角色列表(至少满足其一)。为空/None 表示不校验角色。
- perms: 必需的权限点(全部必需)。为空/None 表示不校验权限点。
- whitelist: 用户白名单(user_id 列表),命中白名单直接放行。
- resolver: 上下文取用户的函数;默认从 ContextVar CURRENT_USER 获取。
- on_deny: 自定义拒绝处理器;默认返回统一错误结构,可替换为抛出 HTTP 异常等。
校验顺序:
1) 无用户 -> 拒绝
2) 白名单 -> 放行
3) 角色校验(如配置)-> 不满足 -> 拒绝
4) 权限点校验(如配置)-> 缺少 -> 拒绝
5) 通过 -> 调用原函数
"""
allowed_roles = _to_str_set(roles)
required_perms = _to_str_set(perms)
white_ids = _to_str_set(whitelist)
_resolver = resolver or _default_resolver
_on_deny = on_deny or _default_on_deny
def _check(user: Any) -> Tuple[bool, str, Dict[str, Set[str]]]:
"""
返回 (通过与否, 原因, 缺失明细)。
原因:
- NO_CONTEXT / NO_USER / ROLE_MISMATCH / MISSING_PERMS
"""
if user is None:
return False, "NO_USER", {"roles": set(), "perms": set()}
# 白名单优先
uid = _extract_user_id(user)
if uid and uid in white_ids:
return True, "", {"roles": set(), "perms": set()}
# 角色校验
if allowed_roles:
user_roles = _extract_roles(user)
if allowed_roles.isdisjoint(user_roles):
return False, "ROLE_MISMATCH", {"roles": allowed_roles, "perms": set()}
# 权限点校验(全部必需)
if required_perms:
user_perms = _extract_perms(user)
missing = required_perms - user_perms
if missing:
return False, "MISSING_PERMS", {"roles": set(), "perms": missing}
return True, "", {"roles": set(), "perms": set()}
def decorator(func: Callable[..., Any]):
is_async = inspect.iscoroutinefunction(func)
if is_async:
@wraps(func)
async def async_wrapper(*args, **kwargs):
user = _resolver()
if user is None:
# 有 resolver 但返回 None 时统一视为 NO_USER
return _on_deny(
func=func,
user=None,
reason="NO_USER",
required={"roles": allowed_roles, "perms": required_perms},
missing={"roles": set(), "perms": set()},
context={"args": args, "kwargs": kwargs},
)
ok, reason, missing = _check(user)
if not ok:
return _on_deny(
func=func,
user=user,
reason=reason,
required={"roles": allowed_roles, "perms": required_perms},
missing=missing,
context={"args": args, "kwargs": kwargs},
)
return await func(*args, **kwargs)
return async_wrapper
@wraps(func)
def sync_wrapper(*args, **kwargs):
user = _resolver()
if user is None:
return _on_deny(
func=func,
user=None,
reason="NO_USER",
required={"roles": allowed_roles, "perms": required_perms},
missing={"roles": set(), "perms": set()},
context={"args": args, "kwargs": kwargs},
)
ok, reason, missing = _check(user)
if not ok:
return _on_deny(
func=func,
user=user,
reason=reason,
required={"roles": allowed_roles, "perms": required_perms},
missing=missing,
context={"args": args, "kwargs": kwargs},
)
return func(*args, **kwargs)
return sync_wrapper
return access_control # type: ignore[return-value]
# 假设你的用户结构
class User:
def __init__(self, uid: str, roles, perms):
self.id = uid
self.roles = set(roles)
self.perms = set(perms)
admin = User("1001", roles=["admin"], perms=["article.read", "article.write"])
guest = User("1002", roles=["guest"], perms=["article.read"])
# 设置当前用户(模拟中间件)
token = set_current_user(admin)
# 声明:需要 admin 角色,且需要 article.write 权限点
@access_control(roles=["admin"], perms=["article.write"])
def publish_article(title: str):
return {"ok": True, "title": title}
print(publish_article("Hello")) # -> {"ok": True, "title": "Hello"}
# 切换为 guest
CURRENT_USER.reset(token)
token = set_current_user(guest)
print(publish_article("Hello")) # -> {"ok": False, "status": 403, "error": {...}}
CURRENT_USER.reset(token)
vip = User("root", roles=["guest"], perms=[])
token = set_current_user(vip)
@access_control(whitelist=["root"], perms=["danger.op"])
def dangerous_op():
return "ok"
print(dangerous_op()) # -> "ok"(虽然缺少权限,但在白名单内)
CURRENT_USER.reset(token)
import asyncio
admin = {"user_id": "u1", "roles": ["admin"], "permissions": ["ops.view"]}
@access_control(roles=["admin"], perms=["ops.view"])
async def fetch_ops():
await asyncio.sleep(0.01)
return {"data": "secret"}
token = set_current_user(admin)
print(asyncio.run(fetch_ops())) # -> {"data": "secret"}
CURRENT_USER.reset(token)
# 1) 在中间件/依赖中设置 CURRENT_USER(伪代码)
# from fastapi import FastAPI, Request
# app = FastAPI()
# @app.middleware("http")
# async def inject_user(request: Request, call_next):
# # 假设从 token 解出用户对象 user
# token = set_current_user(user)
# try:
# return await call_next(request)
# finally:
# CURRENT_USER.reset(token)
# 2) 自定义拒绝处理为抛出 HTTP 异常
# from fastapi import HTTPException
def fastapi_on_deny(*, func, user, reason, required, missing, context):
detail = {"reason": reason, "required": required, "missing": missing}
status = 401 if reason in ("NO_CONTEXT", "NO_USER") else 403
# 注意:此函数不依赖框架本身,示例中返回字典,真实项目可直接 raise HTTPException
return {"detail": detail, "status_code": status}
# 3) 直接使用(resolver 默认从 CURRENT_USER)
@access_control(roles=["admin"], perms=["ops.manage"], on_deny=fastapi_on_deny)
def manage_ops():
return {"ok": True}
# from flask import g
# def flask_resolver():
# return getattr(g, "user", None)
#
# @access_control(resolver=flask_resolver, roles=["admin"])
# def admin_view():
# return "ok"
设计要点
错误处理
如需扩展更多策略(如租户隔离、资源级权限、多策略组合等),可以在当前实现的基础上增加参数与校验分支,并保持 on_deny 的可插拔性。
本装饰器为纯函数与 I/O 密集函数提供可配置缓存,面向数据分析计算与三方接口聚合等高频重复计算/请求场景,支持同步与异步函数两种形态。核心特性:
适用方向:
from __future__ import annotations
import asyncio
import functools
import hashlib
import inspect
import pickle
import threading
import time
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional, Protocol, Tuple, TypeVar, ParamSpec, Union
P = ParamSpec("P")
R = TypeVar("R")
# -------------------------
# Serializer abstraction
# -------------------------
class Serializer(Protocol):
def dumps(self, obj: Any) -> bytes: ...
def loads(self, data: bytes) -> Any: ...
class PickleSerializer:
"""默认序列化实现,使用 pickle 的最高协议。"""
def dumps(self, obj: Any) -> bytes:
return pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
def loads(self, data: bytes) -> Any:
return pickle.loads(data)
# -------------------------
# Cache Backend abstraction
# -------------------------
class BaseCacheBackend:
"""缓存后端抽象,提供同步与异步两套 API(异步默认调用同步实现)。"""
def get(self, full_key: str) -> Any:
raise NotImplementedError
def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
raise NotImplementedError
def delete(self, full_key: str) -> None:
raise NotImplementedError
def clear_namespace(self, namespace: str) -> None:
raise NotImplementedError
async def aget(self, full_key: str) -> Any:
# 默认异步转同步
return self.get(full_key)
async def aset(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
self.set(full_key, value, ttl)
async def adelete(self, full_key: str) -> None:
self.delete(full_key)
async def aclear_namespace(self, namespace: str) -> None:
self.clear_namespace(namespace)
# -------------------------
# Memory LRU cache backend
# -------------------------
class MemoryCache(BaseCacheBackend):
"""线程安全的内存 LRU 缓存,带 TTL 与命名空间前缀。"""
def __init__(self, maxsize: int = 500, namespace: str = "default") -> None:
self.maxsize = maxsize
self.namespace = namespace
self._store: "OrderedDict[str, Tuple[Optional[float], Any]]" = OrderedDict()
self._lock = threading.RLock()
def _now(self) -> float:
return time.time()
def _is_expired(self, exp: Optional[float]) -> bool:
return exp is not None and self._now() >= exp
def _compose(self, key: str) -> str:
return f"{self.namespace}:{key}"
def _evict_if_needed(self) -> None:
# 优先清理过期;否则按 LRU 淘汰
if self.maxsize <= 0:
self._store.clear()
return
# 先尽量清掉过期项
keys_to_delete = [k for k, (exp, _) in self._store.items() if self._is_expired(exp)]
for k in keys_to_delete:
self._store.pop(k, None)
while len(self._store) > self.maxsize:
self._store.popitem(last=False)
def get(self, full_key: str) -> Any:
with self._lock:
item = self._store.get(full_key)
if not item:
return None
exp, value = item
if self._is_expired(exp):
self._store.pop(full_key, None)
return None
# 命中则移动到 LRU 末尾
self._store.move_to_end(full_key, last=True)
return value
def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
with self._lock:
exp = None if not ttl or ttl <= 0 else (self._now() + ttl)
self._store[full_key] = (exp, value)
self._store.move_to_end(full_key, last=True)
self._evict_if_needed()
def delete(self, full_key: str) -> None:
with self._lock:
self._store.pop(full_key, None)
def clear_namespace(self, namespace: str) -> None:
prefix = f"{namespace}:"
with self._lock:
keys = [k for k in self._store.keys() if k.startswith(prefix)]
for k in keys:
self._store.pop(k, None)
# 异步 API 直接复用同步(内存操作非阻塞)
# aget/aset/adelete/aclear_namespace 继承自父类的默认实现
# -------------------------
# Redis cache backend
# -------------------------
class RedisCache(BaseCacheBackend):
"""
Redis 后端。
- 支持 redis.Redis(同步)或 redis.asyncio.Redis(异步)
- 若仅提供同步 client 且在异步调用中使用,会通过 asyncio.to_thread 转线程池避免阻塞事件循环
"""
def __init__(self, client: Any, serializer: Serializer, namespace: str = "default") -> None:
self.client = client
self.serializer = serializer
self.namespace = namespace
# 检测是否是异步客户端(redis.asyncio)
self._is_async_client = any(
inspect.iscoroutinefunction(getattr(client, name, None))
for name in ("get", "set", "delete", "scan", "scan_iter")
)
def _compose(self, key: str) -> str:
return f"{self.namespace}:{key}"
# ---- Sync API ----
def get(self, full_key: str) -> Any:
if self._is_async_client:
# 避免 event loop 中误用同步 API
raise RuntimeError("Async Redis client in sync context; use aget() wrapper.")
data = self.client.get(full_key)
if data is None:
return None
return self.serializer.loads(data)
def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
if self._is_async_client:
raise RuntimeError("Async Redis client in sync context; use aset() wrapper.")
data = self.serializer.dumps(value)
ex = ttl if ttl and ttl > 0 else None
# ex=TTL 秒
self.client.set(full_key, data, ex=ex)
def delete(self, full_key: str) -> None:
if self._is_async_client:
raise RuntimeError("Async Redis client in sync context; use adelete() wrapper.")
self.client.delete(full_key)
def clear_namespace(self, namespace: str) -> None:
if self._is_async_client:
raise RuntimeError("Async Redis client in sync context; use aclear_namespace() wrapper.")
prefix = f"{namespace}:"
# 使用 scan_iter 避免大键空间阻塞
if hasattr(self.client, "scan_iter"):
for k in self.client.scan_iter(match=f"{prefix}*"):
self.client.delete(k)
else:
# 回退到 scan 循环
cursor = 0
while True:
cursor, keys = self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
if keys:
self.client.delete(*keys)
if cursor == 0:
break
# ---- Async API ----
async def aget(self, full_key: str) -> Any:
if self._is_async_client:
data = await self.client.get(full_key)
else:
data = await asyncio.to_thread(self.client.get, full_key)
if data is None:
return None
return self.serializer.loads(data)
async def aset(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
data = self.serializer.dumps(value)
if self._is_async_client:
ex = ttl if ttl and ttl > 0 else None
await self.client.set(full_key, data, ex=ex)
else:
await asyncio.to_thread(self.client.set, full_key, data, None if not ttl or ttl <= 0 else ttl)
async def adelete(self, full_key: str) -> None:
if self._is_async_client:
await self.client.delete(full_key)
else:
await asyncio.to_thread(self.client.delete, full_key)
async def aclear_namespace(self, namespace: str) -> None:
prefix = f"{namespace}:"
if self._is_async_client:
# 优先使用异步 scan_iter
if hasattr(self.client, "scan_iter"):
async for k in self.client.scan_iter(match=f"{prefix}*"):
await self.client.delete(k)
else:
cursor = 0
while True:
cursor, keys = await self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
if keys:
await self.client.delete(*keys)
if cursor == 0:
break
else:
# 同步客户端转线程
def _clear():
if hasattr(self.client, "scan_iter"):
for k in self.client.scan_iter(match=f"{prefix}*"):
self.client.delete(k)
else:
cursor = 0
while True:
cursor, keys = self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
if keys:
self.client.delete(*keys)
if cursor == 0:
break
await asyncio.to_thread(_clear)
# -------------------------
# Key builder
# -------------------------
def _stable_hash_bytes(obj: Any) -> bytes:
"""尽量稳定、可跨进程的参数摘要:优先 pickle;失败则退回 repr()."""
try:
return pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
except Exception:
return repr(obj).encode("utf-8", errors="ignore")
def default_key_builder(func: Callable[..., Any], namespace: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> str:
"""
产生稳定且短小的 key:namespace + 函数标识 + 参数摘要(blake2b)。
"""
hasher = hashlib.blake2b(digest_size=16)
hasher.update(_stable_hash_bytes(args))
# kwargs 按 key 排序
if kwargs:
items = tuple(sorted(kwargs.items(), key=lambda kv: kv[0]))
hasher.update(_stable_hash_bytes(items))
digest = hasher.hexdigest()
fn_id = f"{func.__module__}.{getattr(func, '__qualname__', func.__name__)}"
return f"{fn_id}:{digest}"
# -------------------------
# Decorator factory
# -------------------------
@dataclass
class CacheConfig:
backend: str = "memory" # "memory" | "redis"
ttl: int = 60
key: Optional[Callable[[Callable[..., Any], str, Tuple[Any, ...], Dict[str, Any]], str]] = None
unless: Optional[Callable[..., bool]] = None # True => 跳过缓存
maxsize: int = 500 # 仅对 memory 生效
namespace: str = "default"
serializer: Optional[Serializer] = None # Redis 使用;若不传用 Pickle
backend_options: Optional[Dict[str, Any]] = None # e.g. {'client': redis_client}
def cacheable(
*,
backend: str = "memory",
ttl: int = 60,
key: Optional[Callable[[Callable[..., Any], str, Tuple[Any, ...], Dict[str, Any]], str]] = None,
unless: Optional[Callable[..., bool]] = None,
maxsize: int = 500,
namespace: str = "default",
serializer: Optional[Serializer] = None,
backend_options: Optional[Dict[str, Any]] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
可配置缓存装饰器:
- backend: "memory" | "redis"
- ttl: 秒;<=0 表示不过期(但仍可手动失效)
- key: 自定义键函数 (func, namespace, args, kwargs) -> str
- unless: 返回 True 则跳过缓存(不读不写)
- maxsize: 内存 LRU 容量上限
- namespace: 业务隔离用前缀
- serializer: 序列化器(默认 PickleSerializer);Redis 必需/默认
- backend_options: 后端额外配置,Redis 需提供 {'client': redis_client}
"""
cfg = CacheConfig(
backend=backend,
ttl=ttl,
key=key or default_key_builder,
unless=unless,
maxsize=maxsize,
namespace=namespace,
serializer=serializer or PickleSerializer(),
backend_options=backend_options or {},
)
# 构造后端
if cfg.backend not in ("memory", "redis"):
raise ValueError("backend must be 'memory' or 'redis'.")
if cfg.backend == "memory":
cache_backend: BaseCacheBackend = MemoryCache(maxsize=cfg.maxsize, namespace=cfg.namespace)
else:
client = cfg.backend_options.get("client")
if client is None:
raise ValueError("For backend='redis', backend_options['client'] is required.")
cache_backend = RedisCache(client=client, serializer=cfg.serializer, namespace=cfg.namespace)
# 并发锁:按 key 细粒度
_thread_locks: Dict[str, threading.Lock] = {}
_async_locks: Dict[str, asyncio.Lock] = {}
def _get_thread_lock(k: str) -> threading.Lock:
# 轻量线程安全:双重检查
lock = _thread_locks.get(k)
if lock:
return lock
with threading.Lock():
return _thread_locks.setdefault(k, threading.Lock())
def _get_async_lock(k: str) -> asyncio.Lock:
lock = _async_locks.get(k)
if lock:
return lock
lock = asyncio.Lock()
_async_locks[k] = lock
return lock
def decorator(func: Callable[P, R]) -> Callable[P, R]:
is_coro = inspect.iscoroutinefunction(func)
@functools.wraps(func)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
# unless 为 True => 直接调用原函数
if cfg.unless and cfg.unless(*args, **kwargs):
return func(*args, **kwargs)
k = cfg.key(func, cfg.namespace, args, kwargs) # base key
full_key = f"{cfg.namespace}:{k}"
# 1st 尝试读取
cached = cache_backend.get(full_key)
if cached is not None:
return cached
# 防击穿:按 key 加锁
lk = _get_thread_lock(full_key)
with lk:
# 进入临界区后再次尝试读取
cached2 = cache_backend.get(full_key)
if cached2 is not None:
return cached2
# 计算与写入
result = func(*args, **kwargs)
cache_backend.set(full_key, result, cfg.ttl)
return result
@functools.wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
if cfg.unless and cfg.unless(*args, **kwargs):
return await func(*args, **kwargs) # type: ignore[misc]
k = cfg.key(func, cfg.namespace, args, kwargs)
full_key = f"{cfg.namespace}:{k}"
cached = await cache_backend.aget(full_key)
if cached is not None:
return cached
lk = _get_async_lock(full_key)
async with lk:
cached2 = await cache_backend.aget(full_key)
if cached2 is not None:
return cached2
result = await func(*args, **kwargs) # type: ignore[misc]
await cache_backend.aset(full_key, result, cfg.ttl)
return result
# 选择同步/异步 wrapper
wrapper: Union[Callable[P, R], Callable[P, R]] = async_wrapper if is_coro else sync_wrapper
# 失效 API 挂载(与函数形态一致)
if is_coro:
async def invalidate(*iargs: P.args, **ikwargs: P.kwargs) -> None:
k = cfg.key(func, cfg.namespace, iargs, ikwargs)
full_key = f"{cfg.namespace}:{k}"
await cache_backend.adelete(full_key)
async def clear_namespace() -> None:
await cache_backend.aclear_namespace(cfg.namespace)
setattr(wrapper, "invalidate", invalidate)
setattr(wrapper, "clear_namespace", clear_namespace)
else:
def invalidate(*iargs: P.args, **ikwargs: P.kwargs) -> None:
k = cfg.key(func, cfg.namespace, iargs, ikwargs)
full_key = f"{cfg.namespace}:{k}"
cache_backend.delete(full_key)
def clear_namespace() -> None:
cache_backend.clear_namespace(cfg.namespace)
setattr(wrapper, "invalidate", invalidate)
setattr(wrapper, "clear_namespace", clear_namespace)
# 便于调试/扩展:暴露 backend 与配置
setattr(wrapper, "cache_backend", cache_backend)
setattr(wrapper, "cache_config", cfg)
return wrapper # type: ignore[return-value]
return decorator
示例 1:数据分析计算(同步),内存 LRU 缓存
# 假设为昂贵的聚合计算
@cacheable(
backend="memory",
ttl=120, # 2 分钟
maxsize=1000,
namespace="analytics",
)
def heavy_agg(series: list[int], topk: int = 10) -> dict:
time.sleep(0.5) # 模拟昂贵计算
sorted_vals = sorted(series, reverse=True)[:topk]
return {"topk": sorted_vals, "sum": sum(series)}
# 调用
data = [1,2,3,4,5,6,7,8,9,10]
print(heavy_agg(data, topk=3)) # 首次 ~0.5s
print(heavy_agg(data, topk=3)) # 命中缓存,基本 0ms
# 按参数失效(仅移除对应参数组合的缓存)
heavy_agg.invalidate(data, topk=3)
# 按命名空间清空
heavy_agg.clear_namespace()
示例 2:三方接口聚合(异步),Redis 缓存
import asyncio
from typing import Any
# 使用 redis-py 4.x 的 asyncio 客户端
from redis.asyncio import Redis
redis_client = Redis(host="127.0.0.1", port=6379, db=0)
# unless:当 request_no_cache=True 时跳过缓存
def unless_no_cache(*args, **kwargs) -> bool:
return kwargs.get("request_no_cache", False)
# 自定义键:以 user_id 与 path 构建
def api_key(func, namespace, args, kwargs) -> str:
user_id = kwargs.get("user_id", "anon")
path = kwargs.get("path", "/")
# 与默认行为一致,末尾仍加入参数摘要,避免同 user_id/path 不同 query 的冲突
return f"{func.__module__}.{func.__qualname__}:{user_id}:{path}:{hashlib.blake2b(pickle.dumps(kwargs, protocol=pickle.HIGHEST_PROTOCOL), digest_size=8).hexdigest()}"
@cacheable(
backend="redis",
ttl=30,
key=api_key,
unless=unless_no_cache,
namespace="api-aggregate",
backend_options={"client": redis_client},
)
async def aggregate_third_party(*, user_id: str, path: str, query: dict[str, Any]) -> dict[str, Any]:
# 模拟聚合多个第三方 API
await asyncio.sleep(0.2)
return {"user": user_id, "path": path, "result": {"q": query}, "ts": time.time()}
async def main():
print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"})) # 首次 ~0.2s
print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"})) # 命中 Redis,基本 0ms
# 跳过缓存强制刷新
print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"}, request_no_cache=True))
# 失效与清理
await aggregate_third_party.invalidate(user_id="42", path="/profile", query={"lang": "zh"})
await aggregate_third_party.clear_namespace()
asyncio.run(main())
示例 3:为 Redis 指定 JSON 序列化(便于可视化)
import json
class JsonSerializer:
def dumps(self, obj: Any) -> bytes:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
def loads(self, data: bytes) -> Any:
return json.loads(data.decode("utf-8"))
json_serializer = JsonSerializer()
@cacheable(
backend="redis",
ttl=60,
namespace="json-demo",
serializer=json_serializer,
backend_options={"client": redis_client}, # 同上示例
)
def expensive_config() -> dict:
time.sleep(0.3)
return {"feature": True, "version": 3, "meta": {"owner": "platform"}}
print(expensive_config()) # 首次
print(expensive_config()) # 命中
键生成(key)
TTL 与过期
失效策略
并发与防击穿
序列化(serializer)
后端兼容
元信息保护
如需扩展支持:
将你的业务意图快速转化为可直接落地的 Python 装饰器方案:从需求澄清到结构化设计,再到完整代码、清晰注释、可运行示例与注意事项,一次生成;覆盖性能监控、权限校验、缓存、日志等高频场景;在安全与规范前提下,保证代码可读、可维护、可复用,统一团队风格,显著提升交付速度与质量。
快速生成鉴权、日志、限流等装饰器,直接套用到接口与服务代码,减少样板代码并稳住线上表现。
为数据清洗与特征处理函数一键加缓存与计时,批处理更省时,异常定位有据可依,重复运行更可控。
产出路由、请求校验、权限控制装饰器,沉淀为团队公共组件,统一风格并提升复用率。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期