×
¥
查看详情
🔥 会员专享 文生文 代码

编程任务最佳实践指南

👁️ 510 次查看
📅 Oct 23, 2025
💡 核心价值: 根据用户指定的编程语言和具体开发任务,提供高效、安全、可维护的开发最佳实践,并配套简要说明和小型代码示例,帮助开发者快速理解并应用于实际项目中。

🎯 可自定义参数(3个)

编程语言
编程语言
任务目标
任务目标
示例风格
示例风格

🎨 效果示例

以下内容面向在 FastAPI 上实现完整鉴权与授权模块的工程实践,结合 JWT(访问令牌+刷新令牌)、Argon2id 密码哈希、RBAC/细粒度权限、Pydantic v2 校验、PostgreSQL+SQLAlchemy 2.x 异步、Redis 限流和刷新令牌黑名单、统一错误处理/结构化日志/OpenAPI 文档、pytest 单元与集成测试,并给出最小可运行示例。

一、总体设计与最佳实践要点

  • 身份认证与令牌
    • 使用短时访问令牌(例如 15 分钟)+ 长时刷新令牌(例如 7–30 天),刷新令牌仅用于轮换访问令牌。
    • JWT 建议包含:sub(用户ID)、jti(令牌ID)、type(access/refresh)、iat/nbf/exp、iss/aud、roles/permissions(仅访问令牌可携带)。
    • 使用非对称算法(RS256/EdDSA)或强 HMAC(HS256)并规划密钥轮换(kid 标识)。生产使用 JWKS 分发或 KMS 管理。
    • 刷新令牌应“单次使用+轮换”:每次刷新作废旧 jti 并生成新 jti;Redis 保存黑名单(jti->TTL=exp)。
  • 密码与登录安全
    • Argon2id 参数建议:time_cost=3、memory_cost=64MB(65536 KiB)、parallelism=2、salt_len=16、hash_len=32;在高并发环境按 CPU/内存调优并限制登录速率防止资源耗尽。
    • 登录限流:组合 username + IP 的固定窗口或滑动窗口计数;成功登录重置计数。
    • 登录失败不要暴露用户存在性;统一返回“用户名或密码错误”。
  • 授权(RBAC + 细粒度)
    • 模型:User 多对多 Roles;Roles 多对多 Permissions(如 "items:read"、"items:write"),便于细粒度授权。
    • 在依赖中加载 current_user 后进行权限检查;坚持“默认拒绝”原则,最小授权。
    • 对跨租户或资源级限制,增加资源所有权检查(如 user_id 与资源 owner_id 比较)。
  • 输入校验与防 SQL 注入
    • 使用 Pydantic v2 严格类型、字段校验器;对密码复杂度、Email 格式、枚举值等进行校验。
    • 所有数据库访问采用 ORM/参数化查询,避免字符串拼接。若 raw SQL 使用 text() + bindparam。
  • SQLAlchemy 2.x 异步 + PostgreSQL
    • 使用 AsyncEngine + async_sessionmaker;每请求一个 session(依赖注入),显式事务边界(async with session.begin():)。
    • 连接池:合理配置 pool_size、max_overflow、pool_timeout、pool_recycle;使用 asyncpg 驱动。
  • Redis
    • redis.asyncio 用于登录尝试计数与刷新令牌黑名单。key 设计包含租户/环境前缀。
    • 黑名单 TTL 应匹配刷新令牌 exp。
  • 统一错误处理与结构化日志
    • 定义统一错误响应结构:code/message/details/request_id。为常见领域错误定义异常类与全局异常处理器。
    • 结构化日志(JSON),注入 request_id、user_id、路径、耗时;避免记录敏感数据(密码、令牌)。
  • OpenAPI 文档
    • 为路由添加 tags、描述、响应模型与示例;用 response_model 限制输出。
  • 测试(pytest)
    • 单元测试:哈希/验证、JWT 签发与刷新、权限检查函数。
    • 集成测试:FastAPI app + httpx AsyncClient;准备临时 Postgres/Redis(推荐 Testcontainers)或使用依赖覆盖。
    • 覆盖鉴权中间件与限流逻辑,包括成功与异常分支。
  • 配置分层与密钥管理
    • pydantic-settings 管理配置:Base + Dev + Prod;环境变量/容器秘密/文件;区分敏感字段。
    • 密钥从环境或密钥管理服务加载;计划轮换;最小可见范围;不写入日志。
  • 容器化与部署安全清单
    • 使用瘦基镜像(python:3.x-slim/Distroless);只复制必要文件;多阶段构建;只读根文件系统。
    • 以非 root 用户运行;限制 Linux capabilities;启用 seccomp/AppArmor。
    • 依赖锁定与漏洞扫描(pip-tools/SBOM);禁用调试/自动文档在生产可按策略限制。
    • 强制 HTTPS/TLS;CORS 白名单;若使用 Cookie 承载刷新令牌则必须 HttpOnly/SameSite + CSRF 防护。
    • 资源与速率限制(CPU/Mem、连接池上限);健康检查与熔断;监控告警。
    • 数据库权限最小化(应用用户仅 DML,不授予 DDL);参数化查询;启用审计日志。

二、最小可运行示例(FastAPI + SQLAlchemy 2.x 异步 + Redis) 提示:

  • 需要本地 PostgreSQL 和 Redis 服务。
  • 仅为演示,将 JWT 使用 HS256。生产推荐非对称算法与密钥轮换。
  • Alembic 迁移未包含,示例在启动时自动建表并创建一个演示用户。

文件:app.py (运行:pip install "fastapi>=0.115" "uvicorn[standard]" "sqlalchemy[asyncpg]>=2.0" "pydantic>=2" "pydantic-settings>=2" "argon2-cffi" "PyJWT" "redis>=5" "structlog")

# app.py
import asyncio
import logging
import time
import uuid
from datetime import datetime, timedelta
from typing import Annotated, Optional, Sequence

import jwt  # PyJWT
import structlog
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, EmailStr, Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from redis.asyncio import Redis
from sqlalchemy import ForeignKey, String, select, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

# -------------------- Config --------------------
class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", env_file_encoding="utf-8")

    # App
    app_name: str = "Auth Service"
    environment: str = "dev"

    # DB
    db_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/appdb"
    db_pool_size: int = 5
    db_max_overflow: int = 5
    db_pool_timeout: int = 30
    db_pool_recycle: int = 1800

    # Redis
    redis_url: str = "redis://localhost:6379/0"
    rate_limit_login_per_minute: int = 5

    # JWT
    jwt_secret: str = "CHANGE_ME_IN_PROD"  # 生产从 KMS/Secrets Manager 加载
    jwt_kid: str = "kid-1"
    access_token_ttl_minutes: int = 15
    refresh_token_ttl_days: int = 7
    jwt_algorithm: str = "HS256"
    jwt_issuer: str = "your-company"
    jwt_audience: str = "internal-platform"

    # Argon2id
    argon_time_cost: int = 3
    argon_memory_cost: int = 65536  # KiB = 64 MiB
    argon_parallelism: int = 2
    argon_hash_len: int = 32
    argon_salt_len: int = 16

settings = Settings()

# -------------------- Logging --------------------
# 结构化日志:注入 request_id & user_id
logging.basicConfig(level=logging.INFO)
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ]
)
log = structlog.get_logger()

# -------------------- DB Setup --------------------
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    password_hash: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    last_login: Mapped[Optional[datetime]] = mapped_column(default=None)

    roles: Mapped[Sequence["Role"]] = relationship(
        secondary="user_roles", back_populates="users", lazy="selectin"
    )

class Role(Base):
    __tablename__ = "roles"
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    users: Mapped[Sequence[User]] = relationship(
        secondary="user_roles", back_populates="roles", lazy="selectin"
    )
    permissions: Mapped[Sequence["Permission"]] = relationship(
        secondary="role_permissions", back_populates="roles", lazy="selectin"
    )

class Permission(Base):
    __tablename__ = "permissions"
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    roles: Mapped[Sequence[Role]] = relationship(
        secondary="role_permissions", back_populates="permissions", lazy="selectin"
    )

class UserRole(Base):
    __tablename__ = "user_roles"
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
    role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)

class RolePermission(Base):
    __tablename__ = "role_permissions"
    role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
    permission_id: Mapped[int] = mapped_column(ForeignKey("permissions.id"), primary_key=True)

