¥
立即购买

Python装饰器设计与实现

0 浏览
0 试用
0 购买
Dec 10, 2025更新

本提示词专为Python开发者设计,提供专业的装饰器开发指导。通过明确的功能需求和应用场景分析,生成结构完整、注释清晰的装饰器代码,并附带详细的使用示例和注意事项。能够有效解决函数功能扩展、性能监控、权限验证等常见开发需求,提升代码的可维护性和复用性。适用于Web开发、数据处理、系统工具等多种Python应用场景。

装饰器功能概述

一个可复用的性能监控装饰器,支持同步与异步函数,提供如下能力:

  • 统计:记录调用次数、异常次数、累计耗时与最近耗时,便于后续聚合分析
  • 定时:测量函数调用耗时(高精度:perf_counter_ns)
  • 结构化日志:输出统一埋点结构,适用于开发与生产环境
  • 慢调用告警:当耗时超过阈值时触发自定义告警回调
  • 采样控制:按 sample_rate 采样输出日志,避免高流量下的过量日志
  • 上下文标签:支持全局/装饰器级/请求级标签合并,统一业务标签输出
  • 元信息保持:不破坏原函数名、文档字符串等

典型场景:

  • Web 接口(如 FastAPI/Flask/Django)的请求处理监控
  • 数据处理任务(ETL、批处理、定时任务)的性能追踪
  • 统一开发与生产埋点输出

参数说明(装饰器参数):

  • threshold_ms: int = 200,慢调用阈值(毫秒)
  • log_level: str = "INFO" | "DEBUG",基础日志级别(慢调用自动提升为 WARNING,异常为 ERROR)
  • emit: Callable[[dict], None | Awaitable] = 自定义日志写入函数,默认使用内置 logging 输出 JSON
  • alert: Callable[[dict], None | Awaitable] = 慢调用告警回调(仅当耗时超过阈值时触发)
  • sample_rate: float = 0.3,日志采样率(0~1)
  • tags: dict[str, Any] = 业务标签(与上下文标签合并)
  • env: Optional[str] = 环境标识,默认读取 APP_ENV(未设置则为 "dev")

附加能力:

  • monitor_bind(tags): 上下文标签绑定(基于 contextvars),适合在中间件中添加 request_id、user_id 等
  • 获取统计:通过 wrapped_func.monitor_stats() 获得当前函数统计信息
  • 重置统计:wrapped_func.reset_monitor_stats()

完整代码实现

from __future__ import annotations

import asyncio
import functools
import inspect
import json
import logging
import os
import random
import threading
import time
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, asdict
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Tuple, Union

# -------------------------
# 上下文标签(请求级/任务级)
# -------------------------
_CTX_TAGS: ContextVar[Dict[str, Any]] = ContextVar("_CTX_TAGS", default={})


@contextmanager
def monitor_bind(tags: Mapping[str, Any]):
    """
    绑定临时上下文标签,适合在 Web 中间件或任务入口使用。
    示例:
        with monitor_bind({"trace_id": "abc", "user_id": 42}):
            handler()
    """
    if not isinstance(tags, Mapping):
        raise TypeError("monitor_bind(tags): tags must be a mapping")
    current = _CTX_TAGS.get()
    merged = {**current, **dict(tags)}
    token = _CTX_TAGS.set(merged)
    try:
        yield
    finally:
        _CTX_TAGS.reset(token)


def _current_ctx_tags() -> Dict[str, Any]:
    # 返回当前上下文标签的浅拷贝,避免外部修改
    return dict(_CTX_TAGS.get() or {})


# -------------------------
# 统计器
# -------------------------
@dataclass
class _Stats:
    total_calls: int = 0
    total_exceptions: int = 0
    total_time_ns: int = 0
    last_duration_ns: int = 0
    _lock: threading.Lock = threading.Lock()

    def update(self, duration_ns: int, errored: bool) -> None:
        with self._lock:
            self.total_calls += 1
            if errored:
                self.total_exceptions += 1
            self.total_time_ns += duration_ns
            self.last_duration_ns = duration_ns

    def snapshot(self) -> Dict[str, Any]:
        with self._lock:
            avg_ns = (self.total_time_ns / self.total_calls) if self.total_calls else 0.0
            return {
                "total_calls": self.total_calls,
                "total_exceptions": self.total_exceptions,
                "total_time_ns": self.total_time_ns,
                "last_duration_ns": self.last_duration_ns,
                "avg_ms": avg_ns / 1_000_000.0,
            }

    def reset(self) -> None:
        with self._lock:
            self.total_calls = 0
            self.total_exceptions = 0
            self.total_time_ns = 0
            self.last_duration_ns = 0


# -------------------------
# 工具函数
# -------------------------
def _now_iso() -> str:
    # 使用 time.time() + UTC ISO 格式,避免引入 datetime 依赖成本
    ts = time.time()
    # 保留毫秒,统一 ISO-like 字符串
    return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(ts)) + f".{int((ts % 1) * 1000):03d}Z"


def _parse_log_level(level_str: str) -> int:
    level_str = (level_str or "INFO").upper()
    if level_str not in {"INFO", "DEBUG"}:
        # 限制在 INFO/DEBUG,其他值回退 INFO
        level_str = "INFO"
    return logging.DEBUG if level_str == "DEBUG" else logging.INFO


def _default_emit(record: Dict[str, Any], level: int) -> None:
    logger = logging.getLogger("perf.monitor")
    logger.log(level, json.dumps(record, ensure_ascii=False, separators=(",", ":")))


def _coerce_tags(tags: Optional[Mapping[str, Any]]) -> Dict[str, Any]:
    if tags is None:
        return {}
    if not isinstance(tags, Mapping):
        raise TypeError("tags must be a mapping")
    # 规范化 key 为 str,避免不可序列化/冲突
    return {str(k): v for k, v in dict(tags).items()}


def _should_sample(rate: float) -> bool:
    if rate <= 0:
        return False
    if rate >= 1:
        return True
    return random.random() < rate


def _get_func_location(fn: Callable[..., Any]) -> Tuple[str, Optional[str], Optional[int]]:
    module = getattr(fn, "__module__", "<unknown>")
    file = None
    line = None
    try:
        src_file = inspect.getsourcefile(fn) or inspect.getfile(fn)
        file = src_file
        try:
            _, line = inspect.getsourcelines(fn)
        except OSError:
            line = None
    except OSError:
        pass
    return module, file, line


async def _maybe_call_async(fn: Callable[[Dict[str, Any]], Any], record: Dict[str, Any]) -> None:
    """
    在异步上下文中调用 emit/alert:
    - 如果是 async 函数,await 它
    - 如果是普通函数,直接调用
    """
    if fn is None:
        return
    try:
        result = fn(record)
        if inspect.isawaitable(result):
            await result  # type: ignore[func-returns-value]
    except Exception:  # 不让外部回调影响主逻辑
        logging.getLogger("perf.monitor").exception("monitor callback failed")


def _maybe_call_sync(fn: Callable[[Dict[str, Any]], Any], record: Dict[str, Any]) -> None:
    """
    在同步上下文中调用 emit/alert:
    - 如果是 async 函数,尝试获取运行中的事件循环创建任务,否则使用 asyncio.run
    - 如果是普通函数,直接调用
    """
    if fn is None:
        return
    try:
        result = fn(record)
        if inspect.isawaitable(result):
            try:
                loop = asyncio.get_running_loop()
                # 在已有事件循环中调度(例如在某些框架里)
                loop.create_task(result)  # fire-and-forget
            except RuntimeError:
                # 无事件循环,直接运行
                asyncio.run(result)
    except Exception:
        logging.getLogger("perf.monitor").exception("monitor callback failed")


