¥
立即购买

后端开发伪代码生成器

27 浏览
2 试用
0 购买
Dec 1, 2025更新

本提示词专为后端开发场景设计,能够根据具体功能需求生成结构清晰、逻辑严谨的伪代码。通过系统化的需求分析和架构设计,确保生成的伪代码具备良好的可读性和可执行性,涵盖数据模型设计、API接口定义、业务逻辑流程等核心要素,帮助开发者在实际编码前明确技术方案和实现思路,提升开发效率和质量。

需求分析

  • 为在线商店提供完整的后端结算链路:商品与SKU查询、购物车多店铺多SKU操作、促销规则应用(满减、满折、直降、优惠券)带优先级与互斥控制、结算时库存短期锁定(Redis)、订单创建与支付回调后的库存最终扣减、订单事件投递到消息队列供履约与风控。
  • 非功能性要求:并发控制(同SKU加锁、幂等键user_id+cart_version)、缓存(SKU与促销规则本地+Redis)、限流(IP+用户)、性能(结算P95 < 150ms)、可观测(请求追踪、促销命中率、库存锁命中/冲突比)、容错(促销降级、库存锁失败返回重试建议、消息队列重试/死信)、安全(鉴权、运营促销配置审核)、测试覆盖关键场景。
  • 数据模型已给出,促销与优惠券通过Promotion(type=COUPON)统一管理;库存短锁通过Redis与InventoryLock结合。

架构概述

  • 组件划分
    • API Gateway/Web层(无状态,鉴权:Bearer Token,限流:IP+用户)
    • ProductService:产品与SKU查询,SKU可售量计算(库存-锁定)
    • CartService:购物车增删改查,维护cart_version与price_snapshot
    • PromotionService:促销规则解析、优先级与互斥、生成价格明细
    • InventoryService:库存短锁(Redis原子脚本)、锁记录(InventoryLock)
    • CheckoutService:结算编排,幂等控制(user_id+cart_version)
    • OrderService:订单创建、事件投递MQ、支付状态回写、最终扣减
    • PaymentCallbackHandler:支付回调处理与锁释放/扣减
    • CacheLayer:SKU与促销规则本地缓存+Redis缓存,失效与重建
    • RateLimiter:IP+用户限流
    • Observability:Tracing(request_id/trace_id),Metrics(promotions_hit率、inv_lock_hit/冲突比、P95),Logs
  • 数据存储与消息
    • MySQL(主从):User、Product、SKU、Promotion、CartItem、Order、InventoryLock
    • Redis集群:库存短锁key、购物车版本/幂等、热数据缓存、限流计数
    • MQ:订单事件(order.created, order.paid, order.canceled),带重试与死信队列
  • 关键设计点
    • 可售量计算:available = SKU.stock - sum(reserved_qty_in_redis_for_sku)
    • 库存短锁:按SKU原子检查并预留数量;锁写入Redis(保留TTL),同时记录InventoryLock(用于审计/恢复);锁过期或结算取消时释放
    • 促销互斥与优先级:按priority排序,基于mutex_tags冲突图选择集合;支持跨店铺叠加仅在不互斥且scope兼容时
    • 幂等:结算与下单幂等键 = user_id + cart_version;支付回调幂等键 = order_id + payment_tx_id
    • 订单创建与支付:下单不扣减库存,仅短锁;支付成功事务性扣减库存并释放锁;支付失败释放锁并恢复优惠券使用状态(如适用)
    • 容错与降级:促销引擎异常则回退基础价;锁失败返回重试建议(含等待时间);MQ失败重试并入死信
    • 性能:SKU与促销规则走本地+Redis缓存;锁脚本批量执行;数据库事务小而快;结算路径目标P95 < 150ms

伪代码

// 通用辅助
function require_auth(token):
    user = decode_and_validate_token(token)
    if user is null: raise Unauthorized
    return user

function rate_limit_check(ip, user_id, route):
    key = "rl:" + route + ":" + (user_id or ip)
    count = redis_incr_with_expiry(key, window=1s)
    if count > threshold(route): raise TooManyRequests

function get_trace_context():
    return new_trace_id()

// 产品与SKU查询
API GET /products?keyword&page:
    trace = get_trace_context()
    rate_limit_check(ip, null, "/products")
    if keyword cached: return cached_result
    products = db_query_products(keyword, page)
    cache_set(keyword, products, ttl=60s)
    return products

API GET /skus?product_id:
    trace = get_trace_context()
    rate_limit_check(ip, null, "/skus")
    skus = cache_get("skus:" + product_id)
    if skus is null:
        skus = db_query_skus(product_id)
        cache_set("skus:" + product_id, skus, ttl=60s)
    // 计算可售量:stock - reserved_in_redis
    for each sku in skus:
        reserved = redis_get("inv:sku:" + sku.id + ":reserved") or 0
        sku.available = max(sku.stock - reserved, 0)
    return skus

// 购物车
API GET /cart:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/cart:get")
    items = db_query_cart_items(user.id)
    return items

API POST /cart/items {sku_id, qty}:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/cart:add")
    sku = db_get_sku(sku_id)
    if qty <= 0: raise BadRequest
    // 价格快照:加入时锁定当前价以防促销变动造成展示不一致
    price_snapshot = compute_base_price(sku.price, user.tier)
    upsert_cart_item(user.id, sku_id, qty, price_snapshot)
    bump_cart_version(user.id)  // 增加cart_version用于幂等与并发检测
    return success

API DELETE /cart/items/{id}:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/cart:delete")
    delete_cart_item(id, user.id)
    bump_cart_version(user.id)
    return success

API POST /cart/apply-promotion {promotion_id}:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/cart:apply-promo")
    promotion = get_promotion_cached(promotion_id)
    cart = db_query_cart_items(user.id)
    result = PromotionService.apply(cart, [promotion], user)
    persist_applied_promotion(user.id, promotion_id, cart.version) // 可选记录
    return result.price_breakdown

// 促销引擎
// 输入:购物车项、候选促销集合、用户与店铺上下文
function PromotionService.apply(cart_items, promotions, user):
    // 过滤可用促销:scope匹配(商品/店铺/全局)、用户等级、时间窗、库存状态、优惠券可用性
    applicable = []
    for p in promotions:
        if is_in_time_window(p) and scope_match(p, cart_items) and user_tier_match(p, user):
            if p.type == "COUPON" and not coupon_usable(user, p): continue
            applicable.append(p)
    // 按priority降序排序
    sorted = sort(applicable, by=priority_desc)
    // 互斥控制:基于mutex_tags选择最大收益集合(贪心+互斥检查)
    selected = []
    used_mutex_tags = set()
    for p in sorted:
        if intersects(p.mutex_tags, used_mutex_tags): continue
        if improves_total(cart_items, p): 
            selected.append(p)
            used_mutex_tags.add_all(p.mutex_tags)
    // 计算价格明细
    price_detail = new_price_detail()
    base_total = sum(item.price_snapshot * item.qty for item in cart_items)
    total = base_total
    for p in selected:
        discount = compute_discount(p.ruleset, cart_items)
        price_detail.add(p.id, discount, p.type)
        total = total - discount
    if total < 0: total = 0
    // 促销降级
    if promotion_engine_error():
        return { total: base_total, price_detail: [], degraded: true }
    return { total: total, price_detail: price_detail, degraded: false, applied_promotions: selected }

