编程任务最佳实践指南

252 浏览
23 试用
5 购买
Oct 23, 2025更新

根据用户指定的编程语言和具体开发任务,提供高效、安全、可维护的开发最佳实践,并配套简要说明和小型代码示例,帮助开发者快速理解并应用于实际项目中。

以下内容面向在 FastAPI 上实现完整鉴权与授权模块的工程实践,结合 JWT(访问令牌+刷新令牌)、Argon2id 密码哈希、RBAC/细粒度权限、Pydantic v2 校验、PostgreSQL+SQLAlchemy 2.x 异步、Redis 限流和刷新令牌黑名单、统一错误处理/结构化日志/OpenAPI 文档、pytest 单元与集成测试,并给出最小可运行示例。

一、总体设计与最佳实践要点

  • 身份认证与令牌
    • 使用短时访问令牌(例如 15 分钟)+ 长时刷新令牌(例如 7–30 天),刷新令牌仅用于轮换访问令牌。
    • JWT 建议包含:sub(用户ID)、jti(令牌ID)、type(access/refresh)、iat/nbf/exp、iss/aud、roles/permissions(仅访问令牌可携带)。
    • 使用非对称算法(RS256/EdDSA)或强 HMAC(HS256)并规划密钥轮换(kid 标识)。生产使用 JWKS 分发或 KMS 管理。
    • 刷新令牌应“单次使用+轮换”:每次刷新作废旧 jti 并生成新 jti;Redis 保存黑名单(jti->TTL=exp)。
  • 密码与登录安全
    • Argon2id 参数建议:time_cost=3、memory_cost=64MB(65536 KiB)、parallelism=2、salt_len=16、hash_len=32;在高并发环境按 CPU/内存调优并限制登录速率防止资源耗尽。
    • 登录限流:组合 username + IP 的固定窗口或滑动窗口计数;成功登录重置计数。
    • 登录失败不要暴露用户存在性;统一返回“用户名或密码错误”。
  • 授权(RBAC + 细粒度)
    • 模型:User 多对多 Roles;Roles 多对多 Permissions(如 "items:read"、"items:write"),便于细粒度授权。
    • 在依赖中加载 current_user 后进行权限检查;坚持“默认拒绝”原则,最小授权。
    • 对跨租户或资源级限制,增加资源所有权检查(如 user_id 与资源 owner_id 比较)。
  • 输入校验与防 SQL 注入
    • 使用 Pydantic v2 严格类型、字段校验器;对密码复杂度、Email 格式、枚举值等进行校验。
    • 所有数据库访问采用 ORM/参数化查询,避免字符串拼接。若 raw SQL 使用 text() + bindparam。
  • SQLAlchemy 2.x 异步 + PostgreSQL
    • 使用 AsyncEngine + async_sessionmaker;每请求一个 session(依赖注入),显式事务边界(async with session.begin():)。
    • 连接池:合理配置 pool_size、max_overflow、pool_timeout、pool_recycle;使用 asyncpg 驱动。
  • Redis
    • redis.asyncio 用于登录尝试计数与刷新令牌黑名单。key 设计包含租户/环境前缀。
    • 黑名单 TTL 应匹配刷新令牌 exp。
  • 统一错误处理与结构化日志
    • 定义统一错误响应结构:code/message/details/request_id。为常见领域错误定义异常类与全局异常处理器。
    • 结构化日志(JSON),注入 request_id、user_id、路径、耗时;避免记录敏感数据(密码、令牌)。
  • OpenAPI 文档
    • 为路由添加 tags、描述、响应模型与示例;用 response_model 限制输出。
  • 测试(pytest)
    • 单元测试:哈希/验证、JWT 签发与刷新、权限检查函数。
    • 集成测试:FastAPI app + httpx AsyncClient;准备临时 Postgres/Redis(推荐 Testcontainers)或使用依赖覆盖。
    • 覆盖鉴权中间件与限流逻辑,包括成功与异常分支。
  • 配置分层与密钥管理
    • pydantic-settings 管理配置:Base + Dev + Prod;环境变量/容器秘密/文件;区分敏感字段。
    • 密钥从环境或密钥管理服务加载;计划轮换;最小可见范围;不写入日志。
  • 容器化与部署安全清单
    • 使用瘦基镜像(python:3.x-slim/Distroless);只复制必要文件;多阶段构建;只读根文件系统。
    • 以非 root 用户运行;限制 Linux capabilities;启用 seccomp/AppArmor。
    • 依赖锁定与漏洞扫描(pip-tools/SBOM);禁用调试/自动文档在生产可按策略限制。
    • 强制 HTTPS/TLS;CORS 白名单;若使用 Cookie 承载刷新令牌则必须 HttpOnly/SameSite + CSRF 防护。
    • 资源与速率限制(CPU/Mem、连接池上限);健康检查与熔断;监控告警。
    • 数据库权限最小化(应用用户仅 DML,不授予 DDL);参数化查询;启用审计日志。

二、最小可运行示例(FastAPI + SQLAlchemy 2.x 异步 + Redis) 提示:

  • 需要本地 PostgreSQL 和 Redis 服务。
  • 仅为演示,将 JWT 使用 HS256。生产推荐非对称算法与密钥轮换。
  • Alembic 迁移未包含,示例在启动时自动建表并创建一个演示用户。

文件:app.py (运行:pip install "fastapi>=0.115" "uvicorn[standard]" "sqlalchemy[asyncpg]>=2.0" "pydantic>=2" "pydantic-settings>=2" "argon2-cffi" "PyJWT" "redis>=5" "structlog")

# app.py
import asyncio
import logging
import time
import uuid
from datetime import datetime, timedelta
from typing import Annotated, Optional, Sequence

import jwt  # PyJWT
import structlog
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, EmailStr, Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from redis.asyncio import Redis
from sqlalchemy import ForeignKey, String, select, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

# -------------------- Config --------------------
class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", env_file_encoding="utf-8")

    # App
    app_name: str = "Auth Service"
    environment: str = "dev"

    # DB
    db_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/appdb"
    db_pool_size: int = 5
    db_max_overflow: int = 5
    db_pool_timeout: int = 30
    db_pool_recycle: int = 1800

    # Redis
    redis_url: str = "redis://localhost:6379/0"
    rate_limit_login_per_minute: int = 5

    # JWT
    jwt_secret: str = "CHANGE_ME_IN_PROD"  # 生产从 KMS/Secrets Manager 加载
    jwt_kid: str = "kid-1"
    access_token_ttl_minutes: int = 15
    refresh_token_ttl_days: int = 7
    jwt_algorithm: str = "HS256"
    jwt_issuer: str = "your-company"
    jwt_audience: str = "internal-platform"

    # Argon2id
    argon_time_cost: int = 3
    argon_memory_cost: int = 65536  # KiB = 64 MiB
    argon_parallelism: int = 2
    argon_hash_len: int = 32
    argon_salt_len: int = 16

settings = Settings()

# -------------------- Logging --------------------
# 结构化日志:注入 request_id & user_id
logging.basicConfig(level=logging.INFO)
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ]
)
log = structlog.get_logger()

# -------------------- DB Setup --------------------
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    password_hash: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    last_login: Mapped[Optional[datetime]] = mapped_column(default=None)

    roles: Mapped[Sequence["Role"]] = relationship(
        secondary="user_roles", back_populates="users", lazy="selectin"
    )

class Role(Base):
    __tablename__ = "roles"
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    users: Mapped[Sequence[User]] = relationship(
        secondary="user_roles", back_populates="roles", lazy="selectin"
    )
    permissions: Mapped[Sequence["Permission"]] = relationship(
        secondary="role_permissions", back_populates="roles", lazy="selectin"
    )

class Permission(Base):
    __tablename__ = "permissions"
    id: Mapped[int] = mapped_column(primary_key=True)
    code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
    roles: Mapped[Sequence[Role]] = relationship(
        secondary="role_permissions", back_populates="permissions", lazy="selectin"
    )

class UserRole(Base):
    __tablename__ = "user_roles"
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
    role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)

class RolePermission(Base):
    __tablename__ = "role_permissions"
    role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
    permission_id: Mapped[int] = mapped_column(ForeignKey("permissions.id"), primary_key=True)

engine: AsyncEngine = create_async_engine(
    settings.db_url,
    pool_size=settings.db_pool_size,
    max_overflow=settings.db_max_overflow,
    pool_timeout=settings.db_pool_timeout,
    pool_recycle=settings.db_pool_recycle,
    echo=False,
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def get_session() -> AsyncSession:
    async with SessionLocal() as session:
        yield session  # 请求结束自动关闭

# -------------------- Redis --------------------
redis: Redis | None = None
async def get_redis() -> Redis:
    assert redis is not None
    return redis

# -------------------- Security --------------------
from argon2 import PasswordHasher
ph = PasswordHasher(
    time_cost=settings.argon_time_cost,
    memory_cost=settings.argon_memory_cost,
    parallelism=settings.argon_parallelism,
    hash_len=settings.argon_hash_len,
    salt_len=settings.argon_salt_len,
)

def hash_password(password: str) -> str:
    return ph.hash(password)

def verify_password(password: str, hashed: str) -> bool:
    try:
        return ph.verify(hashed, password)
    except Exception:
        return False

def now_ts() -> int:
    return int(time.time())

def create_access_token(sub: str, roles: list[str], perms: list[str]) -> str:
    jti = str(uuid.uuid4())
    payload = {
        "sub": sub,
        "jti": jti,
        "type": "access",
        "roles": roles,
        "perms": perms,
        "iat": now_ts(),
        "nbf": now_ts(),
        "exp": int((datetime.utcnow() + timedelta(minutes=settings.access_token_ttl_minutes)).timestamp()),
        "iss": settings.jwt_issuer,
        "aud": settings.jwt_audience,
    }
    headers = {"kid": settings.jwt_kid}
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)