engine: AsyncEngine = create_async_engine(
    settings.db_url,
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_timeout=settings.db_pool_timeout,
    pool_recycle=settings.db_pool_recycle,
    echo=False,
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def get_session() -> AsyncSession:
    async with SessionLocal() as session:
        yield session  # 请求结束自动关闭

# -------------------- Redis --------------------
redis: Redis | None = None
async def get_redis() -> Redis:
    assert redis is not None
    return redis

# -------------------- Security --------------------
from argon2 import PasswordHasher
ph = PasswordHasher(
    time_cost=settings.argon_time_cost,
    memory_cost=settings.argon_memory_cost,
    parallelism=settings.argon_parallelism,
    hash_len=settings.argon_hash_len,
    salt_len=settings.argon_salt_len,
)

def hash_password(password: str) -> str:
    return ph.hash(password)

def verify_password(password: str, hashed: str) -> bool:
    try:
        return ph.verify(hashed, password)
    except Exception:
        return False

def now_ts() -> int:
    return int(time.time())

def create_access_token(sub: str, roles: list[str], perms: list[str]) -> str:
    jti = str(uuid.uuid4())
    payload = {
        "sub": sub,
        "jti": jti,
        "type": "access",
        "roles": roles,
        "perms": perms,
        "iat": now_ts(),
        "nbf": now_ts(),
        "exp": int((datetime.utcnow() + timedelta(minutes=settings.access_token_ttl_minutes)).timestamp()),
        "iss": settings.jwt_issuer,
        "aud": settings.jwt_audience,
    }
    headers = {"kid": settings.jwt_kid}
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)

def create_refresh_token(sub: str) -> tuple[str, str, int]:
    jti = str(uuid.uuid4())
    exp = int((datetime.utcnow() + timedelta(days=settings.refresh_token_ttl_days)).timestamp())
    payload = {
        "sub": sub,
        "jti": jti,
        "type": "refresh",
        "iat": now_ts(),
        "nbf": now_ts(),
        "exp": exp,
        "iss": settings.jwt_issuer,
        "aud": settings.jwt_audience,
    }
    headers = {"kid": settings.jwt_kid}
    token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)
    return token, jti, exp

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")  # Swagger 上用于授权按钮

# -------------------- Schemas (Pydantic v2) --------------------
class TokenPair(BaseModel):
    access_token: str = Field(..., description="短时访问令牌")
    refresh_token: str = Field(..., description="长时刷新令牌")
    token_type: str = "bearer"
    expires_in: int = Field(..., description="访问令牌剩余秒数")

class LoginRequest(BaseModel):
    email: EmailStr
    password: SecretStr

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: SecretStr):
        s = v.get_secret_value()
        if len(s) < 8 or not any(c.isdigit() for c in s) or not any(c.isalpha() for c in s):
            raise ValueError("密码至少8位,包含字母与数字")
        return v

class MeResponse(BaseModel):
    id: int
    email: EmailStr
    roles: list[str]
    permissions: list[str]

# -------------------- Error Handling --------------------
class AppError(Exception):
    def __init__(self, message: str, code: str = "bad_request", status_code: int = 400, details: dict | None = None):
        self.message = message
        self.code = code
        self.status_code = status_code
        self.details = details or {}

app = FastAPI(title=settings.app_name)

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    return JSONResponse(
        status_code=exc.status_code,
        content={"code": exc.code, "message": exc.message, "details": exc.details, "request_id": rid},
    )

@app.middleware("http")
async def log_middleware(request: Request, call_next):
    rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    start = time.perf_counter()
    try:
        response = await call_next(request)
    finally:
        duration = (time.perf_counter() - start) * 1000
        structlog.contextvars.clear_contextvars()
        log.info("request",
                 request_id=rid, method=request.method, path=request.url.path,
                 status=getattr(response, "status_code", None), duration_ms=round(duration, 2))
    return response

# -------------------- Auth Dependencies --------------------
async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    session: Annotated[AsyncSession, Depends(get_session)],
) -> tuple[User, list[str], list[str]]:
    # 解析访问令牌
    try:
        payload = jwt.decode(
            token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="令牌无效或已过期")
    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="访问令牌类型错误")

    user_id = int(payload["sub"])
    user = await session.get(User, user_id)
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不可用")
    # 加载角色与权限(避免将权限完全信赖客户端令牌内容)
    roles = [r.code for r in user.roles]
    perm_codes: set[str] = set()
    for r in user.roles:
        for p in r.permissions:
            perm_codes.add(p.code)
    return user, roles, sorted(list(perm_codes))

def require_roles(required: list[str]):
    async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
        user, roles, perms = ctx
        if not set(required).issubset(set(roles)):
            raise HTTPException(status_code=403, detail="缺少角色")
        return ctx
    return dep

def require_permissions(required: list[str]):
    async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
        user, roles, perms = ctx
        if not set(required).issubset(set(perms)):
            raise HTTPException(status_code=403, detail="缺少权限")
        return ctx
    return dep

# -------------------- Rate Limiting & Blacklist --------------------
async def check_login_rate_limit(r: Redis, username: str, ip: str):
    # 固定窗口:每分钟最多 N 次
    key = f"rl:login:{username}:{ip}"
    count = await r.incr(key)
    if count == 1:
        await r.expire(key, 60)
    if count > settings.rate_limit_login_per_minute:
        raise AppError("登录过于频繁,请稍后再试", code="too_many_requests", status_code=429)

async def blacklist_refresh_jti(r: Redis, jti: str, exp_ts: int):
    ttl = max(exp_ts - now_ts(), 1)
    await r.setex(f"rt:blacklist:{jti}", ttl, "1")

async def is_refresh_jti_blacklisted(r: Redis, jti: str) -> bool:
    return await r.exists(f"rt:blacklist:{jti}") == 1

# -------------------- Routes --------------------
@app.post("/auth/login", response_model=TokenPair, tags=["auth"])
async def login(
    req: LoginRequest,
    request: Request,
    session: Annotated[AsyncSession, Depends(get_session)],
    r: Annotated[Redis, Depends(get_redis)],
):
    ip = request.client.host or "unknown"
    await check_login_rate_limit(r, req.email, ip)

    # 查找用户(使用 ORM 防注入)
    stmt = select(User).where(User.email == req.email)
    user = (await session.execute(stmt)).scalar_one_or_none()
    if not user or not verify_password(req.password.get_secret_value(), user.password_hash):
        # 不泄露账号存在性
        raise HTTPException(status_code=401, detail="用户名或密码错误")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="账号已禁用")

    # 加载角色与权限(懒加载已设置为 selectin)
    roles = [r.code for r in user.roles]
    perms = sorted(set(p.code for r in user.roles for p in r.permissions))

    # 更新最后登录时间(事务)
    async with session.begin():
        user.last_login = datetime.utcnow()

    access = create_access_token(str(user.id), roles, perms)
    refresh, jti, exp_ts = create_refresh_token(str(user.id))
    # 选项:可记录 session/设备;此处仅演示不持久化
    expires_in = settings.access_token_ttl_minutes * 60
    return TokenPair(access_token=access, refresh_token=refresh, expires_in=expires_in)

class RefreshRequest(BaseModel):
    refresh_token: str = Field(..., description="刷新令牌")

@app.post("/auth/refresh", response_model=TokenPair, tags=["auth"])
async def refresh_tokens(
    body: RefreshRequest,
    session: Annotated[AsyncSession, Depends(get_session)],
    r: Annotated[Redis, Depends(get_redis)],
):
    # 验证刷新令牌
    try:
        payload = jwt.decode(
            body.refresh_token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="刷新令牌无效或已过期")
    if payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="令牌类型错误")

    jti = payload["jti"]
    if await is_refresh_jti_blacklisted(r, jti):
        raise HTTPException(status_code=401, detail="刷新令牌已失效")

    user = await session.get(User, int(payload["sub"]))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不可用")

    roles = [r.code for r in user.roles]
    perms = sorted(set(p.code for r in user.roles for p in r.permissions))

    # 令牌轮换:作废旧刷新令牌
    await blacklist_refresh_jti(r, jti, payload["exp"])
    access = create_access_token(str(user.id), roles, perms)
    new_refresh, new_jti, new_exp = create_refresh_token(str(user.id))
    return TokenPair(access_token=access, refresh_token=new_refresh, expires_in=settings.access_token_ttl_minutes * 60)

class LogoutRequest(BaseModel):
    refresh_token: str