// 结算与库存短锁
API POST /checkout {cart_id} with Idempotency-Key = user_id + cart_version:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/checkout")
    // 幂等检查
    idem_key = "idem:checkout:" + user.id + ":" + get_cart_version(user.id)
    if redis_set_nx(idem_key, "1", ttl=120s) == false:
        return conflict_or_previous_result(idem_key)
    cart = db_query_cart_items(user.id)
    promotions = get_all_promotions_cached()
    pricing = PromotionService.apply(cart, promotions, user)
    // 可售量校验与短锁(Redis原子脚本,批量)
    ttl = 10 minutes
    for each item in cart:
        success = InventoryService.try_reserve(item.sku_id, item.qty, cart_id, ttl)
        if not success:
            // 回滚已锁定
            InventoryService.release_cart_reservations(cart_id)
            return { error: "InventoryLockFailed", retry_after_ms: random_backoff(200~800) }
    // 返回结算结果与锁信息(包含锁过期时间)
    return { pricing: pricing, lock_expires_at: now() + ttl }

// 库存短锁(Redis)
function InventoryService.try_reserve(sku_id, qty, cart_id, ttl):
    // 原子脚本:检查剩余量并增加reserved,同时记录cart局部锁用于释放
    script:
        stock = GET "sku:" + sku_id + ":stock" or db_fallback_stock(sku_id) // 可选提前缓存
        reserved = GET "inv:sku:" + sku_id + ":reserved" or 0
        available = stock - reserved
        if available >= qty:
            INCRBY "inv:sku:" + sku_id + ":reserved" qty
            SET "inv:cart:" + cart_id + ":sku:" + sku_id qty EX ttl
            // 记录一个聚合索引便于按cart释放
            SADD "inv:cart:" + cart_id + ":skus" sku_id; EX ttl
            return 1
        else:
            return 0
    return eval(script)

function InventoryService.release_cart_reservations(cart_id):
    skus = SMEMBERS "inv:cart:" + cart_id + ":skus"
    for each sku_id in skus:
        qty = GET "inv:cart:" + cart_id + ":sku:" + sku_id
        if qty > 0:
            DECRBY "inv:sku:" + sku_id + ":reserved" qty
        DEL "inv:cart:" + cart_id + ":sku:" + sku_id
    DEL "inv:cart:" + cart_id + ":skus"

// 下单与订单事件
API POST /orders {address_id, payment_channel}:
    user = require_auth(token)
    rate_limit_check(ip, user.id, "/orders:create")
    cart = db_query_cart_items(user.id)
    // 校验短锁仍有效
    if not InventoryService.cart_lock_valid(cart.id):
        return { error: "LockExpired", action: "RetryCheckout" }
    promotions = get_applied_promotions(user.id, cart.version) or reapply_promotions(cart, user)
    pricing = PromotionService.apply(cart, promotions, user)
    // 创建订单(事务)
    tx_begin()
        order_id = insert_order(user.id, cart.items, pricing.total, pay_status="UNPAID", address_id, payment_channel)
        // 记录InventoryLock到DB(审计)
        for item in cart.items:
            insert_inventory_lock(order_id, item.sku_id, item.qty, ttl_remaining(cart.id))
    tx_commit()
    // 投递订单创建事件(带重试与死信)
    MQ.publish("order.created", {order_id, user_id, items, total, payment_channel})
    // 清除结算幂等键
    redis_del("idem:checkout:" + user.id + ":" + get_cart_version(user.id))
    return { order_id, payable_amount: pricing.total }

// 支付回调与最终扣减
function PaymentCallbackHandler.handle(payment_event):
    // 幂等检查
    idem_key = "idem:paycb:" + payment_event.order_id + ":" + payment_event.tx_id
    if redis_set_nx(idem_key, "1", ttl=300s) == false:
        return already_processed()
    order = db_get_order(payment_event.order_id)
    if order.pay_status == "PAID": return success // 幂等
    if payment_event.status == "SUCCESS":
        tx_begin()
            // 最终库存扣减(乐观检查)
            for item in order.items:
                affected = db_exec("UPDATE sku SET stock = stock - ? WHERE id = ? AND stock >= ?", item.qty, item.sku_id, item.qty)
                if affected == 0: 
                    tx_rollback()
                    // 库存不够,标记异常并触发人工介入或自动补货
                    mark_order_exception(order.id, "StockInsufficientOnFinalDeduct")
                    MQ.publish("order.exception", {order_id, type:"FinalDeductFail"})
                    return fail
            update_order_pay_status(order.id, "PAID")
        tx_commit()
        // 释放短锁
        InventoryService.release_cart_reservations(order.cart_id)
        // 投递paid事件
        MQ.publish("order.paid", {order_id})
        // 优惠券消费确认(如适用)
        PromotionService.confirm_coupon_usage(order.user_id, order.applied_coupons, order_id)
        return success
    else:
        // 支付失败或超时
        update_order_pay_status(order.id, "CANCELED")
        InventoryService.release_cart_reservations(order.cart_id)
        PromotionService.rollback_coupon_usage(order.user_id, order.applied_coupons)
        MQ.publish("order.canceled", {order_id})
        return success

function InventoryService.cart_lock_valid(cart_id):
    // 判断锁索引是否存在且未过期
    return EXISTS "inv:cart:" + cart_id + ":skus"

// 提示:运营促销配置(后台)
API POST /promotions {scope, type, priority, mutex_tags, ruleset}:
    operator = require_auth(token)
    require_permission(operator, "PROMOTION_WRITE") // 需审核流程
    create_promotion_draft(...)
    submit_for_review(...)
    return success

// 观测与指标
function record_metrics(pricing, locks):
    metrics.inc("promotion_hit_count", len(pricing.applied_promotions))
    metrics.observe("checkout_p95_latency", elapsed_ms)
    metrics.inc("inv_lock_hits") if locks_all_success else metrics.inc("inv_lock_conflicts")
    tracing.annotate("degraded", pricing.degraded)