def create_refresh_token(sub: str) -> tuple[str, str, int]:
    jti = str(uuid.uuid4())
    exp = int((datetime.utcnow() + timedelta(days=settings.refresh_token_ttl_days)).timestamp())
    payload = {
        "sub": sub,
        "jti": jti,
        "type": "refresh",
        "iat": now_ts(),
        "nbf": now_ts(),
        "exp": exp,
        "iss": settings.jwt_issuer,
        "aud": settings.jwt_audience,
    }
    headers = {"kid": settings.jwt_kid}
    token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)
    return token, jti, exp

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")  # Swagger 上用于授权按钮

# -------------------- Schemas (Pydantic v2) --------------------
class TokenPair(BaseModel):
    access_token: str = Field(..., description="短时访问令牌")
    refresh_token: str = Field(..., description="长时刷新令牌")
    token_type: str = "bearer"
    expires_in: int = Field(..., description="访问令牌剩余秒数")

class LoginRequest(BaseModel):
    email: EmailStr
    password: SecretStr

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: SecretStr):
        s = v.get_secret_value()
        if len(s) < 8 or not any(c.isdigit() for c in s) or not any(c.isalpha() for c in s):
            raise ValueError("密码至少8位,包含字母与数字")
        return v

class MeResponse(BaseModel):
    id: int
    email: EmailStr
    roles: list[str]
    permissions: list[str]

# -------------------- Error Handling --------------------
class AppError(Exception):
    def __init__(self, message: str, code: str = "bad_request", status_code: int = 400, details: dict | None = None):
        self.message = message
        self.code = code
        self.status_code = status_code
        self.details = details or {}

app = FastAPI(title=settings.app_name)

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
    rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    return JSONResponse(
        status_code=exc.status_code,
        content={"code": exc.code, "message": exc.message, "details": exc.details, "request_id": rid},
    )

@app.middleware("http")
async def log_middleware(request: Request, call_next):
    rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    start = time.perf_counter()
    try:
        response = await call_next(request)
    finally:
        duration = (time.perf_counter() - start) * 1000
        structlog.contextvars.clear_contextvars()
        log.info("request",
                 request_id=rid, method=request.method, path=request.url.path,
                 status=getattr(response, "status_code", None), duration_ms=round(duration, 2))
    return response

# -------------------- Auth Dependencies --------------------
async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    session: Annotated[AsyncSession, Depends(get_session)],
) -> tuple[User, list[str], list[str]]:
    # 解析访问令牌
    try:
        payload = jwt.decode(
            token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="令牌无效或已过期")
    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="访问令牌类型错误")

    user_id = int(payload["sub"])
    user = await session.get(User, user_id)
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不可用")
    # 加载角色与权限(避免将权限完全信赖客户端令牌内容)
    roles = [r.code for r in user.roles]
    perm_codes: set[str] = set()
    for r in user.roles:
        for p in r.permissions:
            perm_codes.add(p.code)
    return user, roles, sorted(list(perm_codes))

def require_roles(required: list[str]):
    async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
        user, roles, perms = ctx
        if not set(required).issubset(set(roles)):
            raise HTTPException(status_code=403, detail="缺少角色")
        return ctx
    return dep

def require_permissions(required: list[str]):
    async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
        user, roles, perms = ctx
        if not set(required).issubset(set(perms)):
            raise HTTPException(status_code=403, detail="缺少权限")
        return ctx
    return dep

# -------------------- Rate Limiting & Blacklist --------------------
async def check_login_rate_limit(r: Redis, username: str, ip: str):
    # 固定窗口:每分钟最多 N 次
    key = f"rl:login:{username}:{ip}"
    count = await r.incr(key)
    if count == 1:
        await r.expire(key, 60)
    if count > settings.rate_limit_login_per_minute:
        raise AppError("登录过于频繁,请稍后再试", code="too_many_requests", status_code=429)

async def blacklist_refresh_jti(r: Redis, jti: str, exp_ts: int):
    ttl = max(exp_ts - now_ts(), 1)
    await r.setex(f"rt:blacklist:{jti}", ttl, "1")

async def is_refresh_jti_blacklisted(r: Redis, jti: str) -> bool:
    return await r.exists(f"rt:blacklist:{jti}") == 1

# -------------------- Routes --------------------
@app.post("/auth/login", response_model=TokenPair, tags=["auth"])
async def login(
    req: LoginRequest,
    request: Request,
    session: Annotated[AsyncSession, Depends(get_session)],
    r: Annotated[Redis, Depends(get_redis)],
):
    ip = request.client.host or "unknown"
    await check_login_rate_limit(r, req.email, ip)

    # 查找用户(使用 ORM 防注入)
    stmt = select(User).where(User.email == req.email)
    user = (await session.execute(stmt)).scalar_one_or_none()
    if not user or not verify_password(req.password.get_secret_value(), user.password_hash):
        # 不泄露账号存在性
        raise HTTPException(status_code=401, detail="用户名或密码错误")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="账号已禁用")

    # 加载角色与权限(懒加载已设置为 selectin)
    roles = [r.code for r in user.roles]
    perms = sorted(set(p.code for r in user.roles for p in r.permissions))

    # 更新最后登录时间(事务)
    async with session.begin():
        user.last_login = datetime.utcnow()

    access = create_access_token(str(user.id), roles, perms)
    refresh, jti, exp_ts = create_refresh_token(str(user.id))
    # 选项:可记录 session/设备;此处仅演示不持久化
    expires_in = settings.access_token_ttl_minutes * 60
    return TokenPair(access_token=access, refresh_token=refresh, expires_in=expires_in)

class RefreshRequest(BaseModel):
    refresh_token: str = Field(..., description="刷新令牌")

@app.post("/auth/refresh", response_model=TokenPair, tags=["auth"])
async def refresh_tokens(
    body: RefreshRequest,
    session: Annotated[AsyncSession, Depends(get_session)],
    r: Annotated[Redis, Depends(get_redis)],
):
    # 验证刷新令牌
    try:
        payload = jwt.decode(
            body.refresh_token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="刷新令牌无效或已过期")
    if payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="令牌类型错误")

    jti = payload["jti"]
    if await is_refresh_jti_blacklisted(r, jti):
        raise HTTPException(status_code=401, detail="刷新令牌已失效")

    user = await session.get(User, int(payload["sub"]))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不可用")

    roles = [r.code for r in user.roles]
    perms = sorted(set(p.code for r in user.roles for p in r.permissions))

    # 令牌轮换:作废旧刷新令牌
    await blacklist_refresh_jti(r, jti, payload["exp"])
    access = create_access_token(str(user.id), roles, perms)
    new_refresh, new_jti, new_exp = create_refresh_token(str(user.id))
    return TokenPair(access_token=access, refresh_token=new_refresh, expires_in=settings.access_token_ttl_minutes * 60)

class LogoutRequest(BaseModel):
    refresh_token: str

@app.post("/auth/logout", tags=["auth"])
async def logout(body: LogoutRequest, r: Annotated[Redis, Depends(get_redis)]):
    # 注销:将刷新令牌 jti 加入黑名单。访问令牌让其自然过期。
    try:
        payload = jwt.decode(
            body.refresh_token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
            audience=settings.jwt_audience,
            issuer=settings.jwt_issuer,
            options={"require": ["exp", "sub", "type"], "verify_exp": True},
        )
    except jwt.PyJWTError:
        # 静默处理避免提示有效性
        raise HTTPException(status_code=200, detail="已退出")  # 也可返回 204

    if payload.get("type") != "refresh":
        return {"message": "已退出"}

    await blacklist_refresh_jti(r, payload["jti"], payload["exp"])
    return {"message": "已退出"}