@app.post("/auth/logout", tags=["auth"])
async def logout(body: LogoutRequest, r: Annotated[Redis, Depends(get_redis)]):
    # 注销:将刷新令牌 jti 加入黑名单。访问令牌让其自然过期。
    try:
        payload = jwt.decode(
            body.refresh_token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        # 静默处理避免提示有效性
        raise HTTPException(status_code=200, detail="已退出")  # 也可返回 204

    if payload.get("type") != "refresh":
        return {"message": "已退出"}

    await blacklist_refresh_jti(r, payload["jti"], payload["exp"])
    return {"message": "已退出"}

@app.get("/me", response_model=MeResponse, tags=["users"])
async def me(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
    user, roles, perms = ctx
    return MeResponse(id=user.id, email=user.email, roles=roles, permissions=perms)

@app.get("/admin/secret", tags=["admin"])
async def admin_secret(_: Annotated[tuple[User, list[str], list[str]], Depends(require_roles(["admin"]))]):
    return {"secret": "admin-only"}

@app.get("/items", tags=["items"])
async def list_items(_: Annotated[tuple[User, list[str], list[str]], Depends(require_permissions(["items:read"]))]):
    # 示例:如需 raw SQL,务必使用 text() 与参数绑定防止注入
    # stmt = text("SELECT * FROM items WHERE owner_id = :uid").bindparams(uid=current_user.id)
    return {"items": []}

# -------------------- Lifecycle --------------------
@app.on_event("startup")
async def on_startup():
    global redis
    redis = Redis.from_url(settings.redis_url, decode_responses=True)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 演示数据:创建一个 admin 用户及权限
    async with SessionLocal() as session:
        async with session.begin():
            # 如果没有用户,创建一个
            exists = (await session.execute(select(User).limit(1))).scalar_one_or_none()
            if not exists:
                admin_role = Role(code="admin")
                p_read = Permission(code="items:read")
                p_write = Permission(code="items:write")
                admin_role.permissions.extend([p_read, p_write])
                u = User(email="admin@example.com", password_hash=hash_password("Passw0rd!"))
                u.roles.append(admin_role)
                session.add_all([u])
    log.info("startup_complete", environment=settings.environment)

@app.on_event("shutdown")
async def on_shutdown():
    if redis:
        await redis.close()
    await engine.dispose()

三、Pydantic v2、SQL 注入与事务的要点示例

  • 字段校验器(v2)与严格类型
class CreateItem(BaseModel):
    name: str = Field(min_length=1, max_length=64)
    price: float = Field(gt=0)
    owner_id: int

    @field_validator("name")
    @classmethod
    def no_dangerous_chars(cls, v: str):
        if ";" in v or "--" in v:
            raise ValueError("非法字符")
        return v
  • 原生 SQL 的安全用法
from sqlalchemy import text, bindparam
stmt = text("UPDATE users SET last_login = NOW() WHERE id = :uid").bindparams(bindparam("uid", value=user_id))
await session.execute(stmt)
await session.commit()
  • 异步事务边界(单元工作)
async with session.begin():
    session.add(entity)
    # 多条写操作在同一事务中

四、OpenAPI 文档与示例提示

  • 在路由中使用 response_model、tags、描述;为请求/响应模型添加 Field 的描述与示例。
  • 生产可限制文档访问(如仅内网/带授权访问)。

五、测试(pytest/pytest-asyncio) 建议使用 Testcontainers 启动真实 Postgres 与 Redis;这里给出一个基础集成测试示例,使用 httpx AsyncClient 调用登录与 /me: 安装:pip install pytest pytest-asyncio httpx

文件:tests/test_auth.py

import pytest
import asyncio
from httpx import AsyncClient
from app import app  # 假设示例文件名为 app.py

@pytest.mark.asyncio
async def test_login_and_me():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # 登录
        resp = await client.post("/auth/login", json={"email": "admin@example.com", "password": "Passw0rd!"})
        assert resp.status_code == 200
        tokens = resp.json()
        assert "access_token" in tokens
        # 访问 /me
        headers = {"Authorization": f"Bearer {tokens['access_token']}"}
        me = await client.get("/me", headers=headers)
        assert me.status_code == 200
        assert me.json()["email"] == "admin@example.com"

测试覆盖建议:

  • 密码哈希与验证(正确/错误);
  • 访问令牌过期与刷新令牌轮换(旧 jti 被黑名单);
  • 登录限流逻辑(触发 429);
  • RBAC 与权限拒绝(返回 403);
  • 统一错误响应结构与日志中 request_id 注入。

六、密钥管理、配置分层、容器化与部署安全清单

  • 密钥管理
    • 不将密钥写入仓库;从环境变量/密钥管理服务(AWS KMS/Secrets Manager、HashiCorp Vault)加载。
    • 启用密钥轮换:JWT header 使用 kid;后端支持多活密钥验证(旧密钥验证、用新密钥签发)。
    • 日志中永不打印令牌或密码;对 JWT 仅记录 jti、sub。
  • 配置分层
    • BaseSettings + env 前缀分层;区分 Dev/Staging/Prod;对敏感字段(jwt_secret、db_url)单独密文存储。
    • 使用配置模板与默认安全值(严格 CORS、debug=false)。
  • 容器化与部署安全
    • 非 root 用户运行;裁剪镜像;只读文件系统;限制 /tmp 写入。
    • 依赖锁定(requirements.txt/poetry.lock);CI 扫描漏洞;生成 SBOM。
    • 启用 TLS;在反向代理/Nginx/Ingress 上强制 HSTS;限制 CORS。
    • 数据库最小权限账户;隔离网络;监控连接池指标与慢查询。
    • 资源配额与限流(登录、刷新、敏感路由);报警与审计日志。
    • 打包时排除 .env 等敏感文件;在云环境中通过 secrets 注入。

七、补充建议

  • Alembic 迁移与种子数据脚本;在迁移层面维护角色/权限。
  • 对多租户系统,在 JWT 中加入 tenant_id 并在查询层过滤;权限包含资源归属校验。
  • 若前端为浏览器 SPA,推荐将刷新令牌置于 HttpOnly Cookie 并启用 CSRF(令牌双提交等);访问令牌仍用 Authorization Bearer。

该示例可直接运行(需本机 PostgreSQL 与 Redis),展示路由、依赖注入、令牌签发/刷新与退出登录,满足需求中的关键要点。实际生产请替换 HS256 为非对称算法、接入密钥托管、完善日志/监控、使用迁移与更全面的测试。

以下是面向 React + TypeScript 的多步骤注册表单的实用模板与常见最佳实践,覆盖:Zod 共享校验、React Hook Form 状态管理(含动态字段与文件上传)、无障碍支持、用户名查重的防抖与取消、XSS/CSRF 防护、输入掩码与 i18n、代码分割、Vitest/Playwright 测试,以及可复用控件、提交流程与错误边界示例。

总原则

  • 单一数据源与类型安全:用 Zod 定义前后端共享 schema,类型推导贯穿 UI、API、DB。
  • 按步拆分:每一步独立 schema、独立组件,最后统一合并验证。
  • 组件化与可复用:基础控件 Input/Select/FileUpload 与 FormProvider 结合。
  • 无障碍优先:标签关联、错误提示关联、键盘导航可达、进度区域朗读。
  • 网络健壮性:防抖、AbortController、幂等提交、错误边界。
  • 安全与隐私:默认不使用 innerHTML、CSRF token、Content-Security-Policy、文件类型/大小限制。
  • 性能:懒加载步骤、预加载下一步、只注册可见字段(shouldUnregister)。
  1. Zod 共享校验 schema(前后端复用)
  • 前端/后端共用同一 schema 文件,前端用 @hookform/resolvers/zod,后端在路由层校验。
  • 按步骤定义子 schema,最终合并。

示例:shared/schemas/registration.ts

import { z } from "zod";

export const UsernameSchema = z
  .string()
  .min(3, "用户名至少 3 个字符")
  .max(20, "用户名最多 20 个字符")
  .regex(/^[a-zA-Z0-9_]+$/, "仅限字母、数字和下划线");

export const StepAccountSchema = z.object({
  email: z.string().email("邮箱格式不正确"),
  username: UsernameSchema,
  password: z
    .string()
    .min(8, "密码至少 8 位")
    .regex(/[A-Z]/, "至少包含一个大写字母")
    .regex(/[a-z]/, "至少包含一个小写字母")
    .regex(/\d/, "至少包含一个数字")
    .regex(/[^A-Za-z0-9]/, "至少包含一个特殊字符"),
  confirmPassword: z.string(),
  agree: z.literal(true, { errorMap: () => ({ message: "需同意协议" }) }),
}).refine((d) => d.password === d.confirmPassword, {
  path: ["confirmPassword"],
  message: "两次密码不一致",
});

export const StepProfileSchema = z.object({
  firstName: z.string().min(1, "必填"),
  lastName: z.string().min(1, "必填"),
  birthDate: z.string().optional(), // 前端字符串,后端可 transform 为 Date
  phone: z
    .string()
    .min(10, "电话长度不正确")
    .regex(/^[0-9\-+\s()]+$/ , "电话格式不正确"),
  accountType: z.enum(["personal", "company"]),
  companyName: z.string().optional(),
}).refine((d) => d.accountType === "personal" || !!d.companyName, {
  path: ["companyName"],
  message: "公司帐号需填写公司名称",
});

export const StepPreferencesSchema = z.object({
  newsletter: z.boolean().default(false),
  language: z.enum(["zh-CN", "en-US"]).default("zh-CN"),
});

const acceptedImage = ["image/png", "image/jpeg"];
const acceptedDoc = ["application/pdf"];

export const StepDocumentsSchema = z.object({
  avatar: z
    .instanceof(File)
    .optional()
    .refine((f) => !f || (acceptedImage.includes(f.type) && f.size <= 2 * 1024 * 1024), "头像需 PNG/JPEG 且不超过 2MB"),
  idDocument: z
    .instanceof(File)
    .refine((f) => acceptedDoc.includes(f.type) && f.size <= 5 * 1024 * 1024, "证件需 PDF 且不超过 5MB"),
});

export const RegistrationSchema = StepAccountSchema
  .and(StepProfileSchema)
  .and(StepPreferencesSchema)
  .and(StepDocumentsSchema);

export type RegistrationData = z.infer<typeof RegistrationSchema>;

后端(示例 Express):

// server/routes/register.ts
import { RegistrationSchema } from "../../shared/schemas/registration";
import type { Request, Response } from "express";

export async function registerHandler(req: Request, res: Response) {
  try {
    const parsed = RegistrationSchema.parse({
      ...req.body,
      // 若使用 multipart,需先通过 multer 获取文件对象
      avatar: req.file?.avatar, 
      idDocument: req.file?.idDocument,
    });
    // TODO: 持久化
    res.status(201).json({ ok: true });
  } catch (e) {
    res.status(400).json({ ok: false, errors: e });
  }
}
  1. React Hook Form 管理状态(动态字段、文件上传、进度展示)
  • 使用 FormProvider 共享上下文;每步独立组件,自身只关心其字段。
  • shouldUnregister: true 让隐藏字段自动移除,避免提交冗余数据。
  • 文件上传建议单独组件,前端先校验类型/大小,再上传(带进度)。

示例:Form 容器与步骤懒加载

// App.tsx
import { Suspense, lazy, useMemo, useState } from "react";
import { useForm, FormProvider } from "react-hook-form";
import { zodResolver } from "@hookform/resolvers/zod";
import { RegistrationSchema, StepAccountSchema, StepProfileSchema, StepPreferencesSchema, StepDocumentsSchema, type RegistrationData } from "../shared/schemas/registration";

const StepAccount = lazy(() => import("./steps/StepAccount"));
const StepProfile = lazy(() => import("./steps/StepProfile"));
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
const StepDocuments = lazy(() => import("./steps/StepDocuments"));
const Review = lazy(() => import("./steps/Review"));

type StepKey = "account" | "profile" | "preferences" | "documents" | "review";
const stepOrder: StepKey[] = ["account", "profile", "preferences", "documents", "review"];
const stepResolvers = {
  account: StepAccountSchema,
  profile: StepProfileSchema,
  preferences: StepPreferencesSchema,
  documents: StepDocumentsSchema,
  review: RegistrationSchema, // 最终全量校验
};

export default function App() {
  const [step, setStep] = useState<StepKey>("account");
  const methods = useForm<RegistrationData>({
    resolver: zodResolver(stepResolvers[step]),
    mode: "onBlur",
    shouldUnregister: true,
    defaultValues: { newsletter: false, language: "zh-CN" } as Partial<RegistrationData>,
  });

  const next = async () => {
    const valid = await methods.trigger(); // 仅触发当前步骤校验
    if (!valid) return;
    const idx = stepOrder.indexOf(step);
    setStep(stepOrder[idx + 1] ?? step);
  };
  const back = () => {
    const idx = stepOrder.indexOf(step);
    setStep(stepOrder[idx - 1] ?? step);
  };

  return (
    <FormProvider {...methods}>
      <form onSubmit={methods.handleSubmit(() => setStep("review"))} noValidate>
        <Suspense fallback={<div aria-busy="true">加载中…</div>}>
          {step === "account" && <StepAccount onNext={next} />}
          {step === "profile" && <StepProfile onNext={next} onBack={back} />}
          {step === "preferences" && <StepPreferences onNext={next} onBack={back} />}
          {step === "documents" && <StepDocuments onNext={next} onBack={back} />}
          {step === "review" && <Review onBack={back} />}
        </Suspense>
      </form>
    </FormProvider>
  );
}

可复用基础控件(带无障碍)

// components/FormField.tsx
import { useFormContext } from "react-hook-form";
import { useId } from "react";

type Props = {
  name: string;
  label: string;
  type?: string;
  placeholder?: string;
  autoComplete?: string;
  inputProps?: React.InputHTMLAttributes<HTMLInputElement>;
};

export function FormField({ name, label, type = "text", placeholder, autoComplete, inputProps }: Props) {
  const { register, formState: { errors } } = useFormContext();
  const id = useId();
  const errMsg = (errors as any)[name]?.message as string | undefined;
  const descId = `${id}-desc`;
  return (
    <div role="group" aria-labelledby={`${id}-label`} className="field">
      <label id={`${id}-label`} htmlFor={id}>{label}</label>
      <input
        id={id}
        type={type}
        placeholder={placeholder}
        autoComplete={autoComplete}
        aria-invalid={!!errMsg}
        aria-describedby={errMsg ? descId : undefined}
        {...register(name)}
        {...inputProps}
      />
      {errMsg && (
        <div id={descId} role="alert" aria-live="polite" className="error">
          {errMsg}
        </div>
      )}
    </div>
  );
}

动态字段与输入掩码

// steps/StepProfile.tsx
import { useFormContext } from "react-hook-form";
import InputMask from "react-input-mask";
import { FormField } from "../components/FormField";

export default function StepProfile({ onNext, onBack }: { onNext: () => void; onBack: () => void }) {
  const { register, watch, setValue, formState: { errors } } = useFormContext();
  const accountType = watch("accountType");

  return (
    <section aria-label="个人信息">
      <div>
        <label htmlFor="accountType">账户类型</label>
        <select id="accountType" {...register("accountType")}>
          <option value="personal">个人</option>
          <option value="company">公司</option>
        </select>
      </div>

      <FormField name="firstName" label="名" />
      <FormField name="lastName" label="姓" />
      {/* 电话掩码 */}
      <div>
        <label htmlFor="phone">电话</label>
        <InputMask mask="+99 999-999-999" {...register("phone")} id="phone">
          {(inputProps: any) => <input {...inputProps} aria-invalid={!!errors.phone} />}
        </InputMask>
        {errors.phone?.message && <div role="alert">{String(errors.phone.message)}</div>}
      </div>

      {accountType === "company" && (
        <FormField name="companyName" label="公司名称" />
      )}

      <div className="nav">
        <button type="button" onClick={onBack}>上一步</button>
        <button type="button" onClick={onNext}>下一步</button>
      </div>
    </section>
  );
}

文件上传(大小/类型校验、进度展示)

// components/FileUpload.tsx
import { useFormContext } from "react-hook-form";
import { useId, useState, useRef } from "react";

type FileUploadProps = {
  name: "avatar" | "idDocument";
  label: string;
  accept: string;
};

export function FileUpload({ name, label, accept }: FileUploadProps) {
  const { register, setValue, formState: { errors } } = useFormContext();
  const id = useId();
  const [progress, setProgress] = useState<number>(0);
  const xhrRef = useRef<XMLHttpRequest | null>(null);
  const errMsg = (errors as any)[name]?.message as string | undefined;

  const onUpload = async (file: File) => {
    const form = new FormData();
    form.append(name, file);
    const xhr = new XMLHttpRequest();
    xhrRef.current = xhr;
    xhr.upload.onprogress = (e) => {
      if (e.lengthComputable) setProgress(Math.round((e.loaded / e.total) * 100));
    };
    xhr.onreadystatechange = () => {
      if (xhr.readyState === 4) {
        xhrRef.current = null;
      }
    };
    xhr.open("POST", `/api/upload/${name}`);
    xhr.setRequestHeader("X-CSRF-Token", (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "");
    xhr.send(form);
  };

  return (
    <div>
      <label htmlFor={id}>{label}</label>
      <input
        id={id}
        type="file"
        accept={accept}
        {...register(name as any)}
        onChange={(e) => {
          const file = e.target.files?.[0];
          if (file) {
            setValue(name, file, { shouldValidate: true });
            onUpload(file); // 可改为在最终提交时统一上传
          }
        }}
        aria-invalid={!!errMsg}
        aria-describedby={errMsg ? `${id}-error` : undefined}
      />
      {errMsg && <div id={`${id}-error`} role="alert">{errMsg}</div>}
      <div aria-live="polite" aria-atomic="true">{progress > 0 && `上传进度:${progress}%`}</div>
      {xhrRef.current && <button type="button" onClick={() => xhrRef.current?.abort()}>取消上传</button>}
    </div>
  );
}
  1. 无障碍支持(ARIA、键盘导航、错误提示关联)
  • 每个 input 有 label/aria-describedby 错误提示关联;aria-invalid 标示错误状态;对异步状态使用 aria-live。
  • 步骤容器使用 aria-label 或 aria-labelledby,按键导航保留默认 Enter 提交行为。
  • 验证失败后聚焦第一个错误字段:trigger 后取 errors 的第一个 key 并调用 document.getElementById(...).focus()。

示例:聚焦第一个错误字段

// utils/focusFirstError.ts
import { FieldErrors } from "react-hook-form";
export function focusFirstError(errors: FieldErrors) {
  const firstKey = Object.keys(errors)[0];
  if (!firstKey) return;
  const el = document.querySelector(`[name="${firstKey}"]`) as HTMLElement | null;
  el?.focus();
}
  1. 防抖 + AbortController 处理用户名查重
  • watch username 字段;300ms 防抖;每次新请求取消旧请求;仅当长度满足最小要求时触发。
  • 根据结果 setError 或 clearErrors。

示例:useUsernameCheck Hook

// hooks/useUsernameCheck.ts
import { useEffect, useRef } from "react";
import { UseFormSetError, UseFormClearErrors, UseFormWatch } from "react-hook-form";

export function useUsernameCheck(watch: UseFormWatch<any>, setError: UseFormSetError<any>, clearErrors: UseFormClearErrors<any>) {
  const timerRef = useRef<number | null>(null);
  const abortRef = useRef<AbortController | null>(null);

  useEffect(() => {
    const subscription = watch((value, { name }) => {
      if (name !== "username") return;
      const username = value.username;
      if (!username || username.length < 3) {
        clearErrors("username");
        return;
      }
      if (timerRef.current) window.clearTimeout(timerRef.current);
      timerRef.current = window.setTimeout(async () => {
        abortRef.current?.abort();
        const ac = new AbortController();
        abortRef.current = ac;
        try {
          const res = await fetch(`/api/username/check?u=${encodeURIComponent(username)}`, { signal: ac.signal });
          const data = await res.json();
          if (data.taken) {
            setError("username", { type: "validate", message: "用户名已被占用" });
          } else {
            clearErrors("username");
          }
        } catch (e: any) {
          if (e.name !== "AbortError") {
            setError("username", { type: "validate", message: "检查失败,请稍后重试" });
          }
        }
      }, 300);
    });
    return () => subscription.unsubscribe();
  }, [watch, setError, clearErrors]);
}

在步骤组件中使用:

// steps/StepAccount.tsx
import { useFormContext } from "react-hook-form";
import { useUsernameCheck } from "../hooks/useUsernameCheck";
import { FormField } from "../components/FormField";

export default function StepAccount({ onNext }: { onNext: () => void }) {
  const { watch, setError, clearErrors } = useFormContext();
  useUsernameCheck(watch, setError, clearErrors);

  return (
    <section aria-label="账号设置">
      <FormField name="email" label="邮箱" type="email" autoComplete="email" />
      <FormField name="username" label="用户名" />
      <FormField name="password" label="密码" type="password" autoComplete="new-password" />
      <FormField name="confirmPassword" label="确认密码" type="password" autoComplete="new-password" />
      <div>
        <input id="agree" type="checkbox" {...(useFormContext().register("agree"))} aria-describedby="agree-desc" />
        <label htmlFor="agree">我已阅读并同意</label>
        <div id="agree-desc" className="hint">勾选后可继续</div>
      </div>
      <button type="button" onClick={onNext}>下一步</button>
    </section>
  );
}
  1. 防止 XSS/CSRF、掩码输入与本地化(i18n)
  • XSS:默认不使用 dangerouslySetInnerHTML;若需渲染用户内容,使用 DOMPurify;后端设置 Content-Security-Policy,前端对富文本来源可信度校验。
  • CSRF:后端使用 SameSite=Lax/Strict Cookie,前端提交附带 CSRF token 头;对非幂等请求使用 POST。
  • 输入掩码:电话、生日等用 react-input-mask;结合 Zod 校验原始值。
  • i18n:使用 i18next,错误消息来自 Zod 或自定义 errorMap,再通过 t() 翻译;labels、placeholders 与 aria-label 皆国际化。

示例:安全展示与 i18n

// utils/safeHtml.ts
import DOMPurify from "dompurify";
export function safeHtml(html: string) {
  return { __html: DOMPurify.sanitize(html) };
}

// i18n 使用
import { useTranslation } from "react-i18next";
function LabelExample() {
  const { t } = useTranslation();
  return <label>{t("form.email")}</label>;
}

CSRF fetch 包装器

// utils/fetchWithCsrf.ts
export async function fetchWithCsrf(input: RequestInfo, init: RequestInit = {}) {
  const token = (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "";
  const headers = new Headers(init.headers || {});
  if (token) headers.set("X-CSRF-Token", token);
  return fetch(input, { ...init, headers, credentials: "include" });
}
  1. 代码分割与懒加载
  • 每一步懒加载;在进入当前步后预加载下一步以减少等待。
  • Suspense fallback 提供 aria-busy。
import { useEffect } from "react";
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
useEffect(() => {
  // 预加载下一步
  import("./steps/StepPreferences");
}, []);
  1. 单元测试(Vitest)与端到端测试(Playwright)
  • 单元测试:Zod schema 校验;组件渲染与错误提示;用户名查重防抖逻辑。
  • E2E:模拟用户名检查接口;逐步填写表单;上传文件;验证 aria 属性与错误提示。

Vitest 示例:schema

// tests/registration.schema.test.ts
import { describe, it, expect } from "vitest";
import { StepAccountSchema } from "../shared/schemas/registration";

describe("StepAccountSchema", () => {
  it("rejects weak password", () => {
    const res = StepAccountSchema.safeParse({
      email: "a@b.com",
      username: "user123",
      password: "weak",
      confirmPassword: "weak",
      agree: true,
    });
    expect(res.success).toBe(false);
  });
  it("accepts valid data", () => {
    const res = StepAccountSchema.safeParse({
      email: "a@b.com",
      username: "user_123",
      password: "Strong#123",
      confirmPassword: "Strong#123",
      agree: true,
    });
    expect(res.success).toBe(true);
  });
});

Vitest + Testing Library:组件错误渲染

// tests/StepAccount.test.tsx
import { render, screen } from "@testing-library/react";
import { FormProvider, useForm } from "react-hook-form";
import StepAccount from "../src/steps/StepAccount";

function Wrapper() {
  const methods = useForm({ defaultValues: {} });
  return (
    <FormProvider {...methods}>
      <StepAccount onNext={() => {}} />
    </FormProvider>
  );
}
test("renders labels", () => {
  render(<Wrapper />);
  expect(screen.getByLabelText("邮箱")).toBeInTheDocument();
});

Playwright E2E 示例

// e2e/register.spec.ts
import { test, expect } from "@playwright/test";

test("multi-step registration", async ({ page }) => {
  await page.route("/api/username/check", (route) => {
    const url = new URL(route.request().url());
    const u = url.searchParams.get("u");
    route.fulfill({ json: { taken: u === "taken_user" } });
  });

  await page.goto("/register");
  await page.getByLabel("邮箱").fill("user@example.com");
  await page.getByLabel("用户名").fill("taken_user");
  await page.waitForSelector("text=用户名已被占用");

  await page.getByLabel("用户名").fill("newuser");
  await page.getByLabel("密码").fill("Strong#123");
  await page.getByLabel("确认密码").fill("Strong#123");
  await page.getByLabel("我已阅读并同意").check();
  await page.getByRole("button", { name: "下一步" }).click();

  await page.getByLabel("账户类型").selectOption("company");
  await page.getByLabel("公司名称").fill("Acme Inc.");
  await page.getByRole("button", { name: "下一步" }).click();

  await page.getByRole("button", { name: "下一步" }).click(); // 偏好
  await page.setInputFiles('input[type="file"][accept="image/png,image/jpeg"]', 'tests/fixtures/avatar.png');
  await page.setInputFiles('input[type="file"][accept="application/pdf"]', 'tests/fixtures/id.pdf');
  await page.getByRole("button", { name: "下一步" }).click();

  await expect(page.locator("text=审核并提交")).toBeVisible();
});
  1. 可复用提交流程与错误边界模板
  • 提交流程:先全量校验,再并发上传文件与提交元数据;失败时设置 form-level 错误并滚动聚焦。
  • 错误边界:捕获渲染异常,提供重试按钮与日志上报。

提交模板

// steps/Review.tsx
import { useFormContext } from "react-hook-form";
import { RegistrationSchema, type RegistrationData } from "../../shared/schemas/registration";
import { fetchWithCsrf } from "../utils/fetchWithCsrf";

export default function Review({ onBack }: { onBack: () => void }) {
  const { getValues, trigger, setError } = useFormContext<RegistrationData>();

  const submit = async () => {
    const ok = await trigger(undefined, { shouldFocus: true }); // 全量校验
    if (!ok) return;
    const data = getValues();
    try {
      const form = new FormData();
      for (const [k, v] of Object.entries(data)) {
        if (v instanceof File) form.append(k, v);
        else form.append(k, String(v ?? ""));
      }
      const res = await fetchWithCsrf("/api/register", { method: "POST", body: form });
      if (!res.ok) throw new Error("注册失败");
      // 成功处理
      alert("注册成功");
    } catch (e: any) {
      setError("root", { type: "server", message: e.message || "服务器错误" });
      const el = document.querySelector('[role="alert"]') as HTMLElement | null;
      el?.focus();
    }
  };

  return (
    <section aria-label="审核并提交">
      <div role="alert" tabIndex={-1}>请确认信息后提交</div>
      <div className="nav">
        <button type="button" onClick={onBack}>上一步</button>
        <button type="button" onClick={submit}>提交</button>
      </div>
    </section>
  );
}

错误边界

// components/ErrorBoundary.tsx
import { Component, ReactNode } from "react";

export class ErrorBoundary extends Component<{ children: ReactNode }, { hasError: boolean; message?: string }> {
  constructor(props: any) {
    super(props);
    this.state = { hasError: false };
  }
  static getDerivedStateFromError(error: Error) {
    return { hasError: true, message: error.message };
  }
  componentDidCatch(error: Error, info: any) {
    // TODO: 上报日志
    console.error(error, info);
  }
  render() {
    if (this.state.hasError) {
      return (
        <div role="alert">
          出现错误:{this.state.message}
          <button onClick={() => this.setState({ hasError: false })}>重试</button>
        </div>
      );
    }
    return this.props.children;
  }
}

实践提示

  • 在 useForm 中开启 criteriaMode: "all" 可显示多条错误消息,但注意可读性。
  • 文件上传建议延迟到最终提交以简化回滚,或采用 S3 pre-signed URL 直传。
  • 对动态步骤的 resolver 使用单步 schema,加快校验与避免无关错误。
  • 预加载下一步组件与文案资源(i18n)以提升体验。
  • 键盘支持:确保按钮可通过 Tab 焦点访问;为自定义控件(比如 masked input)确保转发原生 input 的可访问属性。
  • CSP 示例:Content-Security-Policy: default-src 'self'; img-src 'self' data:; script-src 'self'; style-src 'self' 'unsafe-inline'; connect-src 'self';
  • 国际化错误:可通过 zod.setErrorMap((issue, ctx) => ({ message: t(validation.${issue.code}) })) 接入 i18n。

以上模板可直接作为骨架投入项目并根据业务扩展。通过 Zod 实现类型与校验统一,React Hook Form 管控状态与性能,无障碍与安全措施贯穿始终,再结合测试与代码分割确保质量与可维护性。

以下内容按“最佳实践 + 小型代码示例”的风格,给出一个用 Go 实现的订单微服务,REST 与 gRPC 同时暴露,涵盖分层架构、仓储模式、pgx 连接池与重试、幂等与并发控制、熔断与退避、可观测性(OpenTelemetry/Prometheus/zap)、优雅停机/健康探针、表驱动测试与 testcontainers 等关键点。示例代码尽量精简,能作为最小可运行骨架进行扩展。

一、分层架构与仓储模式(context 贯穿)

  • 分层建议:transport(HTTP/gRPC) -> service(业务用例) -> repository(数据访问) -> db。
  • 所有公开方法第一个参数使用 context.Context,传播超时、取消、tracing、日志字段等。
  • Repository 隔离持久化细节,service 仅关心业务流程;transport 只做编解码、验证、鉴权。

示例领域模型与接口

// internal/domain/order.go
package domain

import "time"

type Order struct {
    ID          string
    DedupKey    string // 幂等去重键(外部传入,需唯一约束)
    UserID      string
    AmountCents int64
    Status      string // created, paid, failed...
    Version     int64  // 乐观锁版本
    CreatedAt   time.Time
    UpdatedAt   time.Time
}

// 仓储接口
type OrderRepository interface {
    Create(ctx context.Context, o Order) (Order, error)               // 幂等:基于 DedupKey
    Get(ctx context.Context, id string) (Order, error)
    UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (Order, error)
}

二、PostgreSQL(pgx)连接池、超时与重试

  • 使用 pgxpool,统一初始化连接池,设置最大连接、生命周期、acquire timeout。
  • 在 service 层设置合理的超时(如 200-500ms 查询、1-2s 写入),在 repository 直接使用传入的 ctx。
  • 对可重试的瞬时错误(序列化失败 40001、死锁 40P01、连接重置等)做有限次重试 + 指数退避。

示例:pgx 连接与重试

// internal/db/pg.go
package db

import (
    "context"
    "time"
    "errors"

    "github.com/cenkalti/backoff/v4"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgconn"
)

func NewPool(ctx context.Context, url string) (*pgxpool.Pool, error) {
    cfg, err := pgxpool.ParseConfig(url)
    if err != nil { return nil, err }
    cfg.MaxConns = 20
    cfg.MinConns = 2
    cfg.MaxConnLifetime = time.Hour
    cfg.MaxConnIdleTime = 10 * time.Minute
    cfg.HealthCheckPeriod = 30 * time.Second
    cfg.ConnConfig.ConnectTimeout = 5 * time.Second
    return pgxpool.NewWithConfig(ctx, cfg)
}

func IsRetryablePgErr(err error) bool {
    var pgErr *pgconn.PgError
    if errors.As(err, &pgErr) {
        switch pgErr.Code {
        case "40001", "40P01": // serialization_failure, deadlock_detected
            return true
        }
    }
    // 网络层或临时 IO
    return errors.Is(err, pgx.ErrNoRows) == false && pgconn.SafeToRetry(err)
}

func WithTxRetry(ctx context.Context, pool *pgxpool.Pool, isoLevel pgx.TxIsoLevel, fn func(pgx.Tx) error) error {
    bo := backoff.NewExponentialBackOff()
    bo.InitialInterval = 20 * time.Millisecond
    bo.MaxInterval = 200 * time.Millisecond
    bo.MaxElapsedTime = 1 * time.Second

    operation := func() error {
        tx, err := pool.BeginTx(ctx, pgx.TxOptions{IsoLevel: isoLevel})
        if err != nil { return backoff.Permanent(err) }
        defer tx.Rollback(ctx)

        if err := fn(tx); err != nil {
            if IsRetryablePgErr(err) {
                return err // 触发 backoff 重试
            }
            return backoff.Permanent(err)
        }
        return tx.Commit(ctx)
    }
    return backoff.Retry(operation, bo)
}

三、幂等设计、去重键与悲观/乐观锁

  • 去重键:对外暴露 dedup_key,orders 表建立唯一约束,插入使用 INSERT ... ON CONFLICT。
  • 乐观锁:更新时带 version 条件,更新成功返回新版本,否则返回冲突错误。
  • 悲观锁:需要串行化修改的行使用 SELECT ... FOR UPDATE/UPDATE ... RETURNING,以及 SKIP LOCKED 支持并发 worker。
  • 幂等设计注意:去重键的语义稳定,建立唯一索引;在服务重试或网络抖动下可安全重放。

示例:仓储实现(幂等 + 乐观锁)

// internal/repo/order_repo.go
package repo

import (
    "context"
    "errors"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "yourmod/internal/domain"
    "yourmod/internal/db"
    "time"
)

var ErrConflict = errors.New("conflict")

type OrderRepo struct {
    pool *pgxpool.Pool
}

func NewOrderRepo(pool *pgxpool.Pool) *OrderRepo { return &OrderRepo{pool: pool} }

// SQL schema 要有:
// create table orders (
//   id uuid primary key default gen_random_uuid(),
//   dedup_key text not null unique,
//   user_id text not null,
//   amount_cents bigint not null,
//   status text not null,
//   version bigint not null default 1,
//   created_at timestamptz not null default now(),
//   updated_at timestamptz not null default now()
// );

func (r *OrderRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
    var out domain.Order
    q := `
    insert into orders (dedup_key, user_id, amount_cents, status)
    values ($1,$2,$3,'created')
    on conflict (dedup_key) do update set updated_at = now()
    returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
    `
    err := db.WithTxRetry(ctx, r.pool, pgx.Serializable, func(tx pgx.Tx) error {
        return tx.QueryRow(ctx, q, o.DedupKey, o.UserID, o.AmountCents).
            Scan(&out.ID, &out.DedupKey, &out.UserID, &out.AmountCents, &out.Status, &out.Version, &out.CreatedAt, &out.UpdatedAt)
    })
    return out, err
}

func (r *OrderRepo) Get(ctx context.Context, id string) (domain.Order, error) {
    var o domain.Order
    q := `select id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at from orders where id=$1`
    err := r.pool.QueryRow(ctx, q, id).Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
    return o, err
}

func (r *OrderRepo) UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (domain.Order, error) {
    var o domain.Order
    q := `
    update orders
       set status=$3, version=version+1, updated_at=now()
     where id=$1 and version=$2
     returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
    `
    cmd, err := r.pool.Query(ctx, q, id, fromVersion, newStatus)
    if err != nil { return o, err }
    defer cmd.Close()
    if !cmd.Next() { return o, ErrConflict }
    err = cmd.Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
    return o, err
}

// 悲观锁示意(库存/队列工作项):SELECT ... FOR UPDATE SKIP LOCKED
// tx.Query(ctx, `select id from jobs where status='ready' for update skip locked limit 1`)

四、熔断与回退策略,外部调用重试退避

  • 统一外部调用客户端,加入重试(幂等操作)、指数退避、超时;添加熔断器保护下游。
  • 回退策略:熔断打开时快速失败并采用降级方案(例如记录待支付任务,异步补偿)。

示例:带熔断与退避的支付客户端

// internal/ext/pay_client.go
package ext

import (
    "context"
    "errors"
    "net/http"
    "time"

    "github.com/sony/gobreaker"
    "github.com/cenkalti/backoff/v4"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type PayClient struct {
    httpc   *http.Client
    breaker *gobreaker.CircuitBreaker
}

func NewPayClient() *PayClient {
    st := gobreaker.Settings{
        Name:        "payment",
        MaxRequests: 5,
        Interval:    30 * time.Second,
        Timeout:     10 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures >= 5
        },
    }
    return &PayClient{
        httpc: &http.Client{
            Timeout: 2 * time.Second,
            Transport: otelhttp.NewTransport(http.DefaultTransport),
        },
        breaker: gobreaker.NewCircuitBreaker(st),
    }
}

func (c *PayClient) Authorize(ctx context.Context, orderID string, amount int64) error {
    op := func() error {
        _, err := c.breaker.Execute(func() (interface{}, error) {
            req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://payment/api/authorize", nil)
            resp, err := c.httpc.Do(req)
            if err != nil { return nil, err }
            defer resp.Body.Close()
            if resp.StatusCode >= 500 { return nil, errors.New("payment 5xx") }
            if resp.StatusCode >= 400 { return nil, backoff.Permanent(errors.New("payment 4xx")) }
            return nil, nil
        })
        return err
    }
    bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
    return backoff.Retry(op, bo)
}

五、OpenTelemetry 分布式追踪、Prometheus 指标与结构化日志(zap)

  • 初始化 OTel:OTLP 导出器 -> Collector;HTTP 使用 otelhttp、gRPC 使用 otelgrpc 拦截器自动注入 trace。
  • Prometheus:暴露 /metrics,添加自定义业务指标(订单创建计数、延迟直方图等)。
  • 日志:zap 生产级 JSON,带 trace_id、span_id、request_id 等;不同层使用 context 传播。

示例:可观测性初始化与采样指标

// internal/observ/otel.go
package observ

import (
    "context"
    "time"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/propagation"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func InitOTel(ctx context.Context, service string) (func(context.Context) error, error) {
    exp, err := otlptracehttp.New(ctx) // 读 OTEL_EXPORTER_OTLP_ENDPOINT 等环境变量
    if err != nil { return nil, err }
    res, _ := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(service)))
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exp),
        sdktrace.WithResource(res),
        sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.2))),
    )
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.TraceContext{})
    return tp.Shutdown, nil
}