技术说明

  • 促销规则与互斥

    • 促销优先级:数字越大优先级越高。先排序再贪心选择,遇到mutex_tags冲突则跳过。
    • scope支持:商品级、店铺级、全局。跨店铺叠加需检查scope兼容且不互斥。
    • 类型实现建议:
      • 满减:ruleset = {threshold, reduce_amount}
      • 满折:ruleset = {threshold, discount_rate}
      • 直降:ruleset = {reduce_amount_per_sku or bundle}
      • 优惠券:ruleset = {coupon_code or grant_id, user_binding, expire_at};建议维护优惠券使用状态表或缓存,并在支付成功时确认使用,支付失败时回滚。
    • 降级路径:促销引擎异常或规则加载失败时,返回基础价与标记degraded,避免阻塞交易。
  • 库存短锁设计(Redis)

    • 键设计:
      • inv:sku:{sku_id}:reserved → 当前SKU已预留总量
      • inv:cart:{cart_id}:sku:{sku_id} → 该购物车在该SKU的预留量(带TTL)
      • inv:cart:{cart_id}:skus → 该购物车涉及的SKU集合(便于批量释放,带TTL)
    • 原子性:使用Lua脚本实现“检查可用量并预留”的原子操作;支持批量执行以降低延迟。
    • TTL与释放:结算给出锁过期时间;下单后保留锁直到支付成功或失败;支付完成释放锁并做最终扣减。
    • 可售量计算:API展示时用 stock - reserved;避免超卖。
  • 最终扣减与数据库事务

    • 在支付成功时进行最终扣减,语句形式为条件更新:UPDATE sku SET stock = stock - qty WHERE id = ? AND stock >= qty,确保不会扣成负数。
    • 如扣减失败,标记订单异常并告警;需要人工或异步补偿流程。
  • 幂等与并发控制

    • 结算幂等:键 = user_id + cart_version;每次购物车变更都 bump cart_version。
    • 支付回调幂等:键 = order_id + payment_tx_id。
    • 同SKU并发:通过Redis原子脚本控制预留;数据库最终扣减使用条件更新确保安全。
  • 缓存策略

    • SKU与促销规则:本地内存+Redis双层缓存,设置合理TTL(例如60秒),并在运营修改促销后通过消息或版本号失效。
    • 热路径:SKU stock可选镜像到Redis以便计算available更快;保持以DB为权威,在写路径使用DB。
  • 限流与安全

    • 基于IP+用户的滑动窗口限流,关键接口设置门槛。
    • 鉴权:Bearer Token;运营促销配置走权限与审核流程。
    • 防重放:幂等键与短期锁结合,防止重复结算与重复支付回调。
  • MQ与可靠投递

    • 订单事件:order.created、order.paid、order.canceled;发布失败重试,超限入死信队列(DLQ)。
    • 消费方:履约与风控系统。建议携带trace_id以便端到端追踪。
  • 可观测与性能

    • Tracing:在请求进入、促销应用、锁定库存、订单创建、MQ投递、支付回调各节点打点。
    • Metrics:促销命中率、锁命中/冲突比、结算延迟(P95/P99)。
    • 目标P95 < 150ms:采用批量Redis脚本、缓存命中率优化、减少数据库交互步骤。
  • 测试建议

    • 促销互斥与优先级:多规则组合验证结果一致性。
    • 并发加购与结算:同SKU高并发下锁冲突与可售量准确性。
    • 支付失败回滚:锁释放、优惠券回溯、订单状态正确。
    • 跨店铺促销叠加:确保scope与互斥规则正确执行。
  • 部署与容错

    • 无状态Web层滚动发布与灰度;MySQL主从,读写分离;Redis集群高可用;MQ具备重试与DLQ。
    • 促销引擎降级、库存锁失败返回重试建议(带指数退避)、MQ失败重试/死信保障稳定性。

该方案在数据一致性、并发控制与性能之间保持平衡,使用Redis短锁+DB最终扣减的经典模式,促销引擎采用优先级+互斥贪心选择,支持跨店铺叠加与降级路径,满足中等复杂度Web服务的要求。

需求分析

  • 构建一个事件驱动的会员成长值与权益发放微服务集,服务包括:member-service、points-service、benefits-service、order-listener、notification-service。
  • 事件通道:Kafka 主题 orders.paid、points.accrued、tier.changed、benefit.granted;所有出站事件通过 Outbox 模式确保数据库更新与事件发布一致。
  • 数据模型:Member、PointLedger、Tier、Benefit、BenefitIssue,覆盖成长值账本、等级、权益库存与发放记录。
  • 核心流程:
    1. 订单支付事件进入 order-listener,基于幂等键(order_id+member_id)防重,转换为成长值规则输入;
    2. points-service 计算成长值并写入账本,发布 points.accrued;
    3. member-service 订阅成长值事件评估等级(门槛、有效期、多维权重),升级则发布 tier.changed;
    4. benefits-service 订阅等级变化,按策略发放权益,写入 BenefitIssue 并发布 benefit.granted;
    5. notification-service 消费通知事件推送给用户。
  • 跨服务一致性:Saga 编排,失败时通过补偿步骤撤销权益或回滚成长值。所有接口要求幂等。发放流程使用状态机驱动。
  • 非功能:高可用(多副本)、可扩展(按 member_id 分区)、告警(滞留事件、补偿失败)、合规(审计日志、数据脱敏)、运维(定期对账、死信监控、灰度发布)、安全(服务间 mTLS,外部限流与签名)、测试(重复事件幂等、等级边界升级、库存耗尽补偿、跨区分片一致性)。

架构概述

  • 服务与职责
    • order-listener:订阅 orders.paid,校验幂等,转换为成长值规则输入,调用 points-service 接口。
    • points-service:成长值规则引擎,账本写入与余额维护,Outbox 发布 points.accrued。
    • member-service:订阅 points.accrued,评估等级(门槛、有效期、多维权重),更新 Member,发布 tier.changed。
    • benefits-service:订阅 tier.changed,按策略发放权益,状态机与 Saga 编排,发布 benefit.granted。
    • notification-service:订阅 benefit.granted(和可选 tier.changed),推送消息。
  • 数据存储(每服务独立数据库)
    • Member(id, mobile, tier, status, updated_at, audit_info)
    • PointLedger(id, member_id, delta, source, idempotency_key, balance, created_at, status)
    • Tier(id, name, threshold, valid_window, weight_model)
    • Benefit(id, type, stock, valid_time, policy_id, status)
    • BenefitIssue(id, member_id, benefit_id, status, saga_id, idempotency_key, created_at, updated_at)
    • Outbox(id, event_type, aggregate_id, payload, event_id, created_at, sent_at, status, retry_count)
    • IdempotencyStore(key, scope, status, result_ref, created_at, expires_at)
    • AuditLog(actor, action, resource_id, resource_type, before, after, at, redacted_fields)
  • 事件与分区
    • Kafka 主题:orders.paid、points.accrued、tier.changed、benefit.granted、dead-letter。
    • 分区键:member_id(确保单会员内事件顺序)。
  • Outbox 模式
    • 数据变更与 Outbox 记录在同一事务内提交;后台 OutboxRelayer 拉取并发布到 Kafka,成功后标记 sent。
    • 发布语义:至少一次 + 事件幂等(event_id 去重)。
  • Saga 编排
    • 编排位置:由 benefits-service 内置 SagaManager 按策略驱动(不新增独立服务,符合服务清单)。
    • Saga 实例:与“等级变更触发的权益发放”或“API主动发放”绑定,包含步骤与补偿。
    • 补偿策略:失败时撤销权益(释放库存,标记失败/已补偿),可选回滚成长值(写反向账本)按策略开关。
  • 状态机(核心对象)
    • BenefitIssue 状态:PENDING -> RESERVED -> GRANTED -> NOTIFIED;失败路径:PENDING/RESERVED -> FAILED -> COMPENSATED。
    • PointLedger 状态:POSTED,COMPENSATION_POSTED(负向冲正)。
  • API(对外)
    • POST /members
    • GET /members/{id}
    • POST /points/accrue {order_id, amount, idempotency_key}
    • GET /tiers
    • POST /benefits/grant {member_id, policy_id, idempotency_key}
  • 安全与合规
    • mTLS 服务间鉴权;外部接口 HMAC 签名 + 限流;审计日志与数据脱敏。
  • 观测与运维
    • 指标:Outbox 滞留、DLQ 消息计数、Saga 失败率、库存可用量、账本对账差值。
    • 定期对账、死信队列消费与重放、灰度发布逐步扩容。