# -------------------------
# 装饰器主体
# -------------------------
def monitor(
    *,
    threshold_ms: int = 200,
    log_level: str = "INFO",
    emit: Optional[Callable[[Dict[str, Any]], Union[None, Awaitable[Any]]]] = None,
    alert: Optional[Callable[[Dict[str, Any]], Union[None, Awaitable[Any]]]] = None,
    sample_rate: float = 0.3,
    tags: Optional[Mapping[str, Any]] = None,
    env: Optional[str] = None,
):
    """
    性能监控装饰器(支持同步/异步函数)。
    统计调用次数与异常、记录结构化日志、慢调用触发告警。

    注意:
    - 日志输出遵循采样率(sample_rate)
    - 告警在超阈值时总是触发(不受采样影响)
    - 异常会记录为 ERROR 级别日志(不受采样影响)

    返回:包装后的函数,其上挂载:
        .monitor_stats() -> Dict[str, Any]
        .reset_monitor_stats() -> None
    """
    # 参数校验
    if not isinstance(threshold_ms, int) or threshold_ms < 0:
        raise ValueError("threshold_ms must be a non-negative integer")
    if not isinstance(sample_rate, (float, int)) or not (0.0 <= float(sample_rate) <= 1.0):
        raise ValueError("sample_rate must be a float in [0, 1]")
    base_level = _parse_log_level(log_level)
    emit = emit or _default_emit
    dec_tags = _coerce_tags(tags)
    env = env or os.getenv("APP_ENV", "dev")

    def _decorate(func: Callable[..., Any]):
        is_coro = inspect.iscoroutinefunction(func)
        module, file, line = _get_func_location(func)
        qualname = getattr(func, "__qualname__", getattr(func, "__name__", "<anonymous>"))

        stats = _Stats()

        def monitor_stats() -> Dict[str, Any]:
            d = stats.snapshot()
            d.update(
                {
                    "function": qualname,
                    "module": module,
                    "file": file,
                    "line": line,
                }
            )
            return d

        def reset_monitor_stats() -> None:
            stats.reset()

        def build_record(
            duration_ns: int,
            status: str,
            sampled: bool,
            extra_tags: Optional[Mapping[str, Any]],
            exc_repr: Optional[str],
        ) -> Dict[str, Any]:
            duration_ms = duration_ns / 1_000_000.0
            s = stats.snapshot()
            ctx = _current_ctx_tags()
            merged_tags = {**dec_tags, **ctx}
            if extra_tags:
                merged_tags.update(_coerce_tags(extra_tags))
            slow = duration_ms >= float(threshold_ms)
            # 动态级别提升
            level = base_level
            if status == "error":
                level = logging.ERROR
            elif slow:
                level = max(base_level, logging.WARNING)

            record = {
                "ts": _now_iso(),
                "env": env,
                "function": qualname,
                "module": module,
                "file": file,
                "line": line,
                "duration_ms": round(duration_ms, 3),
                "threshold_ms": threshold_ms,
                "slow": slow,
                "status": status,  # ok | error
                "calls": s["total_calls"],
                "exceptions": s["total_exceptions"],
                "avg_ms": round(s["avg_ms"], 3),
                "sampled": sampled,
                "tags": merged_tags or {},
            }
            if exc_repr:
                record["error"] = exc_repr
            return record, level

        if is_coro:

            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                # 允许通过特殊关键字 _monitor_tags 传入临时标签(不会透传给原函数)
                extra_tags = kwargs.pop("_monitor_tags", None)
                start = time.perf_counter_ns()
                status = "ok"
                exc_repr = None
                try:
                    result = await func(*args, **kwargs)
                    return result
                except Exception as e:
                    status = "error"
                    exc_repr = f"{type(e).__name__}: {e}"
                    raise
                finally:
                    end = time.perf_counter_ns()
                    duration_ns = end - start
                    stats.update(duration_ns, errored=(status == "error"))

                    # 构建记录与输出策略
                    sampled = _should_sample(float(sample_rate))
                    record, level = build_record(duration_ns, status, sampled, extra_tags, exc_repr)

                    # 日志输出:异常或采样命中则输出
                    if status == "error" or record["sampled"]:
                        await _maybe_call_async(emit, record)

                    # 告警:慢调用时总是触发(不受采样影响)
                    if record["slow"] and alert is not None:
                        await _maybe_call_async(alert, record)

            # 挂载统计接口
            async_wrapper.monitor_stats = monitor_stats  # type: ignore[attr-defined]
            async_wrapper.reset_monitor_stats = reset_monitor_stats  # type: ignore[attr-defined]
            return async_wrapper

        else:

            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                extra_tags = kwargs.pop("_monitor_tags", None)
                start = time.perf_counter_ns()
                status = "ok"
                exc_repr = None
                try:
                    result = func(*args, **kwargs)
                    return result
                except Exception as e:
                    status = "error"
                    exc_repr = f"{type(e).__name__}: {e}"
                    raise
                finally:
                    end = time.perf_counter_ns()
                    duration_ns = end - start
                    stats.update(duration_ns, errored=(status == "error"))

                    sampled = _should_sample(float(sample_rate))
                    record, level = build_record(duration_ns, status, sampled, extra_tags, exc_repr)

                    if status == "error" or record["sampled"]:
                        # 默认 emit 会使用 level,但我们已在 record 中不携带 level,仅在内部决定
                        # 因为 emit 接口仅接受 record,这里通过 logger.level 约定避免泄露实现细节
                        # 所以传级别在默认 emit 内部处理,这里只需要调用 emit
                        # 为支持自定义 emit(可能忽略动态级别),我们在 record 中不强制写 level
                        # 如果有需要,可在自定义 emit 中根据 slow/status 决定输出级别
                        # 但为兼容默认 emit,我们仍统一调用一次
                        # Note: 为保证自定义 emit 能获知 level,允许它从记录字段推断
                        # 我们不额外传 level 参数,保持 emit(record) 签名简单。
                        # 若需要强传,可在 emit 环境闭包中自取逻辑。
                        pass

                    # 为了默认 emit 使用正确级别,这里单独调用默认 emit 时带级别;
                    # 对于用户自定义 emit(只接 record),我们仍会调用一次,兼容两者。
                    # 因此这里统一处理两路:如果 emit 是默认 emit,使用带级别的版本;
                    # 否则,直接调用用户 emit(record)。
                    if status == "error" or record["sampled"]:
                        if emit is _default_emit:
                            _default_emit(record, level)
                        else:
                            _maybe_call_sync(emit, record)

                    if record["slow"] and alert is not None:
                        _maybe_call_sync(alert, record)

            sync_wrapper.monitor_stats = monitor_stats  # type: ignore[attr-defined]
            sync_wrapper.reset_monitor_stats = reset_monitor_stats  # type: ignore[attr-defined]
            return sync_wrapper

    return _decorate

使用示例

  • 基本使用(同步函数)
import logging

logging.basicConfig(level=logging.DEBUG)