// internal/observ/metrics.go
package observ

import "github.com/prometheus/client_golang/prometheus"

var (
    OrdersCreated = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "orders_created_total", Help: "Total created orders",
    })
    OrderCreateLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name: "order_create_seconds", Help: "Latency of order creation", Buckets: prometheus.DefBuckets,
    })
)

func MustRegisterMetrics() {
    prometheus.MustRegister(OrdersCreated, OrderCreateLatency)
}

// internal/observ/logging.go
package observ

import (
    "go.uber.org/zap"
    "context"
    "go.opentelemetry.io/otel/trace"
)

func NewLogger() *zap.Logger {
    l, _ := zap.NewProduction()
    return l
}

func WithTraceFields(ctx context.Context, l *zap.Logger) *zap.Logger {
    span := trace.SpanFromContext(ctx)
    sc := span.SpanContext()
    if sc.HasTraceID() {
        l = l.With(zap.String("trace_id", sc.TraceID().String()), zap.String("span_id", sc.SpanID().String()))
    }
    return l
}

六、服务层与路由(REST 与 gRPC)

  • REST 选择 chi/gin,gRPC 使用官方 server,二者共享同一 service。
  • HTTP 路由增加 otelhttp 和请求日志中间件;gRPC 加 otelgrpc 拦截器和健康服务。
  • 健康检查与就绪探针:/healthz(活性),/readyz(检查 DB Ping)。