伪代码

  • 通用组件:Idempotency 与 Outbox
FUNCTION EnsureIdempotency(scope, key):
    existing = SELECT * FROM IdempotencyStore WHERE scope = scope AND key = key
    IF existing EXISTS AND existing.status == "COMPLETED":
        RETURN existing.result_ref
    ELSE IF existing EXISTS AND existing.status == "IN_PROGRESS":
        RAISE "RETRY_LATER"
    ELSE:
        INSERT INTO IdempotencyStore(scope, key, status="IN_PROGRESS", created_at=NOW)
        RETURN null

FUNCTION CompleteIdempotency(scope, key, result_ref):
    UPDATE IdempotencyStore SET status="COMPLETED", result_ref=result_ref WHERE scope=scope AND key=key

FUNCTION WriteOutbox(event_type, aggregate_id, payload):
    event_id = GENERATE_UUID()
    INSERT INTO Outbox(event_type, aggregate_id, payload, event_id, created_at=NOW, status="PENDING")
    RETURN event_id

PROCESS OutboxRelayer:
    LOOP:
        rows = SELECT * FROM Outbox WHERE status="PENDING" ORDER BY created_at LIMIT N
        FOR each row IN rows:
            TRY:
                PUBLISH_TO_KAFKA(topic=row.event_type, key=row.aggregate_id, message=payload_with_event_id(row))
                UPDATE Outbox SET status="SENT", sent_at=NOW WHERE id=row.id
            CATCH transient_error:
                UPDATE Outbox SET retry_count = retry_count + 1 WHERE id=row.id
                IF retry_count > MAX_RETRY:
                    MOVE_TO_DLQ(row)
            SLEEP short_interval
  • order-listener(消费 orders.paid 事件并调用 points-service)
CONSUMER OnOrdersPaid(event from orders.paid):
    member_id = event.member_id
    order_id = event.order_id
    amount = event.amount
    idempotency_key = CONCAT(order_id, ":", member_id)

    IF IsDuplicateEvent(event.event_id):
        ACK; RETURN

    IF EnsureIdempotency(scope="ORDERS_PAID", key=idempotency_key) == "RETRY_LATER":
        NACK; RETURN

    points_input = TRANSFORM_TO_POINTS_INPUT(amount, event.category, event.timestamp)

    TRY:
        CALL points-service POST /points/accrue {order_id, amount=points_input.amount, idempotency_key}
        CompleteIdempotency("ORDERS_PAID", idempotency_key, result_ref=order_id)
        ACK
    CATCH client_error (4xx):
        // Business validation failed; finalize idempotency to avoid infinite retry
        CompleteIdempotency("ORDERS_PAID", idempotency_key, result_ref="FAILED")
        ACK
    CATCH server_error (5xx or timeout):
        // retry later
        NACK
  • points-service(成长值计算与账本写入,Outbox 发布 points.accrued)
API POST /points/accrue(request):
    key = request.idempotency_key
    IF EnsureIdempotency(scope="POINTS_ACCRUE", key=key) == "RETRY_LATER":
        RETURN 409 Retry

    BEGIN TRANSACTION
        existing = SELECT * FROM PointLedger WHERE idempotency_key = key
        IF existing EXISTS:
            COMMIT
            CompleteIdempotency("POINTS_ACCRUE", key, result_ref=existing.id)
            RETURN 200 existing

        delta = APPLY_POINTS_RULES(order_id=request.order_id, amount=request.amount, member_profile=LOAD_MEMBER(request.member_id))
        current_balance = SELECT SUM(delta) FROM PointLedger WHERE member_id = request.member_id AND status IN ("POSTED", "COMPENSATION_POSTED")
        new_balance = current_balance + delta

        INSERT INTO PointLedger(id=NEW_ID, member_id=request.member_id, delta=delta, source="ORDER", idempotency_key=key, balance=new_balance, status="POSTED", created_at=NOW)

        event_id = WriteOutbox(event_type="points.accrued",
                               aggregate_id=request.member_id,
                               payload = { ledger_id: NEW_ID, member_id: request.member_id, delta: delta, balance: new_balance, idempotency_key: key, source: "ORDER" })
    COMMIT

    CompleteIdempotency("POINTS_ACCRUE", key, result_ref=NEW_ID)
    RETURN 200 { ledger_id: NEW_ID, balance: new_balance }

FUNCTION ReverseLedgerByKey(original_key, reason):
    BEGIN TRANSACTION
        orig = SELECT * FROM PointLedger WHERE idempotency_key = original_key AND status="POSTED"
        IF NOT orig EXISTS:
            COMMIT; RETURN "NOOP"
        comp_key = CONCAT(original_key, ":COMP")
        IF SELECT * FROM PointLedger WHERE idempotency_key = comp_key EXISTS:
            COMMIT; RETURN "ALREADY_COMPENSATED"

        current_balance = SELECT SUM(delta) FROM PointLedger WHERE member_id = orig.member_id AND status IN ("POSTED", "COMPENSATION_POSTED")
        new_balance = current_balance - orig.delta

        INSERT INTO PointLedger(id=NEW_ID, member_id=orig.member_id, delta=-orig.delta, source="COMPENSATION", idempotency_key=comp_key, balance=new_balance, status="COMPENSATION_POSTED", created_at=NOW)

        event_id = WriteOutbox(event_type="points.accrued",
                               aggregate_id=orig.member_id,
                               payload = { ledger_id: NEW_ID, member_id: orig.member_id, delta: -orig.delta, balance: new_balance, idempotency_key: comp_key, source: "COMPENSATION", reason: reason })
    COMMIT
  • member-service(订阅 points.accrued,评估等级并发布 tier.changed)
CONSUMER OnPointsAccrued(event from points.accrued):
    IF IsDuplicateEvent(event.event_id):
        ACK; RETURN

    member_id = event.member_id

    // 加权积分:按有效期和权重模型计算
    weighted_score = COMPUTE_WEIGHTED_SCORE(member_id, window=ACTIVE_TIER_WINDOW(), weight_model=LOAD_TIER_WEIGHT_MODEL())

    // 基于门槛判断目标等级
    current_tier = SELECT tier FROM Member WHERE id = member_id
    target_tier = EVALUATE_TIER(weighted_score, thresholds=LOAD_TIERS())

    IF target_tier > current_tier:
        BEGIN TRANSACTION
            UPDATE Member SET tier = target_tier, updated_at=NOW WHERE id = member_id

            event_id = WriteOutbox(event_type="tier.changed",
                                   aggregate_id=member_id,
                                   payload = { member_id: member_id, from_tier: current_tier, to_tier: target_tier, reason: "POINTS_ACCUMULATED", effective_at: NOW })
        COMMIT

    ACK

FUNCTION COMPUTE_WEIGHTED_SCORE(member_id, window, weight_model):
    points = SELECT delta, source, created_at FROM PointLedger WHERE member_id=member_id AND created_at IN window
    score = 0
    FOR p IN points:
        w = weight_model[source] OR DEFAULT_WEIGHT
        decay = TIME_DECAY(window, p.created_at)
        score = score + p.delta * w * decay
    RETURN score