@monitor(threshold_ms=200, sample_rate=0.5, tags={"service": "billing", "component": "settlement"})
def calc_heavy(a: int, b: int) -> int:
    # 模拟耗时
    time.sleep(0.25)
    return a + b

with monitor_bind({"request_id": "req-123", "user_id": 42}):
    for _ in range(3):
        try:
            calc_heavy(1, 2)
        except Exception:
            pass

print("stats:", calc_heavy.monitor_stats())
# 重置统计
calc_heavy.reset_monitor_stats()
  • 异步函数(如在 FastAPI/asyncio 中)
import asyncio

async def my_alert(record: dict):
    # 慢调用告警(仅示例)
    print("[ALERT]", record["function"], "duration:", record["duration_ms"], "ms")

@monitor(threshold_ms=50, log_level="DEBUG", sample_rate=1.0, alert=my_alert, tags={"service": "api"})
async def fetch_user(uid: int):
    await asyncio.sleep(0.08)  # 80ms,超过阈值 50ms,会触发 alert
    return {"id": uid}

async def main():
    async with asyncio.TaskGroup() as tg:
        for i in range(3):
            tg.create_task(fetch_user(i))

    print("async stats:", fetch_user.monitor_stats())

asyncio.run(main())
  • Web 场景(以 FastAPI 为例)
from fastapi import FastAPI, Request
app = FastAPI()

@app.middleware("http")
async def bind_tags(request: Request, call_next):
    # 每个请求绑定 trace 信息
    with monitor_bind({"trace_id": request.headers.get("x-trace-id", "-"),
                       "path": request.url.path,
                       "method": request.method}):
        response = await call_next(request)
        return response

@monitor(threshold_ms=150, tags={"service": "user-api", "endpoint": "get_user"})
async def get_user_logic(user_id: int):
    await asyncio.sleep(0.2)  # 模拟慢调用
    return {"id": user_id}

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    return await get_user_logic(user_id)
  • 自定义 emit(将结构化日志写入任意系统)
def my_emit(record: dict):
    # 例如写入 stdout 或消息队列(仅示意)
    print("[LOG]", json.dumps(record, ensure_ascii=False))

@monitor(emit=my_emit, sample_rate=1.0, tags={"team": "core"})
def work(x):
    time.sleep(0.01)
    return x * 2

work(21)
  • 在调用处临时传入标签(不会传递给原函数)
@monitor(sample_rate=1.0)
def compute(x):
    time.sleep(0.02)

compute(1, _monitor_tags={"job_id": "j-7788", "step": "map"})

技术说明

  • 高精度计时:使用 time.perf_counter_ns() 获取纳秒级时间戳,避免系统时间回拨影响。
  • 异步支持:基于 inspect.iscoroutinefunction 区分同步/异步函数,分别包装。回调 emit/alert 同样支持 sync/async,两类上下文分别处理:
    • 异步上下文:如果回调返回 awaitable,则 await;否则直接调用。
    • 同步上下文:如果回调返回 awaitable,优先投递到现有事件循环(若存在),否则使用 asyncio.run 执行。
  • 结构化日志:统一输出字段,便于在开发/生产环境通过同一 schema 进行分析:
    • 时间/环境:ts、env(默认 APP_ENV 或 dev)
    • 代码位置信息:function、module、file、line
    • 性能数据:duration_ms、threshold_ms、slow、avg_ms
    • 可靠性:status(ok|error)、calls、exceptions
    • 打点控制:sampled
    • 业务维度:tags(合并装饰器标签、上下文标签、调用时标签)
  • 动态级别:
    • 基础级别由 log_level 决定(INFO/DEBUG)
    • 慢调用自动提升为 WARNING
    • 发生异常自动提升为 ERROR
  • 统计器:
    • 每个被装饰函数持有独立 _Stats(线程安全)
    • 提供 monitor_stats()/reset_monitor_stats() 接口读取/清零
  • 采样策略:
    • 日志输出遵循 sample_rate
    • 告警不受采样限制(只要慢调用,即触发)
    • 异常日志不受采样限制(总是输出)
  • 元信息保留:使用 functools.wraps 保留原函数名、文档、注解等信息。

注意事项

  • 性能开销:装饰器尽量保持轻量,仍建议在高 QPS 场景下合理设置 sample_rate,避免日志风暴。
  • 线程与进程:统计仅在当前进程内聚合;多进程/容器环境需要在外部汇总(例如通过日志/指标系统)。
  • 回调安全:
    • emit/alert 的异常会被捕获并记录,不会影响主流程。
    • 若在异步路径中使用阻塞型 emit/alert,可能阻塞事件循环;建议提供异步版本或在回调内部异步投递。
  • 标签与隐私:tags 建议只包含必要的业务信息,避免敏感数据泄露到日志系统。
  • 调用时临时标签:通过关键字参数 _monitor_tags 传入,仅被装饰器消费,不会传给原函数。名称以单下划线前缀以尽量减少冲突。
  • 环境标识:env 默认读取 APP_ENV;如需严格区分(dev/staging/prod),建议在部署环境设置 APP_ENV。
  • 慢调用阈值选择:threshold_ms 应结合实际 SLO 制定;建议在不同端点/任务使用不同阈值。
  • 日志落地:默认 emit 写入 logging 的 perf.monitor logger,输出 JSON 字符串;可按需替换为写入文件、消息队列、APM 等。

如需扩展(例如 Prometheus 指标导出、异常单独告警、参数/返回值采样记录等),可以在现有结构化日志基础上对 emit/alert 进行替换或在装饰器中增加可选参数。

装饰器功能概述

  • 基于角色与权限点的访问控制装饰器,适用于管理后台操作与内部 API。
  • 支持:
    • 按角色(roles)与权限点(perms,全部必需)双重校验
    • 用户白名单(whitelist)优先放行
    • 从请求上下文提取用户(resolver,可自定义)
    • 未授权时返回统一错误(on_deny,可自定义)
  • 兼容同步与异步函数;使用 ContextVar 实现上下文隔离,避免并发污染。
  • 不破坏原函数的元信息(使用 functools.wraps),有良好可维护性与扩展性。

完整代码实现

from __future__ import annotations

from contextvars import ContextVar
from functools import wraps
from typing import Any, Callable, Iterable, Mapping, Optional, Set, Union, Dict, Tuple
import inspect

# ========== 上下文与类型 ==========

# 全局上下文变量:框架中可由中间件/依赖注入设置当前用户
CURRENT_USER: ContextVar[Optional[Any]] = ContextVar("CURRENT_USER", default=None)


def set_current_user(user: Any):
    """
    将用户写入上下文,返回 token 以便重置。
    用法:
        token = set_current_user(user)
        try:
            ...
        finally:
            CURRENT_USER.reset(token)
    """
    return CURRENT_USER.set(user)


# ========== 工具函数 ==========

def _to_str_set(values: Optional[Iterable[Union[str, Any]]]) -> Set[str]:
    """将可迭代对象转换为去重的字符串集合;None -> 空集合。"""
    if not values:
        return set()
    result: Set[str] = set()
    for v in values:
        if v is None:
            continue
        result.add(str(v))
    return result


def _get_from_attr_or_mapping(obj: Any, name: str, default: Any = None) -> Any:
    """
    兼容对象属性与字典取值。
    - 优先属性;其次 Mapping;否则 default。
    """
    if obj is None:
        return default
    if hasattr(obj, name):
        return getattr(obj, name)
    if isinstance(obj, Mapping):
        return obj.get(name, default)
    return default