proto(最小化)

// api/order/v1/order.proto
syntax = "proto3";
package order.v1;
option go_package = "yourmod/api/order/v1;orderv1";

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
  rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
}

message CreateOrderRequest { string dedup_key = 1; string user_id = 2; int64 amount_cents = 3; }
message CreateOrderResponse { string order_id = 1; }
message GetOrderRequest { string order_id = 1; }
message GetOrderResponse { string order_id = 1; string status = 2; int64 amount_cents = 3; }

service 与 transport 示例

// internal/service/order_service.go
package service

import (
    "context"
    "time"
    "yourmod/internal/domain"
    "yourmod/internal/repo"
    "yourmod/internal/ext"
    "yourmod/internal/observ"
    "go.uber.org/zap"
)

type OrderService struct {
    repo repo.OrderRepo
    pay  *ext.PayClient
    log  *zap.Logger
}

func NewOrderService(r *repo.OrderRepo, p *ext.PayClient, l *zap.Logger) *OrderService {
    return &OrderService{repo: *r, pay: p, log: l}
}

func (s *OrderService) Create(ctx context.Context, dedupKey, userID string, amount int64) (domain.Order, error) {
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()

    start := time.Now()
    o, err := s.repo.Create(ctx, domain.Order{DedupKey: dedupKey, UserID: userID, AmountCents: amount})
    observ.OrderCreateLatency.Observe(time.Since(start).Seconds())
    if err != nil { return o, err }
    observ.OrdersCreated.Inc()

    log := observ.WithTraceFields(ctx, s.log)
    // 外部支付授权(演示可选)
    if err := s.pay.Authorize(ctx, o.ID, o.AmountCents); err != nil {
        log.Warn("payment authorize failed, fallback", zap.Error(err))
        // 回退方案:标记待支付状态,或写出补偿消息
    }
    return o, nil
}