FUNCTION EVALUATE_TIER(score, thresholds):
    // thresholds: list of {tier_name, min_score}
    RETURN MAX tier WHERE score >= min_score
  • benefits-service(订阅 tier.changed,状态机与 Saga 编排)
CONSUMER OnTierChanged(event from tier.changed):
    IF IsDuplicateEvent(event.event_id):
        ACK; RETURN

    member_id = event.member_id
    from_tier = event.from_tier
    to_tier = event.to_tier

    policies = SELECT * FROM BenefitPolicy WHERE trigger_tier = to_tier AND status="ACTIVE"
    FOR policy IN policies:
        saga_id = GENERATE_UUID()
        START_SAGA(saga_id, member_id, policy, change_event_id=event.event_id)

PROCESS START_SAGA(saga_id, member_id, policy, change_event_id):
    // 幂等:一个 (member_id, policy_id, change_event_id) 只启动一次
    IF EXISTS BenefitIssue WHERE member_id=member_id AND policy_id=policy.id AND idempotency_key=change_event_id:
        RETURN

    // Step 1: 预留库存
    TRY:
        BEGIN TRANSACTION
            IF policy.type IN ("COUPON", "PACKAGE"):
                stock = SELECT stock FROM Benefit WHERE id = policy.benefit_id FOR UPDATE
                IF stock <= 0:
                    RAISE "OUT_OF_STOCK"
                UPDATE Benefit SET stock = stock - 1 WHERE id = policy.benefit_id

            INSERT INTO BenefitIssue(id=NEW_ID, member_id=member_id, benefit_id=policy.benefit_id, status="RESERVED",
                                     saga_id=saga_id, idempotency_key=change_event_id, created_at=NOW)
        COMMIT
    CATCH "OUT_OF_STOCK":
        HANDLE_COMPENSATION(saga_id, member_id, policy, reason="OUT_OF_STOCK")
        RETURN

    // Step 2: 发放/确认
    TRY:
        BEGIN TRANSACTION
            UPDATE BenefitIssue SET status="GRANTED", updated_at=NOW WHERE id=NEW_ID

            event_id = WriteOutbox(event_type="benefit.granted",
                                   aggregate_id=member_id,
                                   payload = { issue_id: NEW_ID, member_id: member_id, benefit_id: policy.benefit_id, type: policy.type, status: "GRANTED" })
        COMMIT
    CATCH transient_error:
        // 可重试;不立即补偿
        SCHEDULE_RETRY(saga_id)
        RETURN
    CATCH permanent_error:
        HANDLE_COMPENSATION(saga_id, member_id, policy, reason="GRANT_FAILED")
        RETURN

FUNCTION HANDLE_COMPENSATION(saga_id, member_id, policy, reason):
    BEGIN TRANSACTION
        issue = SELECT * FROM BenefitIssue WHERE saga_id=saga_id FOR UPDATE
        IF issue EXISTS AND issue.status IN ("RESERVED", "PENDING"):
            UPDATE BenefitIssue SET status="FAILED", updated_at=NOW WHERE id=issue.id

        IF policy.type IN ("COUPON", "PACKAGE"):
            UPDATE Benefit SET stock = stock + 1 WHERE id = policy.benefit_id // 释放库存

        // 可选:回滚成长值(按策略)
        IF policy.compensation_rolls_back_points == TRUE AND issue.idempotency_key EXISTS:
            CALL points-service ReverseLedgerByKey(original_key=issue.idempotency_key, reason=reason)
    COMMIT
  • notification-service(消费 benefit.granted 并推送)
CONSUMER OnBenefitGranted(event from benefit.granted):
    IF IsDuplicateEvent(event.event_id):
        ACK; RETURN

    message = FORMAT_MESSAGE(event.member_id, event.type, event.benefit_id)
    result = SEND_TO_USER_CHANNEL(member_id=event.member_id, message=message)

    RECORD_DELIVERY(member_id, event.issue_id, status=result.status, channel=result.channel, at=NOW)
    ACK
  • API 其他端点示例(简化)
API POST /members(body):
    VALIDATE body.mobile FORMAT
    BEGIN TRANSACTION
        INSERT INTO Member(id=NEW_ID, mobile=MASK(body.mobile), tier=BASE_TIER, status="ACTIVE", updated_at=NOW)
        WriteOutbox(event_type="member.created", aggregate_id=NEW_ID, payload={member_id: NEW_ID})
        INSERT INTO AuditLog(...)
    COMMIT
    RETURN 201 { id: NEW_ID }

API GET /members/{id}:
    AUTHN_MTLS_OR_HMAC()
    member = SELECT * FROM Member WHERE id = id
    RETURN 200 REDACT(member, fields=["audit_info"])

API GET /tiers:
    RETURN SELECT * FROM Tier ORDER BY threshold ASC

API POST /benefits/grant { member_id, policy_id, idempotency_key }:
    // 主动发放入口,走 Saga
    IF EnsureIdempotency(scope="BENEFIT_GRANT_API", key=idempotency_key) == "RETRY_LATER":
        RETURN 409 Retry
    START_SAGA(saga_id=GENERATE_UUID(), member_id=member_id, policy=LOAD_POLICY(policy_id), change_event_id=idempotency_key)
    CompleteIdempotency("BENEFIT_GRANT_API", idempotency_key, result_ref="STARTED")
    RETURN 202 Accepted
  • 定期对账与死信处理
SCHEDULED TASK ReconciliationDaily:
    FOR each member IN Members:
        ledger_sum = SELECT SUM(delta) FROM PointLedger WHERE member_id=member.id AND status IN ("POSTED", "COMPENSATION_POSTED")
        IF ledger_sum != LAST_KNOWN_BALANCE(member.id):
            ALERT "Ledger mismatch", member.id
    CHECK_OUTBOX_STUCK(threshold_minutes=15)
    CHECK_SAGA_FAILURE_RATE(window=1h, threshold=rate_limit)
    CHECK_BENEFIT_STOCK_BELOW_MIN()

CONSUMER DeadLetterHandler(event from dead-letter):
    CLASSIFY(event)
    IF recoverable:
        REPLAY(event)
    ELSE:
        CREATE_INCIDENT(event)
        LOG_WITH_AUDIT(event)
  • 事件去重与分区
FUNCTION IsDuplicateEvent(event_id):
    RETURN EXISTS EventDedup WHERE event_id = event_id

CONSUMER INIT:
    SET KafkaPartitionKey = member_id  // 保证单会员内顺序一致