@app.get("/me", response_model=MeResponse, tags=["users"])
async def me(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
    user, roles, perms = ctx
    return MeResponse(id=user.id, email=user.email, roles=roles, permissions=perms)

@app.get("/admin/secret", tags=["admin"])
async def admin_secret(_: Annotated[tuple[User, list[str], list[str]], Depends(require_roles(["admin"]))]):
    return {"secret": "admin-only"}

@app.get("/items", tags=["items"])
async def list_items(_: Annotated[tuple[User, list[str], list[str]], Depends(require_permissions(["items:read"]))]):
    # 示例:如需 raw SQL,务必使用 text() 与参数绑定防止注入
    # stmt = text("SELECT * FROM items WHERE owner_id = :uid").bindparams(uid=current_user.id)
    return {"items": []}

# -------------------- Lifecycle --------------------
@app.on_event("startup")
async def on_startup():
    global redis
    redis = Redis.from_url(settings.redis_url, decode_responses=True)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 演示数据:创建一个 admin 用户及权限
    async with SessionLocal() as session:
        async with session.begin():
            # 如果没有用户,创建一个
            exists = (await session.execute(select(User).limit(1))).scalar_one_or_none()
            if not exists:
                admin_role = Role(code="admin")
                p_read = Permission(code="items:read")
                p_write = Permission(code="items:write")
                admin_role.permissions.extend([p_read, p_write])
                u = User(email="admin@example.com", password_hash=hash_password("Passw0rd!"))
                u.roles.append(admin_role)
                session.add_all([u])
    log.info("startup_complete", environment=settings.environment)

@app.on_event("shutdown")
async def on_shutdown():
    if redis:
        await redis.close()
    await engine.dispose()

三、Pydantic v2、SQL 注入与事务的要点示例

  • 字段校验器(v2)与严格类型
class CreateItem(BaseModel):
    name: str = Field(min_length=1, max_length=64)
    price: float = Field(gt=0)
    owner_id: int

    @field_validator("name")
    @classmethod
    def no_dangerous_chars(cls, v: str):
        if ";" in v or "--" in v:
            raise ValueError("非法字符")
        return v
  • 原生 SQL 的安全用法
from sqlalchemy import text, bindparam
stmt = text("UPDATE users SET last_login = NOW() WHERE id = :uid").bindparams(bindparam("uid", value=user_id))
await session.execute(stmt)
await session.commit()
  • 异步事务边界(单元工作)
async with session.begin():
    session.add(entity)
    # 多条写操作在同一事务中

四、OpenAPI 文档与示例提示

  • 在路由中使用 response_model、tags、描述;为请求/响应模型添加 Field 的描述与示例。
  • 生产可限制文档访问(如仅内网/带授权访问)。

五、测试(pytest/pytest-asyncio) 建议使用 Testcontainers 启动真实 Postgres 与 Redis;这里给出一个基础集成测试示例,使用 httpx AsyncClient 调用登录与 /me: 安装:pip install pytest pytest-asyncio httpx

文件:tests/test_auth.py

import pytest
import asyncio
from httpx import AsyncClient
from app import app  # 假设示例文件名为 app.py

@pytest.mark.asyncio
async def test_login_and_me():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # 登录
        resp = await client.post("/auth/login", json={"email": "admin@example.com", "password": "Passw0rd!"})
        assert resp.status_code == 200
        tokens = resp.json()
        assert "access_token" in tokens
        # 访问 /me
        headers = {"Authorization": f"Bearer {tokens['access_token']}"}
        me = await client.get("/me", headers=headers)
        assert me.status_code == 200
        assert me.json()["email"] == "admin@example.com"

测试覆盖建议:

  • 密码哈希与验证(正确/错误);
  • 访问令牌过期与刷新令牌轮换(旧 jti 被黑名单);
  • 登录限流逻辑(触发 429);
  • RBAC 与权限拒绝(返回 403);
  • 统一错误响应结构与日志中 request_id 注入。

六、密钥管理、配置分层、容器化与部署安全清单

  • 密钥管理
    • 不将密钥写入仓库;从环境变量/密钥管理服务(AWS KMS/Secrets Manager、HashiCorp Vault)加载。
    • 启用密钥轮换:JWT header 使用 kid;后端支持多活密钥验证(旧密钥验证、用新密钥签发)。
    • 日志中永不打印令牌或密码;对 JWT 仅记录 jti、sub。
  • 配置分层
    • BaseSettings + env 前缀分层;区分 Dev/Staging/Prod;对敏感字段(jwt_secret、db_url)单独密文存储。
    • 使用配置模板与默认安全值(严格 CORS、debug=false)。
  • 容器化与部署安全
    • 非 root 用户运行;裁剪镜像;只读文件系统;限制 /tmp 写入。
    • 依赖锁定(requirements.txt/poetry.lock);CI 扫描漏洞;生成 SBOM。
    • 启用 TLS;在反向代理/Nginx/Ingress 上强制 HSTS;限制 CORS。
    • 数据库最小权限账户;隔离网络;监控连接池指标与慢查询。
    • 资源配额与限流(登录、刷新、敏感路由);报警与审计日志。
    • 打包时排除 .env 等敏感文件;在云环境中通过 secrets 注入。

七、补充建议

  • Alembic 迁移与种子数据脚本;在迁移层面维护角色/权限。
  • 对多租户系统,在 JWT 中加入 tenant_id 并在查询层过滤;权限包含资源归属校验。
  • 若前端为浏览器 SPA,推荐将刷新令牌置于 HttpOnly Cookie 并启用 CSRF(令牌双提交等);访问令牌仍用 Authorization Bearer。

该示例可直接运行(需本机 PostgreSQL 与 Redis),展示路由、依赖注入、令牌签发/刷新与退出登录,满足需求中的关键要点。实际生产请替换 HS256 为非对称算法、接入密钥托管、完善日志/监控、使用迁移与更全面的测试。

以下是面向 React + TypeScript 的多步骤注册表单的实用模板与常见最佳实践,覆盖:Zod 共享校验、React Hook Form 状态管理(含动态字段与文件上传)、无障碍支持、用户名查重的防抖与取消、XSS/CSRF 防护、输入掩码与 i18n、代码分割、Vitest/Playwright 测试,以及可复用控件、提交流程与错误边界示例。

总原则

  • 单一数据源与类型安全:用 Zod 定义前后端共享 schema,类型推导贯穿 UI、API、DB。
  • 按步拆分:每一步独立 schema、独立组件,最后统一合并验证。
  • 组件化与可复用:基础控件 Input/Select/FileUpload 与 FormProvider 结合。
  • 无障碍优先:标签关联、错误提示关联、键盘导航可达、进度区域朗读。
  • 网络健壮性:防抖、AbortController、幂等提交、错误边界。
  • 安全与隐私:默认不使用 innerHTML、CSRF token、Content-Security-Policy、文件类型/大小限制。
  • 性能:懒加载步骤、预加载下一步、只注册可见字段(shouldUnregister)。
  1. Zod 共享校验 schema(前后端复用)
  • 前端/后端共用同一 schema 文件,前端用 @hookform/resolvers/zod,后端在路由层校验。
  • 按步骤定义子 schema,最终合并。

示例:shared/schemas/registration.ts

import { z } from "zod";

export const UsernameSchema = z
  .string()
  .min(3, "用户名至少 3 个字符")
  .max(20, "用户名最多 20 个字符")
  .regex(/^[a-zA-Z0-9_]+$/, "仅限字母、数字和下划线");

export const StepAccountSchema = z.object({
  email: z.string().email("邮箱格式不正确"),
  username: UsernameSchema,
  password: z
    .string()
    .min(8, "密码至少 8 位")
    .regex(/[A-Z]/, "至少包含一个大写字母")
    .regex(/[a-z]/, "至少包含一个小写字母")
    .regex(/\d/, "至少包含一个数字")
    .regex(/[^A-Za-z0-9]/, "至少包含一个特殊字符"),
  confirmPassword: z.string(),
  agree: z.literal(true, { errorMap: () => ({ message: "需同意协议" }) }),
}).refine((d) => d.password === d.confirmPassword, {
  path: ["confirmPassword"],
  message: "两次密码不一致",
});

export const StepProfileSchema = z.object({
  firstName: z.string().min(1, "必填"),
  lastName: z.string().min(1, "必填"),
  birthDate: z.string().optional(), // 前端字符串,后端可 transform 为 Date
  phone: z
    .string()
    .min(10, "电话长度不正确")
    .regex(/^[0-9\-+\s()]+$/ , "电话格式不正确"),
  accountType: z.enum(["personal", "company"]),
  companyName: z.string().optional(),
}).refine((d) => d.accountType === "personal" || !!d.companyName, {
  path: ["companyName"],
  message: "公司帐号需填写公司名称",
});

export const StepPreferencesSchema = z.object({
  newsletter: z.boolean().default(false),
  language: z.enum(["zh-CN", "en-US"]).default("zh-CN"),
});

const acceptedImage = ["image/png", "image/jpeg"];
const acceptedDoc = ["application/pdf"];

export const StepDocumentsSchema = z.object({
  avatar: z
    .instanceof(File)
    .optional()
    .refine((f) => !f || (acceptedImage.includes(f.type) && f.size <= 2 * 1024 * 1024), "头像需 PNG/JPEG 且不超过 2MB"),
  idDocument: z
    .instanceof(File)
    .refine((f) => acceptedDoc.includes(f.type) && f.size <= 5 * 1024 * 1024, "证件需 PDF 且不超过 5MB"),
});

export const RegistrationSchema = StepAccountSchema
  .and(StepProfileSchema)
  .and(StepPreferencesSchema)
  .and(StepDocumentsSchema);

export type RegistrationData = z.infer<typeof RegistrationSchema>;

后端(示例 Express):

// server/routes/register.ts
import { RegistrationSchema } from "../../shared/schemas/registration";
import type { Request, Response } from "express";

export async function registerHandler(req: Request, res: Response) {
  try {
    const parsed = RegistrationSchema.parse({
      ...req.body,
      // 若使用 multipart,需先通过 multer 获取文件对象
      avatar: req.file?.avatar, 
      idDocument: req.file?.idDocument,
    });
    // TODO: 持久化
    res.status(201).json({ ok: true });
  } catch (e) {
    res.status(400).json({ ok: false, errors: e });
  }
}
  1. React Hook Form 管理状态(动态字段、文件上传、进度展示)
  • 使用 FormProvider 共享上下文;每步独立组件,自身只关心其字段。
  • shouldUnregister: true 让隐藏字段自动移除,避免提交冗余数据。
  • 文件上传建议单独组件,前端先校验类型/大小,再上传(带进度)。

示例:Form 容器与步骤懒加载

// App.tsx
import { Suspense, lazy, useMemo, useState } from "react";
import { useForm, FormProvider } from "react-hook-form";
import { zodResolver } from "@hookform/resolvers/zod";
import { RegistrationSchema, StepAccountSchema, StepProfileSchema, StepPreferencesSchema, StepDocumentsSchema, type RegistrationData } from "../shared/schemas/registration";

const StepAccount = lazy(() => import("./steps/StepAccount"));
const StepProfile = lazy(() => import("./steps/StepProfile"));
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
const StepDocuments = lazy(() => import("./steps/StepDocuments"));
const Review = lazy(() => import("./steps/Review"));

type StepKey = "account" | "profile" | "preferences" | "documents" | "review";
const stepOrder: StepKey[] = ["account", "profile", "preferences", "documents", "review"];
const stepResolvers = {
  account: StepAccountSchema,
  profile: StepProfileSchema,
  preferences: StepPreferencesSchema,
  documents: StepDocumentsSchema,
  review: RegistrationSchema, // 最终全量校验
};

export default function App() {
  const [step, setStep] = useState<StepKey>("account");
  const methods = useForm<RegistrationData>({
    resolver: zodResolver(stepResolvers[step]),
    mode: "onBlur",
    shouldUnregister: true,
    defaultValues: { newsletter: false, language: "zh-CN" } as Partial<RegistrationData>,
  });

  const next = async () => {
    const valid = await methods.trigger(); // 仅触发当前步骤校验
    if (!valid) return;
    const idx = stepOrder.indexOf(step);
    setStep(stepOrder[idx + 1] ?? step);
  };
  const back = () => {
    const idx = stepOrder.indexOf(step);
    setStep(stepOrder[idx - 1] ?? step);
  };

  return (
    <FormProvider {...methods}>
      <form onSubmit={methods.handleSubmit(() => setStep("review"))} noValidate>
        <Suspense fallback={<div aria-busy="true">加载中…</div>}>
          {step === "account" && <StepAccount onNext={next} />}
          {step === "profile" && <StepProfile onNext={next} onBack={back} />}
          {step === "preferences" && <StepPreferences onNext={next} onBack={back} />}
          {step === "documents" && <StepDocuments onNext={next} onBack={back} />}
          {step === "review" && <Review onBack={back} />}
        </Suspense>
      </form>
    </FormProvider>
  );
}

可复用基础控件(带无障碍)

// components/FormField.tsx
import { useFormContext } from "react-hook-form";
import { useId } from "react";

type Props = {
  name: string;
  label: string;
  type?: string;
  placeholder?: string;
  autoComplete?: string;
  inputProps?: React.InputHTMLAttributes<HTMLInputElement>;
};

export function FormField({ name, label, type = "text", placeholder, autoComplete, inputProps }: Props) {
  const { register, formState: { errors } } = useFormContext();
  const id = useId();
  const errMsg = (errors as any)[name]?.message as string | undefined;
  const descId = `${id}-desc`;
  return (
    <div role="group" aria-labelledby={`${id}-label`} className="field">
      <label id={`${id}-label`} htmlFor={id}>{label}</label>
      <input
        id={id}
        type={type}
        placeholder={placeholder}
        autoComplete={autoComplete}
        aria-invalid={!!errMsg}
        aria-describedby={errMsg ? descId : undefined}
        {...register(name)}
        {...inputProps}
      />
      {errMsg && (
        <div id={descId} role="alert" aria-live="polite" className="error">
          {errMsg}
        </div>
      )}
    </div>
  );
}

动态字段与输入掩码

// steps/StepProfile.tsx
import { useFormContext } from "react-hook-form";
import InputMask from "react-input-mask";
import { FormField } from "../components/FormField";

export default function StepProfile({ onNext, onBack }: { onNext: () => void; onBack: () => void }) {
  const { register, watch, setValue, formState: { errors } } = useFormContext();
  const accountType = watch("accountType");

  return (
    <section aria-label="个人信息">
      <div>
        <label htmlFor="accountType">账户类型</label>
        <select id="accountType" {...register("accountType")}>
          <option value="personal">个人</option>
          <option value="company">公司</option>
        </select>
      </div>

      <FormField name="firstName" label="名" />
      <FormField name="lastName" label="姓" />
      {/* 电话掩码 */}
      <div>
        <label htmlFor="phone">电话</label>
        <InputMask mask="+99 999-999-999" {...register("phone")} id="phone">
          {(inputProps: any) => <input {...inputProps} aria-invalid={!!errors.phone} />}
        </InputMask>
        {errors.phone?.message && <div role="alert">{String(errors.phone.message)}</div>}
      </div>

      {accountType === "company" && (
        <FormField name="companyName" label="公司名称" />
      )}

      <div className="nav">
        <button type="button" onClick={onBack}>上一步</button>
        <button type="button" onClick={onNext}>下一步</button>
      </div>
    </section>
  );
}

文件上传(大小/类型校验、进度展示)

// components/FileUpload.tsx
import { useFormContext } from "react-hook-form";
import { useId, useState, useRef } from "react";

type FileUploadProps = {
  name: "avatar" | "idDocument";
  label: string;
  accept: string;
};

export function FileUpload({ name, label, accept }: FileUploadProps) {
  const { register, setValue, formState: { errors } } = useFormContext();
  const id = useId();
  const [progress, setProgress] = useState<number>(0);
  const xhrRef = useRef<XMLHttpRequest | null>(null);
  const errMsg = (errors as any)[name]?.message as string | undefined;

  const onUpload = async (file: File) => {
    const form = new FormData();
    form.append(name, file);
    const xhr = new XMLHttpRequest();
    xhrRef.current = xhr;
    xhr.upload.onprogress = (e) => {
      if (e.lengthComputable) setProgress(Math.round((e.loaded / e.total) * 100));
    };
    xhr.onreadystatechange = () => {
      if (xhr.readyState === 4) {
        xhrRef.current = null;
      }
    };
    xhr.open("POST", `/api/upload/${name}`);
    xhr.setRequestHeader("X-CSRF-Token", (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "");
    xhr.send(form);
  };

  return (
    <div>
      <label htmlFor={id}>{label}</label>
      <input
        id={id}
        type="file"
        accept={accept}
        {...register(name as any)}
        onChange={(e) => {
          const file = e.target.files?.[0];
          if (file) {
            setValue(name, file, { shouldValidate: true });
            onUpload(file); // 可改为在最终提交时统一上传
          }
        }}
        aria-invalid={!!errMsg}
        aria-describedby={errMsg ? `${id}-error` : undefined}
      />
      {errMsg && <div id={`${id}-error`} role="alert">{errMsg}</div>}
      <div aria-live="polite" aria-atomic="true">{progress > 0 && `上传进度:${progress}%`}</div>
      {xhrRef.current && <button type="button" onClick={() => xhrRef.current?.abort()}>取消上传</button>}
    </div>
  );
}
  1. 无障碍支持(ARIA、键盘导航、错误提示关联)
  • 每个 input 有 label/aria-describedby 错误提示关联;aria-invalid 标示错误状态;对异步状态使用 aria-live。
  • 步骤容器使用 aria-label 或 aria-labelledby,按键导航保留默认 Enter 提交行为。
  • 验证失败后聚焦第一个错误字段:trigger 后取 errors 的第一个 key 并调用 document.getElementById(...).focus()。

示例:聚焦第一个错误字段

// utils/focusFirstError.ts
import { FieldErrors } from "react-hook-form";
export function focusFirstError(errors: FieldErrors) {
  const firstKey = Object.keys(errors)[0];
  if (!firstKey) return;
  const el = document.querySelector(`[name="${firstKey}"]`) as HTMLElement | null;
  el?.focus();
}
  1. 防抖 + AbortController 处理用户名查重
  • watch username 字段;300ms 防抖;每次新请求取消旧请求;仅当长度满足最小要求时触发。
  • 根据结果 setError 或 clearErrors。

示例:useUsernameCheck Hook

// hooks/useUsernameCheck.ts
import { useEffect, useRef } from "react";
import { UseFormSetError, UseFormClearErrors, UseFormWatch } from "react-hook-form";

export function useUsernameCheck(watch: UseFormWatch<any>, setError: UseFormSetError<any>, clearErrors: UseFormClearErrors<any>) {
  const timerRef = useRef<number | null>(null);
  const abortRef = useRef<AbortController | null>(null);

  useEffect(() => {
    const subscription = watch((value, { name }) => {
      if (name !== "username") return;
      const username = value.username;
      if (!username || username.length < 3) {
        clearErrors("username");
        return;
      }
      if (timerRef.current) window.clearTimeout(timerRef.current);
      timerRef.current = window.setTimeout(async () => {
        abortRef.current?.abort();
        const ac = new AbortController();
        abortRef.current = ac;
        try {
          const res = await fetch(`/api/username/check?u=${encodeURIComponent(username)}`, { signal: ac.signal });
          const data = await res.json();
          if (data.taken) {
            setError("username", { type: "validate", message: "用户名已被占用" });
          } else {
            clearErrors("username");
          }
        } catch (e: any) {
          if (e.name !== "AbortError") {
            setError("username", { type: "validate", message: "检查失败,请稍后重试" });
          }
        }
      }, 300);
    });
    return () => subscription.unsubscribe();
  }, [watch, setError, clearErrors]);
}

在步骤组件中使用:

// steps/StepAccount.tsx
import { useFormContext } from "react-hook-form";
import { useUsernameCheck } from "../hooks/useUsernameCheck";
import { FormField } from "../components/FormField";

export default function StepAccount({ onNext }: { onNext: () => void }) {
  const { watch, setError, clearErrors } = useFormContext();
  useUsernameCheck(watch, setError, clearErrors);

  return (
    <section aria-label="账号设置">
      <FormField name="email" label="邮箱" type="email" autoComplete="email" />
      <FormField name="username" label="用户名" />
      <FormField name="password" label="密码" type="password" autoComplete="new-password" />
      <FormField name="confirmPassword" label="确认密码" type="password" autoComplete="new-password" />
      <div>
        <input id="agree" type="checkbox" {...(useFormContext().register("agree"))} aria-describedby="agree-desc" />
        <label htmlFor="agree">我已阅读并同意</label>
        <div id="agree-desc" className="hint">勾选后可继续</div>
      </div>
      <button type="button" onClick={onNext}>下一步</button>
    </section>
  );
}
  1. 防止 XSS/CSRF、掩码输入与本地化(i18n)
  • XSS:默认不使用 dangerouslySetInnerHTML;若需渲染用户内容,使用 DOMPurify;后端设置 Content-Security-Policy,前端对富文本来源可信度校验。
  • CSRF:后端使用 SameSite=Lax/Strict Cookie,前端提交附带 CSRF token 头;对非幂等请求使用 POST。
  • 输入掩码:电话、生日等用 react-input-mask;结合 Zod 校验原始值。
  • i18n:使用 i18next,错误消息来自 Zod 或自定义 errorMap,再通过 t() 翻译;labels、placeholders 与 aria-label 皆国际化。

示例:安全展示与 i18n

// utils/safeHtml.ts
import DOMPurify from "dompurify";
export function safeHtml(html: string) {
  return { __html: DOMPurify.sanitize(html) };
}

// i18n 使用
import { useTranslation } from "react-i18next";
function LabelExample() {
  const { t } = useTranslation();
  return <label>{t("form.email")}</label>;
}

CSRF fetch 包装器

// utils/fetchWithCsrf.ts
export async function fetchWithCsrf(input: RequestInfo, init: RequestInit = {}) {
  const token = (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "";
  const headers = new Headers(init.headers || {});
  if (token) headers.set("X-CSRF-Token", token);
  return fetch(input, { ...init, headers, credentials: "include" });
}
  1. 代码分割与懒加载
  • 每一步懒加载;在进入当前步后预加载下一步以减少等待。
  • Suspense fallback 提供 aria-busy。
import { useEffect } from "react";
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
useEffect(() => {
  // 预加载下一步
  import("./steps/StepPreferences");
}, []);
  1. 单元测试(Vitest)与端到端测试(Playwright)
  • 单元测试:Zod schema 校验;组件渲染与错误提示;用户名查重防抖逻辑。
  • E2E:模拟用户名检查接口;逐步填写表单;上传文件;验证 aria 属性与错误提示。

Vitest 示例:schema

// tests/registration.schema.test.ts
import { describe, it, expect } from "vitest";
import { StepAccountSchema } from "../shared/schemas/registration";

describe("StepAccountSchema", () => {
  it("rejects weak password", () => {
    const res = StepAccountSchema.safeParse({
      email: "a@b.com",
      username: "user123",
      password: "weak",
      confirmPassword: "weak",
      agree: true,
    });
    expect(res.success).toBe(false);
  });
  it("accepts valid data", () => {
    const res = StepAccountSchema.safeParse({
      email: "a@b.com",
      username: "user_123",
      password: "Strong#123",
      confirmPassword: "Strong#123",
      agree: true,
    });
    expect(res.success).toBe(true);
  });
});

Vitest + Testing Library:组件错误渲染

// tests/StepAccount.test.tsx
import { render, screen } from "@testing-library/react";
import { FormProvider, useForm } from "react-hook-form";
import StepAccount from "../src/steps/StepAccount";

function Wrapper() {
  const methods = useForm({ defaultValues: {} });
  return (
    <FormProvider {...methods}>
      <StepAccount onNext={() => {}} />
    </FormProvider>
  );
}
test("renders labels", () => {
  render(<Wrapper />);
  expect(screen.getByLabelText("邮箱")).toBeInTheDocument();
});

Playwright E2E 示例

// e2e/register.spec.ts
import { test, expect } from "@playwright/test";

test("multi-step registration", async ({ page }) => {
  await page.route("/api/username/check", (route) => {
    const url = new URL(route.request().url());
    const u = url.searchParams.get("u");
    route.fulfill({ json: { taken: u === "taken_user" } });
  });

  await page.goto("/register");
  await page.getByLabel("邮箱").fill("user@example.com");
  await page.getByLabel("用户名").fill("taken_user");
  await page.waitForSelector("text=用户名已被占用");

  await page.getByLabel("用户名").fill("newuser");
  await page.getByLabel("密码").fill("Strong#123");
  await page.getByLabel("确认密码").fill("Strong#123");
  await page.getByLabel("我已阅读并同意").check();
  await page.getByRole("button", { name: "下一步" }).click();

  await page.getByLabel("账户类型").selectOption("company");
  await page.getByLabel("公司名称").fill("Acme Inc.");
  await page.getByRole("button", { name: "下一步" }).click();

  await page.getByRole("button", { name: "下一步" }).click(); // 偏好
  await page.setInputFiles('input[type="file"][accept="image/png,image/jpeg"]', 'tests/fixtures/avatar.png');
  await page.setInputFiles('input[type="file"][accept="application/pdf"]', 'tests/fixtures/id.pdf');
  await page.getByRole("button", { name: "下一步" }).click();

  await expect(page.locator("text=审核并提交")).toBeVisible();
});
  1. 可复用提交流程与错误边界模板
  • 提交流程:先全量校验,再并发上传文件与提交元数据;失败时设置 form-level 错误并滚动聚焦。
  • 错误边界:捕获渲染异常,提供重试按钮与日志上报。

提交模板

// steps/Review.tsx
import { useFormContext } from "react-hook-form";
import { RegistrationSchema, type RegistrationData } from "../../shared/schemas/registration";
import { fetchWithCsrf } from "../utils/fetchWithCsrf";

export default function Review({ onBack }: { onBack: () => void }) {
  const { getValues, trigger, setError } = useFormContext<RegistrationData>();

  const submit = async () => {
    const ok = await trigger(undefined, { shouldFocus: true }); // 全量校验
    if (!ok) return;
    const data = getValues();
    try {
      const form = new FormData();
      for (const [k, v] of Object.entries(data)) {
        if (v instanceof File) form.append(k, v);
        else form.append(k, String(v ?? ""));
      }
      const res = await fetchWithCsrf("/api/register", { method: "POST", body: form });
      if (!res.ok) throw new Error("注册失败");
      // 成功处理
      alert("注册成功");
    } catch (e: any) {
      setError("root", { type: "server", message: e.message || "服务器错误" });
      const el = document.querySelector('[role="alert"]') as HTMLElement | null;
      el?.focus();
    }
  };

  return (
    <section aria-label="审核并提交">
      <div role="alert" tabIndex={-1}>请确认信息后提交</div>
      <div className="nav">
        <button type="button" onClick={onBack}>上一步</button>
        <button type="button" onClick={submit}>提交</button>
      </div>
    </section>
  );
}

错误边界

// components/ErrorBoundary.tsx
import { Component, ReactNode } from "react";

export class ErrorBoundary extends Component<{ children: ReactNode }, { hasError: boolean; message?: string }> {
  constructor(props: any) {
    super(props);
    this.state = { hasError: false };
  }
  static getDerivedStateFromError(error: Error) {
    return { hasError: true, message: error.message };
  }
  componentDidCatch(error: Error, info: any) {
    // TODO: 上报日志
    console.error(error, info);
  }
  render() {
    if (this.state.hasError) {
      return (
        <div role="alert">
          出现错误:{this.state.message}
          <button onClick={() => this.setState({ hasError: false })}>重试</button>
        </div>
      );
    }
    return this.props.children;
  }
}

实践提示

  • 在 useForm 中开启 criteriaMode: "all" 可显示多条错误消息,但注意可读性。
  • 文件上传建议延迟到最终提交以简化回滚,或采用 S3 pre-signed URL 直传。
  • 对动态步骤的 resolver 使用单步 schema,加快校验与避免无关错误。
  • 预加载下一步组件与文案资源(i18n)以提升体验。
  • 键盘支持:确保按钮可通过 Tab 焦点访问;为自定义控件(比如 masked input)确保转发原生 input 的可访问属性。
  • CSP 示例:Content-Security-Policy: default-src 'self'; img-src 'self' data:; script-src 'self'; style-src 'self' 'unsafe-inline'; connect-src 'self';
  • 国际化错误:可通过 zod.setErrorMap((issue, ctx) => ({ message: t(validation.${issue.code}) })) 接入 i18n。

以上模板可直接作为骨架投入项目并根据业务扩展。通过 Zod 实现类型与校验统一,React Hook Form 管控状态与性能,无障碍与安全措施贯穿始终,再结合测试与代码分割确保质量与可维护性。

以下内容按“最佳实践 + 小型代码示例”的风格,给出一个用 Go 实现的订单微服务,REST 与 gRPC 同时暴露,涵盖分层架构、仓储模式、pgx 连接池与重试、幂等与并发控制、熔断与退避、可观测性(OpenTelemetry/Prometheus/zap)、优雅停机/健康探针、表驱动测试与 testcontainers 等关键点。示例代码尽量精简,能作为最小可运行骨架进行扩展。

一、分层架构与仓储模式(context 贯穿)

  • 分层建议:transport(HTTP/gRPC) -> service(业务用例) -> repository(数据访问) -> db。
  • 所有公开方法第一个参数使用 context.Context,传播超时、取消、tracing、日志字段等。
  • Repository 隔离持久化细节,service 仅关心业务流程;transport 只做编解码、验证、鉴权。

示例领域模型与接口

// internal/domain/order.go
package domain

import "time"

type Order struct {
    ID          string
    DedupKey    string // 幂等去重键(外部传入,需唯一约束)
    UserID      string
    AmountCents int64
    Status      string // created, paid, failed...
    Version     int64  // 乐观锁版本
    CreatedAt   time.Time
    UpdatedAt   time.Time
}

// 仓储接口
type OrderRepository interface {
    Create(ctx context.Context, o Order) (Order, error)               // 幂等:基于 DedupKey
    Get(ctx context.Context, id string) (Order, error)
    UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (Order, error)
}

二、PostgreSQL(pgx)连接池、超时与重试

  • 使用 pgxpool,统一初始化连接池,设置最大连接、生命周期、acquire timeout。
  • 在 service 层设置合理的超时(如 200-500ms 查询、1-2s 写入),在 repository 直接使用传入的 ctx。
  • 对可重试的瞬时错误(序列化失败 40001、死锁 40P01、连接重置等)做有限次重试 + 指数退避。

示例:pgx 连接与重试

// internal/db/pg.go
package db

import (
    "context"
    "time"
    "errors"

    "github.com/cenkalti/backoff/v4"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgconn"
)

func NewPool(ctx context.Context, url string) (*pgxpool.Pool, error) {
    cfg, err := pgxpool.ParseConfig(url)
    if err != nil { return nil, err }
    cfg.MaxConns = 20
    cfg.MinConns = 2
    cfg.MaxConnLifetime = time.Hour
    cfg.MaxConnIdleTime = 10 * time.Minute
    cfg.HealthCheckPeriod = 30 * time.Second
    cfg.ConnConfig.ConnectTimeout = 5 * time.Second
    return pgxpool.NewWithConfig(ctx, cfg)
}

func IsRetryablePgErr(err error) bool {
    var pgErr *pgconn.PgError
    if errors.As(err, &pgErr) {
        switch pgErr.Code {
        case "40001", "40P01": // serialization_failure, deadlock_detected
            return true
        }
    }
    // 网络层或临时 IO
    return errors.Is(err, pgx.ErrNoRows) == false && pgconn.SafeToRetry(err)
}

func WithTxRetry(ctx context.Context, pool *pgxpool.Pool, isoLevel pgx.TxIsoLevel, fn func(pgx.Tx) error) error {
    bo := backoff.NewExponentialBackOff()
    bo.InitialInterval = 20 * time.Millisecond
    bo.MaxInterval = 200 * time.Millisecond
    bo.MaxElapsedTime = 1 * time.Second

    operation := func() error {
        tx, err := pool.BeginTx(ctx, pgx.TxOptions{IsoLevel: isoLevel})
        if err != nil { return backoff.Permanent(err) }
        defer tx.Rollback(ctx)

        if err := fn(tx); err != nil {
            if IsRetryablePgErr(err) {
                return err // 触发 backoff 重试
            }
            return backoff.Permanent(err)
        }
        return tx.Commit(ctx)
    }
    return backoff.Retry(operation, bo)
}

三、幂等设计、去重键与悲观/乐观锁

  • 去重键:对外暴露 dedup_key,orders 表建立唯一约束,插入使用 INSERT ... ON CONFLICT。
  • 乐观锁:更新时带 version 条件,更新成功返回新版本,否则返回冲突错误。
  • 悲观锁:需要串行化修改的行使用 SELECT ... FOR UPDATE/UPDATE ... RETURNING,以及 SKIP LOCKED 支持并发 worker。
  • 幂等设计注意:去重键的语义稳定,建立唯一索引;在服务重试或网络抖动下可安全重放。

示例:仓储实现(幂等 + 乐观锁)

// internal/repo/order_repo.go
package repo

import (
    "context"
    "errors"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "yourmod/internal/domain"
    "yourmod/internal/db"
    "time"
)

var ErrConflict = errors.New("conflict")

type OrderRepo struct {
    pool *pgxpool.Pool
}

func NewOrderRepo(pool *pgxpool.Pool) *OrderRepo { return &OrderRepo{pool: pool} }

// SQL schema 要有:
// create table orders (
//   id uuid primary key default gen_random_uuid(),
//   dedup_key text not null unique,
//   user_id text not null,
//   amount_cents bigint not null,
//   status text not null,
//   version bigint not null default 1,
//   created_at timestamptz not null default now(),
//   updated_at timestamptz not null default now()
// );

func (r *OrderRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
    var out domain.Order
    q := `
    insert into orders (dedup_key, user_id, amount_cents, status)
    values ($1,$2,$3,'created')
    on conflict (dedup_key) do update set updated_at = now()
    returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
    `
    err := db.WithTxRetry(ctx, r.pool, pgx.Serializable, func(tx pgx.Tx) error {
        return tx.QueryRow(ctx, q, o.DedupKey, o.UserID, o.AmountCents).
            Scan(&out.ID, &out.DedupKey, &out.UserID, &out.AmountCents, &out.Status, &out.Version, &out.CreatedAt, &out.UpdatedAt)
    })
    return out, err
}

func (r *OrderRepo) Get(ctx context.Context, id string) (domain.Order, error) {
    var o domain.Order
    q := `select id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at from orders where id=$1`
    err := r.pool.QueryRow(ctx, q, id).Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
    return o, err
}

func (r *OrderRepo) UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (domain.Order, error) {
    var o domain.Order
    q := `
    update orders
       set status=$3, version=version+1, updated_at=now()
     where id=$1 and version=$2
     returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
    `
    cmd, err := r.pool.Query(ctx, q, id, fromVersion, newStatus)
    if err != nil { return o, err }
    defer cmd.Close()
    if !cmd.Next() { return o, ErrConflict }
    err = cmd.Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
    return o, err
}

// 悲观锁示意(库存/队列工作项):SELECT ... FOR UPDATE SKIP LOCKED
// tx.Query(ctx, `select id from jobs where status='ready' for update skip locked limit 1`)

四、熔断与回退策略,外部调用重试退避

  • 统一外部调用客户端,加入重试(幂等操作)、指数退避、超时;添加熔断器保护下游。
  • 回退策略:熔断打开时快速失败并采用降级方案(例如记录待支付任务,异步补偿)。

示例:带熔断与退避的支付客户端

// internal/ext/pay_client.go
package ext

import (
    "context"
    "errors"
    "net/http"
    "time"

    "github.com/sony/gobreaker"
    "github.com/cenkalti/backoff/v4"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type PayClient struct {
    httpc   *http.Client
    breaker *gobreaker.CircuitBreaker
}

func NewPayClient() *PayClient {
    st := gobreaker.Settings{
        Name:        "payment",
        MaxRequests: 5,
        Interval:    30 * time.Second,
        Timeout:     10 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures >= 5
        },
    }
    return &PayClient{
        httpc: &http.Client{
            Timeout: 2 * time.Second,
            Transport: otelhttp.NewTransport(http.DefaultTransport),
        },
        breaker: gobreaker.NewCircuitBreaker(st),
    }
}

func (c *PayClient) Authorize(ctx context.Context, orderID string, amount int64) error {
    op := func() error {
        _, err := c.breaker.Execute(func() (interface{}, error) {
            req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://payment/api/authorize", nil)
            resp, err := c.httpc.Do(req)
            if err != nil { return nil, err }
            defer resp.Body.Close()
            if resp.StatusCode >= 500 { return nil, errors.New("payment 5xx") }
            if resp.StatusCode >= 400 { return nil, backoff.Permanent(errors.New("payment 4xx")) }
            return nil, nil
        })
        return err
    }
    bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
    return backoff.Retry(op, bo)
}

五、OpenTelemetry 分布式追踪、Prometheus 指标与结构化日志(zap)

  • 初始化 OTel:OTLP 导出器 -> Collector;HTTP 使用 otelhttp、gRPC 使用 otelgrpc 拦截器自动注入 trace。
  • Prometheus:暴露 /metrics,添加自定义业务指标(订单创建计数、延迟直方图等)。
  • 日志:zap 生产级 JSON,带 trace_id、span_id、request_id 等;不同层使用 context 传播。

示例:可观测性初始化与采样指标

// internal/observ/otel.go
package observ

import (
    "context"
    "time"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    "go.opentelemetry.io/otel/sdk/resource"
    "go.opentelemetry.io/otel/propagation"
    semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func InitOTel(ctx context.Context, service string) (func(context.Context) error, error) {
    exp, err := otlptracehttp.New(ctx) // 读 OTEL_EXPORTER_OTLP_ENDPOINT 等环境变量
    if err != nil { return nil, err }
    res, _ := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(service)))
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exp),
        sdktrace.WithResource(res),
        sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.2))),
    )
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.TraceContext{})
    return tp.Shutdown, nil
}