def _extract_user_id(user: Any) -> Optional[str]:
    """
    从用户对象提取 ID。支持常见字段:id / user_id / uid。
    """
    for key in ("id", "user_id", "uid"):
        val = _get_from_attr_or_mapping(user, key)
        if val is not None:
            return str(val)
    return None


def _extract_roles(user: Any) -> Set[str]:
    """
    从用户对象提取角色集合。支持字段:
    - roles: Iterable[str]
    - role: str 或 Iterable[str]
    """
    roles = _get_from_attr_or_mapping(user, "roles")
    if roles is None:
        roles = _get_from_attr_or_mapping(user, "role")
    if isinstance(roles, str):
        return {roles}
    return _to_str_set(roles)


def _extract_perms(user: Any) -> Set[str]:
    """
    从用户对象提取权限点集合。支持字段:
    - perms / permissions / scopes: Iterable[str] 或空
    """
    for key in ("perms", "permissions", "scopes"):
        perms = _get_from_attr_or_mapping(user, key)
        if perms is not None:
            return _to_str_set(perms)
    return set()


def _default_resolver() -> Optional[Any]:
    """默认从 ContextVar 读取当前用户。"""
    return CURRENT_USER.get()


def _default_on_deny(
    *,
    func: Callable[..., Any],
    user: Optional[Any],
    reason: str,
    required: Dict[str, Set[str]],
    missing: Dict[str, Set[str]],
    context: Dict[str, Any],
) -> Dict[str, Any]:
    """
    默认拒绝处理器:返回统一结构字典。
    可在实际项目中用自定义 on_deny 替换(如抛 HTTP 异常)。
    """
    status = 401 if reason in ("NO_CONTEXT", "NO_USER") else 403
    messages = {
        "NO_CONTEXT": "未发现请求上下文用户,请先登录",
        "NO_USER": "未登录或登录已失效",
        "ROLE_MISMATCH": "角色不允许访问",
        "MISSING_PERMS": "缺少必要权限点",
    }
    return {
        "ok": False,
        "status": status,
        "error": {
            "code": "UNAUTHORIZED" if status == 401 else "FORBIDDEN",
            "message": messages.get(reason, "访问被拒绝"),
            "reason": reason,
            "required": {k: sorted(v) for k, v in required.items() if v},
            "missing": {k: sorted(v) for k, v in missing.items() if v},
        },
    }


# ========== 核心装饰器 ==========

def access_control(
    *,
    roles: Optional[Iterable[str]] = None,
    perms: Optional[Iterable[str]] = None,
    whitelist: Optional[Iterable[str]] = None,
    resolver: Optional[Callable[[], Any]] = None,
    on_deny: Optional[
        Callable[
            ...,
            # 形参采用关键字参数约定,便于扩展
        ]
    ] = None,
):
    """
    访问控制装饰器(支持同步/异步函数)。

    参数:
    - roles: 允许访问的角色列表(至少满足其一)。为空/None 表示不校验角色。
    - perms: 必需的权限点(全部必需)。为空/None 表示不校验权限点。
    - whitelist: 用户白名单(user_id 列表),命中白名单直接放行。
    - resolver: 上下文取用户的函数;默认从 ContextVar CURRENT_USER 获取。
    - on_deny: 自定义拒绝处理器;默认返回统一错误结构,可替换为抛出 HTTP 异常等。

    校验顺序:
    1) 无用户 -> 拒绝
    2) 白名单 -> 放行
    3) 角色校验(如配置)-> 不满足 -> 拒绝
    4) 权限点校验(如配置)-> 缺少 -> 拒绝
    5) 通过 -> 调用原函数
    """
    allowed_roles = _to_str_set(roles)
    required_perms = _to_str_set(perms)
    white_ids = _to_str_set(whitelist)

    _resolver = resolver or _default_resolver
    _on_deny = on_deny or _default_on_deny

    def _check(user: Any) -> Tuple[bool, str, Dict[str, Set[str]]]:
        """
        返回 (通过与否, 原因, 缺失明细)。
        原因:
        - NO_CONTEXT / NO_USER / ROLE_MISMATCH / MISSING_PERMS
        """
        if user is None:
            return False, "NO_USER", {"roles": set(), "perms": set()}

        # 白名单优先
        uid = _extract_user_id(user)
        if uid and uid in white_ids:
            return True, "", {"roles": set(), "perms": set()}

        # 角色校验
        if allowed_roles:
            user_roles = _extract_roles(user)
            if allowed_roles.isdisjoint(user_roles):
                return False, "ROLE_MISMATCH", {"roles": allowed_roles, "perms": set()}

        # 权限点校验(全部必需)
        if required_perms:
            user_perms = _extract_perms(user)
            missing = required_perms - user_perms
            if missing:
                return False, "MISSING_PERMS", {"roles": set(), "perms": missing}

        return True, "", {"roles": set(), "perms": set()}

    def decorator(func: Callable[..., Any]):
        is_async = inspect.iscoroutinefunction(func)

        if is_async:
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                user = _resolver()
                if user is None:
                    # 有 resolver 但返回 None 时统一视为 NO_USER
                    return _on_deny(
                        func=func,
                        user=None,
                        reason="NO_USER",
                        required={"roles": allowed_roles, "perms": required_perms},
                        missing={"roles": set(), "perms": set()},
                        context={"args": args, "kwargs": kwargs},
                    )

                ok, reason, missing = _check(user)
                if not ok:
                    return _on_deny(
                        func=func,
                        user=user,
                        reason=reason,
                        required={"roles": allowed_roles, "perms": required_perms},
                        missing=missing,
                        context={"args": args, "kwargs": kwargs},
                    )
                return await func(*args, **kwargs)

            return async_wrapper

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            user = _resolver()
            if user is None:
                return _on_deny(
                    func=func,
                    user=None,
                    reason="NO_USER",
                    required={"roles": allowed_roles, "perms": required_perms},
                    missing={"roles": set(), "perms": set()},
                    context={"args": args, "kwargs": kwargs},
                )

            ok, reason, missing = _check(user)
            if not ok:
                return _on_deny(
                    func=func,
                    user=user,
                    reason=reason,
                    required={"roles": allowed_roles, "perms": required_perms},
                    missing=missing,
                    context={"args": args, "kwargs": kwargs},
                )
            return func(*args, **kwargs)

        return sync_wrapper

    return access_control  # type: ignore[return-value]

使用示例

  • 示例 1:基础用法(同步函数)
# 假设你的用户结构
class User:
    def __init__(self, uid: str, roles, perms):
        self.id = uid
        self.roles = set(roles)
        self.perms = set(perms)

admin = User("1001", roles=["admin"], perms=["article.read", "article.write"])
guest = User("1002", roles=["guest"], perms=["article.read"])

# 设置当前用户(模拟中间件)
token = set_current_user(admin)

# 声明:需要 admin 角色,且需要 article.write 权限点
@access_control(roles=["admin"], perms=["article.write"])
def publish_article(title: str):
    return {"ok": True, "title": title}

print(publish_article("Hello"))  # -> {"ok": True, "title": "Hello"}

# 切换为 guest
CURRENT_USER.reset(token)
token = set_current_user(guest)
print(publish_article("Hello"))  # -> {"ok": False, "status": 403, "error": {...}}