技术说明

  • 规则引擎与成长值计算
    • points-service 将订单金额、类别、时间等转换为成长值,支持权重和时间衰减(如近30日权重更高),以配置驱动(Tier.weight_model)。
    • PointLedger 通过唯一约束(idempotency_key)保证接口幂等;每次写入更新冪等一致的 balance 快照,简化读取。
  • Outbox 一致性与事件保证
    • 在同一数据库事务中写业务数据与 Outbox,随后异步发布;避免“写库成功、发消息失败”的不一致。
    • OutboxRelayer 提供重试与 DLQ 转移,事件带 event_id;消费者侧维护 EventDedup 集合去重,提供至少一次投递的幂等处理。
  • Saga 编排与补偿
    • 采用 benefits-service 内的 SagaManager 以实例级驱动发放流程,状态变化通过 BenefitIssue 状态机体现。
    • 失败补偿包含:释放库存;标记 BenefitIssue 为 FAILED/COMPENSATED;可选触发 points-service 的反向账本(ReverseLedgerByKey),由策略配置决定是否“全链路补偿”。建议在“真实支付引发的积分”场景不回滚积分,避免违背交易事实;仅在误发/脏写场景回滚。
  • 幂等与顺序
    • 所有入口(orders.paid、/points/accrue、/benefits/grant)均要求 idempotency_key;幂等存储记录在 IdempotencyStore 并与业务对象唯一约束配合。
    • Kafka 分区键使用 member_id,确保单会员事件顺序;跨服务通过事件去重保证处理幂等。
  • 数据模型与索引建议
    • PointLedger 索引:member_id+created_at、idempotency_key 唯一、source。
    • BenefitIssue 索引:member_id、saga_id、idempotency_key 唯一、status。
    • Outbox 索引:status+created_at;event_id 唯一。
  • 安全与合规
    • 服务间 mTLS 加密与双向认证;外部接口 HMAC 签名校验、限流与黑白名单。
    • 审计日志记录重要操作(创建会员、发放权益、积分调整),敏感字段(mobile)按掩码存储与展示。
  • 高可用与扩展性
    • 各服务多副本部署;数据库主从或多 AZ;Kafka 多分区,以 member_id 为 key 扩展吞吐。
    • 读写分离:成长值账本写入优先一致性,Member/Tier 读取可走只读副本。
  • 观测与告警
    • 指标:Outbox 未发送数量、平均发送延迟、DLQ 消息量、Saga 失败比率、库存告急阈值、对账差值、事件滞留时间。
    • 告警在阈值越界时触发,并提供自动化重试/修复(如对账自动补发漏事件)。
  • 运维与发布
    • 定期对账任务核对账本与会员余额;死信队列监控与人工/自动重放。
    • 灰度发布:按成员哈希或租户分配发布比例,监控关键指标后再扩大。
  • 测试策略
    • 幂等:重复 orders.paid 与 /points/accrue、/benefits/grant 请求,确保一次性效果。
    • 等级边界:在门槛边界(=threshold±1)验证升级/不升级行为;有效期窗口切换时的等级回落/维持。
    • 库存耗尽补偿:benefits-service 在 stock=0 时触发补偿,不产生 GRANTED 事件。
    • 分片一致性:多分区 Kafka 下,同一 member_id 事件顺序性;跨区部署延迟与重放对账。
  • 实现建议
    • 事件模式采用“编排+编舞混合”:业务步骤靠事件编舞,失败补偿集中由 Saga 编排控制。
    • 规则与策略尽量配置化(权重、窗口、阈值、发放策略、是否回滚积分),避免硬编码。
    • 使用数据库事务锁(FOR UPDATE)保证库存扣减与发放记录一致,避免超发。
    • 对 OutboxRelayer 使用逐条确认发布与幂等标记,确保故障恢复后不会重复投递造成副作用。
    • 将所有事件载荷包含 idempotency_key 与 event_id,消费者侧以这两者作为去重与处理幂等依据。

需求分析

  • 构建移动后端的会话与消息系统,包含认证与令牌、WebSocket长连接网关、消息可靠收发与存储、离线推送适配、附件上传下载。
  • 核心要求:
    • 可靠性:幂等(client_msg_id+sender_id)、去重(缓存或布隆)、顺序(按conv_id分片的单序列)、失败回退(重试队列),限流与背压。
    • 实时协议:长连鉴权→分配会话→发送消息(client_msg_id),服务端分配全局seq并ACK;未ACK重试;离线消息入队并走推送;历史游标分页。
    • 非功能:P95发送延迟<200ms;并发峰值控制;冷热分层(Redis热、数据库冷、归档);监控(连接数、吞吐、ACK延迟)。
    • 安全与合规:加密传输、令牌校验、敏感内容审计;附件病毒扫描失败阻断。
    • 测试场景:弱网断连重连、离线追送、重复发送去重、会话参与者权限校验。

架构概述

  • 组件划分
    • auth-service:登录、令牌签发与刷新。
    • session-gateway:WebSocket长连接管理、鉴权、心跳、路由、背压与限流。
    • message-service:消息收发、顺序号分配、幂等去重、存储(热/冷)、投递与重试。
    • push-service:离线推送通道适配(多通道、故障切换、熔断)。
    • media-service:附件上传、下载、病毒扫描、校验与存储。
  • 数据模型(逻辑)
    • User(id, nickname, password_hash)
    • Device(id, user_id, platform, push_token)
    • Conversation(id, type, participants[], last_msg_id)
    • Message(id, conv_id, sender_id, seq, content, ack_status, created_at, client_msg_id)
    • Attachment(id, url, size, checksum)
  • 存储与分层
    • 热数据:Redis(会话seq计数器、最近消息、每设备未读队列、去重缓存、会话路由表)。
    • 冷数据:数据库(消息持久化、会话、用户、设备、附件元数据),周期归档到对象存储。
  • 顺序保障
    • 按conv_id分片,将seq作为该会话的单调递增序列(原子INCR);同一会话的消息在同一分片内处理。
  • 幂等与去重
    • 以(sender_id, client_msg_id)作为唯一键;Redis短期缓存预检 + DB唯一索引兜底;可加布隆过滤器加速拒绝明显重复。
  • 投递与离线
    • 在线:通过session-gateway路由到目标设备连接;服务端ACK给发送端。
    • 离线:消息入设备未读队列,push-service走推送通道;客户端上线后拉取pending。
  • 性能与可靠性
    • 发送路径:鉴权→幂等检查→分配seq→写DB→写Redis热→ACK;尽量将非关键操作异步(推送、指标、审计)。
    • 限流与背压:连接级、用户级、会话级令牌桶;慢消费者检测与暂停发送窗口。
  • 监控
    • 连接数、消息吞吐、ACK延迟/超时、未读队列深度、推送成功率、病毒扫描命中率。

伪代码

// ======================= auth-service =======================

FUNCTION POST /auth/login(mobile, password):
    user = FindUserByMobile(mobile)
    IF user IS NULL OR NOT VerifyPassword(password, user.password_hash):
        RETURN 401, {error: "invalid_credentials"}
    access_token = IssueAccessToken(subject=user.id, ttl=short)
    refresh_token = IssueRefreshToken(subject=user.id, ttl=long)
    RETURN 200, {access_token, refresh_token}

FUNCTION POST /auth/refresh(refresh_token):
    IF NOT VerifyRefreshToken(refresh_token):
        RETURN 401, {error: "invalid_refresh_token"}
    user_id = ExtractSubject(refresh_token)
    access_token = IssueAccessToken(subject=user_id, ttl=short)
    RETURN 200, {access_token}

// ======================= session-gateway =======================

ON WebSocketConnect(conn):
    ScheduleTimeout(conn, T_AUTH_TIMEOUT)

