×
¥
查看详情
🔥 会员专享 文生代码 其它

架构原则应用指导

👁️ 577 次查看
📅 Nov 19, 2025
💡 核心价值: 本提示词根据用户提供的架构原则、项目框架与实际挑战,生成清晰的架构应用指导,包括原理解析、项目情境分析与概念性示例。所有内容均基于用户显式输入,确保可控、可复现,不依赖外部推断。适用于系统设计、架构优化、技术评审等场景,帮助开发者理解如何将架构思想有效落地到项目实践中。

🎯 可自定义参数(4个)

架构原则
架构原则说明
项目框架与技术栈
项目框架与技术栈描述
实际项目挑战
实际项目挑战描述
项目背景与当前结构
项目背景与当前结构描述

🎨 效果示例

下面给出一套可落地的DDD+CQRS+Saga+Outbox设计与脚手架骨架,贴合你现状(单体、表结构、接口)与技术栈(Java 21, Spring Boot 3.3, Spring Cloud, JPA 写模型, MyBatis-Plus 读模型, Kafka 3.x, Debezium Outbox, MySQL 8, Redis 7, Flyway, MapStruct, Testcontainers, OpenAPI Generator, OpenTelemetry, Kubernetes/Helm),目标是迭代式拆分为订单/库存微服务、在促销高峰抑制超卖与死锁、读写分离并保证3秒内一致性收敛与P95<300ms。

一、总体架构与迁移路径

  • 限界上下文
    • Order BC:订单聚合(Order为聚合根,OrderItem为值对象/实体),写模型用JPA;读模型MyBatis-Plus+Redis物化视图。
    • Inventory BC:SkuInventory聚合(库存与Reservation),写模型JPA,读模型可选。
    • Payment BC:预授权(可先Stub,事件驱动接入)。
  • 交互与一致性
    • CQRS:写模型围绕聚合+命令/事件;读模型物化视图(order_view, order_item_view)+Redis,领域事件异步驱动更新。
    • Saga(编排):Order Orchestrator负责编排库存预留->支付预授权->确认/取消。
    • Outbox:两端写库均使用Outbox表,Debezium Outbox Connector推送到Kafka,避免双写。
    • 事件命名用过去式:OrderCreated, StockReserved, PaymentAuthorized...
    • 事件携带幂等键(eventId, source, idempotencyKey),消费端去重并保证同聚合同分区有序与可重放。
    • 并发控制:乐观锁+version(orders.version, inventory.version)。
    • API契约OpenAPI优先,代码由OpenAPI Generator生成,禁止跨库访问。
  • 迁移步骤
    • 迭代1(不停服):在单体中引入Outbox与领域事件(仅改写侧),保持原GET读旧库;POST /orders改为异步受理(返回PENDING,<300ms),本地内的库存预留仍可调用模块方法或经REST(同进程/同库期间仍有本地事务,但通过对库存更新引入乐观锁消除死锁)。引入幂等、版本号、Trace。
    • 迭代2:拆分为order-service与inventory-service;启用Debezium Outbox->Kafka;用事件驱动物化视图与Redis;读侧从MyBatis-Plus读新读库,缓存收敛≤3s;切断跨库访问与本地事务耦合。
    • 之后:引入payment-service或支付预授权回调,完善全链路Saga。
  • SLO与性能
    • 下单接口返回立即受理状态(PENDING/ AWAITING_STOCK),P95<300ms;
    • 读一致性:事件传播+读侧更新+缓存更新流水线在3秒内收敛;
    • 可用性99.9%:消费者幂等+有序+重放,服务可独立扩缩容。

二、目录与包结构(多模块)

  • repo
    • order-service
      • interfaces (REST, OpenAPI, IdempotencyInterceptor)
      • application (CommandHandler, QueryHandler, SagaOrchestrator)
      • domain (aggregate, commands, events, repository)
      • infrastructure
        • persistence (JPA entities, Spring Data Repos, Flyway)
        • outbox (entity, mapper, publisher config for Debezium Outbox)
        • messaging (Kafka consumers/producers for cross-context)
        • mapping (MapStruct)
        • observability (OpenTelemetry)
    • inventory-service(同上布局)
    • order-query-service(读侧构建与API,MyBatis-Plus+Redis)
    • contracts
      • openapi (order.yaml, inventory.yaml)
      • events (JSON Schema/Avro或CloudEvents规范)
    • helm-charts (order, inventory, query)
    • integration-tests (Testcontainers 场景)

三、领域模型与命令事件(示例骨架)

  1. Order聚合根(写模型,JPA)
  • 状态:PENDING -> AWAITING_STOCK -> STOCK_RESERVED -> PAYMENT_AUTHORIZED -> CONFIRMED;失败流转:CANCELED/FAILED
  • 聚合不直接依赖外部服务,仅产生领域事件

示例代码(节选): public enum OrderStatus { PENDING, AWAITING_STOCK, STOCK_RESERVED, PAYMENT_AUTHORIZED, CONFIRMED, CANCELED, FAILED }

@Entity @Table(name = "orders") public class OrderAggregate { @Id private Long id; private Long userId; private BigDecimal total; @Enumerated(EnumType.STRING) private OrderStatus status; @Version private Long version;

@OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.EAGER) private List items = new ArrayList<>();

// domain behaviors public List placeOrder(PlaceOrderCommand cmd) { Preconditions.checkArgument(status == null || status == OrderStatus.PENDING); this.id = cmd.getOrderId(); this.userId = cmd.getUserId(); this.total = cmd.getTotal(); this.status = OrderStatus.AWAITING_STOCK; this.items.clear(); this.items.addAll(OrderItemEntity.from(cmd.getItems(), this)); return List.of(new OrderCreated(cmd.getOrderId(), cmd.getUserId(), cmd.getItems(), cmd.getIdempotencyKey())); }

