不止热门角色,我们为你扩展了更多细分角色分类,覆盖职场提升、商业增长、内容创作、学习规划等多元场景。精准匹配不同目标,让每一次生成都更有方向、更高命中率。
立即探索更多角色分类,找到属于你的增长加速器。
下面给出一套可落地的DDD+CQRS+Saga+Outbox设计与脚手架骨架,贴合你现状(单体、表结构、接口)与技术栈(Java 21, Spring Boot 3.3, Spring Cloud, JPA 写模型, MyBatis-Plus 读模型, Kafka 3.x, Debezium Outbox, MySQL 8, Redis 7, Flyway, MapStruct, Testcontainers, OpenAPI Generator, OpenTelemetry, Kubernetes/Helm),目标是迭代式拆分为订单/库存微服务、在促销高峰抑制超卖与死锁、读写分离并保证3秒内一致性收敛与P95<300ms。
一、总体架构与迁移路径
二、目录与包结构(多模块)
三、领域模型与命令事件(示例骨架)
示例代码(节选): public enum OrderStatus { PENDING, AWAITING_STOCK, STOCK_RESERVED, PAYMENT_AUTHORIZED, CONFIRMED, CANCELED, FAILED }
@Entity @Table(name = "orders") public class OrderAggregate { @Id private Long id; private Long userId; private BigDecimal total; @Enumerated(EnumType.STRING) private OrderStatus status; @Version private Long version;
@OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.EAGER)
private List
// domain behaviors
public List
public List
public List
public List
public List
public List
@Embeddable public class OrderItem { private Long skuId; private Integer qty; private BigDecimal price; }
public interface DomainEvent { String eventId(); // UUID String source(); // "order-service" String type(); // e.g., "OrderCreated" Instant occurredAt(); String idempotencyKey(); }
public record OrderCreated(Long orderId, Long userId, List
public record StockReservationRequested(Long orderId, List
@Transactional public OrderId handle(PlaceOrderCommand cmd) { if (!outbox.tryStartIdempotent("PlaceOrder", cmd.idempotencyKey())) { return new OrderId(outbox.lookupEntityIdByKey(cmd.idempotencyKey())); } var order = new OrderAggregate(); var events = order.placeOrder(cmd); orderRepo.save(order); outbox.appendAll(events, "orders", order.getId().toString()); // topic, key return new OrderId(order.getId()); }
@Transactional public void handleStockReserved(Long orderId, String reservationId, String eventId) { if (!outbox.tryStartIdempotent("StockReserved", eventId)) return; var order = orderRepo.getByIdForUpdate(orderId); // or findById + optimistic lock var events = order.markStockReserved(reservationId); orderRepo.save(order); events.forEach(e -> outbox.append(e, "payments", orderId.toString())); } }
@Component public class OrderSagaOrchestrator { private final OutboxAppender outbox;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onOrderCreated(OrderCreated e) { outbox.append(new StockReservationRequested(e.orderId(), e.items(), e.idempotencyKey()), "inventory", e.orderId().toString()); }
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onStockReserved(StockReserved e) { outbox.append(new PaymentAuthorizationRequested(e.orderId(), /amount/ null), "payment", e.orderId().toString()); }
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onPaymentAuthorized(PaymentAuthorized e) { outbox.append(new OrderConfirmed(e.orderId()), "orders", e.orderId().toString()); } }
四、Inventory聚合与并发控制
public List
public List
@Transactional public void handle(ReserveStockCommand cmd) { if (resRepo.existsByIdempotencyKey(cmd.idempotencyKey())) return; // idempotent var inv = repo.findById(cmd.getSkuId()).orElseThrow(); var events = inv.reserve(cmd.getOrderId(), cmd.getQty(), cmd.getIdempotencyKey()); repo.save(inv); // optimistic lock; retry on OptimisticLockException outbox.appendAll(events, "orders", cmd.getOrderId().toString()); } }
五、Outbox表与Debezium
Flyway示例: -- V1__orders_and_items.sql 现有表增加必要索引/约束 ALTER TABLE orders ADD COLUMN version BIGINT NOT NULL DEFAULT 0; CREATE INDEX idx_orders_user ON orders(user_id); ALTER TABLE order_items ADD PRIMARY KEY(order_id, sku_id);
-- V2__outbox.sql CREATE TABLE outbox_event ( id BIGINT PRIMARY KEY AUTO_INCREMENT, aggregate_id VARCHAR(64) NOT NULL, aggregate_type VARCHAR(64) NOT NULL, type VARCHAR(128) NOT NULL, payload JSON NOT NULL, headers JSON NULL, occurred_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), idempotency_key VARCHAR(128) NOT NULL, partition_key VARCHAR(128) NOT NULL, UNIQUE KEY uk_idem (aggregate_type, idempotency_key) );
-- V3__inventory_reservations.sql(推荐) CREATE TABLE inventory_reservations ( order_id BIGINT NOT NULL, sku_id BIGINT NOT NULL, qty INT NOT NULL, status VARCHAR(32) NOT NULL, version BIGINT NOT NULL DEFAULT 0, idempotency_key VARCHAR(128) NOT NULL, PRIMARY KEY(order_id, sku_id), UNIQUE KEY uk_res_idem (idempotency_key) );
六、Kafka主题与消费者
主题划分
分区与Key
消费者(有序、可重放、幂等) @Component public class InventoryEventConsumer { private final InventoryCommandHandler handler; private final ProcessedEventStore processed; // 去重
@KafkaListener(topics = "inventory.stock-reserved", containerFactory = "kafkaContainerFactory") public void onStockReserved(ConsumerRecord<String, String> rec) { var event = parse(rec.value(), StockReserved.class); if (processed.alreadyProcessed(event.eventId())) return; try { handler.handleOrderSideEffect(event); // e.g., markStockReserved command processed.markProcessed(event.eventId()); } catch (TransientException ex) { throw ex; // 让Kafka重试 } } }
去重存储可用Redis Set或MySQL processed_event(event_id PK, processed_at),保留7-14天TTL。
消费者并发=分区数;保证顺序:每分区单线程。可用Spring Kafka的Concurrency=分区数。
七、读模型(MyBatis-Plus + Redis)
物化视图表 -- V4__read_model.sql CREATE TABLE order_view ( id BIGINT PRIMARY KEY, user_id BIGINT NOT NULL, status VARCHAR(32) NOT NULL, total DECIMAL(18,2) NOT NULL, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE order_item_view ( order_id BIGINT NOT NULL, sku_id BIGINT NOT NULL, qty INT NOT NULL, price DECIMAL(18,2) NOT NULL, PRIMARY KEY(order_id, sku_id) );
读侧消费者
@Service public class OrderProjectionHandler { @Autowired private OrderViewMapper viewMapper; @Autowired private RedisTemplate<String, String> redis;
@Transactional public void apply(OrderCreated e) { viewMapper.insert(new OrderViewDO(e.orderId(), e.userId(), "AWAITING_STOCK", ...)); cachePut(e.orderId()); }
public OrderDTO getById(Long id) { var key = "order:" + id; var json = redis.opsForValue().get(key); if (json != null) return deserialize(json); var view = viewMapper.selectById(id); if (view == null) return null; cachePut(id); return map(view); }
private void cachePut(Long id) { var view = viewMapper.selectById(id); redis.opsForValue().set("order:" + id, serialize(map(view)), Duration.ofMinutes(10)); } }
八、接口契约与代码生成
九、接口层与幂等拦截器
十、性能与并发要点
十一、测试与可复现性
十二、可观测性与运维
十三、控制器示例与客户端调用 @RestController @RequestMapping("/orders") @RequiredArgsConstructor public class OrderController { private final OrderCommandHandler handler;
@PostMapping
public ResponseEntity
@GetMapping("/{id}")
public ResponseEntity
十四、读侧与缓存的一致性策略
十五、与现有API的过渡
十六、避免超卖与关键细节
总结价值
下面给出一个可在两周内落地的“模块化单体 + Clean Architecture”骨架方案,面向你的技术栈与现状问题,包含目录布局、边界与依赖约束、典型用例、端口/适配器、事务与幂等、测试与观测。目标是让控制器“瘦身”、打掉跨模块 import、让用例可单测、外部支付可替身测试,并为后续独立部署(演进为多个进程或服务)保留路径。
一、架构映射与依赖规则
二、Nx 工作区与目录布局(模块化单体) 建议 Nx libs 以 scope 与 layer 打标签,apps 只做装配/部署:
Nx tags(示例):
三、Prisma 模型(现有表 + outbox + idempotency) prisma/schema.prisma 关键片段:
model users {
id String @id @default(uuid())
email String @unique
// ...
}
model plans {
id String @id @default(uuid())
price Int
currency String
}
model subscriptions {
id String @id @default(uuid())
user_id String
plan_id String
status String // 'active' | 'canceled' | 'pending'
renew_at DateTime?
@@index([user_id])
@@index([plan_id])
}
model invoices {
id String @id @default(uuid())
subscription_id String
amount Int
status String // 'pending' | 'processing' | 'paid' | 'failed' | 'canceled'
currency String
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@index([subscription_id])
}
model IdempotencyKey {
id String @id @default(uuid())
scope String // e.g. 'CreateSubscription','ChargeInvoice'
key String
requestHash String?
status String // 'started' | 'completed' | 'expired'
response Json?
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@unique([scope, key])
}
model OutboxEvent {
id String @id @default(uuid())
event_type String
aggregateId String?
payload Json
status String @default("pending") // 'pending' | 'processing' | 'done' | 'failed'
created_at DateTime @default(now())
available_at DateTime @default(now())
retry_count Int @default(0)
}
四、端口接口(Ports) billing/application/src/ports/payment.port.ts
export type PaymentIntent = { id: string; status: 'requires_action' | 'succeeded' | 'failed'; clientSecret?: string };
export interface PaymentPort {
createPaymentIntent(params: {
invoiceId: string;
amount: number;
currency: string;
metadata?: Record<string, string>;
idempotencyKey?: string;
}): Promise<PaymentIntent>;
// 用于回调确认场景可选扩展
retrievePaymentIntent(id: string): Promise<PaymentIntent>;
}
subscriptions/application/src/ports/email.port.ts
export interface EmailPort {
send(to: string, template: string, data: Record<string, unknown>): Promise<void>;
}
仓储端口(示例)
export interface SubscriptionRepoPort {
findById(id: string): Promise<Subscription | null>;
findActiveByUserAndPlan(userId: string, planId: string): Promise<Subscription | null>;
create(s: Subscription, tx?: PrismaTransaction): Promise<void>;
cancel(id: string, at: Date, tx?: PrismaTransaction): Promise<void>;
}
export interface InvoiceRepoPort {
findById(id: string): Promise<Invoice | null>;
create(i: Invoice, tx?: PrismaTransaction): Promise<void>;
markProcessing(id: string, tx?: PrismaTransaction): Promise<void>;
markPaid(id: string, tx?: PrismaTransaction): Promise<void>;
markFailed(id: string, reason: string, tx?: PrismaTransaction): Promise<void>;
}
五、领域实体与事件(轻量) subscriptions/domain/src/subscription.entity.ts
export class Subscription {
constructor(
public readonly id: string,
public readonly userId: string,
public readonly planId: string,
public status: 'active' | 'canceled' | 'pending',
public renewAt?: Date | null,
) {}
}
subscriptions/domain/src/events/subscription.events.ts
export type SubscriptionCreated = {
type: 'SubscriptionCreated';
subscriptionId: string;
userId: string;
planId: string;
};
export type SubscriptionCanceled = {
type: 'SubscriptionCanceled';
subscriptionId: string;
userId: string;
at: string;
};
六、用例(Application):CreateSubscription、ChargeInvoice、CancelSubscription DTO 输入输出(class-validator + Swagger 注解,控制器层);应用层内部用 Zod 二次校验(可选)。
subscriptions/application/src/usecases/create-subscription.usecase.ts
import { z } from 'zod';
import { UnitOfWork } from '@app/platform/prisma';
import { DomainEvents } from '@app/platform/messaging';
import { IdempotencyService } from '@app/platform/prisma';
import { SubscriptionRepoPort } from '../ports/subscription.repo.port';
const InputSchema = z.object({
userId: z.string().uuid(),
planId: z.string().uuid(),
idempotencyKey: z.string().optional(),
});
export type CreateSubscriptionInput = z.infer<typeof InputSchema>;
export type CreateSubscriptionOutput = { subscriptionId: string; status: 'active' | 'pending' };
export class CreateSubscriptionUseCase {
constructor(
private readonly uow: UnitOfWork,
private readonly subsRepo: SubscriptionRepoPort,
private readonly ide: IdempotencyService,
private readonly events: DomainEvents,
) {}
async execute(input: CreateSubscriptionInput): Promise<CreateSubscriptionOutput> {
const { userId, planId, idempotencyKey } = InputSchema.parse(input);
return this.ide.withIdempotency<CreateSubscriptionOutput>('CreateSubscription', idempotencyKey, async () => {
return this.uow.runInTransaction(async (tx) => {
// 示例:避免重复订阅
const existing = await this.subsRepo.findActiveByUserAndPlan(userId, planId);
if (existing) {
return { subscriptionId: existing.id, status: existing.status };
}
const subId = crypto.randomUUID();
const sub = new Subscription(subId, userId, planId, 'active', computeRenewAt());
await this.subsRepo.create(sub, tx);
await this.events.emitInTx(
{ type: 'SubscriptionCreated', subscriptionId: subId, userId, planId },
tx,
);
return { subscriptionId: subId, status: 'active' };
});
});
}
}
function computeRenewAt() {
const d = new Date();
d.setMonth(d.getMonth() + 1);
return d;
}
billing/application/src/usecases/charge-invoice.usecase.ts
const InputSchema = z.object({
invoiceId: z.string().uuid(),
idempotencyKey: z.string().optional(),
});
export class ChargeInvoiceUseCase {
constructor(
private readonly uow: UnitOfWork,
private readonly invoices: InvoiceRepoPort,
private readonly payments: PaymentPort,
private readonly ide: IdempotencyService,
) {}
async execute(input: { invoiceId: string; idempotencyKey?: string }) {
const { invoiceId, idempotencyKey } = InputSchema.parse(input);
return this.ide.withIdempotency('ChargeInvoice', idempotencyKey, async () => {
return this.uow.runInTransaction(async (tx) => {
const inv = await this.invoices.findById(invoiceId);
if (!inv) throw new UseCaseError('NOT_FOUND', 'Invoice not found');
if (inv.status === 'paid') return { status: 'paid' };
await this.invoices.markProcessing(inv.id, tx);
const intent = await this.payments.createPaymentIntent({
invoiceId: inv.id,
amount: inv.amount,
currency: inv.currency,
idempotencyKey,
});
// 前端可能需要 clientSecret
return { status: 'processing', paymentIntentId: intent.id, clientSecret: intent.clientSecret };
});
});
}
}
billing/application/src/usecases/confirm-invoice-payment.usecase.ts(供支付回调调用)
export class ConfirmInvoicePaymentUseCase {
constructor(private readonly uow: UnitOfWork, private readonly invoices: InvoiceRepoPort) {}
async execute(input: { invoiceId: string; success: boolean; reason?: string }) {
return this.uow.runInTransaction(async (tx) => {
if (input.success) await this.invoices.markPaid(input.invoiceId, tx);
else await this.invoices.markFailed(input.invoiceId, input.reason ?? 'failed', tx);
return { status: input.success ? 'paid' : 'failed' };
});
}
}
subscriptions/application/src/usecases/cancel-subscription.usecase.ts
export class CancelSubscriptionUseCase {
constructor(
private readonly uow: UnitOfWork,
private readonly subsRepo: SubscriptionRepoPort,
private readonly events: DomainEvents,
) {}
async execute(input: { subscriptionId: string; at?: Date }) {
const at = input.at ?? new Date();
return this.uow.runInTransaction(async (tx) => {
const sub = await this.subsRepo.findById(input.subscriptionId);
if (!sub) throw new UseCaseError('NOT_FOUND', 'Subscription not found');
await this.subsRepo.cancel(sub.id, at, tx);
await this.events.emitInTx({ type: 'SubscriptionCanceled', subscriptionId: sub.id, userId: sub.userId, at: at.toISOString() }, tx);
return { subscriptionId: sub.id, status: 'canceled' };
});
}
}
七、事件与 outbox(轻量领域事件 + BullMQ) platform/messaging/src/domain-events.ts
export type DomainEvent = { type: string; [k: string]: any };
export interface DomainEvents {
emitInTx(event: DomainEvent, tx: PrismaTransaction): Promise<void>;
emit(event: DomainEvent): Promise<void>;
register(type: string, handler: (e: DomainEvent) => Promise<void>): void;
}
简单实现策略:
billing/adapter-handlers/src/subscription-created.handler.ts
events.register('SubscriptionCreated', async (evt) => {
await uow.runInTransaction(async (tx) => {
const invoice = Invoice.createForNewSubscription(evt.subscriptionId, /* amount/currency from plan cache */);
await invoices.create(invoice, tx);
});
});
八、适配器:REST 控制器与 HTTP/内存支付 subscriptions/adapter-rest/src/subscriptions.controller.ts
@Controller('subscriptions')
export class SubscriptionsController {
constructor(private readonly createSub: CreateSubscriptionUseCase) {}
@Post()
async create(@Body() dto: { userId: string; planId: string }, @Headers('Idempotency-Key') idem?: string) {
const res = await this.createSub.execute({ ...dto, idempotencyKey: idem });
return res;
}
}
billing/adapter-rest/src/invoices.controller.ts
@Controller('invoices')
export class InvoicesController {
constructor(private readonly chargeInvoice: ChargeInvoiceUseCase) {}
@Post(':id/pay')
async pay(@Param('id') id: string, @Headers('Idempotency-Key') idem?: string) {
return this.chargeInvoice.execute({ invoiceId: id, idempotencyKey: idem });
}
}
billing/adapter-rest/src/payments.webhook.controller.ts
@Controller('webhooks/payments')
export class PaymentsWebhookController {
constructor(private readonly confirm: ConfirmInvoicePaymentUseCase) {}
@Post()
async handle(@Body() body: { invoiceId: string; status: 'succeeded' | 'failed'; reason?: string }) {
const ok = body.status === 'succeeded';
await this.confirm.execute({ invoiceId: body.invoiceId, success: ok, reason: body.reason });
return { received: true };
}
}
billing/infra-payment-http/src/payment.http.adapter.ts
export class HttpPaymentAdapter implements PaymentPort {
constructor(private readonly http: HttpService) {}
async createPaymentIntent(params: { invoiceId: string; amount: number; currency: string; idempotencyKey?: string }) {
const resp = await this.http.post('/payment_intents', params, {
headers: { 'Idempotency-Key': params.idempotencyKey },
timeout: 1500,
}).then(r => r.data);
return { id: resp.id, status: resp.status, clientSecret: resp.client_secret };
}
async retrievePaymentIntent(id: string) {
const data = await this.http.get(`/payment_intents/${id}`).then(r => r.data);
return { id: data.id, status: data.status, clientSecret: data.client_secret };
}
}
billing/infra-payment-inmemory/src/payment.inmemory.adapter.ts(测试替身)
export class InMemoryPaymentAdapter implements PaymentPort {
private store = new Map<string, any>();
async createPaymentIntent(p) {
const id = 'pi_' + crypto.randomUUID();
const intent = { id, status: 'succeeded' as const, clientSecret: 'secret_' + id };
this.store.set(id, intent);
return intent;
}
async retrievePaymentIntent(id: string) {
return this.store.get(id);
}
}
九、Prisma/事务脚本与 UoW、幂等 platform/prisma/src/unit-of-work.ts
@Injectable()
export class UnitOfWork {
constructor(private readonly prisma: PrismaService) {}
async runInTransaction<T>(fn: (tx: Prisma.TransactionClient) => Promise<T>): Promise<T> {
return this.prisma.$transaction(async (tx) => fn(tx), { timeout: 5000, isolationLevel: 'Serializable' });
}
}
platform/prisma/src/idempotency.service.ts
@Injectable()
export class IdempotencyService {
constructor(private readonly prisma: PrismaService) {}
async withIdempotency<T>(scope: string, key: string | undefined, fn: () => Promise<T>): Promise<T> {
if (!key) return fn();
const started = await this.prisma.idempotencyKey.upsert({
where: { scope_key: { scope, key } },
update: {},
create: { scope, key, status: 'started' },
});
if (started.status === 'completed' && started.response) return started.response as T;
const result = await fn();
await this.prisma.idempotencyKey.update({
where: { scope_key: { scope, key } },
data: { status: 'completed', response: result as any },
});
return result;
}
}
十、中间件与异常映射 platform/middleware/src/auth.middleware.ts
@Injectable()
export class AuthMiddleware implements NestMiddleware {
use(req: any, res: any, next: () => void) {
// 简化:从 Header 注入 userId,真实场景解析 JWT
req.context = req.context || {};
req.context.userId = req.headers['x-user-id'] ?? null;
next();
}
}
platform/middleware/src/tenant.middleware.ts
export class TenantMiddleware implements NestMiddleware {
use(req: any, res: any, next: () => void) {
req.context = req.context || {};
req.context.tenantId = req.headers['x-tenant-id'] ?? 'default';
next();
}
}
platform/middleware/src/audit.interceptor.ts
@Injectable()
export class AuditInterceptor implements NestInterceptor {
intercept(ctx: ExecutionContext, next: CallHandler<any>): Observable<any> {
const req = ctx.switchToHttp().getRequest();
const start = Date.now();
return next.handle().pipe(tap(() => {
logger.info({ path: req.url, method: req.method, userId: req.context?.userId, ms: Date.now() - start });
}));
}
}
platform/middleware/src/http-exception.filter.ts
@Catch()
export class HttpErrorMappingFilter implements ExceptionFilter {
catch(err: any, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const res = ctx.getResponse<Response>();
if (err instanceof UseCaseError) {
const status = mapUseCaseCodeToStatus(err.code);
return res.status(status).json({ code: err.code, message: err.message });
}
// ZodError/class-validator...
return res.status(500).json({ code: 'INTERNAL_ERROR', message: 'Unexpected error' });
}
}
十一、依赖边界校验 .eslintrc.json(片段)
{
"overrides": [
{
"files": ["*.ts"],
"rules": {
"@nx/enforce-module-boundaries": [
"error",
{
"enforceBuildableLibDependency": true,
"allow": [],
"depConstraints": [
{ "sourceTag": "layer:framework", "onlyDependOnLibsWithTags": ["layer:adapter","layer:framework"] },
{ "sourceTag": "layer:adapter", "onlyDependOnLibsWithTags": ["layer:application","layer:domain","layer:adapter","scope:platform","scope:shared"] },
{ "sourceTag": "layer:application","onlyDependOnLibsWithTags": ["layer:domain","scope:shared"] },
{ "sourceTag": "layer:domain", "onlyDependOnLibsWithTags": ["scope:shared"] }
]
}
]
}
}
]
}
dependency-cruiser(.dependency-cruiser.js)(片段)
module.exports = {
forbidden: [
{ name: 'no-circular', severity: 'error', from: {}, to: { circular: true } },
{ name: 'no-layer-violation', severity: 'error',
from: { path: 'libs/.+', pathNot: '\\.spec\\.ts$',
pathNot: 'libs/clients/.+' },
to: { path: 'libs/.+', pathRegexNot: (from) => {
// 自定义:禁止从内层到外层、禁止跨 scope 直接 adapter->adapter
} }
},
],
};
建议用 tags 维持:项目创建 libs 时写入 project.json 的 tags,例如 ["scope:subscriptions","layer:application"]。
十二、OpenAPI 与客户端生成
十三、测试策略(>70% 覆盖)
十四、性能与可运维
十五、控制器与查询用例(示例) users/adapter-rest/src/users.controller.ts
@Controller('users')
export class UsersController {
constructor(private readonly queryBus: UsersQueryService) {}
@Get(':id/subscriptions')
async listSubscriptions(@Param('id') userId: string) {
return this.queryBus.listUserSubscriptions(userId);
}
}
UsersQueryService 可以位于 users/application(读模型),内部可直接用 prisma 只读查询或通过订阅仓储暴露的查询接口。查询属于 adapter/application 均可,但不要从 users 反向依赖 subscriptions。
十六、装配(AppModule) apps/api/src/app.module.ts(绑定端口到适配器实现)
@Module({
imports: [
// Adapter modules only. Application/Domain are libs providers.
],
providers: [
PrismaService, UnitOfWork, IdempotencyService, DomainEventsImpl, OutboxService,
// 配置:PaymentPort -> HttpPaymentAdapter 或 InMemoryPaymentAdapter
{ provide: PaymentPortToken, useClass: HttpPaymentAdapter },
// EmailPort -> SMTP/Mock
],
controllers: [
SubscriptionsController, InvoicesController, PaymentsWebhookController, UsersController
]
})
export class AppModule {}
十七、迁移与数据对齐
十八、两周落地计划(可运行骨架)
十九、价值回顾(针对你的痛点)
二十、典型请求流(概念示例)
如需,我可以进一步提供:
这套骨架能在两周内产出“可运行、可测、可扩展”的模块化单体,并清晰沉淀用户、账单、订阅边界,为后续独立部署与微服务演进做好准备。
下面给出一套可端到端运行的概念性实现,体现“零信任与多租户隔离优先”的架构原则,并与您的技术栈一致。内容包含:租户解析中间件、自动注入租户过滤的SQLAlchemy查询基类/Hook、PostgreSQL RLS策略迁移脚本、OPA策略示例与客户端封装、审计日志拦截器、Dashboard/Report API与跨租户只读管理端点、测试用例,以及Docker Compose用于一键启动。该方案分层落实以下要点:
目录结构(示例)
一、Docker Compose(Postgres + OPA + 应用) 文件:docker-compose.yml 内容:
二、OPA策略(ABAC,默认拒绝,策略热更新) 文件:opa/policy.rego 说明:
示例: package app
default allow := false
allow { some s s := input.subject.scopes[_] s == "platform:read" startswith(input.resource.path, "/analytics/admin/") input.resource.method == "GET" }
allow { input.resource.tenant_id == input.subject.tenant_id allow_by_labels allow_by_visibility action_allowed }
allow_by_labels { not input.subject.allowed_labels # 未配置则不限制 } else { some l l := input.subject.allowed_labels[] l == input.resource.labels[] }
allow_by_visibility { input.resource.type == "report" vis := input.resource.visibility vis == "tenant" or vis == "public" } else = true { input.resource.type == "dashboard" } # Dashboard默认允许同租户
action_allowed { a := input.action a == "read" or a == "list" } else {
a := input.action a == "create" or a == "update" or a == "delete" some r r := input.subject.roles[_] r == "tenant_admin" }
label_filter := lf { lf := {"enabled": true, "labels": input.subject.allowed_labels} }
三、应用配置与安全 文件:app/settings.py import os
class Settings: APP_NAME = "saas-analytics" JWT_ALG = os.getenv("JWT_ALG", "HS256") JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret") # 演示用,生产请用JWKS JWT_AUDIENCE = os.getenv("JWT_AUD", "saas-analytics") OPA_URL = os.getenv("OPA_URL", "http://opa:8181/v1/data/app/allow") OPA_LABELS_URL = os.getenv("OPA_LABELS_URL", "http://opa:8181/v1/data/app/label_filter") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://postgres:postgres@db:5432/saas") OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME", "saas-analytics") OTEL_EXPORTER_OTLP_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
settings = Settings()
文件:app/security.py import time from typing import Dict, Any, List import jwt from fastapi import HTTPException, status
def verify_and_parse_jwt(token: str, secret: str, algorithms: List[str], audience: str | None = None) -> Dict[str, Any]: try: options = {"verify_aud": bool(audience)} claims = jwt.decode( token, secret, algorithms=algorithms, audience=audience if audience else None, options=options, ) # 必要最小字段 for k in ["sub", "tenant_id", "scope", "exp", "iat"]: if k not in claims: raise ValueError(f"missing claim: {k}") if claims["exp"] < time.time(): raise ValueError("token expired") # 规范化 scopes = claims["scope"].split() if isinstance(claims["scope"], str) else claims["scope"] claims["scopes"] = scopes claims.setdefault("roles", []) claims.setdefault("allowed_labels", []) return claims except Exception as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid_token") from e
四、数据库与ORM(RLS、自动租户过滤、会话变量) 文件:app/db.py from sqlalchemy import create_engine, event, text from sqlalchemy.orm import sessionmaker, DeclarativeBase, Mapped, mapped_column from sqlalchemy import String from contextvars import ContextVar from app.settings import settings
current_tenant: ContextVar[str | None] = ContextVar("current_tenant", default=None) platform_read: ContextVar[bool] = ContextVar("platform_read", default=False)
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True, future=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
class Base(DeclarativeBase): pass
class TenantScopedMixin: tenant_id: Mapped[str] = mapped_column(String, index=True)
@event.listens_for(SessionLocal, "after_begin") def set_pg_vars(session, transaction, connection): tid = current_tenant.get() pr = platform_read.get() # SET LOCAL 在事务内有效 connection.exec_driver_sql("SELECT set_config('app.tenant_id', %s, true)", (str(tid) if tid else "",)) connection.exec_driver_sql("SELECT set_config('app.platform_read', %s, true)", ('1' if pr else '0',))
from sqlalchemy.orm import with_loader_criteria from sqlalchemy.sql import Select from sqlalchemy.orm import Session from sqlalchemy import false
@event.listens_for(SessionLocal, "do_orm_execute") def _add_tenant_filter(execute_state): # 仅对SELECT自动注入过滤;平台只读提升时跳过,由RLS控制 if execute_state.is_select and not platform_read.get(): tid = current_tenant.get() if not tid: # 无租户时构造永不返回 execute_state.statement = execute_state.statement.where(false()) return stmt = execute_state.statement execute_state.statement = stmt.options( with_loader_criteria(TenantScopedMixin, lambda cls: cls.tenant_id == tid, include_aliases=True) )
文件:app/models.py from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy import String, Integer, JSON, Text, TIMESTAMP, func from app.db import Base, TenantScopedMixin
class Tenant(Base): tablename = "tenants" id: Mapped[str] = mapped_column(String, primary_key=True) name: Mapped[str] = mapped_column(String, nullable=False)
class User(Base, TenantScopedMixin): tablename = "users" id: Mapped[str] = mapped_column(String, primary_key=True) role: Mapped[str] = mapped_column(String, nullable=False) # user | tenant_admin | auditor # tenant_id in mixin
class Dashboard(Base, TenantScopedMixin): tablename = "dashboards" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) labels: Mapped[list] = mapped_column(JSON, default=list) # 存储字符串数组
class Report(Base, TenantScopedMixin): tablename = "reports" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) visibility: Mapped[str] = mapped_column(String, default="tenant") # public | tenant | private
class AuditLog(Base): tablename = "audit_logs" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor: Mapped[str] = mapped_column(String) action: Mapped[str] = mapped_column(String) resource: Mapped[str] = mapped_column(String) tenant_id: Mapped[str] = mapped_column(String, index=True) at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) extra: Mapped[dict | None] = mapped_column(JSON, nullable=True)
文件:app/schemas.py from pydantic import BaseModel, Field from typing import List, Optional
class DashboardIn(BaseModel): labels: List[str] = Field(default_factory=list)
class DashboardOut(BaseModel): id: int tenant_id: str labels: List[str]
class ReportIn(BaseModel): visibility: str = "tenant"
class ReportOut(BaseModel): id: int tenant_id: str visibility: str
五、OPA客户端封装(REST决策API) 文件:app/opa_client.py import httpx from app.settings import settings from typing import Any, Dict
class OPAClient: def init(self, base_url: str = settings.OPA_URL, labels_url: str = settings.OPA_LABELS_URL): self.base_url = base_url self.labels_url = labels_url self._client = httpx.AsyncClient(timeout=0.2) # 限时,保障P95
async def allow(self, subject: Dict[str, Any], resource: Dict[str, Any], action: str, env: Dict[str, Any]) -> bool:
payload = {"input": {"subject": subject, "resource": resource, "action": action, "env": env}}
r = await self._client.post(self.base_url, json=payload)
r.raise_for_status()
return bool(r.json().get("result", False))
async def label_filter(self, subject: Dict[str, Any], resource_type: str) -> dict:
payload = {"input": {"subject": subject, "resource": {"type": resource_type}, "action": "list", "env": {}}}
r = await self._client.post(self.labels_url, json=payload)
r.raise_for_status()
return r.json().get("result", {"enabled": False})
opa_client = OPAClient()
六、中间件与依赖(JWT解析、上下文注入、审计拦截、DB Session) 文件:app/middleware.py import json import time from typing import Callable from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware from app.security import verify_and_parse_jwt from app.settings import settings from app.db import current_tenant, platform_read
class JWTContextMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: Callable): # 禁用X-Tenant-Id,记录告警(不作为信任依据) if "x-tenant-id" in request.headers: request.state.legacy_tenant_header_present = True # 解析Bearer auth = request.headers.get("authorization", "") if not auth.lower().startswith("bearer "): return Response(status_code=401, content="missing_bearer") token = auth.split(" ", 1)[1] claims = verify_and_parse_jwt(token, settings.JWT_SECRET, [settings.JWT_ALG], settings.JWT_AUDIENCE) # 注入上下文 token_t = current_tenant.set(claims["tenant_id"]) try: # 平台只读提升仅在admin路径且scope包含platform:read时启用 is_admin_path = request.url.path.startswith("/analytics/admin/") pr = is_admin_path and ("platform:read" in claims["scopes"]) and request.method.upper() == "GET" pr_token = platform_read.set(pr) request.state.claims = claims request.state.platform_read = pr response = await call_next(request) finally: current_tenant.reset(token_t) platform_read.reset(pr_token) return response
文件:app/deps.py from typing import Generator from sqlalchemy.orm import Session from fastapi import Depends from app.db import SessionLocal from fastapi import Request
def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db db.commit() except: db.rollback() raise finally: db.close()
def get_subject(request: Request): return request.state.claims
def is_platform_read(request: Request) -> bool: return bool(getattr(request.state, "platform_read", False))
文件:app/audit.py import json import logging from starlette.middleware.base import BaseHTTPMiddleware from fastapi import Request from app.db import SessionLocal from app.models import AuditLog
logger = logging.getLogger("audit")
class AuditMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start = request.headers.get("x-request-id", None) claims = getattr(request.state, "claims", None) response = await call_next(request) try: # 异步/后台批量更优,这里简单直写 db = SessionLocal() db.add(AuditLog( actor=claims["sub"] if claims else "anonymous", action=f"{request.method} {request.url.path}", resource=request.url.path, tenant_id=(claims["tenant_id"] if claims else ""), extra={ "status": response.status_code, "legacy_header": getattr(request.state, "legacy_tenant_header_present", False), "platform_read": getattr(request.state, "platform_read", False), } )) db.commit() db.close() except Exception as e: logger.exception("audit_log_failed") return response
七、API路由(Dashboard/Report + 管理端跨租户只读) 文件:app/routers/dashboards.py from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Dashboard from app.schemas import DashboardIn, DashboardOut from app.deps import get_db, get_subject from app.opa_client import opa_client
router = APIRouter(prefix="/dashboards", tags=["dashboards"])
@router.get("", response_model=list[DashboardOut]) async def list_dashboards(db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): # 调OPA获取label过滤建议 lf = await opa_client.label_filter(subject, "dashboard") stmt = select(Dashboard) if lf.get("enabled") and lf.get("labels"): # 简化:labels数组与subject.allowed_labels求交集(Postgres jsonb '?|' 操作),用text避免复杂表达式 labels = lf["labels"] stmt = stmt.where(Dashboard.labels.op("?|")(labels)) rows = db.execute(stmt).scalars().all() # ABAC逐条检查(保证默认拒绝) out = [] for d in rows: allowed = await opa_client.allow( subject, {"type": "dashboard", "tenant_id": d.tenant_id, "labels": d.labels, "path": str(request.url.path), "method": "GET"}, "read", {"ip": request.client.host if request.client else ""} ) if allowed: out.append(DashboardOut(id=d.id, tenant_id=d.tenant_id, labels=d.labels)) return out
@router.post("", response_model=DashboardOut) async def create_dashboard(payload: DashboardIn, db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): ok = await opa_client.allow( subject, {"type": "dashboard", "tenant_id": subject["tenant_id"], "labels": payload.labels, "path": str(request.url.path), "method": "POST"}, "create", {} ) if not ok: raise HTTPException(403, "forbidden") d = Dashboard(tenant_id=subject["tenant_id"], labels=payload.labels) db.add(d) db.flush() return DashboardOut(id=d.id, tenant_id=d.tenant_id, labels=d.labels)
文件:app/routers/reports.py from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Report from app.schemas import ReportIn, ReportOut from app.deps import get_db, get_subject from app.opa_client import opa_client
router = APIRouter(prefix="/reports", tags=["reports"])
@router.get("", response_model=list[ReportOut]) async def list_reports(db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): rows = db.execute(select(Report)).scalars().all() out = [] for r in rows: ok = await opa_client.allow( subject, {"type": "report", "tenant_id": r.tenant_id, "visibility": r.visibility, "path": str(request.url.path), "method": "GET"}, "read", {} ) if ok: out.append(ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility)) return out
@router.post("", response_model=ReportOut) async def create_report(payload: ReportIn, db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): ok = await opa_client.allow( subject, {"type": "report", "tenant_id": subject["tenant_id"], "visibility": payload.visibility, "path": str(request.url.path), "method": "POST"}, "create", {} ) if not ok: raise HTTPException(403, "forbidden") r = Report(tenant_id=subject["tenant_id"], visibility=payload.visibility) db.add(r) db.flush() return ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility)
文件:app/routers/admin.py(跨租户只读) from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Report from app.deps import get_db, get_subject, is_platform_read from app.opa_client import opa_client from app.schemas import ReportOut
router = APIRouter(prefix="/analytics/admin", tags=["admin"])
@router.get("/reports", response_model=list[ReportOut]) async def list_reports_all(db: Session = Depends(get_db), subject=Depends(get_subject), pr: bool = Depends(is_platform_read), request: Request = None): # 平台只读由JWT中scope + 中间件设置platform_read + RLS控制。这里再做OPA显式授权。 if not pr: raise HTTPException(403, "forbidden") ok = await opa_client.allow( subject, {"type": "report", "tenant_id": "*", "path": str(request.url.path), "method": "GET"}, "read", {} ) if not ok: raise HTTPException(403, "forbidden") rows = db.execute(select(Report)).scalars().all() # ORM不再添加租户过滤,依赖RLS允许SELECT跨租户 return [ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility) for r in rows]
八、应用主入口与可观测性 文件:app/main.py import logging, sys from fastapi import FastAPI from app.middleware import JWTContextMiddleware from app.audit import AuditMiddleware from app.routers import dashboards, reports, admin from app.settings import settings
handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('{"ts":"%(asctime)s","level":"%(levelname)s","logger":"%(name)s","msg":"%(message)s"}') handler.setFormatter(formatter) root = logging.getLogger() root.setLevel(settings.LOG_LEVEL) root.handlers = [handler]
app = FastAPI(title=settings.APP_NAME)
app.add_middleware(JWTContextMiddleware) app.add_middleware(AuditMiddleware)
app.include_router(dashboards.router) app.include_router(reports.router) app.include_router(admin.router)
try: from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from app.db import engine FastAPIInstrumentor.instrument_app(app) SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine) HTTPXClientInstrumentor().instrument() except Exception: pass
九、Alembic迁移(RLS与策略) 文件:alembic/versions/20250101_enable_rls.py from alembic import op import sqlalchemy as sa
revision = "20250101_enable_rls" down_revision = None branch_labels = None depends_on = None
def upgrade(): # 基础表(简化) op.execute(""" CREATE TABLE IF NOT EXISTS tenants(id text primary key, name text not null); CREATE TABLE IF NOT EXISTS users(id text primary key, tenant_id text not null, role text not null); CREATE INDEX IF NOT EXISTS idx_users_tenant ON users(tenant_id); CREATE TABLE IF NOT EXISTS dashboards(id serial primary key, tenant_id text not null, labels jsonb default '[]'::jsonb); CREATE INDEX IF NOT EXISTS idx_dashboards_tenant ON dashboards(tenant_id); CREATE TABLE IF NOT EXISTS reports(id serial primary key, tenant_id text not null, visibility text default 'tenant'); CREATE INDEX IF NOT EXISTS idx_reports_tenant ON reports(tenant_id); CREATE TABLE IF NOT EXISTS audit_logs(id serial primary key, actor text, action text, resource text, tenant_id text, at timestamptz default now(), extra jsonb); """)
# 启用RLS
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
op.execute(f"ALTER TABLE {tbl} ENABLE ROW LEVEL SECURITY")
op.execute(f"ALTER TABLE {tbl} FORCE ROW LEVEL SECURITY")
# 统一默认拒绝,不创建默认allow策略
# 同租户策略(读/写)
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
op.execute(f"""
DROP POLICY IF EXISTS {tbl}_tenant_select ON {tbl};
CREATE POLICY {tbl}_tenant_select ON {tbl} FOR SELECT
USING (tenant_id::text = current_setting('app.tenant_id', true));
""") if tbl != "tenants" else op.execute("""
DROP POLICY IF EXISTS tenants_tenant_select ON tenants;
CREATE POLICY tenants_tenant_select ON tenants FOR SELECT
USING (id::text = current_setting('app.tenant_id', true));
""")
for tbl in ["users","dashboards","reports","audit_logs"]:
op.execute(f"""
DROP POLICY IF EXISTS {tbl}_tenant_modify ON {tbl};
CREATE POLICY {tbl}_tenant_modify ON {tbl} FOR ALL
USING (tenant_id::text = current_setting('app.tenant_id', true))
WITH CHECK (tenant_id::text = current_setting('app.tenant_id', true));
""")
# 平台跨租户只读(仅SELECT)
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
op.execute(f"""
DROP POLICY IF EXISTS {tbl}_platform_read ON {tbl};
CREATE POLICY {tbl}_platform_read ON {tbl} FOR SELECT
USING (current_setting('app.platform_read', true) = '1');
""")
def downgrade(): for tbl in ["users","dashboards","reports","audit_logs","tenants"]: op.execute(f"ALTER TABLE {tbl} DISABLE ROW LEVEL SECURITY") # 可选:删除策略
说明:
十、测试用例(关键路径) 文件:tests/test_multi_tenant.py import jwt, time from fastapi.testclient import TestClient from app.main import app from app.settings import settings from app.db import SessionLocal from app.models import Dashboard, Report
def make_token(tenant_id: str, scopes: list[str], roles: list[str]=None, allowed_labels: list[str]=None): now = int(time.time()) payload = { "sub": f"user-{tenant_id}", "tenant_id": tenant_id, "scope": " ".join(scopes), "roles": roles or ["user"], "allowed_labels": allowed_labels or [], "iat": now, "exp": now + 300, "aud": settings.JWT_AUDIENCE } return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALG)
client = TestClient(app)
def seed(): db = SessionLocal() db.add_all([ Dashboard(tenant_id="t1", labels=["finance"]), Dashboard(tenant_id="t1", labels=["sales"]), Dashboard(tenant_id="t2", labels=["finance"]), ]) db.add_all([ Report(tenant_id="t1", visibility="tenant"), Report(tenant_id="t2", visibility="public"), ]) db.commit() db.close()
def test_tenant_isolation(): seed() tok_t1 = make_token("t1", scopes=["openid"]) r = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t1}"}) assert r.status_code == 200 assert all(d["tenant_id"] == "t1" for d in r.json())
# t1不可越权到t2
tok_t2 = make_token("t2", scopes=["openid"])
r2 = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t2}"})
assert r2.status_code == 200
assert all(d["tenant_id"] == "t2" for d in r2.json())
def test_platform_read_cross_tenant(): tok_platform = make_token("t0", scopes=["platform:read"]) r = client.get("/analytics/admin/reports", headers={"Authorization": f"Bearer {tok_platform}"}) assert r.status_code == 200 # 跨租户两条都可见(RLS platform_read允许SELECT) assert len(r.json()) >= 2
def test_legacy_header_ignored(): tok_t1 = make_token("t1", scopes=["openid"]) # 强制带错的X-Tenant-Id,不应被使用 r = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t1}", "X-Tenant-Id": "t2"}) assert r.status_code == 200 assert all(d["tenant_id"] == "t1" for d in r.json())
十一、启动方式
十二、关键设计点与实践价值
十三、迁移与上线顺序(与需求一致)
说明与扩展
至此,完整链路已满足:JWT为唯一信任根;ABAC默认拒绝;RLS强隔离与平台受控只读;应用层双重保障;热更新策略;全链路审计与可观测。
帮助用户将复杂的软件架构原则或模式转化为可操作的解决方案,以便更高效、更专业地优化软件开发项目,解决特定场景中的实际挑战。