ON Message(conn, frame):
    IF frame.type == "auth":
        VERIFY_TLS(conn)
        token = frame.access_token
        IF NOT VerifyAccessToken(token):
            CLOSE(conn, code=401)
        user_id = ExtractSubject(token)
        device_id = frame.device_id
        IF NOT BindDeviceToUser(device_id, user_id):
            CLOSE(conn, code=403)
        RegisterConnection(device_id, conn)
        CancelTimeout(conn, T_AUTH_TIMEOUT)
        SEND(conn, {type: "auth_ack", status: "ok"})
    ELSE IF frame.type == "heartbeat":
        UpdateHeartbeat(conn)
        SEND(conn, {type: "heartbeat_ack"})
    ELSE IF frame.type == "send_message":
        RouteToMessageService(frame, user_id, device_id)
    ELSE IF frame.type == "ack":
        HandleClientAck(user_id, frame.conv_id, frame.seq)

ON ConnectionClose(conn):
    device_id = LookupDevice(conn)
    UnregisterConnection(device_id)

FUNCTION RegisterConnection(device_id, conn):
    ConnectionRegistry[device_id] = conn
    UpdateMetrics("connections", +1)

// Rate limit at gateway
FUNCTION CheckRateLimit(subject_key):
    allowed = TokenBucketTryConsume(subject_key, 1)
    RETURN allowed

// Backpressure for slow consumer
FUNCTION SendToDevice(device_id, payload):
    conn = ConnectionRegistry[device_id]
    IF conn IS NULL:
        RETURN "offline"
    IF OutboundQueueSize(conn) > MAX_QUEUE_SIZE:
        ApplyBackpressure(conn)
        RETURN "deferred"
    EnqueueOutbound(conn, payload)
    RETURN "sent"

// ======================= message-service =======================

FUNCTION POST /conversations {participants[]}:
    ValidateParticipants(participants)
    conv_id = CreateConversation(participants)
    RETURN 201, {conv_id}

FUNCTION POST /messages {conv_id, content, client_msg_id} WITH access_token:
    sender_id = ExtractSubject(access_token)
    IF NOT IsParticipant(conv_id, sender_id):
        RETURN 403, {error: "not_in_conversation"}

    // Idempotency & dedup pre-check
    idem_key = MakeIdemKey(sender_id, client_msg_id)
    IF RedisExists(idem_key):
        existing = FindMessageByIdemKey(sender_id, client_msg_id)
        RETURN 200, {ack: "duplicate", seq: existing.seq, message_id: existing.id}

    // Optional bloom quick negative check
    IF BloomMayContain(idem_key):
        // proceed to DB uniqueness check anyway

    // Assign ordered seq per conversation (atomic)
    seq = AtomicIncr("conv_seq:" + conv_id)

    // Content audit (asynchronous preferred)
    IF ContentAuditBlocks(content):
        RETURN 400, {error: "content_blocked"}

    // Persist (synchronous for reliability)
    message_id = InsertMessage(
        conv_id=conv_id,
        sender_id=sender_id,
        seq=seq,
        content=content,
        ack_status="SERVER_ACKED",
        client_msg_id=client_msg_id
    )
    UpdateConversationLastMsg(conv_id, message_id)

    // Write hot cache
    RedisSet(idem_key, {message_id, seq}, ttl=IDEM_TTL)
    RedisListPush("conv_recent:" + conv_id, SerializeMessage(message_id), cap=RECENT_CAP)
    FOR EACH participant IN GetParticipants(conv_id) EXCEPT sender_id:
        EnqueueDelivery(participant, conv_id, seq, message_id)

    // ACK to sender
    RETURN 200, {ack: "ok", seq: seq, message_id: message_id}

FUNCTION EnqueueDelivery(recipient_id, conv_id, seq, message_id):
    devices = ListOnlineDevices(recipient_id)
    payload = {type: "message", conv_id, seq, message_id}
    delivered_any = FALSE
    FOR EACH device_id IN devices:
        r = SendToDevice(device_id, payload)
        IF r == "sent":
            delivered_any = TRUE
    IF NOT delivered_any:
        // offline path: queue & push
        RedisListPush("pending:" + RecipientKey(recipient_id), payload)
        TriggerPush(recipient_id, conv_id, seq)

FUNCTION GET /messages/pending WITH access_token:
    user_id = ExtractSubject(access_token)
    max_n = RequestParam("limit", DEFAULT_LIMIT)
    items = RedisListPopN("pending:" + RecipientKey(user_id), max_n)
    RETURN 200, {messages: items}

FUNCTION RouteToMessageService(frame, user_id, device_id):
    IF NOT CheckRateLimit("device:" + device_id):
        SEND(ConnectionRegistry[device_id], {type: "error", code: 429})
        RETURN
    // parse frame fields
    conv_id = frame.conv_id
    client_msg_id = frame.client_msg_id
    content = frame.content
    resp = POST /messages {conv_id, content, client_msg_id} WITH user_id
    SEND(ConnectionRegistry[device_id], {type: "server_ack", resp})

FUNCTION HandleClientAck(user_id, conv_id, seq):
    MarkDelivered(user_id, conv_id, seq)
    UpdateMetrics("client_ack", 1)

FUNCTION GET /conversations/{id}/history?cursor:
    conv_id = PathParam(id)
    cursor = QueryParam("cursor") // format: {last_seq, page_size, direction}
    page_size = ValidatePageSize(cursor.page_size)
    start_seq = ResolveStartSeq(conv_id, cursor)
    // First read from Redis recent window
    recent = RedisRange("conv_recent:" + conv_id, start_seq, page_size, cursor.direction)
    IF recent.size < page_size:
        remaining = page_size - recent.size
        cold = DbFetchHistory(conv_id, start_seq, remaining, cursor.direction)
        merged = MergeBySeq(recent, cold)
        RETURN 200, {messages: merged, next_cursor: MakeCursor(conv_id, merged)}
    ELSE:
        RETURN 200, {messages: recent, next_cursor: MakeCursor(conv_id, recent)}

FUNCTION ResolveStartSeq(conv_id, cursor):
    IF cursor IS NULL:
        RETURN GetLatestSeq(conv_id)
    ELSE:
        RETURN cursor.last_seq

// ======================= push-service =======================

FUNCTION TriggerPush(recipient_id, conv_id, seq):
    devices = ListDevices(recipient_id)
    FOR EACH d IN devices:
        adapter = SelectAdapter(d.platform)
        payload = MakePushPayload(conv_id, seq)
        result = adapter.Send(d.push_token, payload)
        IF NOT result.success:
            alt = SelectFallbackAdapter(d.platform)
            IF alt IS NOT NULL:
                alt.Send(d.push_token, payload)
            RecordPushFailure(d, result.error)

FUNCTION SelectAdapter(platform):
    IF platform == "ios": RETURN APNSAdapter
    IF platform == "android": RETURN FCMAdapter
    RETURN GenericAdapter

// ======================= media-service =======================

FUNCTION POST /media/upload WITH access_token AND file:
    user_id = ExtractSubject(access_token)
    IF NOT ValidateFileSize(file.size) OR NOT ValidateMime(file.mime):
        RETURN 400, {error: "invalid_file"}
    IF VirusScan(file) == "infected":
        RETURN 400, {error: "virus_detected"}
    checksum = ComputeChecksum(file)
    url = StoreFileObject(file) // object storage
    att_id = InsertAttachment(url, file.size, checksum, user_id)
    RETURN 201, {attachment_id: att_id, url: url}

// ======================= reliability: retry & queues =======================