CURRENT_USER.reset(token)
  • 示例 2:白名单优先(常用于紧急放行或审计账号)
vip = User("root", roles=["guest"], perms=[])
token = set_current_user(vip)

@access_control(whitelist=["root"], perms=["danger.op"])
def dangerous_op():
    return "ok"

print(dangerous_op())  # -> "ok"(虽然缺少权限,但在白名单内)
CURRENT_USER.reset(token)
  • 示例 3:异步函数(如在 FastAPI/ASGI 环境)
import asyncio

admin = {"user_id": "u1", "roles": ["admin"], "permissions": ["ops.view"]}

@access_control(roles=["admin"], perms=["ops.view"])
async def fetch_ops():
    await asyncio.sleep(0.01)
    return {"data": "secret"}

token = set_current_user(admin)
print(asyncio.run(fetch_ops()))  # -> {"data": "secret"}
CURRENT_USER.reset(token)
  • 示例 4:在框架中自定义 resolver 与 on_deny(以 FastAPI 为例)
# 1) 在中间件/依赖中设置 CURRENT_USER(伪代码)
# from fastapi import FastAPI, Request
# app = FastAPI()
# @app.middleware("http")
# async def inject_user(request: Request, call_next):
#     # 假设从 token 解出用户对象 user
#     token = set_current_user(user)
#     try:
#         return await call_next(request)
#     finally:
#         CURRENT_USER.reset(token)

# 2) 自定义拒绝处理为抛出 HTTP 异常
# from fastapi import HTTPException
def fastapi_on_deny(*, func, user, reason, required, missing, context):
    detail = {"reason": reason, "required": required, "missing": missing}
    status = 401 if reason in ("NO_CONTEXT", "NO_USER") else 403
    # 注意:此函数不依赖框架本身,示例中返回字典,真实项目可直接 raise HTTPException
    return {"detail": detail, "status_code": status}

# 3) 直接使用(resolver 默认从 CURRENT_USER)
@access_control(roles=["admin"], perms=["ops.manage"], on_deny=fastapi_on_deny)
def manage_ops():
    return {"ok": True}
  • 示例 5:在 Flask 中从 g 读取用户
# from flask import g
# def flask_resolver():
#     return getattr(g, "user", None)
#
# @access_control(resolver=flask_resolver, roles=["admin"])
# def admin_view():
#     return "ok"

技术说明

  • 设计要点

    • 参数标准化:在装饰器定义阶段将 roles/perms/whitelist 统一转为 set[str],避免每次调用重复处理,提升性能。
    • 访问校验顺序与短路策略:白名单优先、角色再到权限,失败时立即返回,减少无谓计算与函数执行风险(隔离敏感函数)。
    • 上下文隔离:使用 ContextVar 表达“请求上下文”,与线程/协程安全兼容,适用于 ASGI/WSGI 环境。
    • 兼容性:用户对象同时支持对象属性与字典形式;roles/perms 支持多种常见字段名(roles/role、perms/permissions/scopes)。
    • 元信息保留:使用 functools.wraps 保留原函数名、文档与注解,避免工具/框架反射失效。
    • 同步/异步双形态:根据 inspect.iscoroutinefunction 生成对应包装器,避免在同步代码中误用 await。
  • 错误处理

    • 默认 on_deny 返回统一结构(包含 status、reason、缺失明细),便于前后端一致处理。
    • 支持注入自定义 on_deny 来抛出框架特定的异常(如 FastAPI 的 HTTPException)或返回响应对象。

注意事项

  • 用户解析
    • 默认从 ContextVar CURRENT_USER 获取用户。请在中间件或依赖中正确设置它。
    • 用户对象需能解析出 id(id/user_id/uid)、roles(roles/role)、perms(perms/permissions/scopes)。
  • 白名单
    • whitelist 以用户 ID 为准,命中后直接放行,优先级最高。请谨慎维护。
  • 权限点策略
    • perms 为“全部必需”的语义;如需“任一满足”的策略,可在后续版本中扩展参数或自行复制装饰器修改。
  • 敏感函数隔离
    • 未设置用户上下文时默认拒绝,避免被内部代码绕过;确保所有入口(路由/任务)都正确注入用户上下文。
  • 返回类型
    • 默认 on_deny 返回字典;在具体框架中请替换为框架的响应/异常类型以保持一致。
  • 可测试性
    • 可通过 set_current_user 与 CURRENT_USER.reset 在单元测试中注入与还原用户上下文。
  • 性能
    • 参数集合在装饰期标准化,不在调用期重复构建。校验过程为 O(R+P) 集合运算,适用于大多数后台场景。
  • 安全
    • 严禁在 resolver/on_deny 内进行不受控的系统调用。确保拒绝路径不泄露敏感信息(如具体权限缺失的细粒度信息可按需裁剪)。

如需扩展更多策略(如租户隔离、资源级权限、多策略组合等),可以在当前实现的基础上增加参数与校验分支,并保持 on_deny 的可插拔性。

装饰器功能概述

本装饰器为纯函数与 I/O 密集函数提供可配置缓存,面向数据分析计算与三方接口聚合等高频重复计算/请求场景,支持同步与异步函数两种形态。核心特性:

  • 缓存后端可选:内存(LRU)或 Redis
  • TTL 过期控制
  • 自定义键生成(key)与条件跳过缓存(unless)
  • 命名空间(namespace)隔离与失效策略(按参失效/按命名空间清空)
  • 自定义序列化(serializer)
  • 防缓存击穿的同参并发锁(同步/异步分别处理)
  • 完整保留原函数元信息(name、docstring、annotations)

适用方向:

  • 数据分析中昂贵的统计/聚合计算
  • 对三方 API 的聚合/拼装(避免重复请求)
  • 复杂查询或跨系统接口的结果缓存

完整代码实现

from __future__ import annotations

import asyncio
import functools
import hashlib
import inspect
import pickle
import threading
import time
from collections import OrderedDict
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional, Protocol, Tuple, TypeVar, ParamSpec, Union

P = ParamSpec("P")
R = TypeVar("R")

# -------------------------
# Serializer abstraction
# -------------------------
class Serializer(Protocol):
    def dumps(self, obj: Any) -> bytes: ...
    def loads(self, data: bytes) -> Any: ...

class PickleSerializer:
    """默认序列化实现,使用 pickle 的最高协议。"""
    def dumps(self, obj: Any) -> bytes:
        return pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
    def loads(self, data: bytes) -> Any:
        return pickle.loads(data)