func (s *OrderService) Get(ctx context.Context, id string) (domain.Order, error) {
    ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
    defer cancel()
    return s.repo.Get(ctx, id)
}
// internal/transport/http/router.go
package httpapi

import (
    "encoding/json"
    "net/http"

    "github.com/go-chi/chi/v5"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "yourmod/internal/service"
    "yourmod/internal/observ"
    "github.com/jackc/pgx/v5/pgxpool"
)

func NewRouter(svc *service.OrderService, pool *pgxpool.Pool) http.Handler {
    r := chi.NewRouter()
    r.Use(otelhttp.NewMiddleware("http"))
    r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
    r.Get("/readyz", func(w http.ResponseWriter, r *http.Request) {
        if err := pool.Ping(r.Context()); err != nil { http.Error(w, "not ready", 503); return }
        w.WriteHeader(http.StatusOK)
    })
    r.Handle("/metrics", promHandler()) // 见下

    r.Post("/orders", func(w http.ResponseWriter, r *http.Request) {
        var req struct {
            DedupKey string `json:"dedup_key"`
            UserID   string `json:"user_id"`
            Amount   int64  `json:"amount_cents"`
        }
        if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
            http.Error(w, err.Error(), 400); return
        }
        o, err := svc.Create(r.Context(), req.DedupKey, req.UserID, req.Amount)
        if err != nil { http.Error(w, err.Error(), 500); return }
        _ = json.NewEncoder(w).Encode(map[string]string{"order_id": o.ID})
    })
    r.Get("/orders/{id}", func(w http.ResponseWriter, r *http.Request) {
        id := chi.URLParam(r, "id")
        o, err := svc.Get(r.Context(), id)
        if err != nil { http.Error(w, err.Error(), 404); return }
        _ = json.NewEncoder(w).Encode(o)
    })
    return r
}