BACKGROUND WORKER RetryDelivery:
    WHILE TRUE:
        task = PopRetryQueue()
        IF task IS NULL:
            Sleep(SHORT_INTERVAL)
            CONTINUE
        recipient_id = task.recipient_id
        payload = task.payload
        devices = ListOnlineDevices(recipient_id)
        success = FALSE
        FOR EACH device IN devices:
            IF SendToDevice(device, payload) == "sent":
                success = TRUE
        IF NOT success:
            Requeue(task, BackoffNext(task))

FUNCTION ApplyBackpressure(conn):
    PauseOutbound(conn)
    NotifyClientWindow(conn)

// ======================= security & auditing =======================

FUNCTION VerifyAccessToken(token):
    RETURN TokenIsValid(token) AND NotExpired(token)

FUNCTION ContentAuditBlocks(content):
    score = RunContentAudit(content)
    IF score > AUDIT_THRESHOLD:
        LogAuditBlock(content)
        RETURN TRUE
    RETURN FALSE

// ======================= metrics & monitoring =======================

FUNCTION UpdateMetrics(metric_name, value):
    MetricsCounter[metric_name] += value

BACKGROUND WORKER ObserveAckLatency:
    ON ServerAckSent(message_id, timestamp):
        RecordPendingAck(message_id, timestamp)
    ON ClientAckReceived(message_id):
        start = PendingAckTimestamp(message_id)
        latency = Now() - start
        HistogramObserve("ack_latency", latency)

// ======================= rate limit =======================

FUNCTION TokenBucketTryConsume(key, tokens):
    bucket = LoadBucket(key)
    Refill(bucket)
    IF bucket.balance >= tokens:
        bucket.balance -= tokens
        SaveBucket(bucket)
        RETURN TRUE
    RETURN FALSE

技术说明

  • 会话与顺序
    • 按conv_id分片序列:使用原子计数器(如Redis的INCR)维护每会话的seq,确保同一会话内严格顺序;消息处理与投递也按会话分片路由,避免跨分片乱序。
  • 幂等与去重
    • 以(sender_id, client_msg_id)为唯一键,写库使用唯一约束保证不重复;写前在Redis中以短TTL缓存键加速重复判定,结合布隆过滤器降低DB压力。
    • 重复请求的响应返回已有的seq与message_id,保证客户端重试的幂等。
  • 存储冷热分层
    • 热:Redis保存最近N条消息conv_recent、待投递队列pending、会话seq、连接路由;适合快速读取与低延迟ACK。
    • 冷:数据库作为主持久层,消息插入与会话更新同步进行;定期将历史消息归档到对象存储,降低数据库存储成本。
  • ACK与延迟目标
    • P95<200ms:在发送路径中仅执行必要的操作(幂等判定、seq分配、DB单条插入、Redis写入)后立即返回ACK。内容审计与推送走异步或快速阻断策略。
    • 监控ACK延迟:记录服务端ACK时间和客户端ACK时间,统计直方图,触发告警。
  • 离线投递与推送
    • 在线设备直接通过session-gateway发送;全部离线则将消息加入用户设备的pending队列,并触发push-service。
    • push-service采用平台适配器模式;失败时进行备用通道切换并记录指标,支持熔断与退避。
  • 游标分页
    • 游标包含last_seq与方向;优先从Redis recent窗口读取,缺口再查DB,按seq合并。返回next_cursor以便客户端连续分页。
  • 限流与背压
    • 令牌桶在连接级/用户级/会话级应用,避免单连接过载。
    • 慢消费者:检测Outbound队列长度超限时暂停发送窗口(ApplyBackpressure),提示客户端收敛拉取或减速。
  • 敏感内容与安全
    • 全链路TLS加密;所有API与WebSocket消息均校验access_token。
    • 内容审计:在消息写库前进行快速审计;命中规则直接阻断并返回错误;审计规则记录到审计日志。
    • 附件病毒扫描:上传前扫描,命中立即拒绝;存储校验checksum防篡改。
  • 失败回退与重试
    • 投递失败进入重试队列,采用指数退避;设备上线事件可触发加速重试。
    • 推送通道失败:适配器选择备用通道重试;持续失败触发熔断并延迟恢复。
  • 会话并发峰值控制
    • session-gateway按设备/用户一致性哈希分片,水平扩展;连接注册与路由存于共享缓存。
    • 在峰值时对新连接施加受控接纳(连接速率限流)与优先级规则(例如优先处理已有活跃会话)。
  • 监控指标建议
    • 连接数、连接建立失败率、心跳超时率。
    • 消息吞吐(入站/出站)、服务器ACK延迟、客户端ACK延迟、未读队列深度。
    • 推送成功率、通道失败率、重试队列长度。
    • 审计命中率、病毒扫描命中率。
  • 测试要点
    • 弱网与断连:连接中断后自动重连与会话恢复;未ACK消息重试与去重验证。
    • 离线追送:设备离线期间消息入队,上线后GET /messages/pending拉取完整一致。
    • 重复发送:相同client_msg_id的重复发消息返回相同ACK与seq。
    • 权限校验:非会话参与者禁止发送与读取;越权访问返回403。
    • 顺序保证:同会话消息按seq严格递增;跨会话独立序列。

该方案在中等复杂度下满足实时性、可靠性与安全性目标,伪代码聚焦核心处理路径,具体实现可依据工程栈替换持久层与缓存组件,但原则与流程保持一致。

示例详情

解决的问题

将零散的业务需求快速转化为可评审、可落地的后端方案草稿;一次性生成“需求分析—架构概览—伪代码—实现建议”的完整输出,帮助团队在编码前统一共识;在迭代启动、模块重构、跨团队联调、招投标/预研、验收交付等关键节点,提供高质量的技术方案底稿;面向不同复杂度与系统类型(微服务、业务中台、数据处理等),稳定产出高可读、高一致性的逻辑设计;降低返工与沟通成本,提升交付速度与质量,推动试用转化为稳定付费。

适用用户

后端开发工程师

迭代前快速产出伪代码与方案说明,明确数据模型、接口与业务流程;用作每日开发清单,减少返工与沟通时间

架构师

梳理服务拆分、模块边界与依赖;生成可评审的架构与流程材料,加速方案对齐与落地

技术负责人/团队主管

制定统一实现规范与编码骨架;按复杂度拆分任务、安排人力与排期,降低交付风险

特征总结

根据需求一键生成可评审的后端伪代码与方案,清晰呈现数据关系与处理流程
自动拆解业务需求,输出模块划分与接口边界,帮助团队统一实现思路并降低沟通成本
智能设计数据模型与字段关系,减少反复修改,让数据库落地更顺畅,贯穿方案到实施
自动生成接口说明与调用流程,便于评审与联调,缩短上线周期并降低风险
按复杂度与系统类型定制输出结构,适配微服务、单体与后台管理等场景
提供可复制的伪代码骨架与关键步骤,开发者可直接细化为可运行代码
自动标注边界情况与异常处理建议,提前规避问题,降低返工成本于开发早期
输出需求概述、架构总览与流程清单,帮助团队快速对齐认知,提升评审效率
支持模板化复用与参数化输入,一键生成不同方案版本,便于快速试错

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 529 tokens
- 3 个可调节参数
{ 功能需求描述 } { 技术复杂度级别 } { 目标系统类型 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59