public List markStockReserved(String reservationId) { if (this.status != OrderStatus.AWAITING_STOCK) return List.of(); // idempotent this.status = OrderStatus.STOCK_RESERVED; return List.of(new StockReserved(this.id, reservationId)); }

public List markStockReservationFailed(String reason) { if (this.status == OrderStatus.CANCELED || this.status == OrderStatus.FAILED) return List.of(); this.status = OrderStatus.FAILED; return List.of(new StockReservationFailed(this.id, reason)); }

public List markPaymentAuthorized(String authId) { if (this.status != OrderStatus.STOCK_RESERVED) return List.of(); this.status = OrderStatus.PAYMENT_AUTHORIZED; return List.of(new PaymentAuthorized(this.id, authId)); }

public List confirm() { if (this.status != OrderStatus.PAYMENT_AUTHORIZED) return List.of(); this.status = OrderStatus.CONFIRMED; return List.of(new OrderConfirmed(this.id)); }

public List cancel(String reason) { if (this.status == OrderStatus.CONFIRMED) return List.of(); // business rule this.status = OrderStatus.CANCELED; return List.of(new OrderCanceled(this.id, reason)); } }

@Embeddable public class OrderItem { private Long skuId; private Integer qty; private BigDecimal price; }

  1. 命令与事件(动词过去式) public record PlaceOrderCommand(Long orderId, Long userId, List items, BigDecimal total, String idempotencyKey) implements Command {}

public interface DomainEvent { String eventId(); // UUID String source(); // "order-service" String type(); // e.g., "OrderCreated" Instant occurredAt(); String idempotencyKey(); }

public record OrderCreated(Long orderId, Long userId, List items, String idempotencyKey) implements DomainEvent { /* eventId()/source()/type()/occurredAt() */ }

public record StockReservationRequested(Long orderId, List items, String idempotencyKey) implements DomainEvent {} public record StockReserved(Long orderId, String reservationId) implements DomainEvent {} public record StockReservationFailed(Long orderId, String reason) implements DomainEvent {} public record PaymentAuthorizationRequested(Long orderId, BigDecimal amount) implements DomainEvent {} public record PaymentAuthorized(Long orderId, String authId) implements DomainEvent {} public record PaymentAuthorizationFailed(Long orderId, String reason) implements DomainEvent {} public record OrderConfirmed(Long orderId) implements DomainEvent {} public record OrderCanceled(Long orderId, String reason) implements DomainEvent {}

  1. CommandHandler(写端)与Outbox @Service public class OrderCommandHandler { private final OrderRepository orderRepo; private final OutboxAppender outbox; // append inside tx

@Transactional public OrderId handle(PlaceOrderCommand cmd) { if (!outbox.tryStartIdempotent("PlaceOrder", cmd.idempotencyKey())) { return new OrderId(outbox.lookupEntityIdByKey(cmd.idempotencyKey())); } var order = new OrderAggregate(); var events = order.placeOrder(cmd); orderRepo.save(order); outbox.appendAll(events, "orders", order.getId().toString()); // topic, key return new OrderId(order.getId()); }

@Transactional public void handleStockReserved(Long orderId, String reservationId, String eventId) { if (!outbox.tryStartIdempotent("StockReserved", eventId)) return; var order = orderRepo.getByIdForUpdate(orderId); // or findById + optimistic lock var events = order.markStockReserved(reservationId); orderRepo.save(order); events.forEach(e -> outbox.append(e, "payments", orderId.toString())); } }

  1. Saga编排器(在Order服务)
  • 规则:OrderCreated -> 发起StockReservationRequested
  • on StockReserved -> 发起PaymentAuthorizationRequested
  • on PaymentAuthorized -> Confirm
  • on StockReservationFailed/PaymentAuthorizationFailed -> Cancel + ReleaseStock

@Component public class OrderSagaOrchestrator { private final OutboxAppender outbox;

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onOrderCreated(OrderCreated e) { outbox.append(new StockReservationRequested(e.orderId(), e.items(), e.idempotencyKey()), "inventory", e.orderId().toString()); }

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onStockReserved(StockReserved e) { outbox.append(new PaymentAuthorizationRequested(e.orderId(), /amount/ null), "payment", e.orderId().toString()); }

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onPaymentAuthorized(PaymentAuthorized e) { outbox.append(new OrderConfirmed(e.orderId()), "orders", e.orderId().toString()); } }

四、Inventory聚合与并发控制

  1. SkuInventory聚合 @Entity @Table(name = "inventory") public class SkuInventory { @Id private Long skuId; private Integer stock; private Integer reserved; @Version private Long version;

public List reserve(Long orderId, int qty, String idemKey) { if (stock - reserved < qty) { return List.of(new StockReservationFailed(orderId, "INSUFFICIENT")); } this.reserved += qty; return List.of(new StockReserved(orderId, UUID.randomUUID().toString())); }

public List release(Long orderId, int qty) { this.reserved = Math.max(0, this.reserved - qty); return List.of(new StockReleased(orderId)); } }

  • 可选新增表 inventory_reservations(order_id, sku_id, qty, status, version) 以便按订单粒度释放,防止超卖与并发竞争更清晰。保留inventory表的stock/reserved为聚合摘要。
  1. Inventory命令处理与幂等 @Service public class InventoryCommandHandler { private final SkuInventoryRepository repo; private final ReservationRepository resRepo; private final OutboxAppender outbox;

@Transactional public void handle(ReserveStockCommand cmd) { if (resRepo.existsByIdempotencyKey(cmd.idempotencyKey())) return; // idempotent var inv = repo.findById(cmd.getSkuId()).orElseThrow(); var events = inv.reserve(cmd.getOrderId(), cmd.getQty(), cmd.getIdempotencyKey()); repo.save(inv); // optimistic lock; retry on OptimisticLockException outbox.appendAll(events, "orders", cmd.getOrderId().toString()); } }

五、Outbox表与Debezium

  1. Outbox表(通用)
  • 表:outbox_event(id PK, aggregate_id, aggregate_type, type, payload, headers, occurred_at, idempotency_key, partition_key, published_at nullable)
  • Debezium Outbox Router根据type或headers路由到Kafka主题(inventory, orders, payments等)。Kafka Key使用partition_key确保同聚合同分区。

Flyway示例: -- V1__orders_and_items.sql 现有表增加必要索引/约束 ALTER TABLE orders ADD COLUMN version BIGINT NOT NULL DEFAULT 0; CREATE INDEX idx_orders_user ON orders(user_id); ALTER TABLE order_items ADD PRIMARY KEY(order_id, sku_id);

-- V2__outbox.sql CREATE TABLE outbox_event ( id BIGINT PRIMARY KEY AUTO_INCREMENT, aggregate_id VARCHAR(64) NOT NULL, aggregate_type VARCHAR(64) NOT NULL, type VARCHAR(128) NOT NULL, payload JSON NOT NULL, headers JSON NULL, occurred_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), idempotency_key VARCHAR(128) NOT NULL, partition_key VARCHAR(128) NOT NULL, UNIQUE KEY uk_idem (aggregate_type, idempotency_key) );

-- V3__inventory_reservations.sql(推荐) CREATE TABLE inventory_reservations ( order_id BIGINT NOT NULL, sku_id BIGINT NOT NULL, qty INT NOT NULL, status VARCHAR(32) NOT NULL, version BIGINT NOT NULL DEFAULT 0, idempotency_key VARCHAR(128) NOT NULL, PRIMARY KEY(order_id, sku_id), UNIQUE KEY uk_res_idem (idempotency_key) );

  1. Debezium Connector(概念配置)
  • database.history.kafka.bootstrap.servers
  • transforms=outbox
  • transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
  • transforms.outbox.table.fields.additional.placement=idempotency_key:header,aggregate_type:header
  • transforms.outbox.route.by.field=type
  • route.topic.replacement=${routedTopic} 例如 type=OrderCreated -> topic=orders.order-created
  • key.field=partition_key

六、Kafka主题与消费者

  • 主题划分

    • orders.order-created, orders.order-confirmed...
    • inventory.stock-reserved, inventory.stock-reservation-failed...
  • 分区与Key

    • Key=orderId(订单相关事件)或skuId(库存内部),确保同聚合同分区有序。
  • 消费者(有序、可重放、幂等) @Component public class InventoryEventConsumer { private final InventoryCommandHandler handler; private final ProcessedEventStore processed; // 去重

    @KafkaListener(topics = "inventory.stock-reserved", containerFactory = "kafkaContainerFactory") public void onStockReserved(ConsumerRecord<String, String> rec) { var event = parse(rec.value(), StockReserved.class); if (processed.alreadyProcessed(event.eventId())) return; try { handler.handleOrderSideEffect(event); // e.g., markStockReserved command processed.markProcessed(event.eventId()); } catch (TransientException ex) { throw ex; // 让Kafka重试 } } }

  • 去重存储可用Redis Set或MySQL processed_event(event_id PK, processed_at),保留7-14天TTL。

  • 消费者并发=分区数;保证顺序:每分区单线程。可用Spring Kafka的Concurrency=分区数。

七、读模型(MyBatis-Plus + Redis)

  1. 物化视图表 -- V4__read_model.sql CREATE TABLE order_view ( id BIGINT PRIMARY KEY, user_id BIGINT NOT NULL, status VARCHAR(32) NOT NULL, total DECIMAL(18,2) NOT NULL, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ); CREATE TABLE order_item_view ( order_id BIGINT NOT NULL, sku_id BIGINT NOT NULL, qty INT NOT NULL, price DECIMAL(18,2) NOT NULL, PRIMARY KEY(order_id, sku_id) );

  2. 读侧消费者

  • 订阅orders.*事件,按事件类型Upsert物化视图,随后更新Redis。
  • Cache策略:cache-aside,写入后置(事件驱动更新),TTL 5-15分钟,关键路径读先读缓存,未命中查读库并回填。 @Mapper public interface OrderViewMapper extends BaseMapper {}

@Service public class OrderProjectionHandler { @Autowired private OrderViewMapper viewMapper; @Autowired private RedisTemplate<String, String> redis;

@Transactional public void apply(OrderCreated e) { viewMapper.insert(new OrderViewDO(e.orderId(), e.userId(), "AWAITING_STOCK", ...)); cachePut(e.orderId()); }

public OrderDTO getById(Long id) { var key = "order:" + id; var json = redis.opsForValue().get(key); if (json != null) return deserialize(json); var view = viewMapper.selectById(id); if (view == null) return null; cachePut(id); return map(view); }

private void cachePut(Long id) { var view = viewMapper.selectById(id); redis.opsForValue().set("order:" + id, serialize(map(view)), Duration.ofMinutes(10)); } }

  • 读流量80%:将GET /orders/{id}与列表查询全部走读服务+缓存。列表页可以按user_id维度预构建order_list_view,或用Redis Sorted Set按时间维度加速。

八、接口契约与代码生成

  1. Order OpenAPI(片段) openapi: 3.0.3 info: { title: Order API, version: 1.0.0 } paths: /orders: post: operationId: placeOrder parameters: - in: header name: Idempotency-Key required: true schema: { type: string } requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/PlaceOrderRequest' responses: '202': description: Accepted content: application/json: schema: { $ref: '#/components/schemas/PlaceOrderResponse' } /orders/{id}: get: operationId: getOrder parameters: - in: path name: id required: true schema: { type: integer, format: int64 } responses: '200': description: OK components: schemas: PlaceOrderRequest: type: object required: [userId, items] properties: userId: { type: integer, format: int64 } items: type: array items: type: object required: [skuId, qty, price] properties: skuId: { type: integer, format: int64 } qty: { type: integer } price: { type: number, format: double } PlaceOrderResponse: type: object properties: orderId: { type: integer, format: int64 } status: { type: string, enum: [PENDING, AWAITING_STOCK, ...] }
  • OpenAPI Generator(Maven/Gradle)生成Controller接口与客户端SDK,保证契约一致。
  • Inventory API在过渡期保留POST /inventory/reserve(同步),最终以事件为主,API用于手动修正与管理面。

九、接口层与幂等拦截器

  • REST层拦截器:读取Idempotency-Key,结合方法+请求体hash生成幂等记录,利用Redis或表 idempotency_record(method, key, request_hash, response, ttl),重复请求直接返回首次响应。 @Component public class IdempotencyInterceptor implements HandlerInterceptor { @Autowired private IdempotencyStore store; @Override public boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler) { var key = req.getHeader("Idempotency-Key"); if (key == null) return true; var sig = buildSignature(req); var cached = store.find(sig); if (cached != null) { writeResponse(res, cached); return false; } req.setAttribute("idemSig", sig); return true; } @Override public void postHandle(HttpServletRequest req, HttpServletResponse res, Object handler, ModelAndView mv) { var sig = (String) req.getAttribute("idemSig"); if (sig != null && res.getStatus() < 500) store.save(sig, captureResponse(res)); } }

十、性能与并发要点

  • 乐观锁:JPA @Version;对库存预留采用 update ... where sku_id=? and version=?,失败重试带指数退避与随机抖动。
  • 死锁规避:拆散大事务,单聚合内更新;跨聚合交由Saga+事件;避免在库存与订单同一事务下更新。
  • Kafka分区:按orderId分区的订单相关事件确保有序;库存内部按skuId分区。
  • P95<300ms:POST /orders返回202 Accepted,生成订单与OrderCreated事件入Outbox(同事务),响应不等待库存/支付;同步路径仅包含JPA持久化与Outbox insert。
  • 收敛≤3秒:Kafka分区数=CPU核数*2,消费者并发匹配,读侧更新与缓存刷新走批量与幂等处理;监控滞后指标。

十一、测试与可复现性

  • Testcontainers:MySQL、Kafka、Redis、Debezium Connector容器化,跑端到端测试(下单->库存预留->支付->确认->读侧同步)。
  • 合约测试:OpenAPI生成的客户端配合MockServer或WireMock;消费者用Spring Kafka的Embedded Kafka或Testcontainers Kafka。
  • 数据迁移:Flyway baselineOnMigrate,灰度阶段在order-service与inventory-service上逐步放量。

十二、可观测性与运维

  • OpenTelemetry:HTTP、JPA、Kafka自动探针,traceId贯穿日志;日志字段统一:trace_id, span_id, order_id, event_id。
  • 配置中心与灰度:按环境values-{dev,staging,prod}.yaml(Helm);Kafka消费组灰度(两个消费组并行,topic版本化或事件schema兼容)。
  • 限流与降级:下单限流保护库存;库存预留失败时快速失败并可回退到等待补货策略。

十三、控制器示例与客户端调用 @RestController @RequestMapping("/orders") @RequiredArgsConstructor public class OrderController { private final OrderCommandHandler handler;

@PostMapping public ResponseEntity place(@RequestHeader("Idempotency-Key") String idem, @RequestBody PlaceOrderRequest req) { var cmd = new PlaceOrderCommand(generateId(), req.getUserId(), map(req.getItems()), req.getTotal(), idem); var orderId = handler.handle(cmd); return ResponseEntity.accepted().body(new PlaceOrderResponse(orderId.value(), "AWAITING_STOCK")); }

@GetMapping("/{id}") public ResponseEntity get(@PathVariable Long id) { // 读侧服务中实现:从Redis->读库->回填 throw new UnsupportedOperationException(); } }

  • 客户端代码(OpenAPI Generator生成的ApiClient) var api = new OrderApi(); api.getApiClient().addDefaultHeader("Idempotency-Key", UUID.randomUUID().toString()); var resp = api.placeOrder(new PlaceOrderRequest(userId, items, total)); // resp.status == AWAITING_STOCK

十四、读侧与缓存的一致性策略

  • 以事件为准,读侧Upsert后再写入Redis;同一orderId同一个分区可避免乱序;如发生乱序,在事件中携带version或event sequence,消费端校验version递增,不满足则暂存重试。
  • 支持补偿重放:从某时点起按topic/partition/offset重放以修复读库与缓存;幂等保证可重复应用。

十五、与现有API的过渡

  • 迭代1保留POST /inventory/reserve供同步预留路径(在单体中仍本地调用),同时发出StockReservationRequested事件;切到微服务后关闭同步调用,完全依赖事件。
  • 读侧在迭代1仍查旧库;迭代2上线order-query-service并切换GET流量。

十六、避免超卖与关键细节

  • 预留写法:update inventory set reserved = reserved + :qty where sku_id = :skuId and stock - reserved >= :qty and version = :version
  • 每个order+sku存reservation记录,确保释放可追踪;取消订单或支付失败时通过事件ReleaseStock执行回滚。
  • 幂等键:下单用请求头Idempotency-Key;库存预留命令也传递此键;事件eventId=UUID整体去重。

总结价值

  • 通过DDD拆分与CQRS,解耦订单与库存写模型,读多写少场景下以物化视图+缓存承载80%读流量;
  • Saga+Outbox消除跨服务分布式事务,确保一致性可收敛、可重放与观测;
  • 乐观锁+版本号避免死锁;事件有序与幂等确保促销高峰无超卖;
  • OpenAPI优先与代码生成、Testcontainers搭配可复现集成测试,加速5人团队交付;
  • 迭代式迁移保证不停服,逐步把读流量切换到读侧与缓存,满足P95<300ms与一致性≤3s的SLO。

下面给出一个可在两周内落地的“模块化单体 + Clean Architecture”骨架方案,面向你的技术栈与现状问题,包含目录布局、边界与依赖约束、典型用例、端口/适配器、事务与幂等、测试与观测。目标是让控制器“瘦身”、打掉跨模块 import、让用例可单测、外部支付可替身测试,并为后续独立部署(演进为多个进程或服务)保留路径。

一、架构映射与依赖规则

  • Clean Architecture 层次(外向内依赖):Framework(框架) → Adapters(适配器:REST/队列/DB 实现) → Application(用例/端口/仓储接口) → Domain(实体/值对象/领域事件类型)
  • 每个能力域模块化:users、subscriptions、billing(invoices 隶属 billing)
  • 仅暴露:用例接口与领域对象(DTO/Ports/Events)
  • 外部系统(支付/邮件/计费网关等)通过端口(Ports)定义;提供 InMemory 和 HTTP 适配器
  • 领域事件采用“轻量同步事件 + 事务脚本 + outbox 异步补偿”混合
  • 通过 Nx tags + ESLint enforce-module-boundaries + dependency-cruiser 双重静态检查,阻断跨模块耦合
  • 统一错误与观测:DomainError → HTTP/队列错误映射;请求日志、队列指标

二、Nx 工作区与目录布局(模块化单体) 建议 Nx libs 以 scope 与 layer 打标签,apps 只做装配/部署:

  • apps/
    • api/ (Nest 应用/框架层)
      • src/main.ts
      • src/app.module.ts(只组装,不写业务)
      • src/bootstrap/observability.ts(日志/指标)
      • src/bootstrap/swagger.ts
      • src/bootstrap/bullmq.ts
  • libs/platform/
    • prisma/ (infra)
      • src/prisma.service.ts
      • src/unit-of-work.ts
      • src/idempotency.service.ts
      • src/outbox.service.ts
      • prisma/schema.prisma
    • middleware/ (framework)
      • src/auth.middleware.ts
      • src/tenant.middleware.ts
      • src/audit.interceptor.ts
      • src/http-exception.filter.ts
    • messaging/ (adapters for events/queues)
      • src/domain-events.ts(内存事件总线)
      • src/outbox.processor.ts(BullMQ 处理器)
    • observability/
      • src/logger.ts
      • src/metrics.ts
  • libs/shared-kernel/ (domain)
    • src/errors.ts(DomainError/UseCaseError)
    • src/result.ts(Result/Either)
    • src/types.ts(Brand/ID/ValueObject)
    • src/validation.ts(Zod helper + Nest ZodPipe)
  • libs/users/
    • domain/ (domain)
      • src/user.entity.ts
    • application/ (application)
      • src/ports/user.repo.port.ts
      • src/usecases/get-user-subscriptions.query.ts(只定义接口,避免向 subscriptions 反向依赖)
    • adapter-rest/ (adapters)
      • src/users.controller.ts(GET /users/:id/subscriptions 走 query service 或事件/视图模型)
    • infra-prisma/ (adapters/infra)
      • src/user.repo.prisma.ts
  • libs/subscriptions/
    • domain/
      • src/subscription.entity.ts
      • src/events/subscription.events.ts(SubscriptionCreated/Canceled)
    • application/
      • src/ports/subscription.repo.port.ts
      • src/ports/email.port.ts
      • src/usecases/create-subscription.usecase.ts
      • src/usecases/cancel-subscription.usecase.ts
    • adapter-rest/
      • src/subscriptions.controller.ts(POST /subscriptions)
    • adapter-handlers/ (adapters)
      • src/subscription.event.handlers.ts(监听事件做本模块内副作用)
    • infra-prisma/
      • src/subscription.repo.prisma.ts
  • libs/billing/
    • domain/
      • src/invoice.entity.ts
      • src/events/invoice.events.ts
    • application/
      • src/ports/invoice.repo.port.ts
      • src/ports/payment.port.ts
      • src/ports/billing.email.port.ts
      • src/usecases/charge-invoice.usecase.ts(触发支付、置 pending)
      • src/usecases/confirm-invoice-payment.usecase.ts(回调确认)
    • adapter-rest/
      • src/invoices.controller.ts(POST /invoices/:id/pay)
      • src/payments.webhook.controller.ts(支付回调)
    • adapter-handlers/
      • src/subscription-created.handler.ts(订阅创建事件 → 生成首张发票)
    • infra-prisma/
      • src/invoice.repo.prisma.ts
    • infra-payment-http/
      • src/payment.http.adapter.ts(真实 HTTP 支付)
    • infra-payment-inmemory/
      • src/payment.inmemory.adapter.ts(测试替身)
  • libs/clients/(OpenAPI 生成的 TS 客户端,供前端/服务端内部调用)
  • tools/
    • generators/module/(可选:Nx plugin generator,统一目录/命名/边界)

Nx tags(示例):

  • scope:users|subscriptions|billing|platform|shared
  • layer:domain|application|adapter|framework|infra

三、Prisma 模型(现有表 + outbox + idempotency) prisma/schema.prisma 关键片段:

model users {
  id     String @id @default(uuid())
  email  String @unique
  // ...
}

model plans {
  id       String @id @default(uuid())
  price    Int
  currency String
}

model subscriptions {
  id       String  @id @default(uuid())
  user_id  String
  plan_id  String
  status   String  // 'active' | 'canceled' | 'pending'
  renew_at DateTime?

  @@index([user_id])
  @@index([plan_id])
}

model invoices {
  id               String   @id @default(uuid())
  subscription_id  String
  amount           Int
  status           String   // 'pending' | 'processing' | 'paid' | 'failed' | 'canceled'
  currency         String
  created_at       DateTime @default(now())
  updated_at       DateTime @updatedAt

  @@index([subscription_id])
}

model IdempotencyKey {
  id          String   @id @default(uuid())
  scope       String   // e.g. 'CreateSubscription','ChargeInvoice'
  key         String
  requestHash String?
  status      String   // 'started' | 'completed' | 'expired'
  response    Json?
  created_at  DateTime @default(now())
  updated_at  DateTime @updatedAt

  @@unique([scope, key])
}

model OutboxEvent {
  id          String   @id @default(uuid())
  event_type  String
  aggregateId String?
  payload     Json
  status      String   @default("pending") // 'pending' | 'processing' | 'done' | 'failed'
  created_at  DateTime @default(now())
  available_at DateTime @default(now())
  retry_count Int      @default(0)
}

四、端口接口(Ports) billing/application/src/ports/payment.port.ts

export type PaymentIntent = { id: string; status: 'requires_action' | 'succeeded' | 'failed'; clientSecret?: string };

export interface PaymentPort {
  createPaymentIntent(params: {
    invoiceId: string;
    amount: number;
    currency: string;
    metadata?: Record<string, string>;
    idempotencyKey?: string;
  }): Promise<PaymentIntent>;

  // 用于回调确认场景可选扩展
  retrievePaymentIntent(id: string): Promise<PaymentIntent>;
}

subscriptions/application/src/ports/email.port.ts

export interface EmailPort {
  send(to: string, template: string, data: Record<string, unknown>): Promise<void>;
}

仓储端口(示例)

export interface SubscriptionRepoPort {
  findById(id: string): Promise<Subscription | null>;
  findActiveByUserAndPlan(userId: string, planId: string): Promise<Subscription | null>;
  create(s: Subscription, tx?: PrismaTransaction): Promise<void>;
  cancel(id: string, at: Date, tx?: PrismaTransaction): Promise<void>;
}

export interface InvoiceRepoPort {
  findById(id: string): Promise<Invoice | null>;
  create(i: Invoice, tx?: PrismaTransaction): Promise<void>;
  markProcessing(id: string, tx?: PrismaTransaction): Promise<void>;
  markPaid(id: string, tx?: PrismaTransaction): Promise<void>;
  markFailed(id: string, reason: string, tx?: PrismaTransaction): Promise<void>;
}

五、领域实体与事件(轻量) subscriptions/domain/src/subscription.entity.ts

export class Subscription {
  constructor(
    public readonly id: string,
    public readonly userId: string,
    public readonly planId: string,
    public status: 'active' | 'canceled' | 'pending',
    public renewAt?: Date | null,
  ) {}
}

subscriptions/domain/src/events/subscription.events.ts

export type SubscriptionCreated = {
  type: 'SubscriptionCreated';
  subscriptionId: string;
  userId: string;
  planId: string;
};
export type SubscriptionCanceled = {
  type: 'SubscriptionCanceled';
  subscriptionId: string;
  userId: string;
  at: string;
};

六、用例(Application):CreateSubscription、ChargeInvoice、CancelSubscription DTO 输入输出(class-validator + Swagger 注解,控制器层);应用层内部用 Zod 二次校验(可选)。

subscriptions/application/src/usecases/create-subscription.usecase.ts

import { z } from 'zod';
import { UnitOfWork } from '@app/platform/prisma';
import { DomainEvents } from '@app/platform/messaging';
import { IdempotencyService } from '@app/platform/prisma';
import { SubscriptionRepoPort } from '../ports/subscription.repo.port';

const InputSchema = z.object({
  userId: z.string().uuid(),
  planId: z.string().uuid(),
  idempotencyKey: z.string().optional(),
});

export type CreateSubscriptionInput = z.infer<typeof InputSchema>;
export type CreateSubscriptionOutput = { subscriptionId: string; status: 'active' | 'pending' };

export class CreateSubscriptionUseCase {
  constructor(
    private readonly uow: UnitOfWork,
    private readonly subsRepo: SubscriptionRepoPort,
    private readonly ide: IdempotencyService,
    private readonly events: DomainEvents,
  ) {}

  async execute(input: CreateSubscriptionInput): Promise<CreateSubscriptionOutput> {
    const { userId, planId, idempotencyKey } = InputSchema.parse(input);

    return this.ide.withIdempotency<CreateSubscriptionOutput>('CreateSubscription', idempotencyKey, async () => {
      return this.uow.runInTransaction(async (tx) => {
        // 示例:避免重复订阅
        const existing = await this.subsRepo.findActiveByUserAndPlan(userId, planId);
        if (existing) {
          return { subscriptionId: existing.id, status: existing.status };
        }

        const subId = crypto.randomUUID();
        const sub = new Subscription(subId, userId, planId, 'active', computeRenewAt());
        await this.subsRepo.create(sub, tx);

        await this.events.emitInTx(
          { type: 'SubscriptionCreated', subscriptionId: subId, userId, planId },
          tx,
        );

        return { subscriptionId: subId, status: 'active' };
      });
    });
  }
}

function computeRenewAt() {
  const d = new Date();
  d.setMonth(d.getMonth() + 1);
  return d;
}

billing/application/src/usecases/charge-invoice.usecase.ts

const InputSchema = z.object({
  invoiceId: z.string().uuid(),
  idempotencyKey: z.string().optional(),
});

export class ChargeInvoiceUseCase {
  constructor(
    private readonly uow: UnitOfWork,
    private readonly invoices: InvoiceRepoPort,
    private readonly payments: PaymentPort,
    private readonly ide: IdempotencyService,
  ) {}

  async execute(input: { invoiceId: string; idempotencyKey?: string }) {
    const { invoiceId, idempotencyKey } = InputSchema.parse(input);

    return this.ide.withIdempotency('ChargeInvoice', idempotencyKey, async () => {
      return this.uow.runInTransaction(async (tx) => {
        const inv = await this.invoices.findById(invoiceId);
        if (!inv) throw new UseCaseError('NOT_FOUND', 'Invoice not found');
        if (inv.status === 'paid') return { status: 'paid' };

        await this.invoices.markProcessing(inv.id, tx);
        const intent = await this.payments.createPaymentIntent({
          invoiceId: inv.id,
          amount: inv.amount,
          currency: inv.currency,
          idempotencyKey,
        });

        // 前端可能需要 clientSecret
        return { status: 'processing', paymentIntentId: intent.id, clientSecret: intent.clientSecret };
      });
    });
  }
}

billing/application/src/usecases/confirm-invoice-payment.usecase.ts(供支付回调调用)

export class ConfirmInvoicePaymentUseCase {
  constructor(private readonly uow: UnitOfWork, private readonly invoices: InvoiceRepoPort) {}

  async execute(input: { invoiceId: string; success: boolean; reason?: string }) {
    return this.uow.runInTransaction(async (tx) => {
      if (input.success) await this.invoices.markPaid(input.invoiceId, tx);
      else await this.invoices.markFailed(input.invoiceId, input.reason ?? 'failed', tx);
      return { status: input.success ? 'paid' : 'failed' };
    });
  }
}

subscriptions/application/src/usecases/cancel-subscription.usecase.ts

export class CancelSubscriptionUseCase {
  constructor(
    private readonly uow: UnitOfWork,
    private readonly subsRepo: SubscriptionRepoPort,
    private readonly events: DomainEvents,
  ) {}

  async execute(input: { subscriptionId: string; at?: Date }) {
    const at = input.at ?? new Date();
    return this.uow.runInTransaction(async (tx) => {
      const sub = await this.subsRepo.findById(input.subscriptionId);
      if (!sub) throw new UseCaseError('NOT_FOUND', 'Subscription not found');

      await this.subsRepo.cancel(sub.id, at, tx);
      await this.events.emitInTx({ type: 'SubscriptionCanceled', subscriptionId: sub.id, userId: sub.userId, at: at.toISOString() }, tx);

      return { subscriptionId: sub.id, status: 'canceled' };
    });
  }
}

七、事件与 outbox(轻量领域事件 + BullMQ) platform/messaging/src/domain-events.ts

export type DomainEvent = { type: string; [k: string]: any };

export interface DomainEvents {
  emitInTx(event: DomainEvent, tx: PrismaTransaction): Promise<void>;
  emit(event: DomainEvent): Promise<void>;
  register(type: string, handler: (e: DomainEvent) => Promise<void>): void;
}

简单实现策略:

  • emitInTx: 将事件写入 OutboxEvent,随同业务提交
  • 一个 BullMQ processor 轮询/消费 outbox,分发到注册的内存处理器(或直接发布到事件总线)
  • 同进程内可直接 register 处理跨模块副作用,如 SubscriptionCreated → 生成首张 invoice(billing.adapter-handlers)

billing/adapter-handlers/src/subscription-created.handler.ts

events.register('SubscriptionCreated', async (evt) => {
  await uow.runInTransaction(async (tx) => {
    const invoice = Invoice.createForNewSubscription(evt.subscriptionId, /* amount/currency from plan cache */);
    await invoices.create(invoice, tx);
  });
});

八、适配器:REST 控制器与 HTTP/内存支付 subscriptions/adapter-rest/src/subscriptions.controller.ts

@Controller('subscriptions')
export class SubscriptionsController {
  constructor(private readonly createSub: CreateSubscriptionUseCase) {}

  @Post()
  async create(@Body() dto: { userId: string; planId: string }, @Headers('Idempotency-Key') idem?: string) {
    const res = await this.createSub.execute({ ...dto, idempotencyKey: idem });
    return res;
  }
}

billing/adapter-rest/src/invoices.controller.ts

@Controller('invoices')
export class InvoicesController {
  constructor(private readonly chargeInvoice: ChargeInvoiceUseCase) {}

  @Post(':id/pay')
  async pay(@Param('id') id: string, @Headers('Idempotency-Key') idem?: string) {
    return this.chargeInvoice.execute({ invoiceId: id, idempotencyKey: idem });
  }
}

billing/adapter-rest/src/payments.webhook.controller.ts

@Controller('webhooks/payments')
export class PaymentsWebhookController {
  constructor(private readonly confirm: ConfirmInvoicePaymentUseCase) {}

  @Post()
  async handle(@Body() body: { invoiceId: string; status: 'succeeded' | 'failed'; reason?: string }) {
    const ok = body.status === 'succeeded';
    await this.confirm.execute({ invoiceId: body.invoiceId, success: ok, reason: body.reason });
    return { received: true };
  }
}

billing/infra-payment-http/src/payment.http.adapter.ts

export class HttpPaymentAdapter implements PaymentPort {
  constructor(private readonly http: HttpService) {}

  async createPaymentIntent(params: { invoiceId: string; amount: number; currency: string; idempotencyKey?: string }) {
    const resp = await this.http.post('/payment_intents', params, {
      headers: { 'Idempotency-Key': params.idempotencyKey },
      timeout: 1500,
    }).then(r => r.data);
    return { id: resp.id, status: resp.status, clientSecret: resp.client_secret };
  }

  async retrievePaymentIntent(id: string) {
    const data = await this.http.get(`/payment_intents/${id}`).then(r => r.data);
    return { id: data.id, status: data.status, clientSecret: data.client_secret };
  }
}

billing/infra-payment-inmemory/src/payment.inmemory.adapter.ts(测试替身)

export class InMemoryPaymentAdapter implements PaymentPort {
  private store = new Map<string, any>();

  async createPaymentIntent(p) {
    const id = 'pi_' + crypto.randomUUID();
    const intent = { id, status: 'succeeded' as const, clientSecret: 'secret_' + id };
    this.store.set(id, intent);
    return intent;
  }
  async retrievePaymentIntent(id: string) {
    return this.store.get(id);
  }
}

九、Prisma/事务脚本与 UoW、幂等 platform/prisma/src/unit-of-work.ts

@Injectable()
export class UnitOfWork {
  constructor(private readonly prisma: PrismaService) {}
  async runInTransaction<T>(fn: (tx: Prisma.TransactionClient) => Promise<T>): Promise<T> {
    return this.prisma.$transaction(async (tx) => fn(tx), { timeout: 5000, isolationLevel: 'Serializable' });
  }
}

platform/prisma/src/idempotency.service.ts

@Injectable()
export class IdempotencyService {
  constructor(private readonly prisma: PrismaService) {}

  async withIdempotency<T>(scope: string, key: string | undefined, fn: () => Promise<T>): Promise<T> {
    if (!key) return fn();
    const started = await this.prisma.idempotencyKey.upsert({
      where: { scope_key: { scope, key } },
      update: {},
      create: { scope, key, status: 'started' },
    });
    if (started.status === 'completed' && started.response) return started.response as T;

    const result = await fn();
    await this.prisma.idempotencyKey.update({
      where: { scope_key: { scope, key } },
      data: { status: 'completed', response: result as any },
    });
    return result;
  }
}

十、中间件与异常映射 platform/middleware/src/auth.middleware.ts

@Injectable()
export class AuthMiddleware implements NestMiddleware {
  use(req: any, res: any, next: () => void) {
    // 简化:从 Header 注入 userId,真实场景解析 JWT
    req.context = req.context || {};
    req.context.userId = req.headers['x-user-id'] ?? null;
    next();
  }
}

platform/middleware/src/tenant.middleware.ts

export class TenantMiddleware implements NestMiddleware {
  use(req: any, res: any, next: () => void) {
    req.context = req.context || {};
    req.context.tenantId = req.headers['x-tenant-id'] ?? 'default';
    next();
  }
}

platform/middleware/src/audit.interceptor.ts

@Injectable()
export class AuditInterceptor implements NestInterceptor {
  intercept(ctx: ExecutionContext, next: CallHandler<any>): Observable<any> {
    const req = ctx.switchToHttp().getRequest();
    const start = Date.now();
    return next.handle().pipe(tap(() => {
      logger.info({ path: req.url, method: req.method, userId: req.context?.userId, ms: Date.now() - start });
    }));
  }
}

platform/middleware/src/http-exception.filter.ts

@Catch()
export class HttpErrorMappingFilter implements ExceptionFilter {
  catch(err: any, host: ArgumentsHost) {
    const ctx = host.switchToHttp();
    const res = ctx.getResponse<Response>();
    if (err instanceof UseCaseError) {
      const status = mapUseCaseCodeToStatus(err.code);
      return res.status(status).json({ code: err.code, message: err.message });
    }
    // ZodError/class-validator...
    return res.status(500).json({ code: 'INTERNAL_ERROR', message: 'Unexpected error' });
  }
}

十一、依赖边界校验 .eslintrc.json(片段)

{
  "overrides": [
    {
      "files": ["*.ts"],
      "rules": {
        "@nx/enforce-module-boundaries": [
          "error",
          {
            "enforceBuildableLibDependency": true,
            "allow": [],
            "depConstraints": [
              { "sourceTag": "layer:framework", "onlyDependOnLibsWithTags": ["layer:adapter","layer:framework"] },
              { "sourceTag": "layer:adapter",   "onlyDependOnLibsWithTags": ["layer:application","layer:domain","layer:adapter","scope:platform","scope:shared"] },
              { "sourceTag": "layer:application","onlyDependOnLibsWithTags": ["layer:domain","scope:shared"] },
              { "sourceTag": "layer:domain",    "onlyDependOnLibsWithTags": ["scope:shared"] }
            ]
          }
        ]
      }
    }
  ]
}

dependency-cruiser(.dependency-cruiser.js)(片段)

module.exports = {
  forbidden: [
    { name: 'no-circular', severity: 'error', from: {}, to: { circular: true } },
    { name: 'no-layer-violation', severity: 'error',
      from: { path: 'libs/.+', pathNot: '\\.spec\\.ts$',
              pathNot: 'libs/clients/.+' },
      to:   { path: 'libs/.+', pathRegexNot: (from) => {
                // 自定义:禁止从内层到外层、禁止跨 scope 直接 adapter->adapter
              } }
    },
  ],
};

建议用 tags 维持:项目创建 libs 时写入 project.json 的 tags,例如 ["scope:subscriptions","layer:application"]。

十二、OpenAPI 与客户端生成

  • 在 apps/api/src/bootstrap/swagger.ts 配置 SwaggerModule
  • 脚本:
    • "openapi:gen": "node tools/scripts/generate-openapi.js"
    • generate-openapi.js:从 http://localhost:3000/openapi-json 拉取写入 openapi.json
    • 前端/服务端客户端生成:使用 openapi-typescript 和 openapi-fetch 到 libs/clients
  • 控制器使用 Swagger 注解,DTO 用 class-validator 生成 schema

十三、测试策略(>70% 覆盖)

  • 单元测试(用例层):对 CreateSubscription/ChargeInvoice/CancelSubscription 使用 InMemoryPaymentAdapter + 测试用 InMemoryRepo(或 Prisma test tx + sqlite/pg 测试容器)
  • Pact(或类似):对 PaymentPort 的 HTTP 适配器做 provider/consumer 合同测试
  • E2E:supertest 跑关键路径(POST /subscriptions,POST /invoices/:id/pay,GET /users/:id/subscriptions)
  • 覆盖 outbox 处理器的集成测试(使用 Redis 容器)
  • Docker Compose(本地集成):Postgres 15、Redis 7、api 服务
  • CI:lint + dep-cruiser + test + e2e + Pact verify

十四、性能与可运维

  • P95<200ms
    • 控制器瘦:只做 DTO/调用用例
    • 读路径缓存:plans 放 Redis(60s TTL);用户订阅列表可先直查(加索引)
    • Prisma 连接池与 Node.js 线程池参数调优;设置查询超时
    • BullMQ 队列需要独立并发与重试策略,避免阻塞请求
  • 观测
    • 请求日志(pino)+ X-Request-Id
    • BullMQ 指标(活跃/失败/等待)暴露到 Prometheus(可选简单日志)
    • 统一错误码

十五、控制器与查询用例(示例) users/adapter-rest/src/users.controller.ts

@Controller('users')
export class UsersController {
  constructor(private readonly queryBus: UsersQueryService) {}

  @Get(':id/subscriptions')
  async listSubscriptions(@Param('id') userId: string) {
    return this.queryBus.listUserSubscriptions(userId);
  }
}

UsersQueryService 可以位于 users/application(读模型),内部可直接用 prisma 只读查询或通过订阅仓储暴露的查询接口。查询属于 adapter/application 均可,但不要从 users 反向依赖 subscriptions。

十六、装配(AppModule) apps/api/src/app.module.ts(绑定端口到适配器实现)

@Module({
  imports: [
    // Adapter modules only. Application/Domain are libs providers.
  ],
  providers: [
    PrismaService, UnitOfWork, IdempotencyService, DomainEventsImpl, OutboxService,
    // 配置:PaymentPort -> HttpPaymentAdapter 或 InMemoryPaymentAdapter
    { provide: PaymentPortToken, useClass: HttpPaymentAdapter },
    // EmailPort -> SMTP/Mock
  ],
  controllers: [
    SubscriptionsController, InvoicesController, PaymentsWebhookController, UsersController
  ]
})
export class AppModule {}

十七、迁移与数据对齐

  • 为 invoices.status、subscriptions.status 建立 check 或枚举约束(Prisma 可用 enum)
  • 加索引:subscriptions(user_id), invoices(subscription_id)
  • outbox_events、idempotency_keys 表迁移到现有数据库
  • 读场景 GET /users/{id}/subscriptions:基于 subscriptions 表直接查询(附上 plan 信息)

十八、两周落地计划(可运行骨架)

  • D1-D2:Nx 工作区初始化、libs 结构与 tags、ESLint + dependency-cruiser、Docker Compose(PG/Redis)
  • D3-D4:Prisma 模型与迁移、PrismaService/UnitOfWork/Idempotency/Outbox
  • D5-D6:Domain/Ports/UseCases(3 个用例 + 仓储接口);InMemory 适配器;单元测试绿
  • D7:REST 控制器 + DTO + 异常映射;OpenAPI 暴露
  • D8:Billing HTTP 支付适配器 + Webhook;Pact 合同测试
  • D9:事件与 outbox 处理器(SubscriptionCreated→Invoice);队列指标
  • D10:中间件(Auth/Tenant/Audit);请求日志
  • D11-D12:E2E 测试、性能小压测、缓存 plans
  • D13-D14:CI 集成、文档与开发脚手架(生成器 stub)

十九、价值回顾(针对你的痛点)

  • 控制器瘦身:业务统一进入 UseCase;事务脚本封装在 UoW
  • 断绝跨模块环依赖:只能向内依赖;跨模块互动用事件或对外仅依赖对方“用例接口/端口”
  • 可测试性:用例层依赖端口接口,配 InMemory 即可单测;支付可用替身适配器或 Pact 校验
  • 保留演进路径:每个模块是“可独立发布”的单元(层清晰 + 端口明确 + outbox/队列独立),将来可拆为进程或服务时,适配器替换为 RPC/HTTP 即可

二十、典型请求流(概念示例)

  • POST /subscriptions
    • Controller → CreateSubscriptionUseCase → 事务内写 subscriptions → outbox 写 SubscriptionCreated → 返回 200
    • Outbox worker 异步消费 → Billing handler 创建首张 invoice
  • POST /invoices/{id}/pay
    • Controller → ChargeInvoiceUseCase → 标记 processing → 调 PaymentPort.createPaymentIntent → 返回 clientSecret
    • 支付平台回调 /webhooks/payments → ConfirmInvoicePaymentUseCase → mark paid/failed
  • GET /users/{id}/subscriptions
    • Controller → UsersQueryService → 读 subscriptions(可缓存)

如需,我可以进一步提供:

  • 完整 Nest Providers 装配清单(tokens、useFactory)
  • Nx 自定义 generator 样板(按 scope/layer 生成模块)
  • 示例测试(Jest 单测 + Pact 脚手架)
  • Docker Compose 与本地启动脚本
  • OpenAPI 生成脚本与客户端示例用法

这套骨架能在两周内产出“可运行、可测、可扩展”的模块化单体,并清晰沉淀用户、账单、订阅边界,为后续独立部署与微服务演进做好准备。

下面给出一套可端到端运行的概念性实现,体现“零信任与多租户隔离优先”的架构原则,并与您的技术栈一致。内容包含:租户解析中间件、自动注入租户过滤的SQLAlchemy查询基类/Hook、PostgreSQL RLS策略迁移脚本、OPA策略示例与客户端封装、审计日志拦截器、Dashboard/Report API与跨租户只读管理端点、测试用例,以及Docker Compose用于一键启动。该方案分层落实以下要点:

  • 唯一信任根:OIDC短时JWT(PyJWT)校验;统一以JWT中的tenant_id与scope为准,禁用X-Tenant-Id。
  • 授权:OPA/Rego ABAC(主体/资源/环境),默认拒绝;策略热加载(OPA --watch)。
  • 数据库强隔离:PostgreSQL RLS启用并FORCE,强制tenant过滤;可受控提升跨租户只读(通过app.platform_read会话变量+RLS仅SELECT)。
  • 应用层双重保障:租户解析中间件+SQLAlchemy自动租户过滤(with_loader_criteria),默认拒绝越权;跨租户路径仅当scope=platform:read时启用平台只读标志并交由RLS控制。
  • 审计与可观测:结构化日志、审计流水表拦截器、OpenTelemetry贯通HTTP/DB/HTTPX(OPA)。

目录结构(示例)

  • docker-compose.yml
  • opa/policy.rego
  • app/
    • main.py
    • settings.py
    • security.py
    • middleware.py
    • opa_client.py
    • db.py
    • models.py
    • schemas.py
    • deps.py
    • audit.py
    • routers/
      • dashboards.py
      • reports.py
      • admin.py
  • alembic/
    • env.py
    • versions/
      • 20250101_enable_rls.py
  • tests/
    • test_multi_tenant.py

一、Docker Compose(Postgres + OPA + 应用) 文件:docker-compose.yml 内容:

  • postgres:15
  • opa: 本地policy挂载,--server --watch
  • app: FastAPI + Uvicorn,环境变量注入JWT_SECRET等

二、OPA策略(ABAC,默认拒绝,策略热更新) 文件:opa/policy.rego 说明:

  • package app
  • 入口规则 allow,默认false
  • 细粒度:tenant_admin可基于允许的labels进行视图/报表的查看;platform审计员(scope: platform:read)允许访问/analytics/admin/*只读场景
  • 返回标签过滤(可选)label_filter用于列表端点查询优化

示例: package app

default allow := false

输入约定

input = {

"subject": {"sub": "...", "tenant_id": "t1", "roles": ["user"|"tenant_admin"], "scopes": ["..."], "allowed_labels": ["finance","sales"]},

"resource": {"type": "dashboard"|"report", "tenant_id": "t1", "labels": ["..."], "visibility": "private"|"tenant"|"public", "path": "/api/..", "method": "GET"},

"action": "read"|"create"|"update"|"delete"|"list",

"env": {"now": "...", "ip": "..."}

}

平台审计员跨租户只读,仅限admin路径与GET

allow { some s s := input.subject.scopes[_] s == "platform:read" startswith(input.resource.path, "/analytics/admin/") input.resource.method == "GET" }

同租户访问控制(非平台审计)

allow { input.resource.tenant_id == input.subject.tenant_id allow_by_labels allow_by_visibility action_allowed }

标签约束:若策略启用标签过滤,资源应包含subject.allowed_labels之一(例:labels为字符串数组)

allow_by_labels { not input.subject.allowed_labels # 未配置则不限制 } else { some l l := input.subject.allowed_labels[] l == input.resource.labels[] }

可见性规则示例(可扩展)

allow_by_visibility { input.resource.type == "report" vis := input.resource.visibility vis == "tenant" or vis == "public" } else = true { input.resource.type == "dashboard" } # Dashboard默认允许同租户

动作与角色映射(示例)

action_allowed { a := input.action a == "read" or a == "list" } else {

写操作仅限tenant_admin

a := input.action a == "create" or a == "update" or a == "delete" some r r := input.subject.roles[_] r == "tenant_admin" }

列表过滤建议(返回一个允许的标签集合,供客户端作为DB过滤)

label_filter := lf { lf := {"enabled": true, "labels": input.subject.allowed_labels} }

三、应用配置与安全 文件:app/settings.py import os

class Settings: APP_NAME = "saas-analytics" JWT_ALG = os.getenv("JWT_ALG", "HS256") JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret") # 演示用,生产请用JWKS JWT_AUDIENCE = os.getenv("JWT_AUD", "saas-analytics") OPA_URL = os.getenv("OPA_URL", "http://opa:8181/v1/data/app/allow") OPA_LABELS_URL = os.getenv("OPA_LABELS_URL", "http://opa:8181/v1/data/app/label_filter") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://postgres:postgres@db:5432/saas") OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME", "saas-analytics") OTEL_EXPORTER_OTLP_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

settings = Settings()

文件:app/security.py import time from typing import Dict, Any, List import jwt from fastapi import HTTPException, status

def verify_and_parse_jwt(token: str, secret: str, algorithms: List[str], audience: str | None = None) -> Dict[str, Any]: try: options = {"verify_aud": bool(audience)} claims = jwt.decode( token, secret, algorithms=algorithms, audience=audience if audience else None, options=options, ) # 必要最小字段 for k in ["sub", "tenant_id", "scope", "exp", "iat"]: if k not in claims: raise ValueError(f"missing claim: {k}") if claims["exp"] < time.time(): raise ValueError("token expired") # 规范化 scopes = claims["scope"].split() if isinstance(claims["scope"], str) else claims["scope"] claims["scopes"] = scopes claims.setdefault("roles", []) claims.setdefault("allowed_labels", []) return claims except Exception as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid_token") from e

四、数据库与ORM(RLS、自动租户过滤、会话变量) 文件:app/db.py from sqlalchemy import create_engine, event, text from sqlalchemy.orm import sessionmaker, DeclarativeBase, Mapped, mapped_column from sqlalchemy import String from contextvars import ContextVar from app.settings import settings

请求上下文变量

current_tenant: ContextVar[str | None] = ContextVar("current_tenant", default=None) platform_read: ContextVar[bool] = ContextVar("platform_read", default=False)

engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True, future=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)

class Base(DeclarativeBase): pass

class TenantScopedMixin: tenant_id: Mapped[str] = mapped_column(String, index=True)

事务开始时设置PG会话变量,RLS依赖

@event.listens_for(SessionLocal, "after_begin") def set_pg_vars(session, transaction, connection): tid = current_tenant.get() pr = platform_read.get() # SET LOCAL 在事务内有效 connection.exec_driver_sql("SELECT set_config('app.tenant_id', %s, true)", (str(tid) if tid else "",)) connection.exec_driver_sql("SELECT set_config('app.platform_read', %s, true)", ('1' if pr else '0',))

ORM级自动租户过滤(双重保障)

from sqlalchemy.orm import with_loader_criteria from sqlalchemy.sql import Select from sqlalchemy.orm import Session from sqlalchemy import false

@event.listens_for(SessionLocal, "do_orm_execute") def _add_tenant_filter(execute_state): # 仅对SELECT自动注入过滤;平台只读提升时跳过,由RLS控制 if execute_state.is_select and not platform_read.get(): tid = current_tenant.get() if not tid: # 无租户时构造永不返回 execute_state.statement = execute_state.statement.where(false()) return stmt = execute_state.statement execute_state.statement = stmt.options( with_loader_criteria(TenantScopedMixin, lambda cls: cls.tenant_id == tid, include_aliases=True) )

文件:app/models.py from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy import String, Integer, JSON, Text, TIMESTAMP, func from app.db import Base, TenantScopedMixin

class Tenant(Base): tablename = "tenants" id: Mapped[str] = mapped_column(String, primary_key=True) name: Mapped[str] = mapped_column(String, nullable=False)

class User(Base, TenantScopedMixin): tablename = "users" id: Mapped[str] = mapped_column(String, primary_key=True) role: Mapped[str] = mapped_column(String, nullable=False) # user | tenant_admin | auditor # tenant_id in mixin

class Dashboard(Base, TenantScopedMixin): tablename = "dashboards" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) labels: Mapped[list] = mapped_column(JSON, default=list) # 存储字符串数组

class Report(Base, TenantScopedMixin): tablename = "reports" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) visibility: Mapped[str] = mapped_column(String, default="tenant") # public | tenant | private

class AuditLog(Base): tablename = "audit_logs" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor: Mapped[str] = mapped_column(String) action: Mapped[str] = mapped_column(String) resource: Mapped[str] = mapped_column(String) tenant_id: Mapped[str] = mapped_column(String, index=True) at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) extra: Mapped[dict | None] = mapped_column(JSON, nullable=True)

文件:app/schemas.py from pydantic import BaseModel, Field from typing import List, Optional

class DashboardIn(BaseModel): labels: List[str] = Field(default_factory=list)

class DashboardOut(BaseModel): id: int tenant_id: str labels: List[str]

class ReportIn(BaseModel): visibility: str = "tenant"

class ReportOut(BaseModel): id: int tenant_id: str visibility: str

五、OPA客户端封装(REST决策API) 文件:app/opa_client.py import httpx from app.settings import settings from typing import Any, Dict

class OPAClient: def init(self, base_url: str = settings.OPA_URL, labels_url: str = settings.OPA_LABELS_URL): self.base_url = base_url self.labels_url = labels_url self._client = httpx.AsyncClient(timeout=0.2) # 限时,保障P95

async def allow(self, subject: Dict[str, Any], resource: Dict[str, Any], action: str, env: Dict[str, Any]) -> bool:
    payload = {"input": {"subject": subject, "resource": resource, "action": action, "env": env}}
    r = await self._client.post(self.base_url, json=payload)
    r.raise_for_status()
    return bool(r.json().get("result", False))

async def label_filter(self, subject: Dict[str, Any], resource_type: str) -> dict:
    payload = {"input": {"subject": subject, "resource": {"type": resource_type}, "action": "list", "env": {}}}
    r = await self._client.post(self.labels_url, json=payload)
    r.raise_for_status()
    return r.json().get("result", {"enabled": False})

opa_client = OPAClient()

六、中间件与依赖(JWT解析、上下文注入、审计拦截、DB Session) 文件:app/middleware.py import json import time from typing import Callable from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware from app.security import verify_and_parse_jwt from app.settings import settings from app.db import current_tenant, platform_read

class JWTContextMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: Callable): # 禁用X-Tenant-Id,记录告警(不作为信任依据) if "x-tenant-id" in request.headers: request.state.legacy_tenant_header_present = True # 解析Bearer auth = request.headers.get("authorization", "") if not auth.lower().startswith("bearer "): return Response(status_code=401, content="missing_bearer") token = auth.split(" ", 1)[1] claims = verify_and_parse_jwt(token, settings.JWT_SECRET, [settings.JWT_ALG], settings.JWT_AUDIENCE) # 注入上下文 token_t = current_tenant.set(claims["tenant_id"]) try: # 平台只读提升仅在admin路径且scope包含platform:read时启用 is_admin_path = request.url.path.startswith("/analytics/admin/") pr = is_admin_path and ("platform:read" in claims["scopes"]) and request.method.upper() == "GET" pr_token = platform_read.set(pr) request.state.claims = claims request.state.platform_read = pr response = await call_next(request) finally: current_tenant.reset(token_t) platform_read.reset(pr_token) return response

文件:app/deps.py from typing import Generator from sqlalchemy.orm import Session from fastapi import Depends from app.db import SessionLocal from fastapi import Request

def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db db.commit() except: db.rollback() raise finally: db.close()

def get_subject(request: Request): return request.state.claims

def is_platform_read(request: Request) -> bool: return bool(getattr(request.state, "platform_read", False))

文件:app/audit.py import json import logging from starlette.middleware.base import BaseHTTPMiddleware from fastapi import Request from app.db import SessionLocal from app.models import AuditLog

logger = logging.getLogger("audit")

class AuditMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start = request.headers.get("x-request-id", None) claims = getattr(request.state, "claims", None) response = await call_next(request) try: # 异步/后台批量更优,这里简单直写 db = SessionLocal() db.add(AuditLog( actor=claims["sub"] if claims else "anonymous", action=f"{request.method} {request.url.path}", resource=request.url.path, tenant_id=(claims["tenant_id"] if claims else ""), extra={ "status": response.status_code, "legacy_header": getattr(request.state, "legacy_tenant_header_present", False), "platform_read": getattr(request.state, "platform_read", False), } )) db.commit() db.close() except Exception as e: logger.exception("audit_log_failed") return response

七、API路由(Dashboard/Report + 管理端跨租户只读) 文件:app/routers/dashboards.py from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Dashboard from app.schemas import DashboardIn, DashboardOut from app.deps import get_db, get_subject from app.opa_client import opa_client

router = APIRouter(prefix="/dashboards", tags=["dashboards"])

@router.get("", response_model=list[DashboardOut]) async def list_dashboards(db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): # 调OPA获取label过滤建议 lf = await opa_client.label_filter(subject, "dashboard") stmt = select(Dashboard) if lf.get("enabled") and lf.get("labels"): # 简化:labels数组与subject.allowed_labels求交集(Postgres jsonb '?|' 操作),用text避免复杂表达式 labels = lf["labels"] stmt = stmt.where(Dashboard.labels.op("?|")(labels)) rows = db.execute(stmt).scalars().all() # ABAC逐条检查(保证默认拒绝) out = [] for d in rows: allowed = await opa_client.allow( subject, {"type": "dashboard", "tenant_id": d.tenant_id, "labels": d.labels, "path": str(request.url.path), "method": "GET"}, "read", {"ip": request.client.host if request.client else ""} ) if allowed: out.append(DashboardOut(id=d.id, tenant_id=d.tenant_id, labels=d.labels)) return out

@router.post("", response_model=DashboardOut) async def create_dashboard(payload: DashboardIn, db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): ok = await opa_client.allow( subject, {"type": "dashboard", "tenant_id": subject["tenant_id"], "labels": payload.labels, "path": str(request.url.path), "method": "POST"}, "create", {} ) if not ok: raise HTTPException(403, "forbidden") d = Dashboard(tenant_id=subject["tenant_id"], labels=payload.labels) db.add(d) db.flush() return DashboardOut(id=d.id, tenant_id=d.tenant_id, labels=d.labels)

文件:app/routers/reports.py from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Report from app.schemas import ReportIn, ReportOut from app.deps import get_db, get_subject from app.opa_client import opa_client

router = APIRouter(prefix="/reports", tags=["reports"])

@router.get("", response_model=list[ReportOut]) async def list_reports(db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): rows = db.execute(select(Report)).scalars().all() out = [] for r in rows: ok = await opa_client.allow( subject, {"type": "report", "tenant_id": r.tenant_id, "visibility": r.visibility, "path": str(request.url.path), "method": "GET"}, "read", {} ) if ok: out.append(ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility)) return out

@router.post("", response_model=ReportOut) async def create_report(payload: ReportIn, db: Session = Depends(get_db), subject=Depends(get_subject), request: Request = None): ok = await opa_client.allow( subject, {"type": "report", "tenant_id": subject["tenant_id"], "visibility": payload.visibility, "path": str(request.url.path), "method": "POST"}, "create", {} ) if not ok: raise HTTPException(403, "forbidden") r = Report(tenant_id=subject["tenant_id"], visibility=payload.visibility) db.add(r) db.flush() return ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility)

文件:app/routers/admin.py(跨租户只读) from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from sqlalchemy import select from app.models import Report from app.deps import get_db, get_subject, is_platform_read from app.opa_client import opa_client from app.schemas import ReportOut

router = APIRouter(prefix="/analytics/admin", tags=["admin"])

@router.get("/reports", response_model=list[ReportOut]) async def list_reports_all(db: Session = Depends(get_db), subject=Depends(get_subject), pr: bool = Depends(is_platform_read), request: Request = None): # 平台只读由JWT中scope + 中间件设置platform_read + RLS控制。这里再做OPA显式授权。 if not pr: raise HTTPException(403, "forbidden") ok = await opa_client.allow( subject, {"type": "report", "tenant_id": "*", "path": str(request.url.path), "method": "GET"}, "read", {} ) if not ok: raise HTTPException(403, "forbidden") rows = db.execute(select(Report)).scalars().all() # ORM不再添加租户过滤,依赖RLS允许SELECT跨租户 return [ReportOut(id=r.id, tenant_id=r.tenant_id, visibility=r.visibility) for r in rows]

八、应用主入口与可观测性 文件:app/main.py import logging, sys from fastapi import FastAPI from app.middleware import JWTContextMiddleware from app.audit import AuditMiddleware from app.routers import dashboards, reports, admin from app.settings import settings

结构化日志

handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('{"ts":"%(asctime)s","level":"%(levelname)s","logger":"%(name)s","msg":"%(message)s"}') handler.setFormatter(formatter) root = logging.getLogger() root.setLevel(settings.LOG_LEVEL) root.handlers = [handler]

app = FastAPI(title=settings.APP_NAME)

中间件顺序:JWT解析 -> 审计

app.add_middleware(JWTContextMiddleware) app.add_middleware(AuditMiddleware)

路由

app.include_router(dashboards.router) app.include_router(reports.router) app.include_router(admin.router)

OpenTelemetry(可选)

try: from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from app.db import engine FastAPIInstrumentor.instrument_app(app) SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine) HTTPXClientInstrumentor().instrument() except Exception: pass

九、Alembic迁移(RLS与策略) 文件:alembic/versions/20250101_enable_rls.py from alembic import op import sqlalchemy as sa

revision = "20250101_enable_rls" down_revision = None branch_labels = None depends_on = None

def upgrade(): # 基础表(简化) op.execute(""" CREATE TABLE IF NOT EXISTS tenants(id text primary key, name text not null); CREATE TABLE IF NOT EXISTS users(id text primary key, tenant_id text not null, role text not null); CREATE INDEX IF NOT EXISTS idx_users_tenant ON users(tenant_id); CREATE TABLE IF NOT EXISTS dashboards(id serial primary key, tenant_id text not null, labels jsonb default '[]'::jsonb); CREATE INDEX IF NOT EXISTS idx_dashboards_tenant ON dashboards(tenant_id); CREATE TABLE IF NOT EXISTS reports(id serial primary key, tenant_id text not null, visibility text default 'tenant'); CREATE INDEX IF NOT EXISTS idx_reports_tenant ON reports(tenant_id); CREATE TABLE IF NOT EXISTS audit_logs(id serial primary key, actor text, action text, resource text, tenant_id text, at timestamptz default now(), extra jsonb); """)

# 启用RLS
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
    op.execute(f"ALTER TABLE {tbl} ENABLE ROW LEVEL SECURITY")
    op.execute(f"ALTER TABLE {tbl} FORCE ROW LEVEL SECURITY")

# 统一默认拒绝,不创建默认allow策略
# 同租户策略(读/写)
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
    op.execute(f"""
    DROP POLICY IF EXISTS {tbl}_tenant_select ON {tbl};
    CREATE POLICY {tbl}_tenant_select ON {tbl} FOR SELECT
    USING (tenant_id::text = current_setting('app.tenant_id', true));
    """) if tbl != "tenants" else op.execute("""
    DROP POLICY IF EXISTS tenants_tenant_select ON tenants;
    CREATE POLICY tenants_tenant_select ON tenants FOR SELECT
    USING (id::text = current_setting('app.tenant_id', true));
    """)

for tbl in ["users","dashboards","reports","audit_logs"]:
    op.execute(f"""
    DROP POLICY IF EXISTS {tbl}_tenant_modify ON {tbl};
    CREATE POLICY {tbl}_tenant_modify ON {tbl} FOR ALL
    USING (tenant_id::text = current_setting('app.tenant_id', true))
    WITH CHECK (tenant_id::text = current_setting('app.tenant_id', true));
    """)

# 平台跨租户只读(仅SELECT)
for tbl in ["users","dashboards","reports","audit_logs","tenants"]:
    op.execute(f"""
    DROP POLICY IF EXISTS {tbl}_platform_read ON {tbl};
    CREATE POLICY {tbl}_platform_read ON {tbl} FOR SELECT
    USING (current_setting('app.platform_read', true) = '1');
    """)

def downgrade(): for tbl in ["users","dashboards","reports","audit_logs","tenants"]: op.execute(f"ALTER TABLE {tbl} DISABLE ROW LEVEL SECURITY") # 可选:删除策略

说明:

  • 默认拒绝:未满足任一USING条件即不可见。
  • 同租户策略使用current_setting('app.tenant_id', true);若未设置则返回NULL,不匹配,从而拒绝。
  • 平台跨租户读使用current_setting('app.platform_read', true) = '1'仅允许SELECT。
  • FORCE RLS确保表拥有者也受RLS约束(非超级用户)。

十、测试用例(关键路径) 文件:tests/test_multi_tenant.py import jwt, time from fastapi.testclient import TestClient from app.main import app from app.settings import settings from app.db import SessionLocal from app.models import Dashboard, Report

def make_token(tenant_id: str, scopes: list[str], roles: list[str]=None, allowed_labels: list[str]=None): now = int(time.time()) payload = { "sub": f"user-{tenant_id}", "tenant_id": tenant_id, "scope": " ".join(scopes), "roles": roles or ["user"], "allowed_labels": allowed_labels or [], "iat": now, "exp": now + 300, "aud": settings.JWT_AUDIENCE } return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALG)

client = TestClient(app)

def seed(): db = SessionLocal() db.add_all([ Dashboard(tenant_id="t1", labels=["finance"]), Dashboard(tenant_id="t1", labels=["sales"]), Dashboard(tenant_id="t2", labels=["finance"]), ]) db.add_all([ Report(tenant_id="t1", visibility="tenant"), Report(tenant_id="t2", visibility="public"), ]) db.commit() db.close()

def test_tenant_isolation(): seed() tok_t1 = make_token("t1", scopes=["openid"]) r = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t1}"}) assert r.status_code == 200 assert all(d["tenant_id"] == "t1" for d in r.json())

# t1不可越权到t2
tok_t2 = make_token("t2", scopes=["openid"])
r2 = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t2}"})
assert r2.status_code == 200
assert all(d["tenant_id"] == "t2" for d in r2.json())

def test_platform_read_cross_tenant(): tok_platform = make_token("t0", scopes=["platform:read"]) r = client.get("/analytics/admin/reports", headers={"Authorization": f"Bearer {tok_platform}"}) assert r.status_code == 200 # 跨租户两条都可见(RLS platform_read允许SELECT) assert len(r.json()) >= 2

def test_legacy_header_ignored(): tok_t1 = make_token("t1", scopes=["openid"]) # 强制带错的X-Tenant-Id,不应被使用 r = client.get("/dashboards", headers={"Authorization": f"Bearer {tok_t1}", "X-Tenant-Id": "t2"}) assert r.status_code == 200 assert all(d["tenant_id"] == "t1" for d in r.json())

十一、启动方式

  • 初始化数据库与策略:alembic upgrade head
  • 启动:docker-compose up -d(Postgres, OPA, App)
  • 健康验证:
    • 准备JWT(HS256 dev-secret),调用/dashboards、/reports
    • 使用scope=platform:read调用/analytics/admin/reports验证跨租户只读
  • 策略热更新:修改opa/policy.rego,OPA以--watch运行将自动生效;应用无需重启

十二、关键设计点与实践价值

  • 零信任与统一信任根:JWT校验严格化(exp/aud/claims齐备),历史X-Tenant-Id退役仅用于审计记录,避免再次成为隐式信任源。
  • 双层隔离(应用+数据库):
    • 应用层:SQLAlchemy do_orm_execute自动注入with_loader_criteria,任何遗漏手写where的查询仍被限定tenant_id;跨租户管理端严格依赖platform_read上下文才跳过应用层过滤。
    • 数据库层:RLS为最终裁决,缺少会话变量即拒绝;即使开发者误写查询,RLS仍阻断越权。FORCE RLS确保表所有者也受控。
  • 受控提升与最小权限:仅在GET /analytics/admin/*且scope=platform:read才将app.platform_read设置为1,RLS仅放开SELECT;写操作无任何提升通道。
  • ABAC策略即代码:OPA/Rego默认拒绝;主体/资源/环境三元输入;列表端点通过label_filter指导DB层过滤以减噪并保性能;单项资源再调用allow二次确认,避免误放权。
  • 可观察与性能:HTTP、DB、HTTPX(OPA)均接入Tracing;OPA调用设置短超时;结构化审计流水表记录每次访问结果、是否带了遗留头、是否平台提升;P95<200ms实践建议:缓存OPA label_filter短期结果、合理连接池与索引、限制标签数量及响应长度。
  • 策略热更新:OPA --watch监测rego变更即刻生效;应用无状态从OPA取决策,天然热更新。

十三、迁移与上线顺序(与需求一致)

  1. 接入JWT中间件并禁用X-Tenant-Id(本实现JWTContextMiddleware已完成;审计记录遗留头出现)
  2. 为各表添加RLS与默认策略(Alembic脚本)
  3. 关键端点接入OPA授权(路由中已对读/写与admin端点调用opa_client.allow)
  4. ORM查询基类/Hook注入租户过滤与审计(TenantScopedMixin + do_orm_execute;审计中间件)

说明与扩展

  • 生产JWT验证建议切换至JWKS(PyJWT+缓存JWKS),校验kid、iss、aud。
  • OPA策略可拆分bundle并加签;可返回更复杂的查询约束(如数据域、时间窗)并下推到SQL层。
  • 审计建议异步写入(队列或批量)并对关键事件(提升、拒绝)高亮告警。
  • 更严密的DB会话变量设置可使用SET LOCAL app.tenant_id=...于每次事务开始,当前实现已在after_begin事件设置;对只读端点强制BEGIN READ ONLY可进一步加固。
  • JSONB标签查询可建立GIN索引以达成P95目标:CREATE INDEX idx_dashboards_labels_gin ON dashboards USING GIN(labels jsonb_path_ops);

至此,完整链路已满足:JWT为唯一信任根;ABAC默认拒绝;RLS强隔离与平台受控只读;应用层双重保障;热更新策略;全链路审计与可观测。

示例详情

该提示词已被收录:
“程序员必备:提升开发效率的专业AI提示词合集”
让 AI 成为你的第二双手,从代码生成到测试文档全部搞定,节省 80% 开发时间
√ 立即可用 · 零学习成本
√ 参数化批量生成
√ 专业提示词工程师打磨

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

轻松生成架构指导:通过简单输入,即可获得针对特定框架和需求的架构原则应用指导,快速优化项目设计。
智能解决疑难杂症:按需生成概念性示例,高效解析复杂系统挑战,助力定位问题解决方案。
多场景适配:兼容多种项目框架与开发需求,灵活满足各种技术栈团队的实战需求。
深入理解设计模式:直观展示设计思路,帮助团队成员快速掌握架构模式的实践价值。
快捷迭代优化:结合项目实际挑战,提供清晰可实践的架构建议,支持敏捷开发目标。
降低学习与实践门槛:无需高深专业背景,轻松理解复杂架构思想,高效转化为实操。
全面支持开发者协作:模版化内容适合团队内部培训,助力协作更高效。
提升项目稳健性:通过应用成熟架构原则,增强系统鲁棒性与扩展能力。
节省时间成本:不需查阅大量文档或花费时间调研,即时获得专业级指导解决方案。

🎯 解决的问题

帮助用户将复杂的软件架构原则或模式转化为可操作的解决方案,以便更高效、更专业地优化软件开发项目,解决特定场景中的实际挑战。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...