func promHandler() http.Handler {
    observ.MustRegisterMetrics()
    return promhttp.Handler()
}
// internal/transport/grpc/server.go
package grpcapi

import (
    "context"
    "yourmod/api/order/v1"
    "yourmod/internal/service"

    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    "google.golang.org/grpc"
    "google.golang.org/grpc/health"
    healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

type Server struct {
    orderv1.UnimplementedOrderServiceServer
    svc *service.OrderService
}

func NewServer(s *service.OrderService) *Server { return &Server{svc: s} }

func (s *Server) CreateOrder(ctx context.Context, req *orderv1.CreateOrderRequest) (*orderv1.CreateOrderResponse, error) {
    o, err := s.svc.Create(ctx, req.DedupKey, req.UserId, req.AmountCents)
    if err != nil { return nil, err }
    return &orderv1.CreateOrderResponse{OrderId: o.ID}, nil
}

func (s *Server) GetOrder(ctx context.Context, req *orderv1.GetOrderRequest) (*orderv1.GetOrderResponse, error) {
    o, err := s.svc.Get(ctx, req.OrderId)
    if err != nil { return nil, err }
    return &orderv1.GetOrderResponse{OrderId: o.ID, Status: o.Status, AmountCents: o.AmountCents}, nil
}

func NewGRPCServer(s *service.OrderService) *grpc.Server {
    gs := grpc.NewServer(
        grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
        grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
    )
    orderv1.RegisterOrderServiceServer(gs, NewServer(s))
    healthSrv := health.NewServer()
    healthpb.RegisterHealthServer(gs, healthSrv)
    return gs
}

七、优雅停机、健康检查与就绪探针

  • 多 server 并行启动(HTTP、gRPC、metrics),使用 errgroup + context 管理。
  • 捕获 SIGINT/SIGTERM,调用 HTTP Server.Shutdown、gRPC GracefulStop、关闭 pg 池。
  • K8s:livenessProbe 指向 /healthz;readinessProbe 指向 /readyz;gRPC 可用 grpc-health-probe。

最小可运行 main

// cmd/ordersvc/main.go
package main

import (
    "context"
    "net"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/sync/errgroup"
    "go.uber.org/zap"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

    "yourmod/internal/db"
    "yourmod/internal/observ"
    "yourmod/internal/repo"
    "yourmod/internal/service"
    httpapi "yourmod/internal/transport/http"
    grpcapi "yourmod/internal/transport/grpc"
    "yourmod/internal/ext"
)

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    log := observ.NewLogger()
    defer log.Sync()
    shutdownOTel, err := observ.InitOTel(ctx, "ordersvc")
    if err != nil { log.Fatal("otel init", zap.Error(err)) }
    defer shutdownOTel(context.Background())

    pool, err := db.NewPool(ctx, os.Getenv("DATABASE_URL"))
    if err != nil { log.Fatal("db", zap.Error(err)) }
    defer pool.Close()

    pay := NewPayClient()
    repo := repo.NewOrderRepo(pool)
    svc := service.NewOrderService(repo, pay, log)

    httpSrv := &http.Server{
        Addr:    ":8080",
        Handler: httpapi.NewRouter(svc, pool),
        ReadHeaderTimeout: 5 * time.Second,
    }
    grpcSrv := grpcapi.NewGRPCServer(svc)

    g, ctx := errgroup.WithContext(ctx)
    // HTTP
    g.Go(func() error {
        log.Info("http listening", zap.String("addr", httpSrv.Addr))
        if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { return err }
        return nil
    })
    // gRPC
    g.Go(func() error {
        lis, err := net.Listen("tcp", ":9090")
        if err != nil { return err }
        log.Info("grpc listening", zap.String("addr", ":9090"))
        return grpcSrv.Serve(lis)
    })

    // 等待信号
    g.Go(func() error {
        <-ctx.Done()
        c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = httpSrv.Shutdown(c)
        grpcSrv.GracefulStop()
        return nil
    })

    if err := g.Wait(); err != nil {
        log.Error("server exit", zap.Error(err))
    }
}