// internal/observ/metrics.go
package observ

import "github.com/prometheus/client_golang/prometheus"

var (
    OrdersCreated = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "orders_created_total", Help: "Total created orders",
    })
    OrderCreateLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name: "order_create_seconds", Help: "Latency of order creation", Buckets: prometheus.DefBuckets,
    })
)

func MustRegisterMetrics() {
    prometheus.MustRegister(OrdersCreated, OrderCreateLatency)
}

// internal/observ/logging.go
package observ

import (
    "go.uber.org/zap"
    "context"
    "go.opentelemetry.io/otel/trace"
)

func NewLogger() *zap.Logger {
    l, _ := zap.NewProduction()
    return l
}

func WithTraceFields(ctx context.Context, l *zap.Logger) *zap.Logger {
    span := trace.SpanFromContext(ctx)
    sc := span.SpanContext()
    if sc.HasTraceID() {
        l = l.With(zap.String("trace_id", sc.TraceID().String()), zap.String("span_id", sc.SpanID().String()))
    }
    return l
}

六、服务层与路由(REST 与 gRPC)

  • REST 选择 chi/gin,gRPC 使用官方 server,二者共享同一 service。
  • HTTP 路由增加 otelhttp 和请求日志中间件;gRPC 加 otelgrpc 拦截器和健康服务。
  • 健康检查与就绪探针:/healthz(活性),/readyz(检查 DB Ping)。