# -------------------------
# Cache Backend abstraction
# -------------------------
class BaseCacheBackend:
    """缓存后端抽象,提供同步与异步两套 API(异步默认调用同步实现)。"""

    def get(self, full_key: str) -> Any:
        raise NotImplementedError

    def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
        raise NotImplementedError

    def delete(self, full_key: str) -> None:
        raise NotImplementedError

    def clear_namespace(self, namespace: str) -> None:
        raise NotImplementedError

    async def aget(self, full_key: str) -> Any:
        # 默认异步转同步
        return self.get(full_key)

    async def aset(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
        self.set(full_key, value, ttl)

    async def adelete(self, full_key: str) -> None:
        self.delete(full_key)

    async def aclear_namespace(self, namespace: str) -> None:
        self.clear_namespace(namespace)

# -------------------------
# Memory LRU cache backend
# -------------------------
class MemoryCache(BaseCacheBackend):
    """线程安全的内存 LRU 缓存,带 TTL 与命名空间前缀。"""
    def __init__(self, maxsize: int = 500, namespace: str = "default") -> None:
        self.maxsize = maxsize
        self.namespace = namespace
        self._store: "OrderedDict[str, Tuple[Optional[float], Any]]" = OrderedDict()
        self._lock = threading.RLock()

    def _now(self) -> float:
        return time.time()

    def _is_expired(self, exp: Optional[float]) -> bool:
        return exp is not None and self._now() >= exp

    def _compose(self, key: str) -> str:
        return f"{self.namespace}:{key}"

    def _evict_if_needed(self) -> None:
        # 优先清理过期;否则按 LRU 淘汰
        if self.maxsize <= 0:
            self._store.clear()
            return
        # 先尽量清掉过期项
        keys_to_delete = [k for k, (exp, _) in self._store.items() if self._is_expired(exp)]
        for k in keys_to_delete:
            self._store.pop(k, None)
        while len(self._store) > self.maxsize:
            self._store.popitem(last=False)

    def get(self, full_key: str) -> Any:
        with self._lock:
            item = self._store.get(full_key)
            if not item:
                return None
            exp, value = item
            if self._is_expired(exp):
                self._store.pop(full_key, None)
                return None
            # 命中则移动到 LRU 末尾
            self._store.move_to_end(full_key, last=True)
            return value

    def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
        with self._lock:
            exp = None if not ttl or ttl <= 0 else (self._now() + ttl)
            self._store[full_key] = (exp, value)
            self._store.move_to_end(full_key, last=True)
            self._evict_if_needed()

    def delete(self, full_key: str) -> None:
        with self._lock:
            self._store.pop(full_key, None)

    def clear_namespace(self, namespace: str) -> None:
        prefix = f"{namespace}:"
        with self._lock:
            keys = [k for k in self._store.keys() if k.startswith(prefix)]
            for k in keys:
                self._store.pop(k, None)

    # 异步 API 直接复用同步(内存操作非阻塞)
    # aget/aset/adelete/aclear_namespace 继承自父类的默认实现

# -------------------------
# Redis cache backend
# -------------------------
class RedisCache(BaseCacheBackend):
    """
    Redis 后端。
    - 支持 redis.Redis(同步)或 redis.asyncio.Redis(异步)
    - 若仅提供同步 client 且在异步调用中使用,会通过 asyncio.to_thread 转线程池避免阻塞事件循环
    """
    def __init__(self, client: Any, serializer: Serializer, namespace: str = "default") -> None:
        self.client = client
        self.serializer = serializer
        self.namespace = namespace
        # 检测是否是异步客户端(redis.asyncio)
        self._is_async_client = any(
            inspect.iscoroutinefunction(getattr(client, name, None))
            for name in ("get", "set", "delete", "scan", "scan_iter")
        )

    def _compose(self, key: str) -> str:
        return f"{self.namespace}:{key}"

    # ---- Sync API ----
    def get(self, full_key: str) -> Any:
        if self._is_async_client:
            # 避免 event loop 中误用同步 API
            raise RuntimeError("Async Redis client in sync context; use aget() wrapper.")
        data = self.client.get(full_key)
        if data is None:
            return None
        return self.serializer.loads(data)

    def set(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
        if self._is_async_client:
            raise RuntimeError("Async Redis client in sync context; use aset() wrapper.")
        data = self.serializer.dumps(value)
        ex = ttl if ttl and ttl > 0 else None
        # ex=TTL 秒
        self.client.set(full_key, data, ex=ex)

    def delete(self, full_key: str) -> None:
        if self._is_async_client:
            raise RuntimeError("Async Redis client in sync context; use adelete() wrapper.")
        self.client.delete(full_key)

    def clear_namespace(self, namespace: str) -> None:
        if self._is_async_client:
            raise RuntimeError("Async Redis client in sync context; use aclear_namespace() wrapper.")
        prefix = f"{namespace}:"
        # 使用 scan_iter 避免大键空间阻塞
        if hasattr(self.client, "scan_iter"):
            for k in self.client.scan_iter(match=f"{prefix}*"):
                self.client.delete(k)
        else:
            # 回退到 scan 循环
            cursor = 0
            while True:
                cursor, keys = self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
                if keys:
                    self.client.delete(*keys)
                if cursor == 0:
                    break

    # ---- Async API ----
    async def aget(self, full_key: str) -> Any:
        if self._is_async_client:
            data = await self.client.get(full_key)
        else:
            data = await asyncio.to_thread(self.client.get, full_key)
        if data is None:
            return None
        return self.serializer.loads(data)

    async def aset(self, full_key: str, value: Any, ttl: Optional[int]) -> None:
        data = self.serializer.dumps(value)
        if self._is_async_client:
            ex = ttl if ttl and ttl > 0 else None
            await self.client.set(full_key, data, ex=ex)
        else:
            await asyncio.to_thread(self.client.set, full_key, data, None if not ttl or ttl <= 0 else ttl)

    async def adelete(self, full_key: str) -> None:
        if self._is_async_client:
            await self.client.delete(full_key)
        else:
            await asyncio.to_thread(self.client.delete, full_key)

    async def aclear_namespace(self, namespace: str) -> None:
        prefix = f"{namespace}:"
        if self._is_async_client:
            # 优先使用异步 scan_iter
            if hasattr(self.client, "scan_iter"):
                async for k in self.client.scan_iter(match=f"{prefix}*"):
                    await self.client.delete(k)
            else:
                cursor = 0
                while True:
                    cursor, keys = await self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
                    if keys:
                        await self.client.delete(*keys)
                    if cursor == 0:
                        break
        else:
            # 同步客户端转线程
            def _clear():
                if hasattr(self.client, "scan_iter"):
                    for k in self.client.scan_iter(match=f"{prefix}*"):
                        self.client.delete(k)
                else:
                    cursor = 0
                    while True:
                        cursor, keys = self.client.scan(cursor=cursor, match=f"{prefix}*", count=1000)
                        if keys:
                            self.client.delete(*keys)
                        if cursor == 0:
                            break
            await asyncio.to_thread(_clear)

# -------------------------
# Key builder
# -------------------------
def _stable_hash_bytes(obj: Any) -> bytes:
    """尽量稳定、可跨进程的参数摘要:优先 pickle;失败则退回 repr()."""
    try:
        return pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
    except Exception:
        return repr(obj).encode("utf-8", errors="ignore")

def default_key_builder(func: Callable[..., Any], namespace: str, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> str:
    """
    产生稳定且短小的 key:namespace + 函数标识 + 参数摘要(blake2b)。
    """
    hasher = hashlib.blake2b(digest_size=16)
    hasher.update(_stable_hash_bytes(args))
    # kwargs 按 key 排序
    if kwargs:
        items = tuple(sorted(kwargs.items(), key=lambda kv: kv[0]))
        hasher.update(_stable_hash_bytes(items))
    digest = hasher.hexdigest()
    fn_id = f"{func.__module__}.{getattr(func, '__qualname__', func.__name__)}"
    return f"{fn_id}:{digest}"

# -------------------------
# Decorator factory
# -------------------------
@dataclass
class CacheConfig:
    backend: str = "memory"  # "memory" | "redis"
    ttl: int = 60
    key: Optional[Callable[[Callable[..., Any], str, Tuple[Any, ...], Dict[str, Any]], str]] = None
    unless: Optional[Callable[..., bool]] = None    # True => 跳过缓存
    maxsize: int = 500                               # 仅对 memory 生效
    namespace: str = "default"
    serializer: Optional[Serializer] = None          # Redis 使用;若不传用 Pickle
    backend_options: Optional[Dict[str, Any]] = None # e.g. {'client': redis_client}

def cacheable(
    *,
    backend: str = "memory",
    ttl: int = 60,
    key: Optional[Callable[[Callable[..., Any], str, Tuple[Any, ...], Dict[str, Any]], str]] = None,
    unless: Optional[Callable[..., bool]] = None,
    maxsize: int = 500,
    namespace: str = "default",
    serializer: Optional[Serializer] = None,
    backend_options: Optional[Dict[str, Any]] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """
    可配置缓存装饰器:
    - backend: "memory" | "redis"
    - ttl: 秒;<=0 表示不过期(但仍可手动失效)
    - key: 自定义键函数 (func, namespace, args, kwargs) -> str
    - unless: 返回 True 则跳过缓存(不读不写)
    - maxsize: 内存 LRU 容量上限
    - namespace: 业务隔离用前缀
    - serializer: 序列化器(默认 PickleSerializer);Redis 必需/默认
    - backend_options: 后端额外配置,Redis 需提供 {'client': redis_client}
    """
    cfg = CacheConfig(
        backend=backend,
        ttl=ttl,
        key=key or default_key_builder,
        unless=unless,
        maxsize=maxsize,
        namespace=namespace,
        serializer=serializer or PickleSerializer(),
        backend_options=backend_options or {},
    )

    # 构造后端
    if cfg.backend not in ("memory", "redis"):
        raise ValueError("backend must be 'memory' or 'redis'.")

    if cfg.backend == "memory":
        cache_backend: BaseCacheBackend = MemoryCache(maxsize=cfg.maxsize, namespace=cfg.namespace)
    else:
        client = cfg.backend_options.get("client")
        if client is None:
            raise ValueError("For backend='redis', backend_options['client'] is required.")
        cache_backend = RedisCache(client=client, serializer=cfg.serializer, namespace=cfg.namespace)

    # 并发锁:按 key 细粒度
    _thread_locks: Dict[str, threading.Lock] = {}
    _async_locks: Dict[str, asyncio.Lock] = {}

    def _get_thread_lock(k: str) -> threading.Lock:
        # 轻量线程安全:双重检查
        lock = _thread_locks.get(k)
        if lock:
            return lock
        with threading.Lock():
            return _thread_locks.setdefault(k, threading.Lock())

    def _get_async_lock(k: str) -> asyncio.Lock:
        lock = _async_locks.get(k)
        if lock:
            return lock
        lock = asyncio.Lock()
        _async_locks[k] = lock
        return lock

    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        is_coro = inspect.iscoroutinefunction(func)

        @functools.wraps(func)
        def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # unless 为 True => 直接调用原函数
            if cfg.unless and cfg.unless(*args, **kwargs):
                return func(*args, **kwargs)

            k = cfg.key(func, cfg.namespace, args, kwargs)  # base key
            full_key = f"{cfg.namespace}:{k}"

            # 1st 尝试读取
            cached = cache_backend.get(full_key)
            if cached is not None:
                return cached

            # 防击穿:按 key 加锁
            lk = _get_thread_lock(full_key)
            with lk:
                # 进入临界区后再次尝试读取
                cached2 = cache_backend.get(full_key)
                if cached2 is not None:
                    return cached2
                # 计算与写入
                result = func(*args, **kwargs)
                cache_backend.set(full_key, result, cfg.ttl)
                return result

        @functools.wraps(func)
        async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            if cfg.unless and cfg.unless(*args, **kwargs):
                return await func(*args, **kwargs)  # type: ignore[misc]

            k = cfg.key(func, cfg.namespace, args, kwargs)
            full_key = f"{cfg.namespace}:{k}"

            cached = await cache_backend.aget(full_key)
            if cached is not None:
                return cached

            lk = _get_async_lock(full_key)
            async with lk:
                cached2 = await cache_backend.aget(full_key)
                if cached2 is not None:
                    return cached2
                result = await func(*args, **kwargs)  # type: ignore[misc]
                await cache_backend.aset(full_key, result, cfg.ttl)
                return result

        # 选择同步/异步 wrapper
        wrapper: Union[Callable[P, R], Callable[P, R]] = async_wrapper if is_coro else sync_wrapper

        # 失效 API 挂载(与函数形态一致)
        if is_coro:
            async def invalidate(*iargs: P.args, **ikwargs: P.kwargs) -> None:
                k = cfg.key(func, cfg.namespace, iargs, ikwargs)
                full_key = f"{cfg.namespace}:{k}"
                await cache_backend.adelete(full_key)

            async def clear_namespace() -> None:
                await cache_backend.aclear_namespace(cfg.namespace)

            setattr(wrapper, "invalidate", invalidate)
            setattr(wrapper, "clear_namespace", clear_namespace)
        else:
            def invalidate(*iargs: P.args, **ikwargs: P.kwargs) -> None:
                k = cfg.key(func, cfg.namespace, iargs, ikwargs)
                full_key = f"{cfg.namespace}:{k}"
                cache_backend.delete(full_key)

            def clear_namespace() -> None:
                cache_backend.clear_namespace(cfg.namespace)

            setattr(wrapper, "invalidate", invalidate)
            setattr(wrapper, "clear_namespace", clear_namespace)

        # 便于调试/扩展:暴露 backend 与配置
        setattr(wrapper, "cache_backend", cache_backend)
        setattr(wrapper, "cache_config", cfg)

        return wrapper  # type: ignore[return-value]

    return decorator

使用示例

示例 1:数据分析计算(同步),内存 LRU 缓存

# 假设为昂贵的聚合计算
@cacheable(
    backend="memory",
    ttl=120,               # 2 分钟
    maxsize=1000,
    namespace="analytics",
)
def heavy_agg(series: list[int], topk: int = 10) -> dict:
    time.sleep(0.5)        # 模拟昂贵计算
    sorted_vals = sorted(series, reverse=True)[:topk]
    return {"topk": sorted_vals, "sum": sum(series)}

# 调用
data = [1,2,3,4,5,6,7,8,9,10]
print(heavy_agg(data, topk=3))    # 首次 ~0.5s
print(heavy_agg(data, topk=3))    # 命中缓存,基本 0ms

# 按参数失效(仅移除对应参数组合的缓存)
heavy_agg.invalidate(data, topk=3)

# 按命名空间清空
heavy_agg.clear_namespace()

示例 2:三方接口聚合(异步),Redis 缓存

import asyncio
from typing import Any

# 使用 redis-py 4.x 的 asyncio 客户端
from redis.asyncio import Redis
redis_client = Redis(host="127.0.0.1", port=6379, db=0)

# unless:当 request_no_cache=True 时跳过缓存
def unless_no_cache(*args, **kwargs) -> bool:
    return kwargs.get("request_no_cache", False)

# 自定义键:以 user_id 与 path 构建
def api_key(func, namespace, args, kwargs) -> str:
    user_id = kwargs.get("user_id", "anon")
    path = kwargs.get("path", "/")
    # 与默认行为一致,末尾仍加入参数摘要,避免同 user_id/path 不同 query 的冲突
    return f"{func.__module__}.{func.__qualname__}:{user_id}:{path}:{hashlib.blake2b(pickle.dumps(kwargs, protocol=pickle.HIGHEST_PROTOCOL), digest_size=8).hexdigest()}"

@cacheable(
    backend="redis",
    ttl=30,
    key=api_key,
    unless=unless_no_cache,
    namespace="api-aggregate",
    backend_options={"client": redis_client},
)
async def aggregate_third_party(*, user_id: str, path: str, query: dict[str, Any]) -> dict[str, Any]:
    # 模拟聚合多个第三方 API
    await asyncio.sleep(0.2)
    return {"user": user_id, "path": path, "result": {"q": query}, "ts": time.time()}

async def main():
    print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"}))  # 首次 ~0.2s
    print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"}))  # 命中 Redis,基本 0ms

    # 跳过缓存强制刷新
    print(await aggregate_third_party(user_id="42", path="/profile", query={"lang": "zh"}, request_no_cache=True))

    # 失效与清理
    await aggregate_third_party.invalidate(user_id="42", path="/profile", query={"lang": "zh"})
    await aggregate_third_party.clear_namespace()

asyncio.run(main())

示例 3:为 Redis 指定 JSON 序列化(便于可视化)

import json

class JsonSerializer:
    def dumps(self, obj: Any) -> bytes:
        return json.dumps(obj, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
    def loads(self, data: bytes) -> Any:
        return json.loads(data.decode("utf-8"))

json_serializer = JsonSerializer()

@cacheable(
    backend="redis",
    ttl=60,
    namespace="json-demo",
    serializer=json_serializer,
    backend_options={"client": redis_client},  # 同上示例
)
def expensive_config() -> dict:
    time.sleep(0.3)
    return {"feature": True, "version": 3, "meta": {"owner": "platform"}}

print(expensive_config())   # 首次
print(expensive_config())   # 命中

技术说明

  • 键生成(key)

    • 默认 key 使用函数全名(module + qualname)与参数摘要,摘要由 pickle 序列化失败后回退到 repr 的二次策略产生,再经 blake2b(digest_size=16) 得到短哈希,兼顾稳定性与冲突率。
    • 支持自定义 key(func, namespace, args, kwargs) -> str;装饰器会自动加上命名空间前缀 namespace:。
  • TTL 与过期

    • MemoryCache 在读时检测过期并懒删除;写入时存储过期时间戳。
    • RedisCache 通过 set(..., ex=ttl) 维护 TTL。ttl<=0 视为不过期(Memory 存 None,Redis 不设置 ex)。
  • 失效策略

    • wrapper.invalidate(*args, **kwargs):按原始参数组合失效(与 key 逻辑一致)。
    • wrapper.clear_namespace():清理当前命名空间下的全部 key。
    • 以上方法在异步函数上为异步方法(需 await)。
  • 并发与防击穿

    • 内部使用每个 key 一个细粒度锁:同步用 threading.Lock,异步用 asyncio.Lock。
    • Double-check:进入锁前后各读一次缓存,避免重复计算。
  • 序列化(serializer)

    • MemoryCache 直接缓存 Python 对象(无需序列化)。
    • RedisCache 使用 serializer 进行 bytes 化;默认 PickleSerializer。可自定义 JSON 等实现,需具备 dumps/loads。
  • 后端兼容

    • RedisCache 同时兼容 redis.Redis(同步)与 redis.asyncio.Redis(异步)。
    • 若异步函数搭配同步 redis 客户端,内部通过 asyncio.to_thread 转入线程池,避免阻塞事件循环。
  • 元信息保护

    • 使用 functools.wraps 保留原函数的名称、注释与类型注解。

注意事项

  • key 冲突与可序列化
    • 默认策略已尽量稳定,但若参数包含不可 pickle 的对象将回退到 repr,可能导致不同对象 repr 相同的极端冲突。对关键业务建议提供显式 key 函数。
  • Memory LRU 的 maxsize
    • 仅对内存后端有效;过大可能增加内存占用。系统进程内缓存,不跨进程共享。
  • Redis 客户端
    • 建议异步函数使用 redis.asyncio.Redis;同步函数使用 redis.Redis。若混用,同样可工作但会有线程切换开销。
  • unless 条件
    • unless(*args, **kwargs) 返回 True 时,本次调用完全绕过缓存(不读不写)。常用于强制刷新、灰度或带 no-cache 标志的请求。
  • TTL 配置
    • ttl<=0 表示长期缓存,需配合 invalidate/clear_namespace 管理生命周期。
  • 序列化安全
    • Pickle 在不受信任环境下反序列化存在风险;本实现仅在可信环境中使用,并不对外部输入进行反序列化。对跨边界场景建议使用 JSON 或其他安全序列化。
  • 大规模清理
    • Redis 的 clear_namespace 基于 SCAN/SCAN_ITER,已尽量避免阻塞,但在极大键空间仍可能造成一定负载;建议按业务分层 namespace。
  • 异常传播
    • 被装饰函数抛出的异常将原样传播,不会缓存失败结果。
  • 不修改函数签名
    • 装饰器不修改调用签名;失效等能力以方法属性形式暴露(invalidate/clear_namespace)。

如需扩展支持:

  • 互斥锁外溢(分布式锁)防止多进程下击穿,可在 Redis 上新增 SETNX 锁与过期机制
  • 写后异步回填、穿透保护(布隆过滤器)等,可在当前结构上叠加实现。

示例详情

解决的问题

将你的业务意图快速转化为可直接落地的 Python 装饰器方案:从需求澄清到结构化设计,再到完整代码、清晰注释、可运行示例与注意事项,一次生成;覆盖性能监控、权限校验、缓存、日志等高频场景;在安全与规范前提下,保证代码可读、可维护、可复用,统一团队风格,显著提升交付速度与质量。

适用用户

Python后端工程师

快速生成鉴权、日志、限流等装饰器,直接套用到接口与服务代码,减少样板代码并稳住线上表现。

数据工程师与数据科学家

为数据清洗与特征处理函数一键加缓存与计时,批处理更省时,异常定位有据可依,重复运行更可控。

Web框架开发者与中间件作者

产出路由、请求校验、权限控制装饰器,沉淀为团队公共组件,统一风格并提升复用率。

特征总结

按需生成专属装饰器,一键输出清晰代码与示例,开箱即可应用到现有项目
内置性能计时与日志方案,轻松包裹关键函数,定位慢点与异常更高效
支持权限校验与身份控制,几行配置即可保护接口与业务逻辑,减少绕过风险
可选缓存与重试策略,自动复用计算结果并应对临时失败,稳定性与速度同步提升
保留原函数名称与注释,兼容调试与文档生成,避免二次集成带来的维护负担
模板化参数配置,按场景组合限流、熔断、去重等能力,复杂规则也能轻松落地
自动产出注意事项与最佳实践,减少踩坑,帮助团队统一代码风格与使用方式
适配Web、数据处理与工具开发,一套思路覆盖多类项目,降低重复开发成本
从需求分析到示例验证全流程输出,确保装饰器可用、好读、易维护,快速上线
遵循安全边界与规范,默认避免高风险操作与过时用法,降低上线审查阻力

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 584 tokens
- 3 个可调节参数
{ 装饰器功能 } { 应用场景 } { 参数配置 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59