八、表驱动测试与集成测试(testcontainers)

  • 表驱动测试:同一用例逻辑,不同输入/预期,代码简洁可读。
  • 集成测试:用 testcontainers 启动临时 Postgres,运行 schema/migrations,执行仓储/服务真实调用,保证幂等与事务语义。

表驱动示例

// internal/service/order_service_test.go
package service_test

import (
    "context"
    "testing"
    "yourmod/internal/service"
    "yourmod/internal/domain"
)

type stubRepo struct{ created map[string]domain.Order }
func (s *stubRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
    if s.created == nil { s.created = map[string]domain.Order{} }
    if existing, ok := s.created[o.DedupKey]; ok { return existing, nil }
    o.ID = "id-" + o.DedupKey; s.created[o.DedupKey] = o; return o, nil
}
// implement other methods ...

func TestCreate_Idempotent(t *testing.T) {
    r := &stubRepo{}
    svc := service.NewOrderService((*repo.OrderRepo)(r), nil, zap.NewNop())
    ctx := context.Background()

    cases := []struct{
        dedup string; wantID string
    }{
        {"k1", "id-k1"},
        {"k1", "id-k1"},
    }
    var first string
    for i, c := range cases {
        o, err := svc.Create(ctx, c.dedup, "u", 100)
        if err != nil { t.Fatal(err) }
        if i == 0 { first = o.ID }
        if o.ID != first { t.Fatalf("idempotency broken: %s != %s", o.ID, first) }
    }
}

testcontainers 集成测试示例(简化)

// internal/repo/order_repo_integration_test.go
package repo_test

import (
    "context"
    "testing"
    "time"
    "fmt"

    tc "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
    "github.com/jackc/pgx/v5/pgxpool"
    "yourmod/internal/db"
    "yourmod/internal/repo"
    "yourmod/internal/domain"
)

func startPostgres(t *testing.T) (*pgxpool.Pool, func()) {
    ctx := context.Background()
    req := tc.ContainerRequest{
        Image:        "postgres:16-alpine",
        Env:          map[string]string{"POSTGRES_PASSWORD": "pw", "POSTGRES_DB":"test"},
        ExposedPorts: []string{"5432/tcp"},
        WaitingFor:   wait.ForListeningPort("5432/tcp").WithStartupTimeout(30*time.Second),
    }
    pgC, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ContainerRequest: req, Started: true})
    if err != nil { t.Fatal(err) }

    host, _ := pgC.Host(ctx)
    port, _ := pgC.MappedPort(ctx, "5432/tcp")
    url := fmt.Sprintf("postgres://postgres:pw@%s:%s/test?sslmode=disable", host, port.Port())

    pool, err := db.NewPool(ctx, url)
    if err != nil { t.Fatal(err) }

    schema := `
    create extension if not exists pgcrypto;
    create table if not exists orders(
      id uuid primary key default gen_random_uuid(),
      dedup_key text not null unique,
      user_id text not null,
      amount_cents bigint not null,
      status text not null,
      version bigint not null default 1,
      created_at timestamptz not null default now(),
      updated_at timestamptz not null default now()
    );`
    if _, err := pool.Exec(ctx, schema); err != nil { t.Fatal(err) }

    cleanup := func() {
        pool.Close()
        _ = pgC.Terminate(ctx)
    }
    return pool, cleanup
}

func TestRepo_Create_Idempotent(t *testing.T) {
    pool, cleanup := startPostgres(t)
    defer cleanup()

    r := repo.NewOrderRepo(pool)
    ctx := context.Background()

    o1, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
    if err != nil { t.Fatal(err) }
    o2, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
    if err != nil { t.Fatal(err) }
    if o1.ID != o2.ID { t.Fatalf("expect same id, got %s vs %s", o1.ID, o2.ID) }
}

九、Makefile 与容器化建议

  • Makefile 目标:build/test/run/lint/proto/docker-build。
  • Dockerfile:多阶段构建,非 root 用户,最小基础镜像(distroless/alpine),设置健康检查,合理暴露端口(HTTP 8080,gRPC 9090,/metrics 同 HTTP)。
  • 通过环境变量配置:DATABASE_URL、OTEL_EXPORTER_OTLP_ENDPOINT、LOG_LEVEL 等。

Makefile(简化)

APP=ordersvc
PKG=./...
BIN=./bin/$(APP)

build:
	go build -o $(BIN) ./cmd/ordersvc

test:
	go test $(PKG) -count=1

run:
	DATABASE_URL=postgres://user:pw@localhost:5432/db?sslmode=disable go run ./cmd/ordersvc

proto:
	protoc --go_out=. --go-grpc_out=. api/order/v1/*.proto

docker-build:
	docker build -t $(APP):latest .

Dockerfile(简化)

# build
FROM golang:1.22 AS builder
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/ordersvc ./cmd/ordersvc

# runtime
FROM gcr.io/distroless/base-debian12
USER nonroot:nonroot
WORKDIR /app
COPY --from=builder /out/ordersvc /app/ordersvc
EXPOSE 8080 9090
ENV GODEBUG=madvdontneed=1
ENTRYPOINT ["/app/ordersvc"]

K8s 探针建议

  • livenessProbe HTTP GET /healthz,periodSeconds: 10, timeoutSeconds: 2。
  • readinessProbe HTTP GET /readyz,initialDelaySeconds: 5,failureThreshold: 3。
  • gRPC 使用 grpc-health-probe 亦可。

十、其他实践建议

  • 输入校验:REST 使用 binding/validator,gRPC 使用 protoc-gen-validate。
  • 安全与合规:限制请求体大小、超时、最大并发(HTTP Server Read/Write Timeout,gRPC keepalive)、对外调用限流(token bucket)。
  • 迁移工具:goose 或 golang-migrate,CI 中执行迁移。
  • 配置管理:viper/envconfig,支持多环境;敏感信息用 Secret。
  • 性能与池化:避免全局变量污染,复用 http.Client、连接池;对热点路径加缓存(带过期与一致性策略)。
  • 灰度与回滚:版本化 API,兼容旧客户端;数据库变更遵循 expand/contract。

只要将以上骨架按模块放入 yourmod 项目中,即可得到一个最小可运行、可观测、可测试、可容器化的 Go 订单微服务,并可逐步扩展业务逻辑与非功能性需求。

示例详情

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

一键生成多种编程语言的最佳实践,适配技术团队多样需求。
基于任务类型智能推荐高效开发方法,快速解决实际问题。
自动提供安全性与可维护性优化建议,避免潜在技术风险。
附带实用的小型代码示例,助力开发者快速上手与理解。
支持灵活调用和定制,满足不同项目场景需求和规范。
兼顾上下文与工程规范,让输出更贴近实际的工作环境。
适用于新手与资深开发者,一站式获取任务解决方案。
高效对接多语言技术栈,提升团队协作效率。

🎯 解决的问题

为用户提供在指定编程语言中完成特定任务的最佳实践指导,包括高效、安全、可维护的解决方案,并通过实际示例提升学习效果和实践应用能力。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...