proto(最小化)

// api/order/v1/order.proto
syntax = "proto3";
package order.v1;
option go_package = "yourmod/api/order/v1;orderv1";

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
  rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
}

message CreateOrderRequest { string dedup_key = 1; string user_id = 2; int64 amount_cents = 3; }
message CreateOrderResponse { string order_id = 1; }
message GetOrderRequest { string order_id = 1; }
message GetOrderResponse { string order_id = 1; string status = 2; int64 amount_cents = 3; }

service 与 transport 示例

// internal/service/order_service.go
package service

import (
    "context"
    "time"
    "yourmod/internal/domain"
    "yourmod/internal/repo"
    "yourmod/internal/ext"
    "yourmod/internal/observ"
    "go.uber.org/zap"
)

type OrderService struct {
    repo repo.OrderRepo
    pay  *ext.PayClient
    log  *zap.Logger
}

func NewOrderService(r *repo.OrderRepo, p *ext.PayClient, l *zap.Logger) *OrderService {
    return &OrderService{repo: *r, pay: p, log: l}
}

func (s *OrderService) Create(ctx context.Context, dedupKey, userID string, amount int64) (domain.Order, error) {
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()

    start := time.Now()
    o, err := s.repo.Create(ctx, domain.Order{DedupKey: dedupKey, UserID: userID, AmountCents: amount})
    observ.OrderCreateLatency.Observe(time.Since(start).Seconds())
    if err != nil { return o, err }
    observ.OrdersCreated.Inc()

    log := observ.WithTraceFields(ctx, s.log)
    // 外部支付授权(演示可选)
    if err := s.pay.Authorize(ctx, o.ID, o.AmountCents); err != nil {
        log.Warn("payment authorize failed, fallback", zap.Error(err))
        // 回退方案:标记待支付状态,或写出补偿消息
    }
    return o, nil
}

