热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据用户指定的编程语言和具体开发任务,提供高效、安全、可维护的开发最佳实践,并配套简要说明和小型代码示例,帮助开发者快速理解并应用于实际项目中。
以下内容面向在 FastAPI 上实现完整鉴权与授权模块的工程实践,结合 JWT(访问令牌+刷新令牌)、Argon2id 密码哈希、RBAC/细粒度权限、Pydantic v2 校验、PostgreSQL+SQLAlchemy 2.x 异步、Redis 限流和刷新令牌黑名单、统一错误处理/结构化日志/OpenAPI 文档、pytest 单元与集成测试,并给出最小可运行示例。
一、总体设计与最佳实践要点
二、最小可运行示例(FastAPI + SQLAlchemy 2.x 异步 + Redis) 提示:
文件:app.py (运行:pip install "fastapi>=0.115" "uvicorn[standard]" "sqlalchemy[asyncpg]>=2.0" "pydantic>=2" "pydantic-settings>=2" "argon2-cffi" "PyJWT" "redis>=5" "structlog")
# app.py
import asyncio
import logging
import time
import uuid
from datetime import datetime, timedelta
from typing import Annotated, Optional, Sequence
import jwt # PyJWT
import structlog
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, EmailStr, Field, SecretStr, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from redis.asyncio import Redis
from sqlalchemy import ForeignKey, String, select, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# -------------------- Config --------------------
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", env_file_encoding="utf-8")
# App
app_name: str = "Auth Service"
environment: str = "dev"
# DB
db_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/appdb"
db_pool_size: int = 5
db_max_overflow: int = 5
db_pool_timeout: int = 30
db_pool_recycle: int = 1800
# Redis
redis_url: str = "redis://localhost:6379/0"
rate_limit_login_per_minute: int = 5
# JWT
jwt_secret: str = "CHANGE_ME_IN_PROD" # 生产从 KMS/Secrets Manager 加载
jwt_kid: str = "kid-1"
access_token_ttl_minutes: int = 15
refresh_token_ttl_days: int = 7
jwt_algorithm: str = "HS256"
jwt_issuer: str = "your-company"
jwt_audience: str = "internal-platform"
# Argon2id
argon_time_cost: int = 3
argon_memory_cost: int = 65536 # KiB = 64 MiB
argon_parallelism: int = 2
argon_hash_len: int = 32
argon_salt_len: int = 16
settings = Settings()
# -------------------- Logging --------------------
# 结构化日志:注入 request_id & user_id
logging.basicConfig(level=logging.INFO)
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
]
)
log = structlog.get_logger()
# -------------------- DB Setup --------------------
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
password_hash: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
last_login: Mapped[Optional[datetime]] = mapped_column(default=None)
roles: Mapped[Sequence["Role"]] = relationship(
secondary="user_roles", back_populates="users", lazy="selectin"
)
class Role(Base):
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
users: Mapped[Sequence[User]] = relationship(
secondary="user_roles", back_populates="roles", lazy="selectin"
)
permissions: Mapped[Sequence["Permission"]] = relationship(
secondary="role_permissions", back_populates="roles", lazy="selectin"
)
class Permission(Base):
__tablename__ = "permissions"
id: Mapped[int] = mapped_column(primary_key=True)
code: Mapped[str] = mapped_column(String(64), unique=True, index=True)
roles: Mapped[Sequence[Role]] = relationship(
secondary="role_permissions", back_populates="permissions", lazy="selectin"
)
class UserRole(Base):
__tablename__ = "user_roles"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
class RolePermission(Base):
__tablename__ = "role_permissions"
role_id: Mapped[int] = mapped_column(ForeignKey("roles.id"), primary_key=True)
permission_id: Mapped[int] = mapped_column(ForeignKey("permissions.id"), primary_key=True)
engine: AsyncEngine = create_async_engine(
settings.db_url,
pool_size=settings.db_pool_size,
max_overflow=settings.db_max_overflow,
pool_timeout=settings.db_pool_timeout,
pool_recycle=settings.db_pool_recycle,
echo=False,
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_session() -> AsyncSession:
async with SessionLocal() as session:
yield session # 请求结束自动关闭
# -------------------- Redis --------------------
redis: Redis | None = None
async def get_redis() -> Redis:
assert redis is not None
return redis
# -------------------- Security --------------------
from argon2 import PasswordHasher
ph = PasswordHasher(
time_cost=settings.argon_time_cost,
memory_cost=settings.argon_memory_cost,
parallelism=settings.argon_parallelism,
hash_len=settings.argon_hash_len,
salt_len=settings.argon_salt_len,
)
def hash_password(password: str) -> str:
return ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
try:
return ph.verify(hashed, password)
except Exception:
return False
def now_ts() -> int:
return int(time.time())
def create_access_token(sub: str, roles: list[str], perms: list[str]) -> str:
jti = str(uuid.uuid4())
payload = {
"sub": sub,
"jti": jti,
"type": "access",
"roles": roles,
"perms": perms,
"iat": now_ts(),
"nbf": now_ts(),
"exp": int((datetime.utcnow() + timedelta(minutes=settings.access_token_ttl_minutes)).timestamp()),
"iss": settings.jwt_issuer,
"aud": settings.jwt_audience,
}
headers = {"kid": settings.jwt_kid}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)
def create_refresh_token(sub: str) -> tuple[str, str, int]:
jti = str(uuid.uuid4())
exp = int((datetime.utcnow() + timedelta(days=settings.refresh_token_ttl_days)).timestamp())
payload = {
"sub": sub,
"jti": jti,
"type": "refresh",
"iat": now_ts(),
"nbf": now_ts(),
"exp": exp,
"iss": settings.jwt_issuer,
"aud": settings.jwt_audience,
}
headers = {"kid": settings.jwt_kid}
token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm, headers=headers)
return token, jti, exp
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") # Swagger 上用于授权按钮
# -------------------- Schemas (Pydantic v2) --------------------
class TokenPair(BaseModel):
access_token: str = Field(..., description="短时访问令牌")
refresh_token: str = Field(..., description="长时刷新令牌")
token_type: str = "bearer"
expires_in: int = Field(..., description="访问令牌剩余秒数")
class LoginRequest(BaseModel):
email: EmailStr
password: SecretStr
@field_validator("password")
@classmethod
def validate_password(cls, v: SecretStr):
s = v.get_secret_value()
if len(s) < 8 or not any(c.isdigit() for c in s) or not any(c.isalpha() for c in s):
raise ValueError("密码至少8位,包含字母与数字")
return v
class MeResponse(BaseModel):
id: int
email: EmailStr
roles: list[str]
permissions: list[str]
# -------------------- Error Handling --------------------
class AppError(Exception):
def __init__(self, message: str, code: str = "bad_request", status_code: int = 400, details: dict | None = None):
self.message = message
self.code = code
self.status_code = status_code
self.details = details or {}
app = FastAPI(title=settings.app_name)
@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError):
rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
return JSONResponse(
status_code=exc.status_code,
content={"code": exc.code, "message": exc.message, "details": exc.details, "request_id": rid},
)
@app.middleware("http")
async def log_middleware(request: Request, call_next):
rid = request.headers.get("X-Request-ID") or str(uuid.uuid4())
start = time.perf_counter()
try:
response = await call_next(request)
finally:
duration = (time.perf_counter() - start) * 1000
structlog.contextvars.clear_contextvars()
log.info("request",
request_id=rid, method=request.method, path=request.url.path,
status=getattr(response, "status_code", None), duration_ms=round(duration, 2))
return response
# -------------------- Auth Dependencies --------------------
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> tuple[User, list[str], list[str]]:
# 解析访问令牌
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
audience=settings.jwt_audience,
issuer=settings.jwt_issuer,
options={"require": ["exp", "sub", "type"], "verify_exp": True},
)
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="令牌无效或已过期")
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="访问令牌类型错误")
user_id = int(payload["sub"])
user = await session.get(User, user_id)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="用户不可用")
# 加载角色与权限(避免将权限完全信赖客户端令牌内容)
roles = [r.code for r in user.roles]
perm_codes: set[str] = set()
for r in user.roles:
for p in r.permissions:
perm_codes.add(p.code)
return user, roles, sorted(list(perm_codes))
def require_roles(required: list[str]):
async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
user, roles, perms = ctx
if not set(required).issubset(set(roles)):
raise HTTPException(status_code=403, detail="缺少角色")
return ctx
return dep
def require_permissions(required: list[str]):
async def dep(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
user, roles, perms = ctx
if not set(required).issubset(set(perms)):
raise HTTPException(status_code=403, detail="缺少权限")
return ctx
return dep
# -------------------- Rate Limiting & Blacklist --------------------
async def check_login_rate_limit(r: Redis, username: str, ip: str):
# 固定窗口:每分钟最多 N 次
key = f"rl:login:{username}:{ip}"
count = await r.incr(key)
if count == 1:
await r.expire(key, 60)
if count > settings.rate_limit_login_per_minute:
raise AppError("登录过于频繁,请稍后再试", code="too_many_requests", status_code=429)
async def blacklist_refresh_jti(r: Redis, jti: str, exp_ts: int):
ttl = max(exp_ts - now_ts(), 1)
await r.setex(f"rt:blacklist:{jti}", ttl, "1")
async def is_refresh_jti_blacklisted(r: Redis, jti: str) -> bool:
return await r.exists(f"rt:blacklist:{jti}") == 1
# -------------------- Routes --------------------
@app.post("/auth/login", response_model=TokenPair, tags=["auth"])
async def login(
req: LoginRequest,
request: Request,
session: Annotated[AsyncSession, Depends(get_session)],
r: Annotated[Redis, Depends(get_redis)],
):
ip = request.client.host or "unknown"
await check_login_rate_limit(r, req.email, ip)
# 查找用户(使用 ORM 防注入)
stmt = select(User).where(User.email == req.email)
user = (await session.execute(stmt)).scalar_one_or_none()
if not user or not verify_password(req.password.get_secret_value(), user.password_hash):
# 不泄露账号存在性
raise HTTPException(status_code=401, detail="用户名或密码错误")
if not user.is_active:
raise HTTPException(status_code=403, detail="账号已禁用")
# 加载角色与权限(懒加载已设置为 selectin)
roles = [r.code for r in user.roles]
perms = sorted(set(p.code for r in user.roles for p in r.permissions))
# 更新最后登录时间(事务)
async with session.begin():
user.last_login = datetime.utcnow()
access = create_access_token(str(user.id), roles, perms)
refresh, jti, exp_ts = create_refresh_token(str(user.id))
# 选项:可记录 session/设备;此处仅演示不持久化
expires_in = settings.access_token_ttl_minutes * 60
return TokenPair(access_token=access, refresh_token=refresh, expires_in=expires_in)
class RefreshRequest(BaseModel):
refresh_token: str = Field(..., description="刷新令牌")
@app.post("/auth/refresh", response_model=TokenPair, tags=["auth"])
async def refresh_tokens(
body: RefreshRequest,
session: Annotated[AsyncSession, Depends(get_session)],
r: Annotated[Redis, Depends(get_redis)],
):
# 验证刷新令牌
try:
payload = jwt.decode(
body.refresh_token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
audience=settings.jwt_audience,
issuer=settings.jwt_issuer,
options={"require": ["exp", "sub", "type"], "verify_exp": True},
)
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="刷新令牌无效或已过期")
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="令牌类型错误")
jti = payload["jti"]
if await is_refresh_jti_blacklisted(r, jti):
raise HTTPException(status_code=401, detail="刷新令牌已失效")
user = await session.get(User, int(payload["sub"]))
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="用户不可用")
roles = [r.code for r in user.roles]
perms = sorted(set(p.code for r in user.roles for p in r.permissions))
# 令牌轮换:作废旧刷新令牌
await blacklist_refresh_jti(r, jti, payload["exp"])
access = create_access_token(str(user.id), roles, perms)
new_refresh, new_jti, new_exp = create_refresh_token(str(user.id))
return TokenPair(access_token=access, refresh_token=new_refresh, expires_in=settings.access_token_ttl_minutes * 60)
class LogoutRequest(BaseModel):
refresh_token: str
@app.post("/auth/logout", tags=["auth"])
async def logout(body: LogoutRequest, r: Annotated[Redis, Depends(get_redis)]):
# 注销:将刷新令牌 jti 加入黑名单。访问令牌让其自然过期。
try:
payload = jwt.decode(
body.refresh_token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
audience=settings.jwt_audience,
issuer=settings.jwt_issuer,
options={"require": ["exp", "sub", "type"], "verify_exp": True},
)
except jwt.PyJWTError:
# 静默处理避免提示有效性
raise HTTPException(status_code=200, detail="已退出") # 也可返回 204
if payload.get("type") != "refresh":
return {"message": "已退出"}
await blacklist_refresh_jti(r, payload["jti"], payload["exp"])
return {"message": "已退出"}
@app.get("/me", response_model=MeResponse, tags=["users"])
async def me(ctx: Annotated[tuple[User, list[str], list[str]], Depends(get_current_user)]):
user, roles, perms = ctx
return MeResponse(id=user.id, email=user.email, roles=roles, permissions=perms)
@app.get("/admin/secret", tags=["admin"])
async def admin_secret(_: Annotated[tuple[User, list[str], list[str]], Depends(require_roles(["admin"]))]):
return {"secret": "admin-only"}
@app.get("/items", tags=["items"])
async def list_items(_: Annotated[tuple[User, list[str], list[str]], Depends(require_permissions(["items:read"]))]):
# 示例:如需 raw SQL,务必使用 text() 与参数绑定防止注入
# stmt = text("SELECT * FROM items WHERE owner_id = :uid").bindparams(uid=current_user.id)
return {"items": []}
# -------------------- Lifecycle --------------------
@app.on_event("startup")
async def on_startup():
global redis
redis = Redis.from_url(settings.redis_url, decode_responses=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 演示数据:创建一个 admin 用户及权限
async with SessionLocal() as session:
async with session.begin():
# 如果没有用户,创建一个
exists = (await session.execute(select(User).limit(1))).scalar_one_or_none()
if not exists:
admin_role = Role(code="admin")
p_read = Permission(code="items:read")
p_write = Permission(code="items:write")
admin_role.permissions.extend([p_read, p_write])
u = User(email="admin@example.com", password_hash=hash_password("Passw0rd!"))
u.roles.append(admin_role)
session.add_all([u])
log.info("startup_complete", environment=settings.environment)
@app.on_event("shutdown")
async def on_shutdown():
if redis:
await redis.close()
await engine.dispose()
三、Pydantic v2、SQL 注入与事务的要点示例
class CreateItem(BaseModel):
name: str = Field(min_length=1, max_length=64)
price: float = Field(gt=0)
owner_id: int
@field_validator("name")
@classmethod
def no_dangerous_chars(cls, v: str):
if ";" in v or "--" in v:
raise ValueError("非法字符")
return v
from sqlalchemy import text, bindparam
stmt = text("UPDATE users SET last_login = NOW() WHERE id = :uid").bindparams(bindparam("uid", value=user_id))
await session.execute(stmt)
await session.commit()
async with session.begin():
session.add(entity)
# 多条写操作在同一事务中
四、OpenAPI 文档与示例提示
五、测试(pytest/pytest-asyncio) 建议使用 Testcontainers 启动真实 Postgres 与 Redis;这里给出一个基础集成测试示例,使用 httpx AsyncClient 调用登录与 /me: 安装:pip install pytest pytest-asyncio httpx
文件:tests/test_auth.py
import pytest
import asyncio
from httpx import AsyncClient
from app import app # 假设示例文件名为 app.py
@pytest.mark.asyncio
async def test_login_and_me():
async with AsyncClient(app=app, base_url="http://test") as client:
# 登录
resp = await client.post("/auth/login", json={"email": "admin@example.com", "password": "Passw0rd!"})
assert resp.status_code == 200
tokens = resp.json()
assert "access_token" in tokens
# 访问 /me
headers = {"Authorization": f"Bearer {tokens['access_token']}"}
me = await client.get("/me", headers=headers)
assert me.status_code == 200
assert me.json()["email"] == "admin@example.com"
测试覆盖建议:
六、密钥管理、配置分层、容器化与部署安全清单
七、补充建议
该示例可直接运行(需本机 PostgreSQL 与 Redis),展示路由、依赖注入、令牌签发/刷新与退出登录,满足需求中的关键要点。实际生产请替换 HS256 为非对称算法、接入密钥托管、完善日志/监控、使用迁移与更全面的测试。
以下是面向 React + TypeScript 的多步骤注册表单的实用模板与常见最佳实践,覆盖:Zod 共享校验、React Hook Form 状态管理(含动态字段与文件上传)、无障碍支持、用户名查重的防抖与取消、XSS/CSRF 防护、输入掩码与 i18n、代码分割、Vitest/Playwright 测试,以及可复用控件、提交流程与错误边界示例。
总原则
示例:shared/schemas/registration.ts
import { z } from "zod";
export const UsernameSchema = z
.string()
.min(3, "用户名至少 3 个字符")
.max(20, "用户名最多 20 个字符")
.regex(/^[a-zA-Z0-9_]+$/, "仅限字母、数字和下划线");
export const StepAccountSchema = z.object({
email: z.string().email("邮箱格式不正确"),
username: UsernameSchema,
password: z
.string()
.min(8, "密码至少 8 位")
.regex(/[A-Z]/, "至少包含一个大写字母")
.regex(/[a-z]/, "至少包含一个小写字母")
.regex(/\d/, "至少包含一个数字")
.regex(/[^A-Za-z0-9]/, "至少包含一个特殊字符"),
confirmPassword: z.string(),
agree: z.literal(true, { errorMap: () => ({ message: "需同意协议" }) }),
}).refine((d) => d.password === d.confirmPassword, {
path: ["confirmPassword"],
message: "两次密码不一致",
});
export const StepProfileSchema = z.object({
firstName: z.string().min(1, "必填"),
lastName: z.string().min(1, "必填"),
birthDate: z.string().optional(), // 前端字符串,后端可 transform 为 Date
phone: z
.string()
.min(10, "电话长度不正确")
.regex(/^[0-9\-+\s()]+$/ , "电话格式不正确"),
accountType: z.enum(["personal", "company"]),
companyName: z.string().optional(),
}).refine((d) => d.accountType === "personal" || !!d.companyName, {
path: ["companyName"],
message: "公司帐号需填写公司名称",
});
export const StepPreferencesSchema = z.object({
newsletter: z.boolean().default(false),
language: z.enum(["zh-CN", "en-US"]).default("zh-CN"),
});
const acceptedImage = ["image/png", "image/jpeg"];
const acceptedDoc = ["application/pdf"];
export const StepDocumentsSchema = z.object({
avatar: z
.instanceof(File)
.optional()
.refine((f) => !f || (acceptedImage.includes(f.type) && f.size <= 2 * 1024 * 1024), "头像需 PNG/JPEG 且不超过 2MB"),
idDocument: z
.instanceof(File)
.refine((f) => acceptedDoc.includes(f.type) && f.size <= 5 * 1024 * 1024, "证件需 PDF 且不超过 5MB"),
});
export const RegistrationSchema = StepAccountSchema
.and(StepProfileSchema)
.and(StepPreferencesSchema)
.and(StepDocumentsSchema);
export type RegistrationData = z.infer<typeof RegistrationSchema>;
后端(示例 Express):
// server/routes/register.ts
import { RegistrationSchema } from "../../shared/schemas/registration";
import type { Request, Response } from "express";
export async function registerHandler(req: Request, res: Response) {
try {
const parsed = RegistrationSchema.parse({
...req.body,
// 若使用 multipart,需先通过 multer 获取文件对象
avatar: req.file?.avatar,
idDocument: req.file?.idDocument,
});
// TODO: 持久化
res.status(201).json({ ok: true });
} catch (e) {
res.status(400).json({ ok: false, errors: e });
}
}
示例:Form 容器与步骤懒加载
// App.tsx
import { Suspense, lazy, useMemo, useState } from "react";
import { useForm, FormProvider } from "react-hook-form";
import { zodResolver } from "@hookform/resolvers/zod";
import { RegistrationSchema, StepAccountSchema, StepProfileSchema, StepPreferencesSchema, StepDocumentsSchema, type RegistrationData } from "../shared/schemas/registration";
const StepAccount = lazy(() => import("./steps/StepAccount"));
const StepProfile = lazy(() => import("./steps/StepProfile"));
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
const StepDocuments = lazy(() => import("./steps/StepDocuments"));
const Review = lazy(() => import("./steps/Review"));
type StepKey = "account" | "profile" | "preferences" | "documents" | "review";
const stepOrder: StepKey[] = ["account", "profile", "preferences", "documents", "review"];
const stepResolvers = {
account: StepAccountSchema,
profile: StepProfileSchema,
preferences: StepPreferencesSchema,
documents: StepDocumentsSchema,
review: RegistrationSchema, // 最终全量校验
};
export default function App() {
const [step, setStep] = useState<StepKey>("account");
const methods = useForm<RegistrationData>({
resolver: zodResolver(stepResolvers[step]),
mode: "onBlur",
shouldUnregister: true,
defaultValues: { newsletter: false, language: "zh-CN" } as Partial<RegistrationData>,
});
const next = async () => {
const valid = await methods.trigger(); // 仅触发当前步骤校验
if (!valid) return;
const idx = stepOrder.indexOf(step);
setStep(stepOrder[idx + 1] ?? step);
};
const back = () => {
const idx = stepOrder.indexOf(step);
setStep(stepOrder[idx - 1] ?? step);
};
return (
<FormProvider {...methods}>
<form onSubmit={methods.handleSubmit(() => setStep("review"))} noValidate>
<Suspense fallback={<div aria-busy="true">加载中…</div>}>
{step === "account" && <StepAccount onNext={next} />}
{step === "profile" && <StepProfile onNext={next} onBack={back} />}
{step === "preferences" && <StepPreferences onNext={next} onBack={back} />}
{step === "documents" && <StepDocuments onNext={next} onBack={back} />}
{step === "review" && <Review onBack={back} />}
</Suspense>
</form>
</FormProvider>
);
}
可复用基础控件(带无障碍)
// components/FormField.tsx
import { useFormContext } from "react-hook-form";
import { useId } from "react";
type Props = {
name: string;
label: string;
type?: string;
placeholder?: string;
autoComplete?: string;
inputProps?: React.InputHTMLAttributes<HTMLInputElement>;
};
export function FormField({ name, label, type = "text", placeholder, autoComplete, inputProps }: Props) {
const { register, formState: { errors } } = useFormContext();
const id = useId();
const errMsg = (errors as any)[name]?.message as string | undefined;
const descId = `${id}-desc`;
return (
<div role="group" aria-labelledby={`${id}-label`} className="field">
<label id={`${id}-label`} htmlFor={id}>{label}</label>
<input
id={id}
type={type}
placeholder={placeholder}
autoComplete={autoComplete}
aria-invalid={!!errMsg}
aria-describedby={errMsg ? descId : undefined}
{...register(name)}
{...inputProps}
/>
{errMsg && (
<div id={descId} role="alert" aria-live="polite" className="error">
{errMsg}
</div>
)}
</div>
);
}
动态字段与输入掩码
// steps/StepProfile.tsx
import { useFormContext } from "react-hook-form";
import InputMask from "react-input-mask";
import { FormField } from "../components/FormField";
export default function StepProfile({ onNext, onBack }: { onNext: () => void; onBack: () => void }) {
const { register, watch, setValue, formState: { errors } } = useFormContext();
const accountType = watch("accountType");
return (
<section aria-label="个人信息">
<div>
<label htmlFor="accountType">账户类型</label>
<select id="accountType" {...register("accountType")}>
<option value="personal">个人</option>
<option value="company">公司</option>
</select>
</div>
<FormField name="firstName" label="名" />
<FormField name="lastName" label="姓" />
{/* 电话掩码 */}
<div>
<label htmlFor="phone">电话</label>
<InputMask mask="+99 999-999-999" {...register("phone")} id="phone">
{(inputProps: any) => <input {...inputProps} aria-invalid={!!errors.phone} />}
</InputMask>
{errors.phone?.message && <div role="alert">{String(errors.phone.message)}</div>}
</div>
{accountType === "company" && (
<FormField name="companyName" label="公司名称" />
)}
<div className="nav">
<button type="button" onClick={onBack}>上一步</button>
<button type="button" onClick={onNext}>下一步</button>
</div>
</section>
);
}
文件上传(大小/类型校验、进度展示)
// components/FileUpload.tsx
import { useFormContext } from "react-hook-form";
import { useId, useState, useRef } from "react";
type FileUploadProps = {
name: "avatar" | "idDocument";
label: string;
accept: string;
};
export function FileUpload({ name, label, accept }: FileUploadProps) {
const { register, setValue, formState: { errors } } = useFormContext();
const id = useId();
const [progress, setProgress] = useState<number>(0);
const xhrRef = useRef<XMLHttpRequest | null>(null);
const errMsg = (errors as any)[name]?.message as string | undefined;
const onUpload = async (file: File) => {
const form = new FormData();
form.append(name, file);
const xhr = new XMLHttpRequest();
xhrRef.current = xhr;
xhr.upload.onprogress = (e) => {
if (e.lengthComputable) setProgress(Math.round((e.loaded / e.total) * 100));
};
xhr.onreadystatechange = () => {
if (xhr.readyState === 4) {
xhrRef.current = null;
}
};
xhr.open("POST", `/api/upload/${name}`);
xhr.setRequestHeader("X-CSRF-Token", (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "");
xhr.send(form);
};
return (
<div>
<label htmlFor={id}>{label}</label>
<input
id={id}
type="file"
accept={accept}
{...register(name as any)}
onChange={(e) => {
const file = e.target.files?.[0];
if (file) {
setValue(name, file, { shouldValidate: true });
onUpload(file); // 可改为在最终提交时统一上传
}
}}
aria-invalid={!!errMsg}
aria-describedby={errMsg ? `${id}-error` : undefined}
/>
{errMsg && <div id={`${id}-error`} role="alert">{errMsg}</div>}
<div aria-live="polite" aria-atomic="true">{progress > 0 && `上传进度:${progress}%`}</div>
{xhrRef.current && <button type="button" onClick={() => xhrRef.current?.abort()}>取消上传</button>}
</div>
);
}
示例:聚焦第一个错误字段
// utils/focusFirstError.ts
import { FieldErrors } from "react-hook-form";
export function focusFirstError(errors: FieldErrors) {
const firstKey = Object.keys(errors)[0];
if (!firstKey) return;
const el = document.querySelector(`[name="${firstKey}"]`) as HTMLElement | null;
el?.focus();
}
示例:useUsernameCheck Hook
// hooks/useUsernameCheck.ts
import { useEffect, useRef } from "react";
import { UseFormSetError, UseFormClearErrors, UseFormWatch } from "react-hook-form";
export function useUsernameCheck(watch: UseFormWatch<any>, setError: UseFormSetError<any>, clearErrors: UseFormClearErrors<any>) {
const timerRef = useRef<number | null>(null);
const abortRef = useRef<AbortController | null>(null);
useEffect(() => {
const subscription = watch((value, { name }) => {
if (name !== "username") return;
const username = value.username;
if (!username || username.length < 3) {
clearErrors("username");
return;
}
if (timerRef.current) window.clearTimeout(timerRef.current);
timerRef.current = window.setTimeout(async () => {
abortRef.current?.abort();
const ac = new AbortController();
abortRef.current = ac;
try {
const res = await fetch(`/api/username/check?u=${encodeURIComponent(username)}`, { signal: ac.signal });
const data = await res.json();
if (data.taken) {
setError("username", { type: "validate", message: "用户名已被占用" });
} else {
clearErrors("username");
}
} catch (e: any) {
if (e.name !== "AbortError") {
setError("username", { type: "validate", message: "检查失败,请稍后重试" });
}
}
}, 300);
});
return () => subscription.unsubscribe();
}, [watch, setError, clearErrors]);
}
在步骤组件中使用:
// steps/StepAccount.tsx
import { useFormContext } from "react-hook-form";
import { useUsernameCheck } from "../hooks/useUsernameCheck";
import { FormField } from "../components/FormField";
export default function StepAccount({ onNext }: { onNext: () => void }) {
const { watch, setError, clearErrors } = useFormContext();
useUsernameCheck(watch, setError, clearErrors);
return (
<section aria-label="账号设置">
<FormField name="email" label="邮箱" type="email" autoComplete="email" />
<FormField name="username" label="用户名" />
<FormField name="password" label="密码" type="password" autoComplete="new-password" />
<FormField name="confirmPassword" label="确认密码" type="password" autoComplete="new-password" />
<div>
<input id="agree" type="checkbox" {...(useFormContext().register("agree"))} aria-describedby="agree-desc" />
<label htmlFor="agree">我已阅读并同意</label>
<div id="agree-desc" className="hint">勾选后可继续</div>
</div>
<button type="button" onClick={onNext}>下一步</button>
</section>
);
}
示例:安全展示与 i18n
// utils/safeHtml.ts
import DOMPurify from "dompurify";
export function safeHtml(html: string) {
return { __html: DOMPurify.sanitize(html) };
}
// i18n 使用
import { useTranslation } from "react-i18next";
function LabelExample() {
const { t } = useTranslation();
return <label>{t("form.email")}</label>;
}
CSRF fetch 包装器
// utils/fetchWithCsrf.ts
export async function fetchWithCsrf(input: RequestInfo, init: RequestInit = {}) {
const token = (document.querySelector('meta[name="csrf-token"]') as HTMLMetaElement)?.content || "";
const headers = new Headers(init.headers || {});
if (token) headers.set("X-CSRF-Token", token);
return fetch(input, { ...init, headers, credentials: "include" });
}
import { useEffect } from "react";
const StepPreferences = lazy(() => import("./steps/StepPreferences"));
useEffect(() => {
// 预加载下一步
import("./steps/StepPreferences");
}, []);
Vitest 示例:schema
// tests/registration.schema.test.ts
import { describe, it, expect } from "vitest";
import { StepAccountSchema } from "../shared/schemas/registration";
describe("StepAccountSchema", () => {
it("rejects weak password", () => {
const res = StepAccountSchema.safeParse({
email: "a@b.com",
username: "user123",
password: "weak",
confirmPassword: "weak",
agree: true,
});
expect(res.success).toBe(false);
});
it("accepts valid data", () => {
const res = StepAccountSchema.safeParse({
email: "a@b.com",
username: "user_123",
password: "Strong#123",
confirmPassword: "Strong#123",
agree: true,
});
expect(res.success).toBe(true);
});
});
Vitest + Testing Library:组件错误渲染
// tests/StepAccount.test.tsx
import { render, screen } from "@testing-library/react";
import { FormProvider, useForm } from "react-hook-form";
import StepAccount from "../src/steps/StepAccount";
function Wrapper() {
const methods = useForm({ defaultValues: {} });
return (
<FormProvider {...methods}>
<StepAccount onNext={() => {}} />
</FormProvider>
);
}
test("renders labels", () => {
render(<Wrapper />);
expect(screen.getByLabelText("邮箱")).toBeInTheDocument();
});
Playwright E2E 示例
// e2e/register.spec.ts
import { test, expect } from "@playwright/test";
test("multi-step registration", async ({ page }) => {
await page.route("/api/username/check", (route) => {
const url = new URL(route.request().url());
const u = url.searchParams.get("u");
route.fulfill({ json: { taken: u === "taken_user" } });
});
await page.goto("/register");
await page.getByLabel("邮箱").fill("user@example.com");
await page.getByLabel("用户名").fill("taken_user");
await page.waitForSelector("text=用户名已被占用");
await page.getByLabel("用户名").fill("newuser");
await page.getByLabel("密码").fill("Strong#123");
await page.getByLabel("确认密码").fill("Strong#123");
await page.getByLabel("我已阅读并同意").check();
await page.getByRole("button", { name: "下一步" }).click();
await page.getByLabel("账户类型").selectOption("company");
await page.getByLabel("公司名称").fill("Acme Inc.");
await page.getByRole("button", { name: "下一步" }).click();
await page.getByRole("button", { name: "下一步" }).click(); // 偏好
await page.setInputFiles('input[type="file"][accept="image/png,image/jpeg"]', 'tests/fixtures/avatar.png');
await page.setInputFiles('input[type="file"][accept="application/pdf"]', 'tests/fixtures/id.pdf');
await page.getByRole("button", { name: "下一步" }).click();
await expect(page.locator("text=审核并提交")).toBeVisible();
});
提交模板
// steps/Review.tsx
import { useFormContext } from "react-hook-form";
import { RegistrationSchema, type RegistrationData } from "../../shared/schemas/registration";
import { fetchWithCsrf } from "../utils/fetchWithCsrf";
export default function Review({ onBack }: { onBack: () => void }) {
const { getValues, trigger, setError } = useFormContext<RegistrationData>();
const submit = async () => {
const ok = await trigger(undefined, { shouldFocus: true }); // 全量校验
if (!ok) return;
const data = getValues();
try {
const form = new FormData();
for (const [k, v] of Object.entries(data)) {
if (v instanceof File) form.append(k, v);
else form.append(k, String(v ?? ""));
}
const res = await fetchWithCsrf("/api/register", { method: "POST", body: form });
if (!res.ok) throw new Error("注册失败");
// 成功处理
alert("注册成功");
} catch (e: any) {
setError("root", { type: "server", message: e.message || "服务器错误" });
const el = document.querySelector('[role="alert"]') as HTMLElement | null;
el?.focus();
}
};
return (
<section aria-label="审核并提交">
<div role="alert" tabIndex={-1}>请确认信息后提交</div>
<div className="nav">
<button type="button" onClick={onBack}>上一步</button>
<button type="button" onClick={submit}>提交</button>
</div>
</section>
);
}
错误边界
// components/ErrorBoundary.tsx
import { Component, ReactNode } from "react";
export class ErrorBoundary extends Component<{ children: ReactNode }, { hasError: boolean; message?: string }> {
constructor(props: any) {
super(props);
this.state = { hasError: false };
}
static getDerivedStateFromError(error: Error) {
return { hasError: true, message: error.message };
}
componentDidCatch(error: Error, info: any) {
// TODO: 上报日志
console.error(error, info);
}
render() {
if (this.state.hasError) {
return (
<div role="alert">
出现错误:{this.state.message}
<button onClick={() => this.setState({ hasError: false })}>重试</button>
</div>
);
}
return this.props.children;
}
}
实践提示
validation.${issue.code}) })) 接入 i18n。以上模板可直接作为骨架投入项目并根据业务扩展。通过 Zod 实现类型与校验统一,React Hook Form 管控状态与性能,无障碍与安全措施贯穿始终,再结合测试与代码分割确保质量与可维护性。
以下内容按“最佳实践 + 小型代码示例”的风格,给出一个用 Go 实现的订单微服务,REST 与 gRPC 同时暴露,涵盖分层架构、仓储模式、pgx 连接池与重试、幂等与并发控制、熔断与退避、可观测性(OpenTelemetry/Prometheus/zap)、优雅停机/健康探针、表驱动测试与 testcontainers 等关键点。示例代码尽量精简,能作为最小可运行骨架进行扩展。
一、分层架构与仓储模式(context 贯穿)
示例领域模型与接口
// internal/domain/order.go
package domain
import "time"
type Order struct {
ID string
DedupKey string // 幂等去重键(外部传入,需唯一约束)
UserID string
AmountCents int64
Status string // created, paid, failed...
Version int64 // 乐观锁版本
CreatedAt time.Time
UpdatedAt time.Time
}
// 仓储接口
type OrderRepository interface {
Create(ctx context.Context, o Order) (Order, error) // 幂等:基于 DedupKey
Get(ctx context.Context, id string) (Order, error)
UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (Order, error)
}
二、PostgreSQL(pgx)连接池、超时与重试
示例:pgx 连接与重试
// internal/db/pg.go
package db
import (
"context"
"time"
"errors"
"github.com/cenkalti/backoff/v4"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgconn"
)
func NewPool(ctx context.Context, url string) (*pgxpool.Pool, error) {
cfg, err := pgxpool.ParseConfig(url)
if err != nil { return nil, err }
cfg.MaxConns = 20
cfg.MinConns = 2
cfg.MaxConnLifetime = time.Hour
cfg.MaxConnIdleTime = 10 * time.Minute
cfg.HealthCheckPeriod = 30 * time.Second
cfg.ConnConfig.ConnectTimeout = 5 * time.Second
return pgxpool.NewWithConfig(ctx, cfg)
}
func IsRetryablePgErr(err error) bool {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "40001", "40P01": // serialization_failure, deadlock_detected
return true
}
}
// 网络层或临时 IO
return errors.Is(err, pgx.ErrNoRows) == false && pgconn.SafeToRetry(err)
}
func WithTxRetry(ctx context.Context, pool *pgxpool.Pool, isoLevel pgx.TxIsoLevel, fn func(pgx.Tx) error) error {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 20 * time.Millisecond
bo.MaxInterval = 200 * time.Millisecond
bo.MaxElapsedTime = 1 * time.Second
operation := func() error {
tx, err := pool.BeginTx(ctx, pgx.TxOptions{IsoLevel: isoLevel})
if err != nil { return backoff.Permanent(err) }
defer tx.Rollback(ctx)
if err := fn(tx); err != nil {
if IsRetryablePgErr(err) {
return err // 触发 backoff 重试
}
return backoff.Permanent(err)
}
return tx.Commit(ctx)
}
return backoff.Retry(operation, bo)
}
三、幂等设计、去重键与悲观/乐观锁
示例:仓储实现(幂等 + 乐观锁)
// internal/repo/order_repo.go
package repo
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"yourmod/internal/domain"
"yourmod/internal/db"
"time"
)
var ErrConflict = errors.New("conflict")
type OrderRepo struct {
pool *pgxpool.Pool
}
func NewOrderRepo(pool *pgxpool.Pool) *OrderRepo { return &OrderRepo{pool: pool} }
// SQL schema 要有:
// create table orders (
// id uuid primary key default gen_random_uuid(),
// dedup_key text not null unique,
// user_id text not null,
// amount_cents bigint not null,
// status text not null,
// version bigint not null default 1,
// created_at timestamptz not null default now(),
// updated_at timestamptz not null default now()
// );
func (r *OrderRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
var out domain.Order
q := `
insert into orders (dedup_key, user_id, amount_cents, status)
values ($1,$2,$3,'created')
on conflict (dedup_key) do update set updated_at = now()
returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
`
err := db.WithTxRetry(ctx, r.pool, pgx.Serializable, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, q, o.DedupKey, o.UserID, o.AmountCents).
Scan(&out.ID, &out.DedupKey, &out.UserID, &out.AmountCents, &out.Status, &out.Version, &out.CreatedAt, &out.UpdatedAt)
})
return out, err
}
func (r *OrderRepo) Get(ctx context.Context, id string) (domain.Order, error) {
var o domain.Order
q := `select id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at from orders where id=$1`
err := r.pool.QueryRow(ctx, q, id).Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
return o, err
}
func (r *OrderRepo) UpdateStatusOptLock(ctx context.Context, id string, fromVersion int64, newStatus string) (domain.Order, error) {
var o domain.Order
q := `
update orders
set status=$3, version=version+1, updated_at=now()
where id=$1 and version=$2
returning id, dedup_key, user_id, amount_cents, status, version, created_at, updated_at;
`
cmd, err := r.pool.Query(ctx, q, id, fromVersion, newStatus)
if err != nil { return o, err }
defer cmd.Close()
if !cmd.Next() { return o, ErrConflict }
err = cmd.Scan(&o.ID, &o.DedupKey, &o.UserID, &o.AmountCents, &o.Status, &o.Version, &o.CreatedAt, &o.UpdatedAt)
return o, err
}
// 悲观锁示意(库存/队列工作项):SELECT ... FOR UPDATE SKIP LOCKED
// tx.Query(ctx, `select id from jobs where status='ready' for update skip locked limit 1`)
四、熔断与回退策略,外部调用重试退避
示例:带熔断与退避的支付客户端
// internal/ext/pay_client.go
package ext
import (
"context"
"errors"
"net/http"
"time"
"github.com/sony/gobreaker"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
type PayClient struct {
httpc *http.Client
breaker *gobreaker.CircuitBreaker
}
func NewPayClient() *PayClient {
st := gobreaker.Settings{
Name: "payment",
MaxRequests: 5,
Interval: 30 * time.Second,
Timeout: 10 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures >= 5
},
}
return &PayClient{
httpc: &http.Client{
Timeout: 2 * time.Second,
Transport: otelhttp.NewTransport(http.DefaultTransport),
},
breaker: gobreaker.NewCircuitBreaker(st),
}
}
func (c *PayClient) Authorize(ctx context.Context, orderID string, amount int64) error {
op := func() error {
_, err := c.breaker.Execute(func() (interface{}, error) {
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "http://payment/api/authorize", nil)
resp, err := c.httpc.Do(req)
if err != nil { return nil, err }
defer resp.Body.Close()
if resp.StatusCode >= 500 { return nil, errors.New("payment 5xx") }
if resp.StatusCode >= 400 { return nil, backoff.Permanent(errors.New("payment 4xx")) }
return nil, nil
})
return err
}
bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.Retry(op, bo)
}
五、OpenTelemetry 分布式追踪、Prometheus 指标与结构化日志(zap)
示例:可观测性初始化与采样指标
// internal/observ/otel.go
package observ
import (
"context"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)
func InitOTel(ctx context.Context, service string) (func(context.Context) error, error) {
exp, err := otlptracehttp.New(ctx) // 读 OTEL_EXPORTER_OTLP_ENDPOINT 等环境变量
if err != nil { return nil, err }
res, _ := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(service)))
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.2))),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})
return tp.Shutdown, nil
}
// internal/observ/metrics.go
package observ
import "github.com/prometheus/client_golang/prometheus"
var (
OrdersCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "orders_created_total", Help: "Total created orders",
})
OrderCreateLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "order_create_seconds", Help: "Latency of order creation", Buckets: prometheus.DefBuckets,
})
)
func MustRegisterMetrics() {
prometheus.MustRegister(OrdersCreated, OrderCreateLatency)
}
// internal/observ/logging.go
package observ
import (
"go.uber.org/zap"
"context"
"go.opentelemetry.io/otel/trace"
)
func NewLogger() *zap.Logger {
l, _ := zap.NewProduction()
return l
}
func WithTraceFields(ctx context.Context, l *zap.Logger) *zap.Logger {
span := trace.SpanFromContext(ctx)
sc := span.SpanContext()
if sc.HasTraceID() {
l = l.With(zap.String("trace_id", sc.TraceID().String()), zap.String("span_id", sc.SpanID().String()))
}
return l
}
六、服务层与路由(REST 与 gRPC)
proto(最小化)
// api/order/v1/order.proto
syntax = "proto3";
package order.v1;
option go_package = "yourmod/api/order/v1;orderv1";
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
rpc GetOrder(GetOrderRequest) returns (GetOrderResponse);
}
message CreateOrderRequest { string dedup_key = 1; string user_id = 2; int64 amount_cents = 3; }
message CreateOrderResponse { string order_id = 1; }
message GetOrderRequest { string order_id = 1; }
message GetOrderResponse { string order_id = 1; string status = 2; int64 amount_cents = 3; }
service 与 transport 示例
// internal/service/order_service.go
package service
import (
"context"
"time"
"yourmod/internal/domain"
"yourmod/internal/repo"
"yourmod/internal/ext"
"yourmod/internal/observ"
"go.uber.org/zap"
)
type OrderService struct {
repo repo.OrderRepo
pay *ext.PayClient
log *zap.Logger
}
func NewOrderService(r *repo.OrderRepo, p *ext.PayClient, l *zap.Logger) *OrderService {
return &OrderService{repo: *r, pay: p, log: l}
}
func (s *OrderService) Create(ctx context.Context, dedupKey, userID string, amount int64) (domain.Order, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
start := time.Now()
o, err := s.repo.Create(ctx, domain.Order{DedupKey: dedupKey, UserID: userID, AmountCents: amount})
observ.OrderCreateLatency.Observe(time.Since(start).Seconds())
if err != nil { return o, err }
observ.OrdersCreated.Inc()
log := observ.WithTraceFields(ctx, s.log)
// 外部支付授权(演示可选)
if err := s.pay.Authorize(ctx, o.ID, o.AmountCents); err != nil {
log.Warn("payment authorize failed, fallback", zap.Error(err))
// 回退方案:标记待支付状态,或写出补偿消息
}
return o, nil
}
func (s *OrderService) Get(ctx context.Context, id string) (domain.Order, error) {
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()
return s.repo.Get(ctx, id)
}
// internal/transport/http/router.go
package httpapi
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"yourmod/internal/service"
"yourmod/internal/observ"
"github.com/jackc/pgx/v5/pgxpool"
)
func NewRouter(svc *service.OrderService, pool *pgxpool.Pool) http.Handler {
r := chi.NewRouter()
r.Use(otelhttp.NewMiddleware("http"))
r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
r.Get("/readyz", func(w http.ResponseWriter, r *http.Request) {
if err := pool.Ping(r.Context()); err != nil { http.Error(w, "not ready", 503); return }
w.WriteHeader(http.StatusOK)
})
r.Handle("/metrics", promHandler()) // 见下
r.Post("/orders", func(w http.ResponseWriter, r *http.Request) {
var req struct {
DedupKey string `json:"dedup_key"`
UserID string `json:"user_id"`
Amount int64 `json:"amount_cents"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), 400); return
}
o, err := svc.Create(r.Context(), req.DedupKey, req.UserID, req.Amount)
if err != nil { http.Error(w, err.Error(), 500); return }
_ = json.NewEncoder(w).Encode(map[string]string{"order_id": o.ID})
})
r.Get("/orders/{id}", func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
o, err := svc.Get(r.Context(), id)
if err != nil { http.Error(w, err.Error(), 404); return }
_ = json.NewEncoder(w).Encode(o)
})
return r
}
func promHandler() http.Handler {
observ.MustRegisterMetrics()
return promhttp.Handler()
}
// internal/transport/grpc/server.go
package grpcapi
import (
"context"
"yourmod/api/order/v1"
"yourmod/internal/service"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
type Server struct {
orderv1.UnimplementedOrderServiceServer
svc *service.OrderService
}
func NewServer(s *service.OrderService) *Server { return &Server{svc: s} }
func (s *Server) CreateOrder(ctx context.Context, req *orderv1.CreateOrderRequest) (*orderv1.CreateOrderResponse, error) {
o, err := s.svc.Create(ctx, req.DedupKey, req.UserId, req.AmountCents)
if err != nil { return nil, err }
return &orderv1.CreateOrderResponse{OrderId: o.ID}, nil
}
func (s *Server) GetOrder(ctx context.Context, req *orderv1.GetOrderRequest) (*orderv1.GetOrderResponse, error) {
o, err := s.svc.Get(ctx, req.OrderId)
if err != nil { return nil, err }
return &orderv1.GetOrderResponse{OrderId: o.ID, Status: o.Status, AmountCents: o.AmountCents}, nil
}
func NewGRPCServer(s *service.OrderService) *grpc.Server {
gs := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)
orderv1.RegisterOrderServiceServer(gs, NewServer(s))
healthSrv := health.NewServer()
healthpb.RegisterHealthServer(gs, healthSrv)
return gs
}
七、优雅停机、健康检查与就绪探针
最小可运行 main
// cmd/ordersvc/main.go
package main
import (
"context"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/sync/errgroup"
"go.uber.org/zap"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"yourmod/internal/db"
"yourmod/internal/observ"
"yourmod/internal/repo"
"yourmod/internal/service"
httpapi "yourmod/internal/transport/http"
grpcapi "yourmod/internal/transport/grpc"
"yourmod/internal/ext"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
log := observ.NewLogger()
defer log.Sync()
shutdownOTel, err := observ.InitOTel(ctx, "ordersvc")
if err != nil { log.Fatal("otel init", zap.Error(err)) }
defer shutdownOTel(context.Background())
pool, err := db.NewPool(ctx, os.Getenv("DATABASE_URL"))
if err != nil { log.Fatal("db", zap.Error(err)) }
defer pool.Close()
pay := NewPayClient()
repo := repo.NewOrderRepo(pool)
svc := service.NewOrderService(repo, pay, log)
httpSrv := &http.Server{
Addr: ":8080",
Handler: httpapi.NewRouter(svc, pool),
ReadHeaderTimeout: 5 * time.Second,
}
grpcSrv := grpcapi.NewGRPCServer(svc)
g, ctx := errgroup.WithContext(ctx)
// HTTP
g.Go(func() error {
log.Info("http listening", zap.String("addr", httpSrv.Addr))
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { return err }
return nil
})
// gRPC
g.Go(func() error {
lis, err := net.Listen("tcp", ":9090")
if err != nil { return err }
log.Info("grpc listening", zap.String("addr", ":9090"))
return grpcSrv.Serve(lis)
})
// 等待信号
g.Go(func() error {
<-ctx.Done()
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = httpSrv.Shutdown(c)
grpcSrv.GracefulStop()
return nil
})
if err := g.Wait(); err != nil {
log.Error("server exit", zap.Error(err))
}
}
八、表驱动测试与集成测试(testcontainers)
表驱动示例
// internal/service/order_service_test.go
package service_test
import (
"context"
"testing"
"yourmod/internal/service"
"yourmod/internal/domain"
)
type stubRepo struct{ created map[string]domain.Order }
func (s *stubRepo) Create(ctx context.Context, o domain.Order) (domain.Order, error) {
if s.created == nil { s.created = map[string]domain.Order{} }
if existing, ok := s.created[o.DedupKey]; ok { return existing, nil }
o.ID = "id-" + o.DedupKey; s.created[o.DedupKey] = o; return o, nil
}
// implement other methods ...
func TestCreate_Idempotent(t *testing.T) {
r := &stubRepo{}
svc := service.NewOrderService((*repo.OrderRepo)(r), nil, zap.NewNop())
ctx := context.Background()
cases := []struct{
dedup string; wantID string
}{
{"k1", "id-k1"},
{"k1", "id-k1"},
}
var first string
for i, c := range cases {
o, err := svc.Create(ctx, c.dedup, "u", 100)
if err != nil { t.Fatal(err) }
if i == 0 { first = o.ID }
if o.ID != first { t.Fatalf("idempotency broken: %s != %s", o.ID, first) }
}
}
testcontainers 集成测试示例(简化)
// internal/repo/order_repo_integration_test.go
package repo_test
import (
"context"
"testing"
"time"
"fmt"
tc "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/jackc/pgx/v5/pgxpool"
"yourmod/internal/db"
"yourmod/internal/repo"
"yourmod/internal/domain"
)
func startPostgres(t *testing.T) (*pgxpool.Pool, func()) {
ctx := context.Background()
req := tc.ContainerRequest{
Image: "postgres:16-alpine",
Env: map[string]string{"POSTGRES_PASSWORD": "pw", "POSTGRES_DB":"test"},
ExposedPorts: []string{"5432/tcp"},
WaitingFor: wait.ForListeningPort("5432/tcp").WithStartupTimeout(30*time.Second),
}
pgC, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ContainerRequest: req, Started: true})
if err != nil { t.Fatal(err) }
host, _ := pgC.Host(ctx)
port, _ := pgC.MappedPort(ctx, "5432/tcp")
url := fmt.Sprintf("postgres://postgres:pw@%s:%s/test?sslmode=disable", host, port.Port())
pool, err := db.NewPool(ctx, url)
if err != nil { t.Fatal(err) }
schema := `
create extension if not exists pgcrypto;
create table if not exists orders(
id uuid primary key default gen_random_uuid(),
dedup_key text not null unique,
user_id text not null,
amount_cents bigint not null,
status text not null,
version bigint not null default 1,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);`
if _, err := pool.Exec(ctx, schema); err != nil { t.Fatal(err) }
cleanup := func() {
pool.Close()
_ = pgC.Terminate(ctx)
}
return pool, cleanup
}
func TestRepo_Create_Idempotent(t *testing.T) {
pool, cleanup := startPostgres(t)
defer cleanup()
r := repo.NewOrderRepo(pool)
ctx := context.Background()
o1, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
if err != nil { t.Fatal(err) }
o2, err := r.Create(ctx, domain.Order{DedupKey: "d1", UserID:"u1", AmountCents: 100})
if err != nil { t.Fatal(err) }
if o1.ID != o2.ID { t.Fatalf("expect same id, got %s vs %s", o1.ID, o2.ID) }
}
九、Makefile 与容器化建议
Makefile(简化)
APP=ordersvc
PKG=./...
BIN=./bin/$(APP)
build:
go build -o $(BIN) ./cmd/ordersvc
test:
go test $(PKG) -count=1
run:
DATABASE_URL=postgres://user:pw@localhost:5432/db?sslmode=disable go run ./cmd/ordersvc
proto:
protoc --go_out=. --go-grpc_out=. api/order/v1/*.proto
docker-build:
docker build -t $(APP):latest .
Dockerfile(简化)
# build
FROM golang:1.22 AS builder
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/ordersvc ./cmd/ordersvc
# runtime
FROM gcr.io/distroless/base-debian12
USER nonroot:nonroot
WORKDIR /app
COPY --from=builder /out/ordersvc /app/ordersvc
EXPOSE 8080 9090
ENV GODEBUG=madvdontneed=1
ENTRYPOINT ["/app/ordersvc"]
K8s 探针建议
十、其他实践建议
只要将以上骨架按模块放入 yourmod 项目中,即可得到一个最小可运行、可观测、可测试、可容器化的 Go 订单微服务,并可逐步扩展业务逻辑与非功能性需求。
为用户提供在指定编程语言中完成特定任务的最佳实践指导,包括高效、安全、可维护的解决方案,并通过实际示例提升学习效果和实践应用能力。
快速搭建标准开发流程,统一团队实践,减少沟通成本并提升团队开发效率。
在多语言环境下精准获取最佳实践作为参考,优化开发流程,提升交付质量。
快速上手复杂开发任务,通过详细建议与示例学习优质的编程技巧。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期