热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为后端开发场景设计,能够根据具体功能需求生成结构清晰、逻辑严谨的伪代码。通过系统化的需求分析和架构设计,确保生成的伪代码具备良好的可读性和可执行性,涵盖数据模型设计、API接口定义、业务逻辑流程等核心要素,帮助开发者在实际编码前明确技术方案和实现思路,提升开发效率和质量。
需求分析
架构概述
伪代码
// 通用辅助
function require_auth(token):
user = decode_and_validate_token(token)
if user is null: raise Unauthorized
return user
function rate_limit_check(ip, user_id, route):
key = "rl:" + route + ":" + (user_id or ip)
count = redis_incr_with_expiry(key, window=1s)
if count > threshold(route): raise TooManyRequests
function get_trace_context():
return new_trace_id()
// 产品与SKU查询
API GET /products?keyword&page:
trace = get_trace_context()
rate_limit_check(ip, null, "/products")
if keyword cached: return cached_result
products = db_query_products(keyword, page)
cache_set(keyword, products, ttl=60s)
return products
API GET /skus?product_id:
trace = get_trace_context()
rate_limit_check(ip, null, "/skus")
skus = cache_get("skus:" + product_id)
if skus is null:
skus = db_query_skus(product_id)
cache_set("skus:" + product_id, skus, ttl=60s)
// 计算可售量:stock - reserved_in_redis
for each sku in skus:
reserved = redis_get("inv:sku:" + sku.id + ":reserved") or 0
sku.available = max(sku.stock - reserved, 0)
return skus
// 购物车
API GET /cart:
user = require_auth(token)
rate_limit_check(ip, user.id, "/cart:get")
items = db_query_cart_items(user.id)
return items
API POST /cart/items {sku_id, qty}:
user = require_auth(token)
rate_limit_check(ip, user.id, "/cart:add")
sku = db_get_sku(sku_id)
if qty <= 0: raise BadRequest
// 价格快照:加入时锁定当前价以防促销变动造成展示不一致
price_snapshot = compute_base_price(sku.price, user.tier)
upsert_cart_item(user.id, sku_id, qty, price_snapshot)
bump_cart_version(user.id) // 增加cart_version用于幂等与并发检测
return success
API DELETE /cart/items/{id}:
user = require_auth(token)
rate_limit_check(ip, user.id, "/cart:delete")
delete_cart_item(id, user.id)
bump_cart_version(user.id)
return success
API POST /cart/apply-promotion {promotion_id}:
user = require_auth(token)
rate_limit_check(ip, user.id, "/cart:apply-promo")
promotion = get_promotion_cached(promotion_id)
cart = db_query_cart_items(user.id)
result = PromotionService.apply(cart, [promotion], user)
persist_applied_promotion(user.id, promotion_id, cart.version) // 可选记录
return result.price_breakdown
// 促销引擎
// 输入:购物车项、候选促销集合、用户与店铺上下文
function PromotionService.apply(cart_items, promotions, user):
// 过滤可用促销:scope匹配(商品/店铺/全局)、用户等级、时间窗、库存状态、优惠券可用性
applicable = []
for p in promotions:
if is_in_time_window(p) and scope_match(p, cart_items) and user_tier_match(p, user):
if p.type == "COUPON" and not coupon_usable(user, p): continue
applicable.append(p)
// 按priority降序排序
sorted = sort(applicable, by=priority_desc)
// 互斥控制:基于mutex_tags选择最大收益集合(贪心+互斥检查)
selected = []
used_mutex_tags = set()
for p in sorted:
if intersects(p.mutex_tags, used_mutex_tags): continue
if improves_total(cart_items, p):
selected.append(p)
used_mutex_tags.add_all(p.mutex_tags)
// 计算价格明细
price_detail = new_price_detail()
base_total = sum(item.price_snapshot * item.qty for item in cart_items)
total = base_total
for p in selected:
discount = compute_discount(p.ruleset, cart_items)
price_detail.add(p.id, discount, p.type)
total = total - discount
if total < 0: total = 0
// 促销降级
if promotion_engine_error():
return { total: base_total, price_detail: [], degraded: true }
return { total: total, price_detail: price_detail, degraded: false, applied_promotions: selected }
// 结算与库存短锁
API POST /checkout {cart_id} with Idempotency-Key = user_id + cart_version:
user = require_auth(token)
rate_limit_check(ip, user.id, "/checkout")
// 幂等检查
idem_key = "idem:checkout:" + user.id + ":" + get_cart_version(user.id)
if redis_set_nx(idem_key, "1", ttl=120s) == false:
return conflict_or_previous_result(idem_key)
cart = db_query_cart_items(user.id)
promotions = get_all_promotions_cached()
pricing = PromotionService.apply(cart, promotions, user)
// 可售量校验与短锁(Redis原子脚本,批量)
ttl = 10 minutes
for each item in cart:
success = InventoryService.try_reserve(item.sku_id, item.qty, cart_id, ttl)
if not success:
// 回滚已锁定
InventoryService.release_cart_reservations(cart_id)
return { error: "InventoryLockFailed", retry_after_ms: random_backoff(200~800) }
// 返回结算结果与锁信息(包含锁过期时间)
return { pricing: pricing, lock_expires_at: now() + ttl }
// 库存短锁(Redis)
function InventoryService.try_reserve(sku_id, qty, cart_id, ttl):
// 原子脚本:检查剩余量并增加reserved,同时记录cart局部锁用于释放
script:
stock = GET "sku:" + sku_id + ":stock" or db_fallback_stock(sku_id) // 可选提前缓存
reserved = GET "inv:sku:" + sku_id + ":reserved" or 0
available = stock - reserved
if available >= qty:
INCRBY "inv:sku:" + sku_id + ":reserved" qty
SET "inv:cart:" + cart_id + ":sku:" + sku_id qty EX ttl
// 记录一个聚合索引便于按cart释放
SADD "inv:cart:" + cart_id + ":skus" sku_id; EX ttl
return 1
else:
return 0
return eval(script)
function InventoryService.release_cart_reservations(cart_id):
skus = SMEMBERS "inv:cart:" + cart_id + ":skus"
for each sku_id in skus:
qty = GET "inv:cart:" + cart_id + ":sku:" + sku_id
if qty > 0:
DECRBY "inv:sku:" + sku_id + ":reserved" qty
DEL "inv:cart:" + cart_id + ":sku:" + sku_id
DEL "inv:cart:" + cart_id + ":skus"
// 下单与订单事件
API POST /orders {address_id, payment_channel}:
user = require_auth(token)
rate_limit_check(ip, user.id, "/orders:create")
cart = db_query_cart_items(user.id)
// 校验短锁仍有效
if not InventoryService.cart_lock_valid(cart.id):
return { error: "LockExpired", action: "RetryCheckout" }
promotions = get_applied_promotions(user.id, cart.version) or reapply_promotions(cart, user)
pricing = PromotionService.apply(cart, promotions, user)
// 创建订单(事务)
tx_begin()
order_id = insert_order(user.id, cart.items, pricing.total, pay_status="UNPAID", address_id, payment_channel)
// 记录InventoryLock到DB(审计)
for item in cart.items:
insert_inventory_lock(order_id, item.sku_id, item.qty, ttl_remaining(cart.id))
tx_commit()
// 投递订单创建事件(带重试与死信)
MQ.publish("order.created", {order_id, user_id, items, total, payment_channel})
// 清除结算幂等键
redis_del("idem:checkout:" + user.id + ":" + get_cart_version(user.id))
return { order_id, payable_amount: pricing.total }
// 支付回调与最终扣减
function PaymentCallbackHandler.handle(payment_event):
// 幂等检查
idem_key = "idem:paycb:" + payment_event.order_id + ":" + payment_event.tx_id
if redis_set_nx(idem_key, "1", ttl=300s) == false:
return already_processed()
order = db_get_order(payment_event.order_id)
if order.pay_status == "PAID": return success // 幂等
if payment_event.status == "SUCCESS":
tx_begin()
// 最终库存扣减(乐观检查)
for item in order.items:
affected = db_exec("UPDATE sku SET stock = stock - ? WHERE id = ? AND stock >= ?", item.qty, item.sku_id, item.qty)
if affected == 0:
tx_rollback()
// 库存不够,标记异常并触发人工介入或自动补货
mark_order_exception(order.id, "StockInsufficientOnFinalDeduct")
MQ.publish("order.exception", {order_id, type:"FinalDeductFail"})
return fail
update_order_pay_status(order.id, "PAID")
tx_commit()
// 释放短锁
InventoryService.release_cart_reservations(order.cart_id)
// 投递paid事件
MQ.publish("order.paid", {order_id})
// 优惠券消费确认(如适用)
PromotionService.confirm_coupon_usage(order.user_id, order.applied_coupons, order_id)
return success
else:
// 支付失败或超时
update_order_pay_status(order.id, "CANCELED")
InventoryService.release_cart_reservations(order.cart_id)
PromotionService.rollback_coupon_usage(order.user_id, order.applied_coupons)
MQ.publish("order.canceled", {order_id})
return success
function InventoryService.cart_lock_valid(cart_id):
// 判断锁索引是否存在且未过期
return EXISTS "inv:cart:" + cart_id + ":skus"
// 提示:运营促销配置(后台)
API POST /promotions {scope, type, priority, mutex_tags, ruleset}:
operator = require_auth(token)
require_permission(operator, "PROMOTION_WRITE") // 需审核流程
create_promotion_draft(...)
submit_for_review(...)
return success
// 观测与指标
function record_metrics(pricing, locks):
metrics.inc("promotion_hit_count", len(pricing.applied_promotions))
metrics.observe("checkout_p95_latency", elapsed_ms)
metrics.inc("inv_lock_hits") if locks_all_success else metrics.inc("inv_lock_conflicts")
tracing.annotate("degraded", pricing.degraded)
技术说明
促销规则与互斥
库存短锁设计(Redis)
最终扣减与数据库事务
幂等与并发控制
缓存策略
限流与安全
MQ与可靠投递
可观测与性能
测试建议
部署与容错
该方案在数据一致性、并发控制与性能之间保持平衡,使用Redis短锁+DB最终扣减的经典模式,促销引擎采用优先级+互斥贪心选择,支持跨店铺叠加与降级路径,满足中等复杂度Web服务的要求。
需求分析
架构概述
伪代码
FUNCTION EnsureIdempotency(scope, key):
existing = SELECT * FROM IdempotencyStore WHERE scope = scope AND key = key
IF existing EXISTS AND existing.status == "COMPLETED":
RETURN existing.result_ref
ELSE IF existing EXISTS AND existing.status == "IN_PROGRESS":
RAISE "RETRY_LATER"
ELSE:
INSERT INTO IdempotencyStore(scope, key, status="IN_PROGRESS", created_at=NOW)
RETURN null
FUNCTION CompleteIdempotency(scope, key, result_ref):
UPDATE IdempotencyStore SET status="COMPLETED", result_ref=result_ref WHERE scope=scope AND key=key
FUNCTION WriteOutbox(event_type, aggregate_id, payload):
event_id = GENERATE_UUID()
INSERT INTO Outbox(event_type, aggregate_id, payload, event_id, created_at=NOW, status="PENDING")
RETURN event_id
PROCESS OutboxRelayer:
LOOP:
rows = SELECT * FROM Outbox WHERE status="PENDING" ORDER BY created_at LIMIT N
FOR each row IN rows:
TRY:
PUBLISH_TO_KAFKA(topic=row.event_type, key=row.aggregate_id, message=payload_with_event_id(row))
UPDATE Outbox SET status="SENT", sent_at=NOW WHERE id=row.id
CATCH transient_error:
UPDATE Outbox SET retry_count = retry_count + 1 WHERE id=row.id
IF retry_count > MAX_RETRY:
MOVE_TO_DLQ(row)
SLEEP short_interval
CONSUMER OnOrdersPaid(event from orders.paid):
member_id = event.member_id
order_id = event.order_id
amount = event.amount
idempotency_key = CONCAT(order_id, ":", member_id)
IF IsDuplicateEvent(event.event_id):
ACK; RETURN
IF EnsureIdempotency(scope="ORDERS_PAID", key=idempotency_key) == "RETRY_LATER":
NACK; RETURN
points_input = TRANSFORM_TO_POINTS_INPUT(amount, event.category, event.timestamp)
TRY:
CALL points-service POST /points/accrue {order_id, amount=points_input.amount, idempotency_key}
CompleteIdempotency("ORDERS_PAID", idempotency_key, result_ref=order_id)
ACK
CATCH client_error (4xx):
// Business validation failed; finalize idempotency to avoid infinite retry
CompleteIdempotency("ORDERS_PAID", idempotency_key, result_ref="FAILED")
ACK
CATCH server_error (5xx or timeout):
// retry later
NACK
API POST /points/accrue(request):
key = request.idempotency_key
IF EnsureIdempotency(scope="POINTS_ACCRUE", key=key) == "RETRY_LATER":
RETURN 409 Retry
BEGIN TRANSACTION
existing = SELECT * FROM PointLedger WHERE idempotency_key = key
IF existing EXISTS:
COMMIT
CompleteIdempotency("POINTS_ACCRUE", key, result_ref=existing.id)
RETURN 200 existing
delta = APPLY_POINTS_RULES(order_id=request.order_id, amount=request.amount, member_profile=LOAD_MEMBER(request.member_id))
current_balance = SELECT SUM(delta) FROM PointLedger WHERE member_id = request.member_id AND status IN ("POSTED", "COMPENSATION_POSTED")
new_balance = current_balance + delta
INSERT INTO PointLedger(id=NEW_ID, member_id=request.member_id, delta=delta, source="ORDER", idempotency_key=key, balance=new_balance, status="POSTED", created_at=NOW)
event_id = WriteOutbox(event_type="points.accrued",
aggregate_id=request.member_id,
payload = { ledger_id: NEW_ID, member_id: request.member_id, delta: delta, balance: new_balance, idempotency_key: key, source: "ORDER" })
COMMIT
CompleteIdempotency("POINTS_ACCRUE", key, result_ref=NEW_ID)
RETURN 200 { ledger_id: NEW_ID, balance: new_balance }
FUNCTION ReverseLedgerByKey(original_key, reason):
BEGIN TRANSACTION
orig = SELECT * FROM PointLedger WHERE idempotency_key = original_key AND status="POSTED"
IF NOT orig EXISTS:
COMMIT; RETURN "NOOP"
comp_key = CONCAT(original_key, ":COMP")
IF SELECT * FROM PointLedger WHERE idempotency_key = comp_key EXISTS:
COMMIT; RETURN "ALREADY_COMPENSATED"
current_balance = SELECT SUM(delta) FROM PointLedger WHERE member_id = orig.member_id AND status IN ("POSTED", "COMPENSATION_POSTED")
new_balance = current_balance - orig.delta
INSERT INTO PointLedger(id=NEW_ID, member_id=orig.member_id, delta=-orig.delta, source="COMPENSATION", idempotency_key=comp_key, balance=new_balance, status="COMPENSATION_POSTED", created_at=NOW)
event_id = WriteOutbox(event_type="points.accrued",
aggregate_id=orig.member_id,
payload = { ledger_id: NEW_ID, member_id: orig.member_id, delta: -orig.delta, balance: new_balance, idempotency_key: comp_key, source: "COMPENSATION", reason: reason })
COMMIT
CONSUMER OnPointsAccrued(event from points.accrued):
IF IsDuplicateEvent(event.event_id):
ACK; RETURN
member_id = event.member_id
// 加权积分:按有效期和权重模型计算
weighted_score = COMPUTE_WEIGHTED_SCORE(member_id, window=ACTIVE_TIER_WINDOW(), weight_model=LOAD_TIER_WEIGHT_MODEL())
// 基于门槛判断目标等级
current_tier = SELECT tier FROM Member WHERE id = member_id
target_tier = EVALUATE_TIER(weighted_score, thresholds=LOAD_TIERS())
IF target_tier > current_tier:
BEGIN TRANSACTION
UPDATE Member SET tier = target_tier, updated_at=NOW WHERE id = member_id
event_id = WriteOutbox(event_type="tier.changed",
aggregate_id=member_id,
payload = { member_id: member_id, from_tier: current_tier, to_tier: target_tier, reason: "POINTS_ACCUMULATED", effective_at: NOW })
COMMIT
ACK
FUNCTION COMPUTE_WEIGHTED_SCORE(member_id, window, weight_model):
points = SELECT delta, source, created_at FROM PointLedger WHERE member_id=member_id AND created_at IN window
score = 0
FOR p IN points:
w = weight_model[source] OR DEFAULT_WEIGHT
decay = TIME_DECAY(window, p.created_at)
score = score + p.delta * w * decay
RETURN score
FUNCTION EVALUATE_TIER(score, thresholds):
// thresholds: list of {tier_name, min_score}
RETURN MAX tier WHERE score >= min_score
CONSUMER OnTierChanged(event from tier.changed):
IF IsDuplicateEvent(event.event_id):
ACK; RETURN
member_id = event.member_id
from_tier = event.from_tier
to_tier = event.to_tier
policies = SELECT * FROM BenefitPolicy WHERE trigger_tier = to_tier AND status="ACTIVE"
FOR policy IN policies:
saga_id = GENERATE_UUID()
START_SAGA(saga_id, member_id, policy, change_event_id=event.event_id)
PROCESS START_SAGA(saga_id, member_id, policy, change_event_id):
// 幂等:一个 (member_id, policy_id, change_event_id) 只启动一次
IF EXISTS BenefitIssue WHERE member_id=member_id AND policy_id=policy.id AND idempotency_key=change_event_id:
RETURN
// Step 1: 预留库存
TRY:
BEGIN TRANSACTION
IF policy.type IN ("COUPON", "PACKAGE"):
stock = SELECT stock FROM Benefit WHERE id = policy.benefit_id FOR UPDATE
IF stock <= 0:
RAISE "OUT_OF_STOCK"
UPDATE Benefit SET stock = stock - 1 WHERE id = policy.benefit_id
INSERT INTO BenefitIssue(id=NEW_ID, member_id=member_id, benefit_id=policy.benefit_id, status="RESERVED",
saga_id=saga_id, idempotency_key=change_event_id, created_at=NOW)
COMMIT
CATCH "OUT_OF_STOCK":
HANDLE_COMPENSATION(saga_id, member_id, policy, reason="OUT_OF_STOCK")
RETURN
// Step 2: 发放/确认
TRY:
BEGIN TRANSACTION
UPDATE BenefitIssue SET status="GRANTED", updated_at=NOW WHERE id=NEW_ID
event_id = WriteOutbox(event_type="benefit.granted",
aggregate_id=member_id,
payload = { issue_id: NEW_ID, member_id: member_id, benefit_id: policy.benefit_id, type: policy.type, status: "GRANTED" })
COMMIT
CATCH transient_error:
// 可重试;不立即补偿
SCHEDULE_RETRY(saga_id)
RETURN
CATCH permanent_error:
HANDLE_COMPENSATION(saga_id, member_id, policy, reason="GRANT_FAILED")
RETURN
FUNCTION HANDLE_COMPENSATION(saga_id, member_id, policy, reason):
BEGIN TRANSACTION
issue = SELECT * FROM BenefitIssue WHERE saga_id=saga_id FOR UPDATE
IF issue EXISTS AND issue.status IN ("RESERVED", "PENDING"):
UPDATE BenefitIssue SET status="FAILED", updated_at=NOW WHERE id=issue.id
IF policy.type IN ("COUPON", "PACKAGE"):
UPDATE Benefit SET stock = stock + 1 WHERE id = policy.benefit_id // 释放库存
// 可选:回滚成长值(按策略)
IF policy.compensation_rolls_back_points == TRUE AND issue.idempotency_key EXISTS:
CALL points-service ReverseLedgerByKey(original_key=issue.idempotency_key, reason=reason)
COMMIT
CONSUMER OnBenefitGranted(event from benefit.granted):
IF IsDuplicateEvent(event.event_id):
ACK; RETURN
message = FORMAT_MESSAGE(event.member_id, event.type, event.benefit_id)
result = SEND_TO_USER_CHANNEL(member_id=event.member_id, message=message)
RECORD_DELIVERY(member_id, event.issue_id, status=result.status, channel=result.channel, at=NOW)
ACK
API POST /members(body):
VALIDATE body.mobile FORMAT
BEGIN TRANSACTION
INSERT INTO Member(id=NEW_ID, mobile=MASK(body.mobile), tier=BASE_TIER, status="ACTIVE", updated_at=NOW)
WriteOutbox(event_type="member.created", aggregate_id=NEW_ID, payload={member_id: NEW_ID})
INSERT INTO AuditLog(...)
COMMIT
RETURN 201 { id: NEW_ID }
API GET /members/{id}:
AUTHN_MTLS_OR_HMAC()
member = SELECT * FROM Member WHERE id = id
RETURN 200 REDACT(member, fields=["audit_info"])
API GET /tiers:
RETURN SELECT * FROM Tier ORDER BY threshold ASC
API POST /benefits/grant { member_id, policy_id, idempotency_key }:
// 主动发放入口,走 Saga
IF EnsureIdempotency(scope="BENEFIT_GRANT_API", key=idempotency_key) == "RETRY_LATER":
RETURN 409 Retry
START_SAGA(saga_id=GENERATE_UUID(), member_id=member_id, policy=LOAD_POLICY(policy_id), change_event_id=idempotency_key)
CompleteIdempotency("BENEFIT_GRANT_API", idempotency_key, result_ref="STARTED")
RETURN 202 Accepted
SCHEDULED TASK ReconciliationDaily:
FOR each member IN Members:
ledger_sum = SELECT SUM(delta) FROM PointLedger WHERE member_id=member.id AND status IN ("POSTED", "COMPENSATION_POSTED")
IF ledger_sum != LAST_KNOWN_BALANCE(member.id):
ALERT "Ledger mismatch", member.id
CHECK_OUTBOX_STUCK(threshold_minutes=15)
CHECK_SAGA_FAILURE_RATE(window=1h, threshold=rate_limit)
CHECK_BENEFIT_STOCK_BELOW_MIN()
CONSUMER DeadLetterHandler(event from dead-letter):
CLASSIFY(event)
IF recoverable:
REPLAY(event)
ELSE:
CREATE_INCIDENT(event)
LOG_WITH_AUDIT(event)
FUNCTION IsDuplicateEvent(event_id):
RETURN EXISTS EventDedup WHERE event_id = event_id
CONSUMER INIT:
SET KafkaPartitionKey = member_id // 保证单会员内顺序一致
技术说明
需求分析
架构概述
伪代码
// ======================= auth-service =======================
FUNCTION POST /auth/login(mobile, password):
user = FindUserByMobile(mobile)
IF user IS NULL OR NOT VerifyPassword(password, user.password_hash):
RETURN 401, {error: "invalid_credentials"}
access_token = IssueAccessToken(subject=user.id, ttl=short)
refresh_token = IssueRefreshToken(subject=user.id, ttl=long)
RETURN 200, {access_token, refresh_token}
FUNCTION POST /auth/refresh(refresh_token):
IF NOT VerifyRefreshToken(refresh_token):
RETURN 401, {error: "invalid_refresh_token"}
user_id = ExtractSubject(refresh_token)
access_token = IssueAccessToken(subject=user_id, ttl=short)
RETURN 200, {access_token}
// ======================= session-gateway =======================
ON WebSocketConnect(conn):
ScheduleTimeout(conn, T_AUTH_TIMEOUT)
ON Message(conn, frame):
IF frame.type == "auth":
VERIFY_TLS(conn)
token = frame.access_token
IF NOT VerifyAccessToken(token):
CLOSE(conn, code=401)
user_id = ExtractSubject(token)
device_id = frame.device_id
IF NOT BindDeviceToUser(device_id, user_id):
CLOSE(conn, code=403)
RegisterConnection(device_id, conn)
CancelTimeout(conn, T_AUTH_TIMEOUT)
SEND(conn, {type: "auth_ack", status: "ok"})
ELSE IF frame.type == "heartbeat":
UpdateHeartbeat(conn)
SEND(conn, {type: "heartbeat_ack"})
ELSE IF frame.type == "send_message":
RouteToMessageService(frame, user_id, device_id)
ELSE IF frame.type == "ack":
HandleClientAck(user_id, frame.conv_id, frame.seq)
ON ConnectionClose(conn):
device_id = LookupDevice(conn)
UnregisterConnection(device_id)
FUNCTION RegisterConnection(device_id, conn):
ConnectionRegistry[device_id] = conn
UpdateMetrics("connections", +1)
// Rate limit at gateway
FUNCTION CheckRateLimit(subject_key):
allowed = TokenBucketTryConsume(subject_key, 1)
RETURN allowed
// Backpressure for slow consumer
FUNCTION SendToDevice(device_id, payload):
conn = ConnectionRegistry[device_id]
IF conn IS NULL:
RETURN "offline"
IF OutboundQueueSize(conn) > MAX_QUEUE_SIZE:
ApplyBackpressure(conn)
RETURN "deferred"
EnqueueOutbound(conn, payload)
RETURN "sent"
// ======================= message-service =======================
FUNCTION POST /conversations {participants[]}:
ValidateParticipants(participants)
conv_id = CreateConversation(participants)
RETURN 201, {conv_id}
FUNCTION POST /messages {conv_id, content, client_msg_id} WITH access_token:
sender_id = ExtractSubject(access_token)
IF NOT IsParticipant(conv_id, sender_id):
RETURN 403, {error: "not_in_conversation"}
// Idempotency & dedup pre-check
idem_key = MakeIdemKey(sender_id, client_msg_id)
IF RedisExists(idem_key):
existing = FindMessageByIdemKey(sender_id, client_msg_id)
RETURN 200, {ack: "duplicate", seq: existing.seq, message_id: existing.id}
// Optional bloom quick negative check
IF BloomMayContain(idem_key):
// proceed to DB uniqueness check anyway
// Assign ordered seq per conversation (atomic)
seq = AtomicIncr("conv_seq:" + conv_id)
// Content audit (asynchronous preferred)
IF ContentAuditBlocks(content):
RETURN 400, {error: "content_blocked"}
// Persist (synchronous for reliability)
message_id = InsertMessage(
conv_id=conv_id,
sender_id=sender_id,
seq=seq,
content=content,
ack_status="SERVER_ACKED",
client_msg_id=client_msg_id
)
UpdateConversationLastMsg(conv_id, message_id)
// Write hot cache
RedisSet(idem_key, {message_id, seq}, ttl=IDEM_TTL)
RedisListPush("conv_recent:" + conv_id, SerializeMessage(message_id), cap=RECENT_CAP)
FOR EACH participant IN GetParticipants(conv_id) EXCEPT sender_id:
EnqueueDelivery(participant, conv_id, seq, message_id)
// ACK to sender
RETURN 200, {ack: "ok", seq: seq, message_id: message_id}
FUNCTION EnqueueDelivery(recipient_id, conv_id, seq, message_id):
devices = ListOnlineDevices(recipient_id)
payload = {type: "message", conv_id, seq, message_id}
delivered_any = FALSE
FOR EACH device_id IN devices:
r = SendToDevice(device_id, payload)
IF r == "sent":
delivered_any = TRUE
IF NOT delivered_any:
// offline path: queue & push
RedisListPush("pending:" + RecipientKey(recipient_id), payload)
TriggerPush(recipient_id, conv_id, seq)
FUNCTION GET /messages/pending WITH access_token:
user_id = ExtractSubject(access_token)
max_n = RequestParam("limit", DEFAULT_LIMIT)
items = RedisListPopN("pending:" + RecipientKey(user_id), max_n)
RETURN 200, {messages: items}
FUNCTION RouteToMessageService(frame, user_id, device_id):
IF NOT CheckRateLimit("device:" + device_id):
SEND(ConnectionRegistry[device_id], {type: "error", code: 429})
RETURN
// parse frame fields
conv_id = frame.conv_id
client_msg_id = frame.client_msg_id
content = frame.content
resp = POST /messages {conv_id, content, client_msg_id} WITH user_id
SEND(ConnectionRegistry[device_id], {type: "server_ack", resp})
FUNCTION HandleClientAck(user_id, conv_id, seq):
MarkDelivered(user_id, conv_id, seq)
UpdateMetrics("client_ack", 1)
FUNCTION GET /conversations/{id}/history?cursor:
conv_id = PathParam(id)
cursor = QueryParam("cursor") // format: {last_seq, page_size, direction}
page_size = ValidatePageSize(cursor.page_size)
start_seq = ResolveStartSeq(conv_id, cursor)
// First read from Redis recent window
recent = RedisRange("conv_recent:" + conv_id, start_seq, page_size, cursor.direction)
IF recent.size < page_size:
remaining = page_size - recent.size
cold = DbFetchHistory(conv_id, start_seq, remaining, cursor.direction)
merged = MergeBySeq(recent, cold)
RETURN 200, {messages: merged, next_cursor: MakeCursor(conv_id, merged)}
ELSE:
RETURN 200, {messages: recent, next_cursor: MakeCursor(conv_id, recent)}
FUNCTION ResolveStartSeq(conv_id, cursor):
IF cursor IS NULL:
RETURN GetLatestSeq(conv_id)
ELSE:
RETURN cursor.last_seq
// ======================= push-service =======================
FUNCTION TriggerPush(recipient_id, conv_id, seq):
devices = ListDevices(recipient_id)
FOR EACH d IN devices:
adapter = SelectAdapter(d.platform)
payload = MakePushPayload(conv_id, seq)
result = adapter.Send(d.push_token, payload)
IF NOT result.success:
alt = SelectFallbackAdapter(d.platform)
IF alt IS NOT NULL:
alt.Send(d.push_token, payload)
RecordPushFailure(d, result.error)
FUNCTION SelectAdapter(platform):
IF platform == "ios": RETURN APNSAdapter
IF platform == "android": RETURN FCMAdapter
RETURN GenericAdapter
// ======================= media-service =======================
FUNCTION POST /media/upload WITH access_token AND file:
user_id = ExtractSubject(access_token)
IF NOT ValidateFileSize(file.size) OR NOT ValidateMime(file.mime):
RETURN 400, {error: "invalid_file"}
IF VirusScan(file) == "infected":
RETURN 400, {error: "virus_detected"}
checksum = ComputeChecksum(file)
url = StoreFileObject(file) // object storage
att_id = InsertAttachment(url, file.size, checksum, user_id)
RETURN 201, {attachment_id: att_id, url: url}
// ======================= reliability: retry & queues =======================
BACKGROUND WORKER RetryDelivery:
WHILE TRUE:
task = PopRetryQueue()
IF task IS NULL:
Sleep(SHORT_INTERVAL)
CONTINUE
recipient_id = task.recipient_id
payload = task.payload
devices = ListOnlineDevices(recipient_id)
success = FALSE
FOR EACH device IN devices:
IF SendToDevice(device, payload) == "sent":
success = TRUE
IF NOT success:
Requeue(task, BackoffNext(task))
FUNCTION ApplyBackpressure(conn):
PauseOutbound(conn)
NotifyClientWindow(conn)
// ======================= security & auditing =======================
FUNCTION VerifyAccessToken(token):
RETURN TokenIsValid(token) AND NotExpired(token)
FUNCTION ContentAuditBlocks(content):
score = RunContentAudit(content)
IF score > AUDIT_THRESHOLD:
LogAuditBlock(content)
RETURN TRUE
RETURN FALSE
// ======================= metrics & monitoring =======================
FUNCTION UpdateMetrics(metric_name, value):
MetricsCounter[metric_name] += value
BACKGROUND WORKER ObserveAckLatency:
ON ServerAckSent(message_id, timestamp):
RecordPendingAck(message_id, timestamp)
ON ClientAckReceived(message_id):
start = PendingAckTimestamp(message_id)
latency = Now() - start
HistogramObserve("ack_latency", latency)
// ======================= rate limit =======================
FUNCTION TokenBucketTryConsume(key, tokens):
bucket = LoadBucket(key)
Refill(bucket)
IF bucket.balance >= tokens:
bucket.balance -= tokens
SaveBucket(bucket)
RETURN TRUE
RETURN FALSE
技术说明
该方案在中等复杂度下满足实时性、可靠性与安全性目标,伪代码聚焦核心处理路径,具体实现可依据工程栈替换持久层与缓存组件,但原则与流程保持一致。
将零散的业务需求快速转化为可评审、可落地的后端方案草稿;一次性生成“需求分析—架构概览—伪代码—实现建议”的完整输出,帮助团队在编码前统一共识;在迭代启动、模块重构、跨团队联调、招投标/预研、验收交付等关键节点,提供高质量的技术方案底稿;面向不同复杂度与系统类型(微服务、业务中台、数据处理等),稳定产出高可读、高一致性的逻辑设计;降低返工与沟通成本,提升交付速度与质量,推动试用转化为稳定付费。
迭代前快速产出伪代码与方案说明,明确数据模型、接口与业务流程;用作每日开发清单,减少返工与沟通时间
梳理服务拆分、模块边界与依赖;生成可评审的架构与流程材料,加速方案对齐与落地
制定统一实现规范与编码骨架;按复杂度拆分任务、安排人力与排期,降低交付风险
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期