func (s *OrderService) Get(ctx context.Context, id string) (domain.Order, error) {
    ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
    defer cancel()
    return s.repo.Get(ctx, id)
}
// internal/transport/http/router.go
package httpapi

import (
    "encoding/json"
    "net/http"

    "github.com/go-chi/chi/v5"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "yourmod/internal/service"
    "yourmod/internal/observ"
    "github.com/jackc/pgx/v5/pgxpool"
)

func NewRouter(svc *service.OrderService, pool *pgxpool.Pool) http.Handler {
    r := chi.NewRouter()
    r.Use(otelhttp.NewMiddleware("http"))
    r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
    r.Get("/readyz", func(w http.ResponseWriter, r *http.Request) {
        if err := pool.Ping(r.Context()); err != nil { http.Error(w, "not ready", 503); return }
        w.WriteHeader(http.StatusOK)
    })
    r.Handle("/metrics", promHandler()) // 见下

    r.Post("/orders", func(w http.ResponseWriter, r *http.Request) {
        var req struct {
            DedupKey string `json:"dedup_key"`
            UserID   string `json:"user_id"`
            Amount   int64  `json:"amount_cents"`
        }
        if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
            http.Error(w, err.Error(), 400); return
        }
        o, err := svc.Create(r.Context(), req.DedupKey, req.UserID, req.Amount)
        if err != nil { http.Error(w, err.Error(), 500); return }
        _ = json.NewEncoder(w).Encode(map[string]string{"order_id": o.ID})
    })
    r.Get("/orders/{id}", func(w http.ResponseWriter, r *http.Request) {
        id := chi.URLParam(r, "id")
        o, err := svc.Get(r.Context(), id)
        if err != nil { http.Error(w, err.Error(), 404); return }
        _ = json.NewEncoder(w).Encode(o)
    })
    return r
}

func promHandler() http.Handler {
    observ.MustRegisterMetrics()
    return promhttp.Handler()
}
// internal/transport/grpc/server.go
package grpcapi

import (
    "context"
    "yourmod/api/order/v1"
    "yourmod/internal/service"

    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    "google.golang.org/grpc"
    "google.golang.org/grpc/health"
    healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

type Server struct {
    orderv1.UnimplementedOrderServiceServer
    svc *service.OrderService
}

func NewServer(s *service.OrderService) *Server { return &Server{svc: s} }

func (s *Server) CreateOrder(ctx context.Context, req *orderv1.CreateOrderRequest) (*orderv1.CreateOrderResponse, error) {
    o, err := s.svc.Create(ctx, req.DedupKey, req.UserId, req.AmountCents)
    if err != nil { return nil, err }
    return &orderv1.CreateOrderResponse{OrderId: o.ID}, nil
}

func (s *Server) GetOrder(ctx context.Context, req *orderv1.GetOrderRequest) (*orderv1.GetOrderResponse, error) {
    o, err := s.svc.Get(ctx, req.OrderId)
    if err != nil { return nil, err }
    return &orderv1.GetOrderResponse{OrderId: o.ID, Status: o.Status, AmountCents: o.AmountCents}, nil
}

func NewGRPCServer(s *service.OrderService) *grpc.Server {
    gs := grpc.NewServer(
        grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
        grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
    )
    orderv1.RegisterOrderServiceServer(gs, NewServer(s))
    healthSrv := health.NewServer()
    healthpb.RegisterHealthServer(gs, healthSrv)
    return gs
}

七、优雅停机、健康检查与就绪探针

  • 多 server 并行启动(HTTP、gRPC、metrics),使用 errgroup + context 管理。
  • 捕获 SIGINT/SIGTERM,调用 HTTP Server.Shutdown、gRPC GracefulStop、关闭 pg 池。
  • K8s:livenessProbe 指向 /healthz;readinessProbe 指向 /readyz;gRPC 可用 grpc-health-probe。

最小可运行 main

// cmd/ordersvc/main.go
package main

import (
    "context"
    "net"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/sync/errgroup"
    "go.uber.org/zap"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

    "yourmod/internal/db"
    "yourmod/internal/observ"
    "yourmod/internal/repo"
    "yourmod/internal/service"
    httpapi "yourmod/internal/transport/http"
    grpcapi "yourmod/internal/transport/grpc"
    "yourmod/internal/ext"
)

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    log := observ.NewLogger()
    defer log.Sync()
    shutdownOTel, err := observ.InitOTel(ctx, "ordersvc")
    if err != nil { log.Fatal("otel init", zap.Error(err)) }
    defer shutdownOTel(context.Background())

    pool, err := db.NewPool(ctx, os.Getenv("DATABASE_URL"))
    if err != nil { log.Fatal("db", zap.Error(err)) }
    defer pool.Close()

    pay := NewPayClient()
    repo := repo.NewOrderRepo(pool)
    svc := service.NewOrderService(repo, pay, log)

    httpSrv := &http.Server{
        Addr:    ":8080",
        Handler: httpapi.NewRouter(svc, pool),
        ReadHeaderTimeout: 5 * time.Second,
    }
    grpcSrv := grpcapi.NewGRPCServer(svc)

    g, ctx := errgroup.WithContext(ctx)
    // HTTP
    g.Go(func() error {
        log.Info("http listening", zap.String("addr", httpSrv.Addr))
        if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { return err }
        return nil
    })
    // gRPC
    g.Go(func() error {
        lis, err := net.Listen("tcp", ":9090")
        if err != nil { return err }
        log.Info("grpc listening", zap.String("addr", ":9090"))
        return grpcSrv.Serve(lis)
    })

    // 等待信号
    g.Go(func() error {
        <-ctx.Done()
        c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        _ = httpSrv.Shutdown(c)
        grpcSrv.GracefulStop()
        return nil
    })

    if err := g.Wait(); err != nil {
        log.Error("server exit", zap.Error(err))
    }
}

八、表驱动测试与集成测试(testcontainers)

  • 表驱动测试:同一用例逻辑,不同输入/预期,代码简洁可读。
  • 集成测试:用 testcontainers 启动临时 Postgres,运行 schema/migrations,执行仓储/服务真实调用,保证幂等与事务语义。

表驱动示例

// internal/service/order_service_test.go
package service_test

import (
    "context"
    "testing"
    "yourmod/internal/service"
    "yourmod/internal/domain"
)

type stubRepo struct{ created map[string]domain.Order }
func (s *stubRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
    if s.created == nil { s.created = map[string]domain.Order{} }
    if existing, ok := s.created[o.DedupKey]; ok { return existing, nil }
    o.ID = "id-" + o.DedupKey; s.created[o.DedupKey] = o; return o, nil
}
// implement other methods ...

func TestCreate_Idempotent(t *testing.T) {
    r := &stubRepo{}
    svc := service.NewOrderService((*repo.OrderRepo)(r), nil, zap.NewNop())
    ctx := context.Background()

    cases := []struct{
        dedup string; wantID string
    }{
        {"k1", "id-k1"},
        {"k1", "id-k1"},
    }
    var first string
    for i, c := range cases {
        o, err := svc.Create(ctx, c.dedup, "u", 100)
        if err != nil { t.Fatal(err) }
        if i == 0 { first = o.ID }
        if o.ID != first { t.Fatalf("idempotency broken: %s != %s", o.ID, first) }
    }
}

testcontainers 集成测试示例(简化)

// internal/repo/order_repo_integration_test.go
package repo_test

import (
    "context"
    "testing"
    "time"
    "fmt"

    tc "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
    "github.com/jackc/pgx/v5/pgxpool"
    "yourmod/internal/db"
    "yourmod/internal/repo"
    "yourmod/internal/domain"
)

func startPostgres(t *testing.T) (*pgxpool.Pool, func()) {
    ctx := context.Background()
    req := tc.ContainerRequest{
        Image:        "postgres:16-alpine",
        Env:          map[string]string{"POSTGRES_PASSWORD": "pw", "POSTGRES_DB":"test"},
        ExposedPorts: []string{"5432/tcp"},
        WaitingFor:   wait.ForListeningPort("5432/tcp").WithStartupTimeout(30*time.Second),
    }
    pgC, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ContainerRequest: req, Started: true})
    if err != nil { t.Fatal(err) }

    host, _ := pgC.Host(ctx)
    port, _ := pgC.MappedPort(ctx, "5432/tcp")
    url := fmt.Sprintf("postgres://postgres:pw@%s:%s/test?sslmode=disable", host, port.Port())

    pool, err := db.NewPool(ctx, url)
    if err != nil { t.Fatal(err) }

    schema := `
    create extension if not exists pgcrypto;
    create table if not exists orders(
      id uuid primary key default gen_random_uuid(),
      dedup_key text not null unique,
      user_id text not null,
      amount_cents bigint not null,
      status text not null,
      version bigint not null default 1,
      created_at timestamptz not null default now(),
      updated_at timestamptz not null default now()
    );`
    if _, err := pool.Exec(ctx, schema); err != nil { t.Fatal(err) }

    cleanup := func() {
        pool.Close()
        _ = pgC.Terminate(ctx)
    }
    return pool, cleanup
}

func TestRepo_Create_Idempotent(t *testing.T) {
    pool, cleanup := startPostgres(t)
    defer cleanup()

    r := repo.NewOrderRepo(pool)
    ctx := context.Background()

    o1, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
    if err != nil { t.Fatal(err) }
    o2, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
    if err != nil { t.Fatal(err) }
    if o1.ID != o2.ID { t.Fatalf("expect same id, got %s vs %s", o1.ID, o2.ID) }
}

九、Makefile 与容器化建议

  • Makefile 目标:build/test/run/lint/proto/docker-build。
  • Dockerfile:多阶段构建,非 root 用户,最小基础镜像(distroless/alpine),设置健康检查,合理暴露端口(HTTP 8080,gRPC 9090,/metrics 同 HTTP)。
  • 通过环境变量配置:DATABASE_URL、OTEL_EXPORTER_OTLP_ENDPOINT、LOG_LEVEL 等。

Makefile(简化)

APP=ordersvc
PKG=./...
BIN=./bin/$(APP)

build:
	go build -o $(BIN) ./cmd/ordersvc

test:
	go test $(PKG) -count=1

run:
	DATABASE_URL=postgres://user:pw@localhost:5432/db?sslmode=disable go run ./cmd/ordersvc

proto:
	protoc --go_out=. --go-grpc_out=. api/order/v1/*.proto

docker-build:
	docker build -t $(APP):latest .

Dockerfile(简化)

# build
FROM golang:1.22 AS builder
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/ordersvc ./cmd/ordersvc

# runtime
FROM gcr.io/distroless/base-debian12
USER nonroot:nonroot
WORKDIR /app
COPY --from=builder /out/ordersvc /app/ordersvc
EXPOSE 8080 9090
ENV GODEBUG=madvdontneed=1
ENTRYPOINT ["/app/ordersvc"]

K8s 探针建议

  • livenessProbe HTTP GET /healthz,periodSeconds: 10, timeoutSeconds: 2。
  • readinessProbe HTTP GET /readyz,initialDelaySeconds: 5,failureThreshold: 3。
  • gRPC 使用 grpc-health-probe 亦可。

十、其他实践建议

  • 输入校验:REST 使用 binding/validator,gRPC 使用 protoc-gen-validate。
  • 安全与合规:限制请求体大小、超时、最大并发(HTTP Server Read/Write Timeout,gRPC keepalive)、对外调用限流(token bucket)。
  • 迁移工具:goose 或 golang-migrate,CI 中执行迁移。
  • 配置管理:viper/envconfig,支持多环境;敏感信息用 Secret。
  • 性能与池化:避免全局变量污染,复用 http.Client、连接池;对热点路径加缓存(带过期与一致性策略)。
  • 灰度与回滚:版本化 API,兼容旧客户端;数据库变更遵循 expand/contract。

只要将以上骨架按模块放入 yourmod 项目中,即可得到一个最小可运行、可观测、可测试、可容器化的 Go 订单微服务,并可逐步扩展业务逻辑与非功能性需求。

示例详情

解决的问题

为用户提供在指定编程语言中完成特定任务的最佳实践指导,包括高效、安全、可维护的解决方案,并通过实际示例提升学习效果和实践应用能力。

适用用户

技术团队负责人

快速搭建标准开发流程,统一团队实践,减少沟通成本并提升团队开发效率。

资深开发者

在多语言环境下精准获取最佳实践作为参考,优化开发流程,提升交付质量。

编程新手

快速上手复杂开发任务,通过详细建议与示例学习优质的编程技巧。

特征总结

一键生成多种编程语言的最佳实践,适配技术团队多样需求。
基于任务类型智能推荐高效开发方法,快速解决实际问题。
自动提供安全性与可维护性优化建议,避免潜在技术风险。
附带实用的小型代码示例,助力开发者快速上手与理解。
支持灵活调用和定制,满足不同项目场景需求和规范。
兼顾上下文与工程规范,让输出更贴近实际的工作环境。
适用于新手与资深开发者,一站式获取任务解决方案。
高效对接多语言技术栈,提升团队协作效率。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥10.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 78 tokens
- 3 个可调节参数
{ 编程语言 } { 任务目标 } { 示例风格 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59